From 82342ea00f88297d0d89259cb7c805730a884f73 Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Mon, 20 Jan 2025 19:13:41 +0100 Subject: [PATCH] wip(sync): try a better pattern to unsubscribe to pubsub channels When publishing a "STOP", this would unsubscribe every listener of the channel. --- umap/sync/app.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/umap/sync/app.py b/umap/sync/app.py index 1677c197..be89a4fe 100644 --- a/umap/sync/app.py +++ b/umap/sync/app.py @@ -63,6 +63,7 @@ class Peer: self.username = username or "" self.room_id = room_id self.is_authenticated = False + self._subscriptions = [] async def get_peers(self): peers = await self.client.hgetall(self.room_id) @@ -73,17 +74,20 @@ class Peer: async def reader(pubsub): await pubsub.subscribe(channel_name) while True: + if pubsub.connection is None: + # It has been unsubscribed/closed. + break try: message = await pubsub.get_message(ignore_subscribe_messages=True) except Exception as err: print(err) break if message is not None: - if message["data"].decode() == "STOP": - break await self.send(message["data"].decode()) + await asyncio.sleep(0.001) # Be nice with the server async with self.client.pubsub() as pubsub: + self._subscriptions.append(pubsub) asyncio.create_task(reader(pubsub)) async def listen(self): @@ -95,10 +99,11 @@ class Peer: async def disconnect(self): await self.client.hdel(self.room_id, self.peer_id) + for pubsub in self._subscriptions: + await pubsub.unsubscribe() + await pubsub.close() await self.send_peers_list() await self.client.aclose() - await self.client.publish(self.room_id, "STOP") - await self.client.publish(self.peer_id, "STOP") async def send_peers_list(self): message = ListPeersResponse(peers=await self.get_peers())