wip(sync): POC of using Redis for pubsub

Co-authored-by: David Larlet <david@larlet.fr>
This commit is contained in:
Yohan Boniface 2025-01-13 17:41:39 +01:00
parent ab7119e0a4
commit acb2e967b8
7 changed files with 120 additions and 185 deletions

View file

@ -102,3 +102,6 @@ format_css=true
blank_line_after_tag="load,extends" blank_line_after_tag="load,extends"
line_break_after_multiline_tag=true line_break_after_multiline_tag=true
[lint]
# Disable autoremove of unused import.
unfixable = ["F401"]

View file

@ -129,7 +129,6 @@ INSTALLED_APPS = (
"django.contrib.gis", "django.contrib.gis",
"django_probes", "django_probes",
"umap", "umap",
"umap.sync",
"social_django", "social_django",
# See https://github.com/peopledoc/django-agnocomplete/commit/26eda2dfa4a2f8a805ca2ea19a0c504b9d773a1c # See https://github.com/peopledoc/django-agnocomplete/commit/26eda2dfa4a2f8a805ca2ea19a0c504b9d773a1c
# Django does not find the app config in the default place, so the app is not loaded # Django does not find the app config in the default place, so the app is not loaded
@ -344,3 +343,5 @@ WEBSOCKET_ENABLED = env.bool("WEBSOCKET_ENABLED", default=False)
WEBSOCKET_BACK_HOST = env("WEBSOCKET_BACK_HOST", default="localhost") WEBSOCKET_BACK_HOST = env("WEBSOCKET_BACK_HOST", default="localhost")
WEBSOCKET_BACK_PORT = env.int("WEBSOCKET_BACK_PORT", default=8001) WEBSOCKET_BACK_PORT = env.int("WEBSOCKET_BACK_PORT", default=8001)
WEBSOCKET_FRONT_URI = env("WEBSOCKET_FRONT_URI", default="ws://localhost:8001") WEBSOCKET_FRONT_URI = env("WEBSOCKET_FRONT_URI", default="ws://localhost:8001")
REDIS_URL = "redis://localhost:6379"

View file

@ -1,13 +1,26 @@
import asyncio
import logging
import uuid import uuid
import redis.asyncio as redis
from django.conf import settings
from django.core.signing import TimestampSigner
from django.urls.resolvers import RoutePattern from django.urls.resolvers import RoutePattern
from pydantic import ValidationError
from .payloads import (
JoinRequest,
JoinResponse,
ListPeersResponse,
OperationMessage,
PeerMessage,
Request,
)
ws_pattern = RoutePattern("/ws/sync/<str:map_id>") ws_pattern = RoutePattern("/ws/sync/<str:map_id>")
async def application(scope, receive, send): async def application(scope, receive, send):
from .models import Peer
matched = ws_pattern.match(scope["path"]) matched = ws_pattern.match(scope["path"])
print(matched) print(matched)
if not matched: if not matched:
@ -16,8 +29,7 @@ async def application(scope, receive, send):
_, _, kwargs = matched _, _, kwargs = matched
map_id = kwargs["map_id"] map_id = kwargs["map_id"]
room_id = f"room{map_id}" peer = Peer(uuid=uuid.uuid4(), map_id=map_id)
peer = Peer(uuid=uuid.uuid4(), name="FooBar", room_id=room_id)
print(peer) print(peer)
peer._send = send peer._send = send
while True: while True:
@ -27,8 +39,10 @@ async def application(scope, receive, send):
if event["type"] == "websocket.connect": if event["type"] == "websocket.connect":
try: try:
print("Let's accept") print("Let's accept")
await send({"type": "websocket.accept"})
await peer.connect() await peer.connect()
print("After connect")
await send({"type": "websocket.accept"})
print("After accept")
except ValueError: except ValueError:
await send({"type": "websocket.close"}) await send({"type": "websocket.close"})
@ -43,3 +57,99 @@ async def application(scope, receive, send):
await send({"type": "websocket.send", "text": "pong"}) await send({"type": "websocket.send", "text": "pong"})
else: else:
await peer.receive(event["text"]) await peer.receive(event["text"])
class Peer:
def __init__(self, uuid, map_id, username=None):
self.uuid = uuid
self.user_id = f"user:{uuid}"
self.username = username or ""
self.room_id = f"umap:{map_id}"
self.is_authenticated = False
async def get_peers(self):
peers = await self.client.hgetall(self.room_id)
# Send only ids for now (values are client names).
return peers.keys()
async def listen_to_channel(self, channel_name):
async def reader(pubsub):
await pubsub.subscribe(channel_name)
while True:
try:
message = await pubsub.get_message(ignore_subscribe_messages=True)
except Exception as err:
print(err)
break
if message is not None:
if message["data"].decode() == "STOP":
break
await self.send(message["data"].decode())
async with self.client.pubsub() as pubsub:
asyncio.create_task(reader(pubsub))
async def listen(self):
await self.listen_to_channel(self.room_id)
await self.listen_to_channel(self.user_id)
async def connect(self):
self.client = redis.from_url(settings.REDIS_URL)
async def disconnect(self):
await self.client.hdel(self.room_id, self.user_id)
await self.send_peers_list()
await self.client.aclose()
await self.client.publish(self.room_id, "STOP")
await self.client.publish(self.user_id, "STOP")
async def send_peers_list(self):
message = ListPeersResponse(peers=await self.get_peers())
await self.broadcast(message.model_dump_json())
async def broadcast(self, message):
print("BROADCASTING", message)
# Send to all channels (including sender!)
await self.client.publish(self.room_id, message)
async def send_to(self, peer_id, message):
print("SEND TO", peer_id, message)
# Send to one given channel
await self.client.publish(peer_id, message)
async def receive(self, text_data):
if not self.is_authenticated:
print("AUTHENTICATING", self.uuid)
message = JoinRequest.model_validate_json(text_data)
signed = TimestampSigner().unsign_object(message.token, max_age=30)
user, room_id, permissions = signed.values()
if "edit" not in permissions:
return await self.disconnect()
await self.client.hset(self.room_id, self.user_id, self.username)
await self.listen()
response = JoinResponse(uuid=str(self.uuid), peers=await self.get_peers())
await self.send(response.model_dump_json())
await self.send_peers_list()
self.is_authenticated = True
return
try:
incoming = Request.model_validate_json(text_data)
except ValidationError as error:
message = (
f"An error occurred when receiving the following message: {text_data!r}"
)
logging.error(message, error)
else:
match incoming.root:
# Broadcast all operation messages to connected peers
case OperationMessage():
await self.broadcast(text_data)
# Send peer messages to the proper peer
case PeerMessage():
await self.send_to(incoming.root.recipient, text_data)
async def send(self, text):
print("SEND", text)
await self._send({"type": "websocket.send", "text": text})

View file

@ -1,6 +0,0 @@
from django.apps import AppConfig
class UmapConfig(AppConfig):
name = "umap.sync"
verbose_name = "uMap Sync"

View file

@ -1,23 +0,0 @@
# Generated by Django 5.1.4 on 2024-12-27 16:14
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = []
operations = [
migrations.CreateModel(
name="Peer",
fields=[
(
"uuid",
models.UUIDField(primary_key=True, serialize=False, unique=True),
),
("name", models.CharField(max_length=200)),
("room_id", models.CharField(max_length=200)),
],
),
]

View file

@ -1,150 +0,0 @@
import asyncio
import logging
import psycopg
from django.core.signing import TimestampSigner
from django.db import connection, models
from psycopg import sql
from pydantic import ValidationError
from .payloads import (
JoinRequest,
JoinResponse,
ListPeersResponse,
OperationMessage,
PeerMessage,
Request,
)
class Peer(models.Model):
uuid = models.UUIDField(unique=True, primary_key=True)
name = models.CharField(max_length=200)
room_id = models.CharField(max_length=200)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.is_authenticated = False
async def get_peers(self):
qs = Peer.objects.filter(room_id=self.room_id).values_list("uuid", flat=True)
peers = []
async for peer in qs:
peers.append(peer)
return peers
async def listen_public(self):
# We need a dedicated connection for the LISTEN
aconn = await psycopg.AsyncConnection.connect(
**self.connection_params,
autocommit=True,
)
async with aconn:
async with aconn.cursor() as acursor:
await acursor.execute(
sql.SQL("LISTEN {chan}").format(
chan=sql.Identifier(str(self.room_id))
)
)
print("LISTEN", self.room_id)
gen = aconn.notifies()
async for notify in gen:
await self.send(notify.payload)
async def listen_private(self):
aconn = await psycopg.AsyncConnection.connect(
**self.connection_params,
autocommit=True,
)
async with aconn:
async with aconn.cursor() as acursor:
await acursor.execute(
sql.SQL("LISTEN {chan}").format(chan=sql.Identifier(str(self.uuid)))
)
print("LISTEN", self.uuid)
gen = aconn.notifies()
async for notify in gen:
await self.send(notify.payload)
async def connect(self):
# Join room for this map
connection_params = connection.get_connection_params()
connection_params.pop("cursor_factory")
self.connection_params = connection_params
self.connection = await psycopg.AsyncConnection.connect(
**connection_params,
autocommit=True,
)
async def listen(self):
asyncio.create_task(self.listen_public())
asyncio.create_task(self.listen_private())
async def disconnect(self):
await self.adelete()
await self.send_peers_list()
async def send_peers_list(self):
message = ListPeersResponse(peers=await self.get_peers())
await self.broadcast(message.model_dump_json())
async def broadcast(self, message):
print("BROADCASTING", message)
# Send to all channels (including sender!)
async with self.connection.cursor() as cursor:
await cursor.execute(
sql.SQL("NOTIFY {chan}, {message}").format(
chan=sql.Identifier(str(self.room_id)),
message=message,
)
)
async def send_to(self, peer_id, message):
print("SEND TO", peer_id, message)
# Send to one given channel
async with self.connection.cursor() as cursor:
await cursor.execute(
sql.SQL("NOTIFY {chan}, {message}").format(
chan=sql.Identifier(str(peer_id)), message=message
)
)
async def receive(self, text_data):
if not self.is_authenticated:
print("AUTHENTICATING", self.uuid)
message = JoinRequest.model_validate_json(text_data)
signed = TimestampSigner().unsign_object(message.token, max_age=30)
user, map_id, permissions = signed.values()
if "edit" not in permissions:
return await self.disconnect()
await self.asave()
await self.listen()
response = JoinResponse(uuid=str(self.uuid), peers=await self.get_peers())
await self.send(response.model_dump_json())
await self.send_peers_list()
self.is_authenticated = True
return
if text_data == "ping":
return await self.send("pong")
try:
incoming = Request.model_validate_json(text_data)
except ValidationError as error:
message = (
f"An error occurred when receiving the following message: {text_data!r}"
)
logging.error(message, error)
else:
match incoming.root:
# Broadcast all operation messages to connected peers
case OperationMessage():
await self.broadcast(text_data)
# Send peer messages to the proper peer
case PeerMessage():
await self.send_to(incoming.root.recipient, text_data)
async def send(self, text):
print("SEND", text)
await self._send({"type": "websocket.send", "text": text})