From 31546d6ff42639f3d06a04876c5e934fa56ac9c8 Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Mon, 23 Dec 2024 13:37:01 +0100 Subject: [PATCH 01/18] wip(sync): use django-channels to serve websockets Co-authored-by: David Larlet --- umap/asgi.py | 9 +- .../commands/run_websocket_server.py | 23 -- umap/settings/base.py | 1 + umap/static/umap/js/modules/sync/engine.js | 7 +- umap/static/umap/js/modules/sync/websocket.js | 3 +- umap/sync/__init__.py | 0 umap/sync/consumers.py | 86 ++++++++ umap/sync/payloads.py | 47 ++++ umap/tests/integration/conftest.py | 13 ++ umap/tests/integration/test_websocket_sync.py | 16 +- umap/tests/test_websocket_server.py | 22 -- umap/websocket_server.py | 202 ------------------ 12 files changed, 169 insertions(+), 260 deletions(-) delete mode 100644 umap/management/commands/run_websocket_server.py create mode 100644 umap/sync/__init__.py create mode 100644 umap/sync/consumers.py create mode 100644 umap/sync/payloads.py delete mode 100644 umap/tests/test_websocket_server.py delete mode 100644 umap/websocket_server.py diff --git a/umap/asgi.py b/umap/asgi.py index 2ca12ddc..1f9d618a 100644 --- a/umap/asgi.py +++ b/umap/asgi.py @@ -1,15 +1,22 @@ import os -from channels.routing import ProtocolTypeRouter +from channels.routing import ProtocolTypeRouter, URLRouter +from channels.security.websocket import AllowedHostsOriginValidator from django.core.asgi import get_asgi_application +from django.urls import re_path + +from .sync import consumers os.environ.setdefault("DJANGO_SETTINGS_MODULE", "umap.settings") # Initialize Django ASGI application early to ensure the AppRegistry # is populated before importing code that may import ORM models. django_asgi_app = get_asgi_application() +urlpatterns = (re_path(r"ws/sync/(?P\w+)/$", consumers.SyncConsumer.as_asgi()),) + application = ProtocolTypeRouter( { "http": django_asgi_app, + "websocket": AllowedHostsOriginValidator(URLRouter(urlpatterns)), } ) diff --git a/umap/management/commands/run_websocket_server.py b/umap/management/commands/run_websocket_server.py deleted file mode 100644 index 2cb2db89..00000000 --- a/umap/management/commands/run_websocket_server.py +++ /dev/null @@ -1,23 +0,0 @@ -from django.conf import settings -from django.core.management.base import BaseCommand - -from umap import websocket_server - - -class Command(BaseCommand): - help = "Run the websocket server" - - def add_arguments(self, parser): - parser.add_argument( - "--host", - help="The server host to bind to.", - default=settings.WEBSOCKET_BACK_HOST, - ) - parser.add_argument( - "--port", - help="The server port to bind to.", - default=settings.WEBSOCKET_BACK_PORT, - ) - - def handle(self, *args, **options): - websocket_server.run(options["host"], options["port"]) diff --git a/umap/settings/base.py b/umap/settings/base.py index f47ad236..fedefe88 100644 --- a/umap/settings/base.py +++ b/umap/settings/base.py @@ -343,3 +343,4 @@ WEBSOCKET_ENABLED = env.bool("WEBSOCKET_ENABLED", default=False) WEBSOCKET_BACK_HOST = env("WEBSOCKET_BACK_HOST", default="localhost") WEBSOCKET_BACK_PORT = env.int("WEBSOCKET_BACK_PORT", default=8001) WEBSOCKET_FRONT_URI = env("WEBSOCKET_FRONT_URI", default="ws://localhost:8001") +CHANNEL_LAYERS = {"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"}} diff --git a/umap/static/umap/js/modules/sync/engine.js b/umap/static/umap/js/modules/sync/engine.js index 212c2528..c9b32167 100644 --- a/umap/static/umap/js/modules/sync/engine.js +++ b/umap/static/umap/js/modules/sync/engine.js @@ -77,7 +77,7 @@ export class SyncEngine { start(authToken) { this.transport = new WebSocketTransport( - this._umap.properties.websocketURI, + Utils.template(this._umap.properties.websocketURI, { id: this._umap.id }), authToken, this ) @@ -125,7 +125,7 @@ export class SyncEngine { if (this.offline) return if (this.transport) { - this.transport.send('OperationMessage', message) + this.transport.send('OperationMessage', { sender: this.uuid, ...message }) } } @@ -177,6 +177,7 @@ export class SyncEngine { * @param {Object} payload */ onOperationMessage(payload) { + if (payload.sender === this.uuid) return this._operations.storeRemoteOperations([payload]) this._applyOperation(payload) } @@ -484,7 +485,7 @@ export class Operations { return ( Utils.deepEqual(local.subject, remote.subject) && Utils.deepEqual(local.metadata, remote.metadata) && - (!shouldCheckKey || (shouldCheckKey && local.key == remote.key)) + (!shouldCheckKey || (shouldCheckKey && local.key === remote.key)) ) } } diff --git a/umap/static/umap/js/modules/sync/websocket.js b/umap/static/umap/js/modules/sync/websocket.js index 26c99f26..accdbcc3 100644 --- a/umap/static/umap/js/modules/sync/websocket.js +++ b/umap/static/umap/js/modules/sync/websocket.js @@ -6,7 +6,7 @@ export class WebSocketTransport { constructor(webSocketURI, authToken, messagesReceiver) { this.receiver = messagesReceiver - this.websocket = new WebSocket(webSocketURI) + this.websocket = new WebSocket(`${webSocketURI}`) this.websocket.onopen = () => { this.send('JoinRequest', { token: authToken }) @@ -48,6 +48,7 @@ export class WebSocketTransport { } onMessage(wsMessage) { + console.log(wsMessage) if (wsMessage.data === 'pong') { this.pongReceived = true } else { diff --git a/umap/sync/__init__.py b/umap/sync/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/umap/sync/consumers.py b/umap/sync/consumers.py new file mode 100644 index 00000000..dc722279 --- /dev/null +++ b/umap/sync/consumers.py @@ -0,0 +1,86 @@ +import logging + +from channels.generic.websocket import AsyncWebsocketConsumer +from django.core.signing import TimestampSigner +from pydantic import ValidationError + +from .payloads import ( + JoinRequest, + JoinResponse, + ListPeersResponse, + OperationMessage, + PeerMessage, + Request, +) + + +class SyncConsumer(AsyncWebsocketConsumer): + @property + def peers(self): + return self.channel_layer.groups[self.map_id].keys() + + async def connect(self): + self.map_id = self.scope["url_route"]["kwargs"]["map_id"] + + # Join room group + await self.channel_layer.group_add(self.map_id, self.channel_name) + + self.is_authenticated = False + await self.accept() + + async def disconnect(self, close_code): + await self.channel_layer.group_discard(self.map_id, self.channel_name) + await self.send_peers_list() + + async def send_peers_list(self): + message = ListPeersResponse(peers=self.peers) + await self.broadcast(message.model_dump_json()) + + async def broadcast(self, message): + # Send to all channels (including sender!) + await self.channel_layer.group_send( + self.map_id, {"message": message, "type": "on_message"} + ) + + async def send_to(self, channel, message): + # Send to one given channel + await self.channel_layer.send( + channel, {"message": message, "type": "on_message"} + ) + + async def on_message(self, event): + # Send to self channel + await self.send(event["message"]) + + async def receive(self, text_data): + if not self.is_authenticated: + message = JoinRequest.model_validate_json(text_data) + signed = TimestampSigner().unsign_object(message.token, max_age=30) + user, map_id, permissions = signed.values() + if "edit" not in permissions: + return await self.disconnect() + response = JoinResponse(uuid=self.channel_name, peers=self.peers) + await self.send(response.model_dump_json()) + await self.send_peers_list() + self.is_authenticated = True + return + + if text_data == "ping": + return await self.send("pong") + + try: + incoming = Request.model_validate_json(text_data) + except ValidationError as error: + message = ( + f"An error occurred when receiving the following message: {text_data!r}" + ) + logging.error(message, error) + else: + match incoming.root: + # Broadcast all operation messages to connected peers + case OperationMessage(): + await self.broadcast(text_data) + + # Send peer messages to the proper peer + case PeerMessage(): + await self.send_to(incoming.root.recipient, text_data) diff --git a/umap/sync/payloads.py b/umap/sync/payloads.py new file mode 100644 index 00000000..6a15a3f1 --- /dev/null +++ b/umap/sync/payloads.py @@ -0,0 +1,47 @@ +from typing import Literal, Optional, Union + +from pydantic import BaseModel, Field, RootModel + + +class JoinRequest(BaseModel): + kind: Literal["JoinRequest"] = "JoinRequest" + token: str + + +class OperationMessage(BaseModel): + """Message sent from one peer to all the others""" + + kind: Literal["OperationMessage"] = "OperationMessage" + verb: Literal["upsert", "update", "delete"] + subject: Literal["map", "datalayer", "feature"] + metadata: Optional[dict] = None + key: Optional[str] = None + + +class PeerMessage(BaseModel): + """Message sent from a specific peer to another one""" + + kind: Literal["PeerMessage"] = "PeerMessage" + sender: str + recipient: str + # The message can be whatever the peers want. It's not checked by the server. + message: dict + + +class Request(RootModel): + """Any message coming from the websocket should be one of these, and will be rejected otherwise.""" + + root: Union[PeerMessage, OperationMessage] = Field(discriminator="kind") + + +class JoinResponse(BaseModel): + """Server response containing the list of peers""" + + kind: Literal["JoinResponse"] = "JoinResponse" + peers: list + uuid: str + + +class ListPeersResponse(BaseModel): + kind: Literal["ListPeersResponse"] = "ListPeersResponse" + peers: list diff --git a/umap/tests/integration/conftest.py b/umap/tests/integration/conftest.py index 4601a709..620ab5ec 100644 --- a/umap/tests/integration/conftest.py +++ b/umap/tests/integration/conftest.py @@ -5,6 +5,7 @@ import time from pathlib import Path import pytest +from channels.testing import ChannelsLiveServerTestCase from playwright.sync_api import expect from ..base import mock_tiles @@ -87,3 +88,15 @@ def websocket_server(): yield ds_proc # Shut it down at the end of the pytest session ds_proc.terminate() + + +@pytest.fixture(scope="function") +def channels_live_server(request, settings): + server = ChannelsLiveServerTestCase() + server.serve_static = False + server._pre_setup() + settings.WEBSOCKET_FRONT_URI = f"{server.live_server_ws_url}/ws/sync/{{id}}/" + + yield server + + server._post_teardown() diff --git a/umap/tests/integration/test_websocket_sync.py b/umap/tests/integration/test_websocket_sync.py index 96946ce8..b5e5fc16 100644 --- a/umap/tests/integration/test_websocket_sync.py +++ b/umap/tests/integration/test_websocket_sync.py @@ -12,7 +12,7 @@ DATALAYER_UPDATE = re.compile(r".*/datalayer/update/.*") @pytest.mark.xdist_group(name="websockets") def test_websocket_connection_can_sync_markers( - new_page, live_server, websocket_server, tilelayer + new_page, live_server, channels_live_server, tilelayer ): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True @@ -80,7 +80,7 @@ def test_websocket_connection_can_sync_markers( @pytest.mark.xdist_group(name="websockets") def test_websocket_connection_can_sync_polygons( - context, live_server, websocket_server, tilelayer + context, live_server, channels_live_server, tilelayer ): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True @@ -164,7 +164,7 @@ def test_websocket_connection_can_sync_polygons( @pytest.mark.xdist_group(name="websockets") def test_websocket_connection_can_sync_map_properties( - new_page, live_server, websocket_server, tilelayer + new_page, live_server, channels_live_server, tilelayer ): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True @@ -198,7 +198,7 @@ def test_websocket_connection_can_sync_map_properties( @pytest.mark.xdist_group(name="websockets") def test_websocket_connection_can_sync_datalayer_properties( - new_page, live_server, websocket_server, tilelayer + new_page, live_server, channels_live_server, tilelayer ): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True @@ -227,7 +227,7 @@ def test_websocket_connection_can_sync_datalayer_properties( @pytest.mark.xdist_group(name="websockets") def test_websocket_connection_can_sync_cloned_polygons( - context, live_server, websocket_server, tilelayer + context, live_server, channels_live_server, tilelayer ): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True @@ -290,7 +290,7 @@ def test_websocket_connection_can_sync_cloned_polygons( @pytest.mark.xdist_group(name="websockets") def test_websocket_connection_can_sync_late_joining_peer( - new_page, live_server, websocket_server, tilelayer + new_page, live_server, channels_live_server, tilelayer ): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True @@ -351,7 +351,7 @@ def test_websocket_connection_can_sync_late_joining_peer( @pytest.mark.xdist_group(name="websockets") -def test_should_sync_datalayers(new_page, live_server, websocket_server, tilelayer): +def test_should_sync_datalayers(new_page, live_server, channels_live_server, tilelayer): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True map.save() @@ -490,7 +490,7 @@ def test_should_sync_datalayers_delete( @pytest.mark.xdist_group(name="websockets") def test_create_and_sync_map( - new_page, live_server, websocket_server, tilelayer, login, user + new_page, live_server, channels_live_server, tilelayer, login, user ): # Create a syncable map with peerA peerA = login(user, prefix="Page A") diff --git a/umap/tests/test_websocket_server.py b/umap/tests/test_websocket_server.py deleted file mode 100644 index 62bc93e9..00000000 --- a/umap/tests/test_websocket_server.py +++ /dev/null @@ -1,22 +0,0 @@ -from umap.websocket_server import OperationMessage, PeerMessage, Request, ServerRequest - - -def test_messages_are_parsed_correctly(): - server = Request.model_validate(dict(kind="Server", action="list-peers")).root - assert type(server) is ServerRequest - - operation = Request.model_validate( - dict( - kind="OperationMessage", - verb="upsert", - subject="map", - metadata={}, - key="key", - ) - ).root - assert type(operation) is OperationMessage - - peer_message = Request.model_validate( - dict(kind="PeerMessage", sender="Alice", recipient="Bob", message={}) - ).root - assert type(peer_message) is PeerMessage diff --git a/umap/websocket_server.py b/umap/websocket_server.py deleted file mode 100644 index 6483d648..00000000 --- a/umap/websocket_server.py +++ /dev/null @@ -1,202 +0,0 @@ -#!/usr/bin/env python - -import asyncio -import logging -import uuid -from collections import defaultdict -from typing import Literal, Optional, Union - -import websockets -from django.conf import settings -from django.core.signing import TimestampSigner -from pydantic import BaseModel, Field, RootModel, ValidationError -from websockets import WebSocketClientProtocol -from websockets.server import serve - - -class Connections: - def __init__(self) -> None: - self._connections: set[WebSocketClientProtocol] = set() - self._ids: dict[WebSocketClientProtocol, str] = dict() - - def join(self, websocket: WebSocketClientProtocol) -> str: - self._connections.add(websocket) - _id = str(uuid.uuid4()) - self._ids[websocket] = _id - return _id - - def leave(self, websocket: WebSocketClientProtocol) -> None: - self._connections.remove(websocket) - del self._ids[websocket] - - def get(self, id) -> WebSocketClientProtocol: - # use an iterator to stop iterating as soon as we found - return next(k for k, v in self._ids.items() if v == id) - - def get_id(self, websocket: WebSocketClientProtocol): - return self._ids[websocket] - - def get_other_peers( - self, websocket: WebSocketClientProtocol - ) -> set[WebSocketClientProtocol]: - return self._connections - {websocket} - - def get_all_peers(self) -> set[WebSocketClientProtocol]: - return self._connections - - -# Contains the list of websocket connections handled by this process. -# It's a mapping of map_id to a set of the active websocket connections -CONNECTIONS: defaultdict[int, Connections] = defaultdict(Connections) - - -class JoinRequest(BaseModel): - kind: Literal["JoinRequest"] = "JoinRequest" - token: str - - -class OperationMessage(BaseModel): - """Message sent from one peer to all the others""" - - kind: Literal["OperationMessage"] = "OperationMessage" - verb: Literal["upsert", "update", "delete"] - subject: Literal["map", "datalayer", "feature"] - metadata: Optional[dict] = None - key: Optional[str] = None - - -class PeerMessage(BaseModel): - """Message sent from a specific peer to another one""" - - kind: Literal["PeerMessage"] = "PeerMessage" - sender: str - recipient: str - # The message can be whatever the peers want. It's not checked by the server. - message: dict - - -class ServerRequest(BaseModel): - """A request towards the server""" - - kind: Literal["Server"] = "Server" - action: Literal["list-peers"] - - -class Request(RootModel): - """Any message coming from the websocket should be one of these, and will be rejected otherwise.""" - - root: Union[ServerRequest, PeerMessage, OperationMessage] = Field( - discriminator="kind" - ) - - -class JoinResponse(BaseModel): - """Server response containing the list of peers""" - - kind: Literal["JoinResponse"] = "JoinResponse" - peers: list - uuid: str - - -class ListPeersResponse(BaseModel): - kind: Literal["ListPeersResponse"] = "ListPeersResponse" - peers: list - - -async def join_and_listen( - map_id: int, permissions: list, user: str | int, websocket: WebSocketClientProtocol -): - """Join a "room" with other connected peers, and wait for messages.""" - logging.debug(f"{user} joined room #{map_id}") - connections: Connections = CONNECTIONS[map_id] - _id: str = connections.join(websocket) - - # Assign an ID to the joining peer and return it the list of connected peers. - peers: list[WebSocketClientProtocol] = [ - connections.get_id(p) for p in connections.get_all_peers() - ] - response = JoinResponse(uuid=_id, peers=peers) - await websocket.send(response.model_dump_json()) - - # Notify all other peers of the new list of connected peers. - message = ListPeersResponse(peers=peers) - websockets.broadcast( - connections.get_other_peers(websocket), message.model_dump_json() - ) - - try: - async for raw_message in websocket: - if raw_message == "ping": - await websocket.send("pong") - continue - - # recompute the peers list at the time of message-sending. - # as doing so beforehand would miss new connections - other_peers = connections.get_other_peers(websocket) - try: - incoming = Request.model_validate_json(raw_message) - except ValidationError as e: - error = f"An error occurred when receiving the following message: {raw_message!r}" - logging.error(error, e) - else: - match incoming.root: - # Broadcast all operation messages to connected peers - case OperationMessage(): - websockets.broadcast(other_peers, raw_message) - - # Send peer messages to the proper peer - case PeerMessage(recipient=_id): - peer = connections.get(_id) - if peer: - await peer.send(raw_message) - - finally: - # On disconnect, remove the connection from the pool - connections.leave(websocket) - - # TODO: refactor this in a separate method. - # Notify all other peers of the new list of connected peers. - peers = [connections.get_id(p) for p in connections.get_all_peers()] - message = ListPeersResponse(peers=peers) - websockets.broadcast( - connections.get_other_peers(websocket), message.model_dump_json() - ) - - -async def handler(websocket: WebSocketClientProtocol): - """Main WebSocket handler. - - Check if the permission is granted and let the peer enter a room. - """ - raw_message = await websocket.recv() - - # The first event should always be 'join' - message: JoinRequest = JoinRequest.model_validate_json(raw_message) - signed = TimestampSigner().unsign_object(message.token, max_age=30) - user, map_id, permissions = signed.values() - - # Check if permissions for this map have been granted by the server - if "edit" in signed["permissions"]: - await join_and_listen(map_id, permissions, user, websocket) - - -def run(host: str, port: int): - if not settings.WEBSOCKET_ENABLED: - msg = ( - "WEBSOCKET_ENABLED should be set to True to run the WebSocket Server. " - "See the documentation at " - "https://docs.umap-project.org/en/stable/config/settings/#websocket_enabled " - "for more information." - ) - print(msg) - exit(1) - - async def _serve(): - async with serve(handler, host, port): - logging.debug(f"Waiting for connections on {host}:{port}") - await asyncio.Future() # run forever - - try: - asyncio.run(_serve()) - except KeyboardInterrupt: - print("Closing WebSocket server") From a29eae138e6a455a882b6769f8f1ab7bae70f965 Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Fri, 27 Dec 2024 18:09:27 +0100 Subject: [PATCH 02/18] wip(sync): websocket server with ASGI and PostgreSQL LISTEN/NOTIFY --- umap/asgi.py | 26 ++-- umap/settings/base.py | 2 +- umap/static/umap/js/modules/sync/websocket.js | 9 +- umap/sync/app.py | 45 ++++++ umap/sync/apps.py | 6 + umap/sync/consumers.py | 86 ---------- umap/sync/migrations/0001_initial.py | 23 +++ umap/sync/migrations/__init__.py | 0 umap/sync/models.py | 147 ++++++++++++++++++ 9 files changed, 241 insertions(+), 103 deletions(-) create mode 100644 umap/sync/app.py create mode 100644 umap/sync/apps.py delete mode 100644 umap/sync/consumers.py create mode 100644 umap/sync/migrations/0001_initial.py create mode 100644 umap/sync/migrations/__init__.py create mode 100644 umap/sync/models.py diff --git a/umap/asgi.py b/umap/asgi.py index 1f9d618a..47d69a93 100644 --- a/umap/asgi.py +++ b/umap/asgi.py @@ -1,22 +1,20 @@ import os -from channels.routing import ProtocolTypeRouter, URLRouter -from channels.security.websocket import AllowedHostsOriginValidator -from django.core.asgi import get_asgi_application -from django.urls import re_path - -from .sync import consumers - os.environ.setdefault("DJANGO_SETTINGS_MODULE", "umap.settings") + +from django.core.asgi import get_asgi_application + +from .sync.app import application as ws_application + # Initialize Django ASGI application early to ensure the AppRegistry # is populated before importing code that may import ORM models. django_asgi_app = get_asgi_application() -urlpatterns = (re_path(r"ws/sync/(?P\w+)/$", consumers.SyncConsumer.as_asgi()),) -application = ProtocolTypeRouter( - { - "http": django_asgi_app, - "websocket": AllowedHostsOriginValidator(URLRouter(urlpatterns)), - } -) +async def application(scope, receive, send): + if scope["type"] == "http": + await django_asgi_app(scope, receive, send) + elif scope["type"] == "websocket": + await ws_application(scope, receive, send) + else: + raise NotImplementedError(f"Unknown scope type {scope['type']}") diff --git a/umap/settings/base.py b/umap/settings/base.py index fedefe88..e89f17af 100644 --- a/umap/settings/base.py +++ b/umap/settings/base.py @@ -129,6 +129,7 @@ INSTALLED_APPS = ( "django.contrib.gis", "django_probes", "umap", + "umap.sync", "social_django", # See https://github.com/peopledoc/django-agnocomplete/commit/26eda2dfa4a2f8a805ca2ea19a0c504b9d773a1c # Django does not find the app config in the default place, so the app is not loaded @@ -343,4 +344,3 @@ WEBSOCKET_ENABLED = env.bool("WEBSOCKET_ENABLED", default=False) WEBSOCKET_BACK_HOST = env("WEBSOCKET_BACK_HOST", default="localhost") WEBSOCKET_BACK_PORT = env.int("WEBSOCKET_BACK_PORT", default=8001) WEBSOCKET_FRONT_URI = env("WEBSOCKET_FRONT_URI", default="ws://localhost:8001") -CHANNEL_LAYERS = {"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"}} diff --git a/umap/static/umap/js/modules/sync/websocket.js b/umap/static/umap/js/modules/sync/websocket.js index accdbcc3..0a80f98f 100644 --- a/umap/static/umap/js/modules/sync/websocket.js +++ b/umap/static/umap/js/modules/sync/websocket.js @@ -6,7 +6,7 @@ export class WebSocketTransport { constructor(webSocketURI, authToken, messagesReceiver) { this.receiver = messagesReceiver - this.websocket = new WebSocket(`${webSocketURI}`) + this.websocket = new WebSocket(webSocketURI) this.websocket.onopen = () => { this.send('JoinRequest', { token: authToken }) @@ -21,6 +21,10 @@ export class WebSocketTransport { } } + this.websocket.onerror = (error) => { + console.log('WS ERROR', error) + } + this.ensureOpen = setInterval(() => { if (this.websocket.readyState !== WebSocket.OPEN) { this.websocket.close() @@ -34,6 +38,7 @@ export class WebSocketTransport { // See https://making.close.com/posts/reliable-websockets/ for more details. this.pingInterval = setInterval(() => { if (this.websocket.readyState === WebSocket.OPEN) { + console.log('sending ping') this.websocket.send('ping') this.pongReceived = false setTimeout(() => { @@ -48,7 +53,6 @@ export class WebSocketTransport { } onMessage(wsMessage) { - console.log(wsMessage) if (wsMessage.data === 'pong') { this.pongReceived = true } else { @@ -64,6 +68,7 @@ export class WebSocketTransport { } close() { + console.log('Closing') this.receiver.closeRequested = true this.websocket.close() } diff --git a/umap/sync/app.py b/umap/sync/app.py new file mode 100644 index 00000000..04482d2d --- /dev/null +++ b/umap/sync/app.py @@ -0,0 +1,45 @@ +import uuid + +from django.urls.resolvers import RoutePattern + +ws_pattern = RoutePattern("/ws/sync/") + + +async def application(scope, receive, send): + from .models import Peer + + matched = ws_pattern.match(scope["path"]) + print(matched) + if not matched: + print("Wrong path") + return + _, _, kwargs = matched + + map_id = kwargs["map_id"] + room_id = f"room{map_id}" + peer = await Peer.objects.acreate(uuid=uuid.uuid4(), name="FooBar", room_id=room_id) + print(peer) + peer._send = send + while True: + event = await receive() + print("EVENT", event) + + if event["type"] == "websocket.connect": + try: + print("Let's accept") + await send({"type": "websocket.accept"}) + await peer.connect() + except ValueError: + await send({"type": "websocket.close"}) + + if event["type"] == "websocket.disconnect": + print("Closing", event) + await peer.disconnect() + print("Closed") + break + + if event["type"] == "websocket.receive": + if event["text"] == "ping": + await send({"type": "websocket.send", "text": "pong"}) + else: + await peer.receive(event["text"]) diff --git a/umap/sync/apps.py b/umap/sync/apps.py new file mode 100644 index 00000000..b4c9c694 --- /dev/null +++ b/umap/sync/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class UmapConfig(AppConfig): + name = "umap.sync" + verbose_name = "uMap Sync" diff --git a/umap/sync/consumers.py b/umap/sync/consumers.py deleted file mode 100644 index dc722279..00000000 --- a/umap/sync/consumers.py +++ /dev/null @@ -1,86 +0,0 @@ -import logging - -from channels.generic.websocket import AsyncWebsocketConsumer -from django.core.signing import TimestampSigner -from pydantic import ValidationError - -from .payloads import ( - JoinRequest, - JoinResponse, - ListPeersResponse, - OperationMessage, - PeerMessage, - Request, -) - - -class SyncConsumer(AsyncWebsocketConsumer): - @property - def peers(self): - return self.channel_layer.groups[self.map_id].keys() - - async def connect(self): - self.map_id = self.scope["url_route"]["kwargs"]["map_id"] - - # Join room group - await self.channel_layer.group_add(self.map_id, self.channel_name) - - self.is_authenticated = False - await self.accept() - - async def disconnect(self, close_code): - await self.channel_layer.group_discard(self.map_id, self.channel_name) - await self.send_peers_list() - - async def send_peers_list(self): - message = ListPeersResponse(peers=self.peers) - await self.broadcast(message.model_dump_json()) - - async def broadcast(self, message): - # Send to all channels (including sender!) - await self.channel_layer.group_send( - self.map_id, {"message": message, "type": "on_message"} - ) - - async def send_to(self, channel, message): - # Send to one given channel - await self.channel_layer.send( - channel, {"message": message, "type": "on_message"} - ) - - async def on_message(self, event): - # Send to self channel - await self.send(event["message"]) - - async def receive(self, text_data): - if not self.is_authenticated: - message = JoinRequest.model_validate_json(text_data) - signed = TimestampSigner().unsign_object(message.token, max_age=30) - user, map_id, permissions = signed.values() - if "edit" not in permissions: - return await self.disconnect() - response = JoinResponse(uuid=self.channel_name, peers=self.peers) - await self.send(response.model_dump_json()) - await self.send_peers_list() - self.is_authenticated = True - return - - if text_data == "ping": - return await self.send("pong") - - try: - incoming = Request.model_validate_json(text_data) - except ValidationError as error: - message = ( - f"An error occurred when receiving the following message: {text_data!r}" - ) - logging.error(message, error) - else: - match incoming.root: - # Broadcast all operation messages to connected peers - case OperationMessage(): - await self.broadcast(text_data) - - # Send peer messages to the proper peer - case PeerMessage(): - await self.send_to(incoming.root.recipient, text_data) diff --git a/umap/sync/migrations/0001_initial.py b/umap/sync/migrations/0001_initial.py new file mode 100644 index 00000000..a8893dfa --- /dev/null +++ b/umap/sync/migrations/0001_initial.py @@ -0,0 +1,23 @@ +# Generated by Django 5.1.4 on 2024-12-27 16:14 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + initial = True + + dependencies = [] + + operations = [ + migrations.CreateModel( + name="Peer", + fields=[ + ( + "uuid", + models.UUIDField(primary_key=True, serialize=False, unique=True), + ), + ("name", models.CharField(max_length=200)), + ("room_id", models.CharField(max_length=200)), + ], + ), + ] diff --git a/umap/sync/migrations/__init__.py b/umap/sync/migrations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/umap/sync/models.py b/umap/sync/models.py new file mode 100644 index 00000000..cc1b4af7 --- /dev/null +++ b/umap/sync/models.py @@ -0,0 +1,147 @@ +import asyncio +import logging + +import psycopg +from django.core.signing import TimestampSigner +from django.db import connection, models +from psycopg import sql +from pydantic import ValidationError + +from .payloads import ( + JoinRequest, + JoinResponse, + ListPeersResponse, + OperationMessage, + PeerMessage, + Request, +) + + +class Peer(models.Model): + uuid = models.UUIDField(unique=True, primary_key=True) + name = models.CharField(max_length=200) + room_id = models.CharField(max_length=200) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.is_authenticated = False + + async def get_peers(self): + qs = Peer.objects.filter(room_id=self.room_id).values_list("uuid", flat=True) + peers = [] + async for peer in qs: + peers.append(peer) + return peers + + async def listen_public(self): + # We need a dedicated connection for the LISTEN + aconn = await psycopg.AsyncConnection.connect( + **self.connection_params, + autocommit=True, + ) + async with aconn: + async with aconn.cursor() as acursor: + await acursor.execute( + sql.SQL("LISTEN {chan}").format( + chan=sql.Identifier(str(self.room_id)) + ) + ) + print("LISTEN", self.room_id) + gen = aconn.notifies() + async for notify in gen: + await self.send(notify.payload) + + async def listen_private(self): + aconn = await psycopg.AsyncConnection.connect( + **self.connection_params, + autocommit=True, + ) + async with aconn: + async with aconn.cursor() as acursor: + await acursor.execute( + sql.SQL("LISTEN {chan}").format(chan=sql.Identifier(str(self.uuid))) + ) + print("LISTEN", self.uuid) + gen = aconn.notifies() + async for notify in gen: + await self.send(notify.payload) + + async def connect(self): + # Join room for this map + connection_params = connection.get_connection_params() + connection_params.pop("cursor_factory") + self.connection_params = connection_params + self.connection = await psycopg.AsyncConnection.connect( + **connection_params, + autocommit=True, + ) + asyncio.create_task(self.listen_public()) + asyncio.create_task(self.listen_private()) + + async def disconnect(self): + await self.adelete() + await self.send_peers_list() + + async def send_peers_list(self): + message = ListPeersResponse(peers=await self.get_peers()) + await self.broadcast(message.model_dump_json()) + + async def broadcast(self, message): + print("BROADCASTING", message) + # Send to all channels (including sender!) + async with self.connection.cursor() as cursor: + await cursor.execute( + sql.SQL("NOTIFY {chan}, {message}").format( + chan=sql.Identifier(str(self.room_id)), + message=message, + ) + ) + + async def send_to(self, peer_id, message): + print("SEND TO", peer_id, message) + # Send to one given channel + async with self.connection.cursor() as cursor: + await cursor.execute( + sql.SQL("NOTIFY {chan}, {message}").format( + chan=sql.Identifier(str(peer_id)), message=message + ) + ) + + async def receive(self, text_data): + if not self.is_authenticated: + print("AUTHENTICATING", self.uuid) + message = JoinRequest.model_validate_json(text_data) + signed = TimestampSigner().unsign_object(message.token, max_age=30) + user, map_id, permissions = signed.values() + if "edit" not in permissions: + return await self.disconnect() + await Peer.asave() + response = JoinResponse(uuid=str(self.uuid), peers=await self.get_peers()) + await self.send(response.model_dump_json()) + await self.send_peers_list() + self.is_authenticated = True + return + + if text_data == "ping": + return await self.send("pong") + + try: + incoming = Request.model_validate_json(text_data) + except ValidationError as error: + message = ( + f"An error occurred when receiving the following message: {text_data!r}" + ) + logging.error(message, error) + else: + match incoming.root: + # Broadcast all operation messages to connected peers + case OperationMessage(): + await self.broadcast(text_data) + + # Send peer messages to the proper peer + case PeerMessage(): + await self.send_to(incoming.root.recipient, text_data) + + async def send(self, text): + print("SEND", text) + await self._send({"type": "websocket.send", "text": text}) From 698c74b4275e25aeef276ddd601d4de9e163153b Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Thu, 2 Jan 2025 17:40:17 +0100 Subject: [PATCH 03/18] wip(sync): only save Peer after authentication --- umap/sync/app.py | 2 +- umap/sync/models.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/umap/sync/app.py b/umap/sync/app.py index 04482d2d..cf526722 100644 --- a/umap/sync/app.py +++ b/umap/sync/app.py @@ -17,7 +17,7 @@ async def application(scope, receive, send): map_id = kwargs["map_id"] room_id = f"room{map_id}" - peer = await Peer.objects.acreate(uuid=uuid.uuid4(), name="FooBar", room_id=room_id) + peer = Peer(uuid=uuid.uuid4(), name="FooBar", room_id=room_id) print(peer) peer._send = send while True: diff --git a/umap/sync/models.py b/umap/sync/models.py index cc1b4af7..c3bbccfe 100644 --- a/umap/sync/models.py +++ b/umap/sync/models.py @@ -115,7 +115,7 @@ class Peer(models.Model): user, map_id, permissions = signed.values() if "edit" not in permissions: return await self.disconnect() - await Peer.asave() + await self.asave() response = JoinResponse(uuid=str(self.uuid), peers=await self.get_peers()) await self.send(response.model_dump_json()) await self.send_peers_list() From 460a0c9997e0cb2b545bc92a5853fc2001560614 Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Thu, 2 Jan 2025 17:43:12 +0100 Subject: [PATCH 04/18] wip(sync): only open listen connections after authentication --- umap/sync/models.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/umap/sync/models.py b/umap/sync/models.py index c3bbccfe..18a40076 100644 --- a/umap/sync/models.py +++ b/umap/sync/models.py @@ -75,6 +75,8 @@ class Peer(models.Model): **connection_params, autocommit=True, ) + + async def listen(self): asyncio.create_task(self.listen_public()) asyncio.create_task(self.listen_private()) @@ -116,6 +118,7 @@ class Peer(models.Model): if "edit" not in permissions: return await self.disconnect() await self.asave() + await self.listen() response = JoinResponse(uuid=str(self.uuid), peers=await self.get_peers()) await self.send(response.model_dump_json()) await self.send_peers_list() From ab7119e0a48724f897fe3c0d2875e33795fc661f Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Fri, 3 Jan 2025 10:56:28 +0100 Subject: [PATCH 05/18] wip(sync): use Daphne as live_server for tests Also clean dependencies. We still use the channels live server for our tests, but do not use it anymore for the actual websocket handling. --- pyproject.toml | 4 +- umap/tests/integration/conftest.py | 45 ++++++------------- umap/tests/integration/test_websocket_sync.py | 16 +++---- 3 files changed, 22 insertions(+), 43 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 94c4521a..578da9c9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,6 +54,7 @@ dev = [ "isort==5.13.2", ] test = [ + "daphne==4.1.2", "factory-boy==3.3.1", "playwright>=1.39", "pytest==8.3.4", @@ -70,10 +71,7 @@ s3 = [ "django-storages[s3]==1.14.4", ] sync = [ - "channels==4.2.0", - "daphne==4.1.2", "pydantic==2.10.5", - "websockets==13.1", ] [project.scripts] diff --git a/umap/tests/integration/conftest.py b/umap/tests/integration/conftest.py index 620ab5ec..d72886f8 100644 --- a/umap/tests/integration/conftest.py +++ b/umap/tests/integration/conftest.py @@ -1,13 +1,12 @@ import os import re -import subprocess -import time -from pathlib import Path import pytest -from channels.testing import ChannelsLiveServerTestCase +from daphne.testing import DaphneProcess from playwright.sync_api import expect +from umap.asgi import application + from ..base import mock_tiles @@ -68,35 +67,17 @@ def login(new_page, settings, live_server): return do_login -@pytest.fixture -def websocket_server(): - # Find the test-settings, and put them in the current environment - settings_path = (Path(__file__).parent.parent / "settings.py").absolute().as_posix() - os.environ["UMAP_SETTINGS"] = settings_path - - ds_proc = subprocess.Popen( - [ - "umap", - "run_websocket_server", - ], - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - ) - time.sleep(2) - # Ensure it started properly before yielding - assert not ds_proc.poll(), ds_proc.stdout.read().decode("utf-8") - yield ds_proc - # Shut it down at the end of the pytest session - ds_proc.terminate() - - @pytest.fixture(scope="function") -def channels_live_server(request, settings): - server = ChannelsLiveServerTestCase() - server.serve_static = False - server._pre_setup() - settings.WEBSOCKET_FRONT_URI = f"{server.live_server_ws_url}/ws/sync/{{id}}/" +def asgi_live_server(request, settings): + request.getfixturevalue("transactional_db") + server = DaphneProcess("localhost", lambda: application) + server.start() + server.ready.wait() + port = server.port.value + settings.WEBSOCKET_FRONT_URI = f"ws://localhost:{port}/ws/sync/{{id}}/" + server.url = f"http://localhost:{port}" yield server - server._post_teardown() + server.terminate() + server.join() diff --git a/umap/tests/integration/test_websocket_sync.py b/umap/tests/integration/test_websocket_sync.py index b5e5fc16..569bca30 100644 --- a/umap/tests/integration/test_websocket_sync.py +++ b/umap/tests/integration/test_websocket_sync.py @@ -12,7 +12,7 @@ DATALAYER_UPDATE = re.compile(r".*/datalayer/update/.*") @pytest.mark.xdist_group(name="websockets") def test_websocket_connection_can_sync_markers( - new_page, live_server, channels_live_server, tilelayer + new_page, live_server, asgi_live_server, tilelayer ): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True @@ -80,7 +80,7 @@ def test_websocket_connection_can_sync_markers( @pytest.mark.xdist_group(name="websockets") def test_websocket_connection_can_sync_polygons( - context, live_server, channels_live_server, tilelayer + context, live_server, asgi_live_server, tilelayer ): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True @@ -164,7 +164,7 @@ def test_websocket_connection_can_sync_polygons( @pytest.mark.xdist_group(name="websockets") def test_websocket_connection_can_sync_map_properties( - new_page, live_server, channels_live_server, tilelayer + new_page, live_server, asgi_live_server, tilelayer ): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True @@ -198,7 +198,7 @@ def test_websocket_connection_can_sync_map_properties( @pytest.mark.xdist_group(name="websockets") def test_websocket_connection_can_sync_datalayer_properties( - new_page, live_server, channels_live_server, tilelayer + new_page, live_server, asgi_live_server, tilelayer ): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True @@ -227,7 +227,7 @@ def test_websocket_connection_can_sync_datalayer_properties( @pytest.mark.xdist_group(name="websockets") def test_websocket_connection_can_sync_cloned_polygons( - context, live_server, channels_live_server, tilelayer + context, live_server, asgi_live_server, tilelayer ): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True @@ -290,7 +290,7 @@ def test_websocket_connection_can_sync_cloned_polygons( @pytest.mark.xdist_group(name="websockets") def test_websocket_connection_can_sync_late_joining_peer( - new_page, live_server, channels_live_server, tilelayer + new_page, live_server, asgi_live_server, tilelayer ): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True @@ -351,7 +351,7 @@ def test_websocket_connection_can_sync_late_joining_peer( @pytest.mark.xdist_group(name="websockets") -def test_should_sync_datalayers(new_page, live_server, channels_live_server, tilelayer): +def test_should_sync_datalayers(new_page, live_server, asgi_live_server, tilelayer): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True map.save() @@ -490,7 +490,7 @@ def test_should_sync_datalayers_delete( @pytest.mark.xdist_group(name="websockets") def test_create_and_sync_map( - new_page, live_server, channels_live_server, tilelayer, login, user + new_page, live_server, asgi_live_server, tilelayer, login, user ): # Create a syncable map with peerA peerA = login(user, prefix="Page A") From acb2e967b8257b152697952460a67195717b38bc Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Mon, 13 Jan 2025 17:41:39 +0100 Subject: [PATCH 06/18] wip(sync): POC of using Redis for pubsub Co-authored-by: David Larlet --- pyproject.toml | 3 + umap/settings/base.py | 3 +- umap/sync/app.py | 120 ++++++++++++++++++++- umap/sync/apps.py | 6 -- umap/sync/migrations/0001_initial.py | 23 ---- umap/sync/migrations/__init__.py | 0 umap/sync/models.py | 150 --------------------------- 7 files changed, 120 insertions(+), 185 deletions(-) delete mode 100644 umap/sync/apps.py delete mode 100644 umap/sync/migrations/0001_initial.py delete mode 100644 umap/sync/migrations/__init__.py delete mode 100644 umap/sync/models.py diff --git a/pyproject.toml b/pyproject.toml index 578da9c9..bc2370c7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -102,3 +102,6 @@ format_css=true blank_line_after_tag="load,extends" line_break_after_multiline_tag=true +[lint] +# Disable autoremove of unused import. +unfixable = ["F401"] diff --git a/umap/settings/base.py b/umap/settings/base.py index e89f17af..1dc7748f 100644 --- a/umap/settings/base.py +++ b/umap/settings/base.py @@ -129,7 +129,6 @@ INSTALLED_APPS = ( "django.contrib.gis", "django_probes", "umap", - "umap.sync", "social_django", # See https://github.com/peopledoc/django-agnocomplete/commit/26eda2dfa4a2f8a805ca2ea19a0c504b9d773a1c # Django does not find the app config in the default place, so the app is not loaded @@ -344,3 +343,5 @@ WEBSOCKET_ENABLED = env.bool("WEBSOCKET_ENABLED", default=False) WEBSOCKET_BACK_HOST = env("WEBSOCKET_BACK_HOST", default="localhost") WEBSOCKET_BACK_PORT = env.int("WEBSOCKET_BACK_PORT", default=8001) WEBSOCKET_FRONT_URI = env("WEBSOCKET_FRONT_URI", default="ws://localhost:8001") + +REDIS_URL = "redis://localhost:6379" diff --git a/umap/sync/app.py b/umap/sync/app.py index cf526722..18db1c86 100644 --- a/umap/sync/app.py +++ b/umap/sync/app.py @@ -1,13 +1,26 @@ +import asyncio +import logging import uuid +import redis.asyncio as redis +from django.conf import settings +from django.core.signing import TimestampSigner from django.urls.resolvers import RoutePattern +from pydantic import ValidationError + +from .payloads import ( + JoinRequest, + JoinResponse, + ListPeersResponse, + OperationMessage, + PeerMessage, + Request, +) ws_pattern = RoutePattern("/ws/sync/") async def application(scope, receive, send): - from .models import Peer - matched = ws_pattern.match(scope["path"]) print(matched) if not matched: @@ -16,8 +29,7 @@ async def application(scope, receive, send): _, _, kwargs = matched map_id = kwargs["map_id"] - room_id = f"room{map_id}" - peer = Peer(uuid=uuid.uuid4(), name="FooBar", room_id=room_id) + peer = Peer(uuid=uuid.uuid4(), map_id=map_id) print(peer) peer._send = send while True: @@ -27,8 +39,10 @@ async def application(scope, receive, send): if event["type"] == "websocket.connect": try: print("Let's accept") - await send({"type": "websocket.accept"}) await peer.connect() + print("After connect") + await send({"type": "websocket.accept"}) + print("After accept") except ValueError: await send({"type": "websocket.close"}) @@ -43,3 +57,99 @@ async def application(scope, receive, send): await send({"type": "websocket.send", "text": "pong"}) else: await peer.receive(event["text"]) + + +class Peer: + def __init__(self, uuid, map_id, username=None): + self.uuid = uuid + self.user_id = f"user:{uuid}" + self.username = username or "" + self.room_id = f"umap:{map_id}" + self.is_authenticated = False + + async def get_peers(self): + peers = await self.client.hgetall(self.room_id) + # Send only ids for now (values are client names). + return peers.keys() + + async def listen_to_channel(self, channel_name): + async def reader(pubsub): + await pubsub.subscribe(channel_name) + while True: + try: + message = await pubsub.get_message(ignore_subscribe_messages=True) + except Exception as err: + print(err) + break + if message is not None: + if message["data"].decode() == "STOP": + break + await self.send(message["data"].decode()) + + async with self.client.pubsub() as pubsub: + asyncio.create_task(reader(pubsub)) + + async def listen(self): + await self.listen_to_channel(self.room_id) + await self.listen_to_channel(self.user_id) + + async def connect(self): + self.client = redis.from_url(settings.REDIS_URL) + + async def disconnect(self): + await self.client.hdel(self.room_id, self.user_id) + await self.send_peers_list() + await self.client.aclose() + await self.client.publish(self.room_id, "STOP") + await self.client.publish(self.user_id, "STOP") + + async def send_peers_list(self): + message = ListPeersResponse(peers=await self.get_peers()) + await self.broadcast(message.model_dump_json()) + + async def broadcast(self, message): + print("BROADCASTING", message) + # Send to all channels (including sender!) + await self.client.publish(self.room_id, message) + + async def send_to(self, peer_id, message): + print("SEND TO", peer_id, message) + # Send to one given channel + await self.client.publish(peer_id, message) + + async def receive(self, text_data): + if not self.is_authenticated: + print("AUTHENTICATING", self.uuid) + message = JoinRequest.model_validate_json(text_data) + signed = TimestampSigner().unsign_object(message.token, max_age=30) + user, room_id, permissions = signed.values() + if "edit" not in permissions: + return await self.disconnect() + await self.client.hset(self.room_id, self.user_id, self.username) + await self.listen() + response = JoinResponse(uuid=str(self.uuid), peers=await self.get_peers()) + await self.send(response.model_dump_json()) + await self.send_peers_list() + self.is_authenticated = True + return + + try: + incoming = Request.model_validate_json(text_data) + except ValidationError as error: + message = ( + f"An error occurred when receiving the following message: {text_data!r}" + ) + logging.error(message, error) + else: + match incoming.root: + # Broadcast all operation messages to connected peers + case OperationMessage(): + await self.broadcast(text_data) + + # Send peer messages to the proper peer + case PeerMessage(): + await self.send_to(incoming.root.recipient, text_data) + + async def send(self, text): + print("SEND", text) + await self._send({"type": "websocket.send", "text": text}) diff --git a/umap/sync/apps.py b/umap/sync/apps.py deleted file mode 100644 index b4c9c694..00000000 --- a/umap/sync/apps.py +++ /dev/null @@ -1,6 +0,0 @@ -from django.apps import AppConfig - - -class UmapConfig(AppConfig): - name = "umap.sync" - verbose_name = "uMap Sync" diff --git a/umap/sync/migrations/0001_initial.py b/umap/sync/migrations/0001_initial.py deleted file mode 100644 index a8893dfa..00000000 --- a/umap/sync/migrations/0001_initial.py +++ /dev/null @@ -1,23 +0,0 @@ -# Generated by Django 5.1.4 on 2024-12-27 16:14 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - initial = True - - dependencies = [] - - operations = [ - migrations.CreateModel( - name="Peer", - fields=[ - ( - "uuid", - models.UUIDField(primary_key=True, serialize=False, unique=True), - ), - ("name", models.CharField(max_length=200)), - ("room_id", models.CharField(max_length=200)), - ], - ), - ] diff --git a/umap/sync/migrations/__init__.py b/umap/sync/migrations/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/umap/sync/models.py b/umap/sync/models.py deleted file mode 100644 index 18a40076..00000000 --- a/umap/sync/models.py +++ /dev/null @@ -1,150 +0,0 @@ -import asyncio -import logging - -import psycopg -from django.core.signing import TimestampSigner -from django.db import connection, models -from psycopg import sql -from pydantic import ValidationError - -from .payloads import ( - JoinRequest, - JoinResponse, - ListPeersResponse, - OperationMessage, - PeerMessage, - Request, -) - - -class Peer(models.Model): - uuid = models.UUIDField(unique=True, primary_key=True) - name = models.CharField(max_length=200) - room_id = models.CharField(max_length=200) - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.is_authenticated = False - - async def get_peers(self): - qs = Peer.objects.filter(room_id=self.room_id).values_list("uuid", flat=True) - peers = [] - async for peer in qs: - peers.append(peer) - return peers - - async def listen_public(self): - # We need a dedicated connection for the LISTEN - aconn = await psycopg.AsyncConnection.connect( - **self.connection_params, - autocommit=True, - ) - async with aconn: - async with aconn.cursor() as acursor: - await acursor.execute( - sql.SQL("LISTEN {chan}").format( - chan=sql.Identifier(str(self.room_id)) - ) - ) - print("LISTEN", self.room_id) - gen = aconn.notifies() - async for notify in gen: - await self.send(notify.payload) - - async def listen_private(self): - aconn = await psycopg.AsyncConnection.connect( - **self.connection_params, - autocommit=True, - ) - async with aconn: - async with aconn.cursor() as acursor: - await acursor.execute( - sql.SQL("LISTEN {chan}").format(chan=sql.Identifier(str(self.uuid))) - ) - print("LISTEN", self.uuid) - gen = aconn.notifies() - async for notify in gen: - await self.send(notify.payload) - - async def connect(self): - # Join room for this map - connection_params = connection.get_connection_params() - connection_params.pop("cursor_factory") - self.connection_params = connection_params - self.connection = await psycopg.AsyncConnection.connect( - **connection_params, - autocommit=True, - ) - - async def listen(self): - asyncio.create_task(self.listen_public()) - asyncio.create_task(self.listen_private()) - - async def disconnect(self): - await self.adelete() - await self.send_peers_list() - - async def send_peers_list(self): - message = ListPeersResponse(peers=await self.get_peers()) - await self.broadcast(message.model_dump_json()) - - async def broadcast(self, message): - print("BROADCASTING", message) - # Send to all channels (including sender!) - async with self.connection.cursor() as cursor: - await cursor.execute( - sql.SQL("NOTIFY {chan}, {message}").format( - chan=sql.Identifier(str(self.room_id)), - message=message, - ) - ) - - async def send_to(self, peer_id, message): - print("SEND TO", peer_id, message) - # Send to one given channel - async with self.connection.cursor() as cursor: - await cursor.execute( - sql.SQL("NOTIFY {chan}, {message}").format( - chan=sql.Identifier(str(peer_id)), message=message - ) - ) - - async def receive(self, text_data): - if not self.is_authenticated: - print("AUTHENTICATING", self.uuid) - message = JoinRequest.model_validate_json(text_data) - signed = TimestampSigner().unsign_object(message.token, max_age=30) - user, map_id, permissions = signed.values() - if "edit" not in permissions: - return await self.disconnect() - await self.asave() - await self.listen() - response = JoinResponse(uuid=str(self.uuid), peers=await self.get_peers()) - await self.send(response.model_dump_json()) - await self.send_peers_list() - self.is_authenticated = True - return - - if text_data == "ping": - return await self.send("pong") - - try: - incoming = Request.model_validate_json(text_data) - except ValidationError as error: - message = ( - f"An error occurred when receiving the following message: {text_data!r}" - ) - logging.error(message, error) - else: - match incoming.root: - # Broadcast all operation messages to connected peers - case OperationMessage(): - await self.broadcast(text_data) - - # Send peer messages to the proper peer - case PeerMessage(): - await self.send_to(incoming.root.recipient, text_data) - - async def send(self, text): - print("SEND", text) - await self._send({"type": "websocket.send", "text": text}) From 36d9e9bf062e444e2efc236125a909dfee644d5a Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Thu, 16 Jan 2025 16:43:48 +0100 Subject: [PATCH 07/18] wip(sync): use the correct URL for websocket Co-authored-by: David Larlet --- umap/settings/base.py | 1 - umap/static/umap/js/modules/sync/engine.js | 4 ++- umap/sync/app.py | 26 ++++++++-------- umap/utils.py | 35 +++++++++++++++------- umap/views.py | 1 - 5 files changed, 41 insertions(+), 26 deletions(-) diff --git a/umap/settings/base.py b/umap/settings/base.py index 1dc7748f..788a03b3 100644 --- a/umap/settings/base.py +++ b/umap/settings/base.py @@ -342,6 +342,5 @@ LOGGING = { WEBSOCKET_ENABLED = env.bool("WEBSOCKET_ENABLED", default=False) WEBSOCKET_BACK_HOST = env("WEBSOCKET_BACK_HOST", default="localhost") WEBSOCKET_BACK_PORT = env.int("WEBSOCKET_BACK_PORT", default=8001) -WEBSOCKET_FRONT_URI = env("WEBSOCKET_FRONT_URI", default="ws://localhost:8001") REDIS_URL = "redis://localhost:6379" diff --git a/umap/static/umap/js/modules/sync/engine.js b/umap/static/umap/js/modules/sync/engine.js index c9b32167..9928865e 100644 --- a/umap/static/umap/js/modules/sync/engine.js +++ b/umap/static/umap/js/modules/sync/engine.js @@ -76,8 +76,10 @@ export class SyncEngine { } start(authToken) { + const path = this._umap.urls.get('ws_sync', { map_id: this._umap.id }) + const protocol = window.location.protocol === 'http:' ? 'ws:' : 'wss:' this.transport = new WebSocketTransport( - Utils.template(this._umap.properties.websocketURI, { id: this._umap.id }), + `${protocol}//${window.location.host}${path}`, authToken, this ) diff --git a/umap/sync/app.py b/umap/sync/app.py index 18db1c86..3b0bea0c 100644 --- a/umap/sync/app.py +++ b/umap/sync/app.py @@ -5,7 +5,7 @@ import uuid import redis.asyncio as redis from django.conf import settings from django.core.signing import TimestampSigner -from django.urls.resolvers import RoutePattern +from django.urls import path from pydantic import ValidationError from .payloads import ( @@ -17,20 +17,19 @@ from .payloads import ( Request, ) -ws_pattern = RoutePattern("/ws/sync/") - async def application(scope, receive, send): - matched = ws_pattern.match(scope["path"]) - print(matched) - if not matched: - print("Wrong path") - return - _, _, kwargs = matched + path = scope["path"].lstrip("/") + for pattern in urlpatterns: + if matched := pattern.resolve(path): + await matched.func(scope, receive, send, **matched.kwargs) + break + else: + await send({"type": "websocket.close"}) - map_id = kwargs["map_id"] - peer = Peer(uuid=uuid.uuid4(), map_id=map_id) - print(peer) + +async def sync(scope, receive, send, **kwargs): + peer = Peer(uuid=uuid.uuid4(), map_id=kwargs["map_id"]) peer._send = send while True: event = await receive() @@ -153,3 +152,6 @@ class Peer: async def send(self, text): print("SEND", text) await self._send({"type": "websocket.send", "text": text}) + + +urlpatterns = [path("ws/sync/", name="ws_sync", view=sync)] diff --git a/umap/utils.py b/umap/utils.py index 26cf581d..561ae702 100644 --- a/umap/utils.py +++ b/umap/utils.py @@ -7,23 +7,36 @@ from django.core.serializers.json import DjangoJSONEncoder from django.urls import URLPattern, URLResolver, get_resolver -def _urls_for_js(urls=None): +def _get_url_names(module): + def _get_names(resolver): + names = [] + for pattern in resolver.url_patterns: + if getattr(pattern, "url_patterns", None): + # Do not add "admin" and other third party apps urls. + if not pattern.namespace: + names.extend(_get_names(pattern)) + elif getattr(pattern, "name", None): + names.append(pattern.name) + return names + + return _get_names(get_resolver(module)) + + +def _urls_for_js(): """ Return templated URLs prepared for javascript. """ - if urls is None: - # prevent circular import - from .urls import i18n_urls, urlpatterns - - urls = [ - url.name for url in urlpatterns + i18n_urls if getattr(url, "name", None) - ] - urls = dict(zip(urls, [get_uri_template(url) for url in urls])) + urls = {} + for module in ["umap.urls", "umap.sync.app"]: + names = _get_url_names(module) + urls.update( + dict(zip(names, [get_uri_template(url, module=module) for url in names])) + ) urls.update(getattr(settings, "UMAP_EXTRA_URLS", {})) return urls -def get_uri_template(urlname, args=None, prefix=""): +def get_uri_template(urlname, args=None, prefix="", module=None): """ Utility function to return an URI Template from a named URL in django Copied from django-digitalpaper. @@ -45,7 +58,7 @@ def get_uri_template(urlname, args=None, prefix=""): paths = template % dict([p, "{%s}" % p] for p in args) return "%s/%s" % (prefix, paths) - resolver = get_resolver(None) + resolver = get_resolver(module) parts = urlname.split(":") if len(parts) > 1 and parts[0] in resolver.namespace_dict: namespace = parts[0] diff --git a/umap/views.py b/umap/views.py index d1952405..c8c09476 100644 --- a/umap/views.py +++ b/umap/views.py @@ -609,7 +609,6 @@ class MapDetailMixin(SessionMixin): "umap_version": VERSION, "featuresHaveOwner": settings.UMAP_DEFAULT_FEATURES_HAVE_OWNERS, "websocketEnabled": settings.WEBSOCKET_ENABLED, - "websocketURI": settings.WEBSOCKET_FRONT_URI, "importers": settings.UMAP_IMPORTERS, "defaultLabelKeys": settings.UMAP_LABEL_KEYS, } From 1bf100d7a8bc439dbce43ac81aa6ada6004ab6ed Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Mon, 20 Jan 2025 14:57:13 +0100 Subject: [PATCH 08/18] wip(sync): make the client set its peer id --- umap/static/umap/js/modules/sync/engine.js | 17 +++++++------- umap/static/umap/js/modules/sync/websocket.js | 4 ++-- umap/sync/app.py | 22 +++++++++---------- umap/sync/payloads.py | 3 ++- 4 files changed, 24 insertions(+), 22 deletions(-) diff --git a/umap/static/umap/js/modules/sync/engine.js b/umap/static/umap/js/modules/sync/engine.js index 9928865e..0b883438 100644 --- a/umap/static/umap/js/modules/sync/engine.js +++ b/umap/static/umap/js/modules/sync/engine.js @@ -62,6 +62,7 @@ export class SyncEngine { this._reconnectDelay = RECONNECT_DELAY this.websocketConnected = false this.closeRequested = false + this.peerId = Utils.generateId() } async authenticate() { @@ -81,7 +82,8 @@ export class SyncEngine { this.transport = new WebSocketTransport( `${protocol}//${window.location.host}${path}`, authToken, - this + this, + this.peerId ) } @@ -127,7 +129,7 @@ export class SyncEngine { if (this.offline) return if (this.transport) { - this.transport.send('OperationMessage', { sender: this.uuid, ...message }) + this.transport.send('OperationMessage', { sender: this.peerId, ...message }) } } @@ -179,7 +181,7 @@ export class SyncEngine { * @param {Object} payload */ onOperationMessage(payload) { - if (payload.sender === this.uuid) return + if (payload.sender === this.peerId) return this._operations.storeRemoteOperations([payload]) this._applyOperation(payload) } @@ -191,9 +193,8 @@ export class SyncEngine { * @param {string} payload.uuid The server-assigned uuid for this peer * @param {string[]} payload.peers The list of peers uuids */ - onJoinResponse({ uuid, peers }) { - debug('received join response', { uuid, peers }) - this.uuid = uuid + onJoinResponse({ peer, peers }) { + debug('received join response', { peer, peers }) this.onListPeersResponse({ peers }) // Get one peer at random @@ -289,7 +290,7 @@ export class SyncEngine { sendToPeer(recipient, verb, payload) { payload.verb = verb this.transport.send('PeerMessage', { - sender: this.uuid, + sender: this.peerId, recipient: recipient, message: payload, }) @@ -301,7 +302,7 @@ export class SyncEngine { * @returns {string|bool} the selected peer uuid, or False if none was found. */ _getRandomPeer() { - const otherPeers = this.peers.filter((p) => p !== this.uuid) + const otherPeers = this.peers.filter((p) => p !== this.peerId) if (otherPeers.length > 0) { const random = Math.floor(Math.random() * otherPeers.length) return otherPeers[random] diff --git a/umap/static/umap/js/modules/sync/websocket.js b/umap/static/umap/js/modules/sync/websocket.js index 0a80f98f..991a9c4e 100644 --- a/umap/static/umap/js/modules/sync/websocket.js +++ b/umap/static/umap/js/modules/sync/websocket.js @@ -3,13 +3,13 @@ const PING_INTERVAL = 30000 const FIRST_CONNECTION_TIMEOUT = 2000 export class WebSocketTransport { - constructor(webSocketURI, authToken, messagesReceiver) { + constructor(webSocketURI, authToken, messagesReceiver, peerId) { this.receiver = messagesReceiver this.websocket = new WebSocket(webSocketURI) this.websocket.onopen = () => { - this.send('JoinRequest', { token: authToken }) + this.send('JoinRequest', { token: authToken, peer: peerId }) this.receiver.onConnection() } this.websocket.addEventListener('message', this.onMessage.bind(this)) diff --git a/umap/sync/app.py b/umap/sync/app.py index 3b0bea0c..1677c197 100644 --- a/umap/sync/app.py +++ b/umap/sync/app.py @@ -1,6 +1,5 @@ import asyncio import logging -import uuid import redis.asyncio as redis from django.conf import settings @@ -29,7 +28,8 @@ async def application(scope, receive, send): async def sync(scope, receive, send, **kwargs): - peer = Peer(uuid=uuid.uuid4(), map_id=kwargs["map_id"]) + room_id = f"umap:{kwargs['map_id']}" + peer = Peer(room_id) peer._send = send while True: event = await receive() @@ -59,11 +59,9 @@ async def sync(scope, receive, send, **kwargs): class Peer: - def __init__(self, uuid, map_id, username=None): - self.uuid = uuid - self.user_id = f"user:{uuid}" + def __init__(self, room_id, username=None): self.username = username or "" - self.room_id = f"umap:{map_id}" + self.room_id = room_id self.is_authenticated = False async def get_peers(self): @@ -90,17 +88,17 @@ class Peer: async def listen(self): await self.listen_to_channel(self.room_id) - await self.listen_to_channel(self.user_id) + await self.listen_to_channel(self.peer_id) async def connect(self): self.client = redis.from_url(settings.REDIS_URL) async def disconnect(self): - await self.client.hdel(self.room_id, self.user_id) + await self.client.hdel(self.room_id, self.peer_id) await self.send_peers_list() await self.client.aclose() await self.client.publish(self.room_id, "STOP") - await self.client.publish(self.user_id, "STOP") + await self.client.publish(self.peer_id, "STOP") async def send_peers_list(self): message = ListPeersResponse(peers=await self.get_peers()) @@ -124,9 +122,11 @@ class Peer: user, room_id, permissions = signed.values() if "edit" not in permissions: return await self.disconnect() - await self.client.hset(self.room_id, self.user_id, self.username) + self.peer_id = message.peer + print("AUTHENTICATED", self.peer_id) + await self.client.hset(self.room_id, self.peer_id, self.username) await self.listen() - response = JoinResponse(uuid=str(self.uuid), peers=await self.get_peers()) + response = JoinResponse(peer=self.peer_id, peers=await self.get_peers()) await self.send(response.model_dump_json()) await self.send_peers_list() self.is_authenticated = True diff --git a/umap/sync/payloads.py b/umap/sync/payloads.py index 6a15a3f1..9cb96a80 100644 --- a/umap/sync/payloads.py +++ b/umap/sync/payloads.py @@ -6,6 +6,7 @@ from pydantic import BaseModel, Field, RootModel class JoinRequest(BaseModel): kind: Literal["JoinRequest"] = "JoinRequest" token: str + peer: str class OperationMessage(BaseModel): @@ -39,7 +40,7 @@ class JoinResponse(BaseModel): kind: Literal["JoinResponse"] = "JoinResponse" peers: list - uuid: str + peer: str class ListPeersResponse(BaseModel): From a07ee482cec438d447321bd592edebe4f113618c Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Mon, 20 Jan 2025 18:55:42 +0100 Subject: [PATCH 09/18] wip(sync): use our async_live_server for websocket related PW tests As now both http and ws are on the same domain, let's use one test server able to serve both. Co-authored-by: David Larlet --- umap/tests/integration/conftest.py | 7 +-- umap/tests/integration/test_websocket_sync.py | 60 ++++++++----------- 2 files changed, 29 insertions(+), 38 deletions(-) diff --git a/umap/tests/integration/conftest.py b/umap/tests/integration/conftest.py index d72886f8..bbb202d1 100644 --- a/umap/tests/integration/conftest.py +++ b/umap/tests/integration/conftest.py @@ -3,6 +3,7 @@ import re import pytest from daphne.testing import DaphneProcess +from django.contrib.staticfiles.handlers import ASGIStaticFilesHandler from playwright.sync_api import expect from umap.asgi import application @@ -68,13 +69,11 @@ def login(new_page, settings, live_server): @pytest.fixture(scope="function") -def asgi_live_server(request, settings): - request.getfixturevalue("transactional_db") - server = DaphneProcess("localhost", lambda: application) +def asgi_live_server(request, live_server): + server = DaphneProcess("localhost", lambda: ASGIStaticFilesHandler(application)) server.start() server.ready.wait() port = server.port.value - settings.WEBSOCKET_FRONT_URI = f"ws://localhost:{port}/ws/sync/{{id}}/" server.url = f"http://localhost:{port}" yield server diff --git a/umap/tests/integration/test_websocket_sync.py b/umap/tests/integration/test_websocket_sync.py index 569bca30..0a2306cf 100644 --- a/umap/tests/integration/test_websocket_sync.py +++ b/umap/tests/integration/test_websocket_sync.py @@ -11,9 +11,7 @@ DATALAYER_UPDATE = re.compile(r".*/datalayer/update/.*") @pytest.mark.xdist_group(name="websockets") -def test_websocket_connection_can_sync_markers( - new_page, live_server, asgi_live_server, tilelayer -): +def test_websocket_connection_can_sync_markers(new_page, asgi_live_server, tilelayer): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True map.save() @@ -21,9 +19,9 @@ def test_websocket_connection_can_sync_markers( # Create two tabs peerA = new_page("Page A") - peerA.goto(f"{live_server.url}{map.get_absolute_url()}?edit") + peerA.goto(f"{asgi_live_server.url}{map.get_absolute_url()}?edit") peerB = new_page("Page B") - peerB.goto(f"{live_server.url}{map.get_absolute_url()}?edit") + peerB.goto(f"{asgi_live_server.url}{map.get_absolute_url()}?edit") a_marker_pane = peerA.locator(".leaflet-marker-pane > div") b_marker_pane = peerB.locator(".leaflet-marker-pane > div") @@ -79,9 +77,7 @@ def test_websocket_connection_can_sync_markers( @pytest.mark.xdist_group(name="websockets") -def test_websocket_connection_can_sync_polygons( - context, live_server, asgi_live_server, tilelayer -): +def test_websocket_connection_can_sync_polygons(context, asgi_live_server, tilelayer): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True map.save() @@ -89,9 +85,9 @@ def test_websocket_connection_can_sync_polygons( # Create two tabs peerA = context.new_page() - peerA.goto(f"{live_server.url}{map.get_absolute_url()}?edit") + peerA.goto(f"{asgi_live_server.url}{map.get_absolute_url()}?edit") peerB = context.new_page() - peerB.goto(f"{live_server.url}{map.get_absolute_url()}?edit") + peerB.goto(f"{asgi_live_server.url}{map.get_absolute_url()}?edit") b_map_el = peerB.locator("#map") @@ -164,7 +160,7 @@ def test_websocket_connection_can_sync_polygons( @pytest.mark.xdist_group(name="websockets") def test_websocket_connection_can_sync_map_properties( - new_page, live_server, asgi_live_server, tilelayer + new_page, asgi_live_server, tilelayer ): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True @@ -173,9 +169,9 @@ def test_websocket_connection_can_sync_map_properties( # Create two tabs peerA = new_page() - peerA.goto(f"{live_server.url}{map.get_absolute_url()}?edit") + peerA.goto(f"{asgi_live_server.url}{map.get_absolute_url()}?edit") peerB = new_page() - peerB.goto(f"{live_server.url}{map.get_absolute_url()}?edit") + peerB.goto(f"{asgi_live_server.url}{map.get_absolute_url()}?edit") # Name change is synced peerA.get_by_role("link", name="Edit map name and caption").click() @@ -198,7 +194,7 @@ def test_websocket_connection_can_sync_map_properties( @pytest.mark.xdist_group(name="websockets") def test_websocket_connection_can_sync_datalayer_properties( - new_page, live_server, asgi_live_server, tilelayer + new_page, asgi_live_server, tilelayer ): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True @@ -207,9 +203,9 @@ def test_websocket_connection_can_sync_datalayer_properties( # Create two tabs peerA = new_page() - peerA.goto(f"{live_server.url}{map.get_absolute_url()}?edit") + peerA.goto(f"{asgi_live_server.url}{map.get_absolute_url()}?edit") peerB = new_page() - peerB.goto(f"{live_server.url}{map.get_absolute_url()}?edit") + peerB.goto(f"{asgi_live_server.url}{map.get_absolute_url()}?edit") # Layer addition, name and type are synced peerA.get_by_role("link", name="Manage layers").click() @@ -227,7 +223,7 @@ def test_websocket_connection_can_sync_datalayer_properties( @pytest.mark.xdist_group(name="websockets") def test_websocket_connection_can_sync_cloned_polygons( - context, live_server, asgi_live_server, tilelayer + context, asgi_live_server, tilelayer ): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True @@ -236,9 +232,9 @@ def test_websocket_connection_can_sync_cloned_polygons( # Create two tabs peerA = context.new_page() - peerA.goto(f"{live_server.url}{map.get_absolute_url()}?edit") + peerA.goto(f"{asgi_live_server.url}{map.get_absolute_url()}?edit") peerB = context.new_page() - peerB.goto(f"{live_server.url}{map.get_absolute_url()}?edit") + peerB.goto(f"{asgi_live_server.url}{map.get_absolute_url()}?edit") b_map_el = peerB.locator("#map") @@ -290,7 +286,7 @@ def test_websocket_connection_can_sync_cloned_polygons( @pytest.mark.xdist_group(name="websockets") def test_websocket_connection_can_sync_late_joining_peer( - new_page, live_server, asgi_live_server, tilelayer + new_page, asgi_live_server, tilelayer ): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True @@ -299,7 +295,7 @@ def test_websocket_connection_can_sync_late_joining_peer( # Create first peer (A) and have it join immediately peerA = new_page("Page A") - peerA.goto(f"{live_server.url}{map.get_absolute_url()}?edit") + peerA.goto(f"{asgi_live_server.url}{map.get_absolute_url()}?edit") # Add a marker from peer A a_create_marker = peerA.get_by_title("Draw a marker") @@ -326,7 +322,7 @@ def test_websocket_connection_can_sync_late_joining_peer( # Now create peer B and have it join peerB = new_page("Page B") - peerB.goto(f"{live_server.url}{map.get_absolute_url()}?edit") + peerB.goto(f"{asgi_live_server.url}{map.get_absolute_url()}?edit") # Check if peer B has received all the updates b_marker_pane = peerB.locator(".leaflet-marker-pane > div") @@ -351,7 +347,7 @@ def test_websocket_connection_can_sync_late_joining_peer( @pytest.mark.xdist_group(name="websockets") -def test_should_sync_datalayers(new_page, live_server, asgi_live_server, tilelayer): +def test_should_sync_datalayers(new_page, asgi_live_server, tilelayer): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True map.save() @@ -360,9 +356,9 @@ def test_should_sync_datalayers(new_page, live_server, asgi_live_server, tilelay # Create two tabs peerA = new_page("Page A") - peerA.goto(f"{live_server.url}{map.get_absolute_url()}?edit") + peerA.goto(f"{asgi_live_server.url}{map.get_absolute_url()}?edit") peerB = new_page("Page B") - peerB.goto(f"{live_server.url}{map.get_absolute_url()}?edit") + peerB.goto(f"{asgi_live_server.url}{map.get_absolute_url()}?edit") # Create a new layer from peerA peerA.get_by_role("link", name="Manage layers").click() @@ -423,9 +419,7 @@ def test_should_sync_datalayers(new_page, live_server, asgi_live_server, tilelay @pytest.mark.xdist_group(name="websockets") -def test_should_sync_datalayers_delete( - new_page, live_server, websocket_server, tilelayer -): +def test_should_sync_datalayers_delete(new_page, asgi_live_server, tilelayer): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True map.save() @@ -464,9 +458,9 @@ def test_should_sync_datalayers_delete( # Create two tabs peerA = new_page("Page A") - peerA.goto(f"{live_server.url}{map.get_absolute_url()}?edit") + peerA.goto(f"{asgi_live_server.url}{map.get_absolute_url()}?edit") peerB = new_page("Page B") - peerB.goto(f"{live_server.url}{map.get_absolute_url()}?edit") + peerB.goto(f"{asgi_live_server.url}{map.get_absolute_url()}?edit") peerA.get_by_role("button", name="Open browser").click() expect(peerA.get_by_text("datalayer 1")).to_be_visible() @@ -489,12 +483,10 @@ def test_should_sync_datalayers_delete( @pytest.mark.xdist_group(name="websockets") -def test_create_and_sync_map( - new_page, live_server, asgi_live_server, tilelayer, login, user -): +def test_create_and_sync_map(new_page, asgi_live_server, tilelayer, login, user): # Create a syncable map with peerA peerA = login(user, prefix="Page A") - peerA.goto(f"{live_server.url}/en/map/new/") + peerA.goto(f"{asgi_live_server.url}/en/map/new/") with peerA.expect_response(re.compile("./map/create/.*")): peerA.get_by_role("button", name="Save Draft").click() peerA.get_by_role("link", name="Map advanced properties").click() From 7e42331533959df4044bbecc15ab5fc2477279f2 Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Mon, 20 Jan 2025 19:04:03 +0100 Subject: [PATCH 10/18] wip(sync): add Redis to CI and configure tests settings --- .github/workflows/test-docs.yml | 4 +++- umap/tests/integration/test_websocket_sync.py | 14 ++++++++++++++ umap/tests/settings.py | 2 ++ 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test-docs.yml b/.github/workflows/test-docs.yml index db76112d..fe95d423 100644 --- a/.github/workflows/test-docs.yml +++ b/.github/workflows/test-docs.yml @@ -20,7 +20,9 @@ jobs: POSTGRES_PASSWORD: postgres POSTGRES_DB: postgres options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 - + redis: + image: redis + options: --health-cmd "redis-cli ping" --health-interval 10s --health-timeout 5s --health-retries 5 strategy: fail-fast: false matrix: diff --git a/umap/tests/integration/test_websocket_sync.py b/umap/tests/integration/test_websocket_sync.py index 0a2306cf..a69e134f 100644 --- a/umap/tests/integration/test_websocket_sync.py +++ b/umap/tests/integration/test_websocket_sync.py @@ -1,6 +1,8 @@ import re import pytest +import redis +from django.conf import settings from playwright.sync_api import expect from umap.models import DataLayer, Map @@ -9,6 +11,18 @@ from ..base import DataLayerFactory, MapFactory DATALAYER_UPDATE = re.compile(r".*/datalayer/update/.*") +pytestmark = pytest.mark.django_db + + +def setup_function(): + # Sync client to prevent headache with pytest / pytest-asyncio and async + client = redis.from_url(settings.REDIS_URL) + # Make sure there are no dead peers in the Redis hash, otherwise asking for + # operations from another peer may never be answered + # FIXME this should not happen in an ideal world + assert client.connection_pool.connection_kwargs["db"] == 15 + client.flushdb() + @pytest.mark.xdist_group(name="websockets") def test_websocket_connection_can_sync_markers(new_page, asgi_live_server, tilelayer): diff --git a/umap/tests/settings.py b/umap/tests/settings.py index b776c083..41de66d3 100644 --- a/umap/tests/settings.py +++ b/umap/tests/settings.py @@ -29,3 +29,5 @@ PASSWORD_HASHERS = [ WEBSOCKET_ENABLED = True WEBSOCKET_BACK_PORT = "8010" WEBSOCKET_FRONT_URI = "ws://localhost:8010" + +REDIS_URL = "redis://localhost:6379/15" From 82342ea00f88297d0d89259cb7c805730a884f73 Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Mon, 20 Jan 2025 19:13:41 +0100 Subject: [PATCH 11/18] wip(sync): try a better pattern to unsubscribe to pubsub channels When publishing a "STOP", this would unsubscribe every listener of the channel. --- umap/sync/app.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/umap/sync/app.py b/umap/sync/app.py index 1677c197..be89a4fe 100644 --- a/umap/sync/app.py +++ b/umap/sync/app.py @@ -63,6 +63,7 @@ class Peer: self.username = username or "" self.room_id = room_id self.is_authenticated = False + self._subscriptions = [] async def get_peers(self): peers = await self.client.hgetall(self.room_id) @@ -73,17 +74,20 @@ class Peer: async def reader(pubsub): await pubsub.subscribe(channel_name) while True: + if pubsub.connection is None: + # It has been unsubscribed/closed. + break try: message = await pubsub.get_message(ignore_subscribe_messages=True) except Exception as err: print(err) break if message is not None: - if message["data"].decode() == "STOP": - break await self.send(message["data"].decode()) + await asyncio.sleep(0.001) # Be nice with the server async with self.client.pubsub() as pubsub: + self._subscriptions.append(pubsub) asyncio.create_task(reader(pubsub)) async def listen(self): @@ -95,10 +99,11 @@ class Peer: async def disconnect(self): await self.client.hdel(self.room_id, self.peer_id) + for pubsub in self._subscriptions: + await pubsub.unsubscribe() + await pubsub.close() await self.send_peers_list() await self.client.aclose() - await self.client.publish(self.room_id, "STOP") - await self.client.publish(self.peer_id, "STOP") async def send_peers_list(self): message = ListPeersResponse(peers=await self.get_peers()) From ef7c769abec81ef5b136286ffa18d477de090011 Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Mon, 20 Jan 2025 19:17:34 +0100 Subject: [PATCH 12/18] wip(sync): remove a bit of server prints --- umap/sync/app.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/umap/sync/app.py b/umap/sync/app.py index be89a4fe..4db280a9 100644 --- a/umap/sync/app.py +++ b/umap/sync/app.py @@ -33,22 +33,16 @@ async def sync(scope, receive, send, **kwargs): peer._send = send while True: event = await receive() - print("EVENT", event) if event["type"] == "websocket.connect": try: - print("Let's accept") await peer.connect() - print("After connect") await send({"type": "websocket.accept"}) - print("After accept") except ValueError: await send({"type": "websocket.close"}) if event["type"] == "websocket.disconnect": - print("Closing", event) await peer.disconnect() - print("Closed") break if event["type"] == "websocket.receive": @@ -121,7 +115,7 @@ class Peer: async def receive(self, text_data): if not self.is_authenticated: - print("AUTHENTICATING", self.uuid) + print("AUTHENTICATING", text_data) message = JoinRequest.model_validate_json(text_data) signed = TimestampSigner().unsign_object(message.token, max_age=30) user, room_id, permissions = signed.values() @@ -155,7 +149,7 @@ class Peer: await self.send_to(incoming.root.recipient, text_data) async def send(self, text): - print("SEND", text) + print(" FORWARDING TO", self.peer_id, text) await self._send({"type": "websocket.send", "text": text}) From 11fb29c456db82fb6c8783831847bdd9f9e8a5a3 Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Mon, 20 Jan 2025 19:18:13 +0100 Subject: [PATCH 13/18] wip(sync): log but do not crash when sending fail This should be a race condition when sending to a closed websocket. Let's log to track them. --- umap/sync/app.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/umap/sync/app.py b/umap/sync/app.py index 4db280a9..6f9ede15 100644 --- a/umap/sync/app.py +++ b/umap/sync/app.py @@ -150,7 +150,11 @@ class Peer: async def send(self, text): print(" FORWARDING TO", self.peer_id, text) - await self._send({"type": "websocket.send", "text": text}) + try: + await self._send({"type": "websocket.send", "text": text}) + except Exception as err: + print("Error sending message:", text) + print(err) urlpatterns = [path("ws/sync/", name="ws_sync", view=sync)] From 0d5e3047f4b2aaa547601358fa6e3f367028aa70 Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Tue, 21 Jan 2025 20:24:06 +0100 Subject: [PATCH 14/18] wip(sync): only return peers with an active connection --- umap/static/umap/js/modules/sync/engine.js | 9 ++-- umap/static/umap/js/modules/sync/websocket.js | 4 +- umap/sync/app.py | 41 ++++++++++++------- umap/sync/payloads.py | 5 ++- 4 files changed, 37 insertions(+), 22 deletions(-) diff --git a/umap/static/umap/js/modules/sync/engine.js b/umap/static/umap/js/modules/sync/engine.js index 0b883438..05994544 100644 --- a/umap/static/umap/js/modules/sync/engine.js +++ b/umap/static/umap/js/modules/sync/engine.js @@ -83,7 +83,8 @@ export class SyncEngine { `${protocol}//${window.location.host}${path}`, authToken, this, - this.peerId + this.peerId, + this._umap.properties.user?.name ) } @@ -146,7 +147,7 @@ export class SyncEngine { } getNumberOfConnectedPeers() { - if (this.peers) return this.peers.length + if (this.peers) return Object.keys(this.peers).length return 0 } @@ -215,7 +216,7 @@ export class SyncEngine { * @param {string[]} payload.peers The list of peers uuids */ onListPeersResponse({ peers }) { - debug('received peerinfo', { peers }) + debug('received peerinfo', peers) this.peers = peers this.updaters.map.update({ key: 'numberOfConnectedPeers' }) } @@ -302,7 +303,7 @@ export class SyncEngine { * @returns {string|bool} the selected peer uuid, or False if none was found. */ _getRandomPeer() { - const otherPeers = this.peers.filter((p) => p !== this.peerId) + const otherPeers = Object.keys(this.peers).filter((p) => p !== this.peerId) if (otherPeers.length > 0) { const random = Math.floor(Math.random() * otherPeers.length) return otherPeers[random] diff --git a/umap/static/umap/js/modules/sync/websocket.js b/umap/static/umap/js/modules/sync/websocket.js index 991a9c4e..5a18f880 100644 --- a/umap/static/umap/js/modules/sync/websocket.js +++ b/umap/static/umap/js/modules/sync/websocket.js @@ -3,13 +3,13 @@ const PING_INTERVAL = 30000 const FIRST_CONNECTION_TIMEOUT = 2000 export class WebSocketTransport { - constructor(webSocketURI, authToken, messagesReceiver, peerId) { + constructor(webSocketURI, authToken, messagesReceiver, peerId, username) { this.receiver = messagesReceiver this.websocket = new WebSocket(webSocketURI) this.websocket.onopen = () => { - this.send('JoinRequest', { token: authToken, peer: peerId }) + this.send('JoinRequest', { token: authToken, peer: peerId, username }) this.receiver.onConnection() } this.websocket.addEventListener('message', this.onMessage.bind(this)) diff --git a/umap/sync/app.py b/umap/sync/app.py index 6f9ede15..caaca27b 100644 --- a/umap/sync/app.py +++ b/umap/sync/app.py @@ -28,8 +28,7 @@ async def application(scope, receive, send): async def sync(scope, receive, send, **kwargs): - room_id = f"umap:{kwargs['map_id']}" - peer = Peer(room_id) + peer = Peer(kwargs["map_id"]) peer._send = send while True: event = await receive() @@ -53,16 +52,28 @@ async def sync(scope, receive, send, **kwargs): class Peer: - def __init__(self, room_id, username=None): + def __init__(self, map_id, username=None): self.username = username or "" - self.room_id = room_id + self.map_id = map_id self.is_authenticated = False self._subscriptions = [] + @property + def room_key(self): + return f"umap:{self.map_id}" + + @property + def peer_key(self): + return f"user:{self.map_id}:{self.peer_id}" + async def get_peers(self): - peers = await self.client.hgetall(self.room_id) - # Send only ids for now (values are client names). - return peers.keys() + known = await self.client.hgetall(self.room_key) + active = await self.client.pubsub_channels(f"user:{self.map_id}:*") + active = [name.split(b":")[-1] for name in active] + if self.peer_id.encode() not in active: + # Our connection may not yet be active + active.append(self.peer_id.encode()) + return {k: v for k, v in known.items() if k in active} async def listen_to_channel(self, channel_name): async def reader(pubsub): @@ -85,14 +96,14 @@ class Peer: asyncio.create_task(reader(pubsub)) async def listen(self): - await self.listen_to_channel(self.room_id) - await self.listen_to_channel(self.peer_id) + await self.listen_to_channel(self.room_key) + await self.listen_to_channel(self.peer_key) async def connect(self): self.client = redis.from_url(settings.REDIS_URL) async def disconnect(self): - await self.client.hdel(self.room_id, self.peer_id) + await self.client.hdel(self.room_key, self.peer_id) for pubsub in self._subscriptions: await pubsub.unsubscribe() await pubsub.close() @@ -106,24 +117,26 @@ class Peer: async def broadcast(self, message): print("BROADCASTING", message) # Send to all channels (including sender!) - await self.client.publish(self.room_id, message) + await self.client.publish(self.room_key, message) async def send_to(self, peer_id, message): print("SEND TO", peer_id, message) # Send to one given channel - await self.client.publish(peer_id, message) + await self.client.publish(f"user:{self.map_id}:{peer_id}", message) async def receive(self, text_data): if not self.is_authenticated: print("AUTHENTICATING", text_data) message = JoinRequest.model_validate_json(text_data) signed = TimestampSigner().unsign_object(message.token, max_age=30) - user, room_id, permissions = signed.values() + user, map_id, permissions = signed.values() + assert str(map_id) == self.map_id if "edit" not in permissions: return await self.disconnect() self.peer_id = message.peer + self.username = message.username print("AUTHENTICATED", self.peer_id) - await self.client.hset(self.room_id, self.peer_id, self.username) + await self.client.hset(self.room_key, self.peer_id, self.username) await self.listen() response = JoinResponse(peer=self.peer_id, peers=await self.get_peers()) await self.send(response.model_dump_json()) diff --git a/umap/sync/payloads.py b/umap/sync/payloads.py index 9cb96a80..9ab2bf1a 100644 --- a/umap/sync/payloads.py +++ b/umap/sync/payloads.py @@ -7,6 +7,7 @@ class JoinRequest(BaseModel): kind: Literal["JoinRequest"] = "JoinRequest" token: str peer: str + username: Optional[str] = "" class OperationMessage(BaseModel): @@ -39,10 +40,10 @@ class JoinResponse(BaseModel): """Server response containing the list of peers""" kind: Literal["JoinResponse"] = "JoinResponse" - peers: list + peers: dict peer: str class ListPeersResponse(BaseModel): kind: Literal["ListPeersResponse"] = "ListPeersResponse" - peers: list + peers: dict From 476c160fd5c0632d25ad4025afa9a15425d4e7bf Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Thu, 23 Jan 2025 17:05:13 +0100 Subject: [PATCH 15/18] wip(sync): clean stale username from redis We wanted to use the HEXPIRE command, but discovered that this command is only available since the Redis 7.4 version (the latest), and this version does not have an OSI compliant licence, so it is generally not installable through common packages managers. The OSS fork is Valkey, but it still does not have the HEXPIRE command. So we decide to clean those keys manually, and in order no do this clean task at each websocket connection, we only do it when we are the first user to connect to a given map. Co-authored-by: David Larlet --- umap/sync/app.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/umap/sync/app.py b/umap/sync/app.py index caaca27b..27c52262 100644 --- a/umap/sync/app.py +++ b/umap/sync/app.py @@ -69,12 +69,20 @@ class Peer: async def get_peers(self): known = await self.client.hgetall(self.room_key) active = await self.client.pubsub_channels(f"user:{self.map_id}:*") + if not active: + # Poor man way of deleting stale usernames from the store + # HEXPIRE command is not in the open source Redis version + await self.client.delete(self.room_key) + await self.store_username() active = [name.split(b":")[-1] for name in active] if self.peer_id.encode() not in active: # Our connection may not yet be active active.append(self.peer_id.encode()) return {k: v for k, v in known.items() if k in active} + async def store_username(self): + await self.client.hset(self.room_key, self.peer_id, self.username) + async def listen_to_channel(self, channel_name): async def reader(pubsub): await pubsub.subscribe(channel_name) @@ -136,7 +144,7 @@ class Peer: self.peer_id = message.peer self.username = message.username print("AUTHENTICATED", self.peer_id) - await self.client.hset(self.room_key, self.peer_id, self.username) + await self.store_username() await self.listen() response = JoinResponse(peer=self.peer_id, peers=await self.get_peers()) await self.send(response.model_dump_json()) From 222213ec87dcc97b433c8c8e4a45fbffe605ba70 Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Thu, 23 Jan 2025 17:26:39 +0100 Subject: [PATCH 16/18] fix(sync): add python redis package Co-authored-by: David Larlet --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index bc2370c7..80077de8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -72,6 +72,7 @@ s3 = [ ] sync = [ "pydantic==2.10.5", + "redis==5.2.1", ] [project.scripts] From b62085b7aabd40ff37900187cff4cc49b1069662 Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Thu, 23 Jan 2025 18:02:28 +0100 Subject: [PATCH 17/18] chore: add REDIS_HOST and REDIS_PORT env vars in CI --- .github/workflows/test-docs.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/test-docs.yml b/.github/workflows/test-docs.yml index fe95d423..df9b9270 100644 --- a/.github/workflows/test-docs.yml +++ b/.github/workflows/test-docs.yml @@ -50,6 +50,8 @@ jobs: DJANGO_SETTINGS_MODULE: 'umap.tests.settings' UMAP_SETTINGS: 'umap/tests/settings.py' PLAYWRIGHT_TIMEOUT: '20000' + REDIS_HOST: localhost + REDIS_PORT: 6379 lint: runs-on: ubuntu-latest steps: From 1d47bfce0a773bd7afc7e707ca08e7eb59a432d7 Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Fri, 24 Jan 2025 10:13:00 +0100 Subject: [PATCH 18/18] chore: add redis port in Github workflow --- .github/workflows/test-docs.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/test-docs.yml b/.github/workflows/test-docs.yml index df9b9270..f2ce937a 100644 --- a/.github/workflows/test-docs.yml +++ b/.github/workflows/test-docs.yml @@ -23,6 +23,8 @@ jobs: redis: image: redis options: --health-cmd "redis-cli ping" --health-interval 10s --health-timeout 5s --health-retries 5 + ports: + - 6379:6379 strategy: fail-fast: false matrix: