wip(sync): try a better pattern to unsubscribe to pubsub channels

When publishing a "STOP", this would unsubscribe every listener of the
channel.
This commit is contained in:
Yohan Boniface 2025-01-20 19:13:41 +01:00
parent 7e42331533
commit 82342ea00f

View file

@ -63,6 +63,7 @@ class Peer:
self.username = username or ""
self.room_id = room_id
self.is_authenticated = False
self._subscriptions = []
async def get_peers(self):
peers = await self.client.hgetall(self.room_id)
@ -73,17 +74,20 @@ class Peer:
async def reader(pubsub):
await pubsub.subscribe(channel_name)
while True:
if pubsub.connection is None:
# It has been unsubscribed/closed.
break
try:
message = await pubsub.get_message(ignore_subscribe_messages=True)
except Exception as err:
print(err)
break
if message is not None:
if message["data"].decode() == "STOP":
break
await self.send(message["data"].decode())
await asyncio.sleep(0.001) # Be nice with the server
async with self.client.pubsub() as pubsub:
self._subscriptions.append(pubsub)
asyncio.create_task(reader(pubsub))
async def listen(self):
@ -95,10 +99,11 @@ class Peer:
async def disconnect(self):
await self.client.hdel(self.room_id, self.peer_id)
for pubsub in self._subscriptions:
await pubsub.unsubscribe()
await pubsub.close()
await self.send_peers_list()
await self.client.aclose()
await self.client.publish(self.room_id, "STOP")
await self.client.publish(self.peer_id, "STOP")
async def send_peers_list(self):
message = ListPeersResponse(peers=await self.get_peers())