From acb2e967b8257b152697952460a67195717b38bc Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Mon, 13 Jan 2025 17:41:39 +0100 Subject: [PATCH] wip(sync): POC of using Redis for pubsub Co-authored-by: David Larlet --- pyproject.toml | 3 + umap/settings/base.py | 3 +- umap/sync/app.py | 120 ++++++++++++++++++++- umap/sync/apps.py | 6 -- umap/sync/migrations/0001_initial.py | 23 ---- umap/sync/migrations/__init__.py | 0 umap/sync/models.py | 150 --------------------------- 7 files changed, 120 insertions(+), 185 deletions(-) delete mode 100644 umap/sync/apps.py delete mode 100644 umap/sync/migrations/0001_initial.py delete mode 100644 umap/sync/migrations/__init__.py delete mode 100644 umap/sync/models.py diff --git a/pyproject.toml b/pyproject.toml index 578da9c9..bc2370c7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -102,3 +102,6 @@ format_css=true blank_line_after_tag="load,extends" line_break_after_multiline_tag=true +[lint] +# Disable autoremove of unused import. +unfixable = ["F401"] diff --git a/umap/settings/base.py b/umap/settings/base.py index e89f17af..1dc7748f 100644 --- a/umap/settings/base.py +++ b/umap/settings/base.py @@ -129,7 +129,6 @@ INSTALLED_APPS = ( "django.contrib.gis", "django_probes", "umap", - "umap.sync", "social_django", # See https://github.com/peopledoc/django-agnocomplete/commit/26eda2dfa4a2f8a805ca2ea19a0c504b9d773a1c # Django does not find the app config in the default place, so the app is not loaded @@ -344,3 +343,5 @@ WEBSOCKET_ENABLED = env.bool("WEBSOCKET_ENABLED", default=False) WEBSOCKET_BACK_HOST = env("WEBSOCKET_BACK_HOST", default="localhost") WEBSOCKET_BACK_PORT = env.int("WEBSOCKET_BACK_PORT", default=8001) WEBSOCKET_FRONT_URI = env("WEBSOCKET_FRONT_URI", default="ws://localhost:8001") + +REDIS_URL = "redis://localhost:6379" diff --git a/umap/sync/app.py b/umap/sync/app.py index cf526722..18db1c86 100644 --- a/umap/sync/app.py +++ b/umap/sync/app.py @@ -1,13 +1,26 @@ +import asyncio +import logging import uuid +import redis.asyncio as redis +from django.conf import settings +from django.core.signing import TimestampSigner from django.urls.resolvers import RoutePattern +from pydantic import ValidationError + +from .payloads import ( + JoinRequest, + JoinResponse, + ListPeersResponse, + OperationMessage, + PeerMessage, + Request, +) ws_pattern = RoutePattern("/ws/sync/") async def application(scope, receive, send): - from .models import Peer - matched = ws_pattern.match(scope["path"]) print(matched) if not matched: @@ -16,8 +29,7 @@ async def application(scope, receive, send): _, _, kwargs = matched map_id = kwargs["map_id"] - room_id = f"room{map_id}" - peer = Peer(uuid=uuid.uuid4(), name="FooBar", room_id=room_id) + peer = Peer(uuid=uuid.uuid4(), map_id=map_id) print(peer) peer._send = send while True: @@ -27,8 +39,10 @@ async def application(scope, receive, send): if event["type"] == "websocket.connect": try: print("Let's accept") - await send({"type": "websocket.accept"}) await peer.connect() + print("After connect") + await send({"type": "websocket.accept"}) + print("After accept") except ValueError: await send({"type": "websocket.close"}) @@ -43,3 +57,99 @@ async def application(scope, receive, send): await send({"type": "websocket.send", "text": "pong"}) else: await peer.receive(event["text"]) + + +class Peer: + def __init__(self, uuid, map_id, username=None): + self.uuid = uuid + self.user_id = f"user:{uuid}" + self.username = username or "" + self.room_id = f"umap:{map_id}" + self.is_authenticated = False + + async def get_peers(self): + peers = await self.client.hgetall(self.room_id) + # Send only ids for now (values are client names). + return peers.keys() + + async def listen_to_channel(self, channel_name): + async def reader(pubsub): + await pubsub.subscribe(channel_name) + while True: + try: + message = await pubsub.get_message(ignore_subscribe_messages=True) + except Exception as err: + print(err) + break + if message is not None: + if message["data"].decode() == "STOP": + break + await self.send(message["data"].decode()) + + async with self.client.pubsub() as pubsub: + asyncio.create_task(reader(pubsub)) + + async def listen(self): + await self.listen_to_channel(self.room_id) + await self.listen_to_channel(self.user_id) + + async def connect(self): + self.client = redis.from_url(settings.REDIS_URL) + + async def disconnect(self): + await self.client.hdel(self.room_id, self.user_id) + await self.send_peers_list() + await self.client.aclose() + await self.client.publish(self.room_id, "STOP") + await self.client.publish(self.user_id, "STOP") + + async def send_peers_list(self): + message = ListPeersResponse(peers=await self.get_peers()) + await self.broadcast(message.model_dump_json()) + + async def broadcast(self, message): + print("BROADCASTING", message) + # Send to all channels (including sender!) + await self.client.publish(self.room_id, message) + + async def send_to(self, peer_id, message): + print("SEND TO", peer_id, message) + # Send to one given channel + await self.client.publish(peer_id, message) + + async def receive(self, text_data): + if not self.is_authenticated: + print("AUTHENTICATING", self.uuid) + message = JoinRequest.model_validate_json(text_data) + signed = TimestampSigner().unsign_object(message.token, max_age=30) + user, room_id, permissions = signed.values() + if "edit" not in permissions: + return await self.disconnect() + await self.client.hset(self.room_id, self.user_id, self.username) + await self.listen() + response = JoinResponse(uuid=str(self.uuid), peers=await self.get_peers()) + await self.send(response.model_dump_json()) + await self.send_peers_list() + self.is_authenticated = True + return + + try: + incoming = Request.model_validate_json(text_data) + except ValidationError as error: + message = ( + f"An error occurred when receiving the following message: {text_data!r}" + ) + logging.error(message, error) + else: + match incoming.root: + # Broadcast all operation messages to connected peers + case OperationMessage(): + await self.broadcast(text_data) + + # Send peer messages to the proper peer + case PeerMessage(): + await self.send_to(incoming.root.recipient, text_data) + + async def send(self, text): + print("SEND", text) + await self._send({"type": "websocket.send", "text": text}) diff --git a/umap/sync/apps.py b/umap/sync/apps.py deleted file mode 100644 index b4c9c694..00000000 --- a/umap/sync/apps.py +++ /dev/null @@ -1,6 +0,0 @@ -from django.apps import AppConfig - - -class UmapConfig(AppConfig): - name = "umap.sync" - verbose_name = "uMap Sync" diff --git a/umap/sync/migrations/0001_initial.py b/umap/sync/migrations/0001_initial.py deleted file mode 100644 index a8893dfa..00000000 --- a/umap/sync/migrations/0001_initial.py +++ /dev/null @@ -1,23 +0,0 @@ -# Generated by Django 5.1.4 on 2024-12-27 16:14 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - initial = True - - dependencies = [] - - operations = [ - migrations.CreateModel( - name="Peer", - fields=[ - ( - "uuid", - models.UUIDField(primary_key=True, serialize=False, unique=True), - ), - ("name", models.CharField(max_length=200)), - ("room_id", models.CharField(max_length=200)), - ], - ), - ] diff --git a/umap/sync/migrations/__init__.py b/umap/sync/migrations/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/umap/sync/models.py b/umap/sync/models.py deleted file mode 100644 index 18a40076..00000000 --- a/umap/sync/models.py +++ /dev/null @@ -1,150 +0,0 @@ -import asyncio -import logging - -import psycopg -from django.core.signing import TimestampSigner -from django.db import connection, models -from psycopg import sql -from pydantic import ValidationError - -from .payloads import ( - JoinRequest, - JoinResponse, - ListPeersResponse, - OperationMessage, - PeerMessage, - Request, -) - - -class Peer(models.Model): - uuid = models.UUIDField(unique=True, primary_key=True) - name = models.CharField(max_length=200) - room_id = models.CharField(max_length=200) - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.is_authenticated = False - - async def get_peers(self): - qs = Peer.objects.filter(room_id=self.room_id).values_list("uuid", flat=True) - peers = [] - async for peer in qs: - peers.append(peer) - return peers - - async def listen_public(self): - # We need a dedicated connection for the LISTEN - aconn = await psycopg.AsyncConnection.connect( - **self.connection_params, - autocommit=True, - ) - async with aconn: - async with aconn.cursor() as acursor: - await acursor.execute( - sql.SQL("LISTEN {chan}").format( - chan=sql.Identifier(str(self.room_id)) - ) - ) - print("LISTEN", self.room_id) - gen = aconn.notifies() - async for notify in gen: - await self.send(notify.payload) - - async def listen_private(self): - aconn = await psycopg.AsyncConnection.connect( - **self.connection_params, - autocommit=True, - ) - async with aconn: - async with aconn.cursor() as acursor: - await acursor.execute( - sql.SQL("LISTEN {chan}").format(chan=sql.Identifier(str(self.uuid))) - ) - print("LISTEN", self.uuid) - gen = aconn.notifies() - async for notify in gen: - await self.send(notify.payload) - - async def connect(self): - # Join room for this map - connection_params = connection.get_connection_params() - connection_params.pop("cursor_factory") - self.connection_params = connection_params - self.connection = await psycopg.AsyncConnection.connect( - **connection_params, - autocommit=True, - ) - - async def listen(self): - asyncio.create_task(self.listen_public()) - asyncio.create_task(self.listen_private()) - - async def disconnect(self): - await self.adelete() - await self.send_peers_list() - - async def send_peers_list(self): - message = ListPeersResponse(peers=await self.get_peers()) - await self.broadcast(message.model_dump_json()) - - async def broadcast(self, message): - print("BROADCASTING", message) - # Send to all channels (including sender!) - async with self.connection.cursor() as cursor: - await cursor.execute( - sql.SQL("NOTIFY {chan}, {message}").format( - chan=sql.Identifier(str(self.room_id)), - message=message, - ) - ) - - async def send_to(self, peer_id, message): - print("SEND TO", peer_id, message) - # Send to one given channel - async with self.connection.cursor() as cursor: - await cursor.execute( - sql.SQL("NOTIFY {chan}, {message}").format( - chan=sql.Identifier(str(peer_id)), message=message - ) - ) - - async def receive(self, text_data): - if not self.is_authenticated: - print("AUTHENTICATING", self.uuid) - message = JoinRequest.model_validate_json(text_data) - signed = TimestampSigner().unsign_object(message.token, max_age=30) - user, map_id, permissions = signed.values() - if "edit" not in permissions: - return await self.disconnect() - await self.asave() - await self.listen() - response = JoinResponse(uuid=str(self.uuid), peers=await self.get_peers()) - await self.send(response.model_dump_json()) - await self.send_peers_list() - self.is_authenticated = True - return - - if text_data == "ping": - return await self.send("pong") - - try: - incoming = Request.model_validate_json(text_data) - except ValidationError as error: - message = ( - f"An error occurred when receiving the following message: {text_data!r}" - ) - logging.error(message, error) - else: - match incoming.root: - # Broadcast all operation messages to connected peers - case OperationMessage(): - await self.broadcast(text_data) - - # Send peer messages to the proper peer - case PeerMessage(): - await self.send_to(incoming.root.recipient, text_data) - - async def send(self, text): - print("SEND", text) - await self._send({"type": "websocket.send", "text": text})