From 0d5e3047f4b2aaa547601358fa6e3f367028aa70 Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Tue, 21 Jan 2025 20:24:06 +0100 Subject: [PATCH] wip(sync): only return peers with an active connection --- umap/static/umap/js/modules/sync/engine.js | 9 ++-- umap/static/umap/js/modules/sync/websocket.js | 4 +- umap/sync/app.py | 41 ++++++++++++------- umap/sync/payloads.py | 5 ++- 4 files changed, 37 insertions(+), 22 deletions(-) diff --git a/umap/static/umap/js/modules/sync/engine.js b/umap/static/umap/js/modules/sync/engine.js index 0b883438..05994544 100644 --- a/umap/static/umap/js/modules/sync/engine.js +++ b/umap/static/umap/js/modules/sync/engine.js @@ -83,7 +83,8 @@ export class SyncEngine { `${protocol}//${window.location.host}${path}`, authToken, this, - this.peerId + this.peerId, + this._umap.properties.user?.name ) } @@ -146,7 +147,7 @@ export class SyncEngine { } getNumberOfConnectedPeers() { - if (this.peers) return this.peers.length + if (this.peers) return Object.keys(this.peers).length return 0 } @@ -215,7 +216,7 @@ export class SyncEngine { * @param {string[]} payload.peers The list of peers uuids */ onListPeersResponse({ peers }) { - debug('received peerinfo', { peers }) + debug('received peerinfo', peers) this.peers = peers this.updaters.map.update({ key: 'numberOfConnectedPeers' }) } @@ -302,7 +303,7 @@ export class SyncEngine { * @returns {string|bool} the selected peer uuid, or False if none was found. */ _getRandomPeer() { - const otherPeers = this.peers.filter((p) => p !== this.peerId) + const otherPeers = Object.keys(this.peers).filter((p) => p !== this.peerId) if (otherPeers.length > 0) { const random = Math.floor(Math.random() * otherPeers.length) return otherPeers[random] diff --git a/umap/static/umap/js/modules/sync/websocket.js b/umap/static/umap/js/modules/sync/websocket.js index 991a9c4e..5a18f880 100644 --- a/umap/static/umap/js/modules/sync/websocket.js +++ b/umap/static/umap/js/modules/sync/websocket.js @@ -3,13 +3,13 @@ const PING_INTERVAL = 30000 const FIRST_CONNECTION_TIMEOUT = 2000 export class WebSocketTransport { - constructor(webSocketURI, authToken, messagesReceiver, peerId) { + constructor(webSocketURI, authToken, messagesReceiver, peerId, username) { this.receiver = messagesReceiver this.websocket = new WebSocket(webSocketURI) this.websocket.onopen = () => { - this.send('JoinRequest', { token: authToken, peer: peerId }) + this.send('JoinRequest', { token: authToken, peer: peerId, username }) this.receiver.onConnection() } this.websocket.addEventListener('message', this.onMessage.bind(this)) diff --git a/umap/sync/app.py b/umap/sync/app.py index 6f9ede15..caaca27b 100644 --- a/umap/sync/app.py +++ b/umap/sync/app.py @@ -28,8 +28,7 @@ async def application(scope, receive, send): async def sync(scope, receive, send, **kwargs): - room_id = f"umap:{kwargs['map_id']}" - peer = Peer(room_id) + peer = Peer(kwargs["map_id"]) peer._send = send while True: event = await receive() @@ -53,16 +52,28 @@ async def sync(scope, receive, send, **kwargs): class Peer: - def __init__(self, room_id, username=None): + def __init__(self, map_id, username=None): self.username = username or "" - self.room_id = room_id + self.map_id = map_id self.is_authenticated = False self._subscriptions = [] + @property + def room_key(self): + return f"umap:{self.map_id}" + + @property + def peer_key(self): + return f"user:{self.map_id}:{self.peer_id}" + async def get_peers(self): - peers = await self.client.hgetall(self.room_id) - # Send only ids for now (values are client names). - return peers.keys() + known = await self.client.hgetall(self.room_key) + active = await self.client.pubsub_channels(f"user:{self.map_id}:*") + active = [name.split(b":")[-1] for name in active] + if self.peer_id.encode() not in active: + # Our connection may not yet be active + active.append(self.peer_id.encode()) + return {k: v for k, v in known.items() if k in active} async def listen_to_channel(self, channel_name): async def reader(pubsub): @@ -85,14 +96,14 @@ class Peer: asyncio.create_task(reader(pubsub)) async def listen(self): - await self.listen_to_channel(self.room_id) - await self.listen_to_channel(self.peer_id) + await self.listen_to_channel(self.room_key) + await self.listen_to_channel(self.peer_key) async def connect(self): self.client = redis.from_url(settings.REDIS_URL) async def disconnect(self): - await self.client.hdel(self.room_id, self.peer_id) + await self.client.hdel(self.room_key, self.peer_id) for pubsub in self._subscriptions: await pubsub.unsubscribe() await pubsub.close() @@ -106,24 +117,26 @@ class Peer: async def broadcast(self, message): print("BROADCASTING", message) # Send to all channels (including sender!) - await self.client.publish(self.room_id, message) + await self.client.publish(self.room_key, message) async def send_to(self, peer_id, message): print("SEND TO", peer_id, message) # Send to one given channel - await self.client.publish(peer_id, message) + await self.client.publish(f"user:{self.map_id}:{peer_id}", message) async def receive(self, text_data): if not self.is_authenticated: print("AUTHENTICATING", text_data) message = JoinRequest.model_validate_json(text_data) signed = TimestampSigner().unsign_object(message.token, max_age=30) - user, room_id, permissions = signed.values() + user, map_id, permissions = signed.values() + assert str(map_id) == self.map_id if "edit" not in permissions: return await self.disconnect() self.peer_id = message.peer + self.username = message.username print("AUTHENTICATED", self.peer_id) - await self.client.hset(self.room_id, self.peer_id, self.username) + await self.client.hset(self.room_key, self.peer_id, self.username) await self.listen() response = JoinResponse(peer=self.peer_id, peers=await self.get_peers()) await self.send(response.model_dump_json()) diff --git a/umap/sync/payloads.py b/umap/sync/payloads.py index 9cb96a80..9ab2bf1a 100644 --- a/umap/sync/payloads.py +++ b/umap/sync/payloads.py @@ -7,6 +7,7 @@ class JoinRequest(BaseModel): kind: Literal["JoinRequest"] = "JoinRequest" token: str peer: str + username: Optional[str] = "" class OperationMessage(BaseModel): @@ -39,10 +40,10 @@ class JoinResponse(BaseModel): """Server response containing the list of peers""" kind: Literal["JoinResponse"] = "JoinResponse" - peers: list + peers: dict peer: str class ListPeersResponse(BaseModel): kind: Literal["ListPeersResponse"] = "ListPeersResponse" - peers: list + peers: dict