mirror of
https://github.com/umap-project/umap.git
synced 2025-04-28 19:42:36 +02:00
wip(sync): websocket server with ASGI and PostgreSQL LISTEN/NOTIFY
This commit is contained in:
parent
31546d6ff4
commit
a29eae138e
9 changed files with 241 additions and 103 deletions
26
umap/asgi.py
26
umap/asgi.py
|
@ -1,22 +1,20 @@
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from channels.routing import ProtocolTypeRouter, URLRouter
|
|
||||||
from channels.security.websocket import AllowedHostsOriginValidator
|
|
||||||
from django.core.asgi import get_asgi_application
|
|
||||||
from django.urls import re_path
|
|
||||||
|
|
||||||
from .sync import consumers
|
|
||||||
|
|
||||||
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "umap.settings")
|
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "umap.settings")
|
||||||
|
|
||||||
|
from django.core.asgi import get_asgi_application
|
||||||
|
|
||||||
|
from .sync.app import application as ws_application
|
||||||
|
|
||||||
# Initialize Django ASGI application early to ensure the AppRegistry
|
# Initialize Django ASGI application early to ensure the AppRegistry
|
||||||
# is populated before importing code that may import ORM models.
|
# is populated before importing code that may import ORM models.
|
||||||
django_asgi_app = get_asgi_application()
|
django_asgi_app = get_asgi_application()
|
||||||
|
|
||||||
urlpatterns = (re_path(r"ws/sync/(?P<map_id>\w+)/$", consumers.SyncConsumer.as_asgi()),)
|
|
||||||
|
|
||||||
application = ProtocolTypeRouter(
|
async def application(scope, receive, send):
|
||||||
{
|
if scope["type"] == "http":
|
||||||
"http": django_asgi_app,
|
await django_asgi_app(scope, receive, send)
|
||||||
"websocket": AllowedHostsOriginValidator(URLRouter(urlpatterns)),
|
elif scope["type"] == "websocket":
|
||||||
}
|
await ws_application(scope, receive, send)
|
||||||
)
|
else:
|
||||||
|
raise NotImplementedError(f"Unknown scope type {scope['type']}")
|
||||||
|
|
|
@ -129,6 +129,7 @@ INSTALLED_APPS = (
|
||||||
"django.contrib.gis",
|
"django.contrib.gis",
|
||||||
"django_probes",
|
"django_probes",
|
||||||
"umap",
|
"umap",
|
||||||
|
"umap.sync",
|
||||||
"social_django",
|
"social_django",
|
||||||
# See https://github.com/peopledoc/django-agnocomplete/commit/26eda2dfa4a2f8a805ca2ea19a0c504b9d773a1c
|
# See https://github.com/peopledoc/django-agnocomplete/commit/26eda2dfa4a2f8a805ca2ea19a0c504b9d773a1c
|
||||||
# Django does not find the app config in the default place, so the app is not loaded
|
# Django does not find the app config in the default place, so the app is not loaded
|
||||||
|
@ -343,4 +344,3 @@ WEBSOCKET_ENABLED = env.bool("WEBSOCKET_ENABLED", default=False)
|
||||||
WEBSOCKET_BACK_HOST = env("WEBSOCKET_BACK_HOST", default="localhost")
|
WEBSOCKET_BACK_HOST = env("WEBSOCKET_BACK_HOST", default="localhost")
|
||||||
WEBSOCKET_BACK_PORT = env.int("WEBSOCKET_BACK_PORT", default=8001)
|
WEBSOCKET_BACK_PORT = env.int("WEBSOCKET_BACK_PORT", default=8001)
|
||||||
WEBSOCKET_FRONT_URI = env("WEBSOCKET_FRONT_URI", default="ws://localhost:8001")
|
WEBSOCKET_FRONT_URI = env("WEBSOCKET_FRONT_URI", default="ws://localhost:8001")
|
||||||
CHANNEL_LAYERS = {"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"}}
|
|
||||||
|
|
|
@ -6,7 +6,7 @@ export class WebSocketTransport {
|
||||||
constructor(webSocketURI, authToken, messagesReceiver) {
|
constructor(webSocketURI, authToken, messagesReceiver) {
|
||||||
this.receiver = messagesReceiver
|
this.receiver = messagesReceiver
|
||||||
|
|
||||||
this.websocket = new WebSocket(`${webSocketURI}`)
|
this.websocket = new WebSocket(webSocketURI)
|
||||||
|
|
||||||
this.websocket.onopen = () => {
|
this.websocket.onopen = () => {
|
||||||
this.send('JoinRequest', { token: authToken })
|
this.send('JoinRequest', { token: authToken })
|
||||||
|
@ -21,6 +21,10 @@ export class WebSocketTransport {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.websocket.onerror = (error) => {
|
||||||
|
console.log('WS ERROR', error)
|
||||||
|
}
|
||||||
|
|
||||||
this.ensureOpen = setInterval(() => {
|
this.ensureOpen = setInterval(() => {
|
||||||
if (this.websocket.readyState !== WebSocket.OPEN) {
|
if (this.websocket.readyState !== WebSocket.OPEN) {
|
||||||
this.websocket.close()
|
this.websocket.close()
|
||||||
|
@ -34,6 +38,7 @@ export class WebSocketTransport {
|
||||||
// See https://making.close.com/posts/reliable-websockets/ for more details.
|
// See https://making.close.com/posts/reliable-websockets/ for more details.
|
||||||
this.pingInterval = setInterval(() => {
|
this.pingInterval = setInterval(() => {
|
||||||
if (this.websocket.readyState === WebSocket.OPEN) {
|
if (this.websocket.readyState === WebSocket.OPEN) {
|
||||||
|
console.log('sending ping')
|
||||||
this.websocket.send('ping')
|
this.websocket.send('ping')
|
||||||
this.pongReceived = false
|
this.pongReceived = false
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
|
@ -48,7 +53,6 @@ export class WebSocketTransport {
|
||||||
}
|
}
|
||||||
|
|
||||||
onMessage(wsMessage) {
|
onMessage(wsMessage) {
|
||||||
console.log(wsMessage)
|
|
||||||
if (wsMessage.data === 'pong') {
|
if (wsMessage.data === 'pong') {
|
||||||
this.pongReceived = true
|
this.pongReceived = true
|
||||||
} else {
|
} else {
|
||||||
|
@ -64,6 +68,7 @@ export class WebSocketTransport {
|
||||||
}
|
}
|
||||||
|
|
||||||
close() {
|
close() {
|
||||||
|
console.log('Closing')
|
||||||
this.receiver.closeRequested = true
|
this.receiver.closeRequested = true
|
||||||
this.websocket.close()
|
this.websocket.close()
|
||||||
}
|
}
|
||||||
|
|
45
umap/sync/app.py
Normal file
45
umap/sync/app.py
Normal file
|
@ -0,0 +1,45 @@
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
from django.urls.resolvers import RoutePattern
|
||||||
|
|
||||||
|
ws_pattern = RoutePattern("/ws/sync/<str:map_id>")
|
||||||
|
|
||||||
|
|
||||||
|
async def application(scope, receive, send):
|
||||||
|
from .models import Peer
|
||||||
|
|
||||||
|
matched = ws_pattern.match(scope["path"])
|
||||||
|
print(matched)
|
||||||
|
if not matched:
|
||||||
|
print("Wrong path")
|
||||||
|
return
|
||||||
|
_, _, kwargs = matched
|
||||||
|
|
||||||
|
map_id = kwargs["map_id"]
|
||||||
|
room_id = f"room{map_id}"
|
||||||
|
peer = await Peer.objects.acreate(uuid=uuid.uuid4(), name="FooBar", room_id=room_id)
|
||||||
|
print(peer)
|
||||||
|
peer._send = send
|
||||||
|
while True:
|
||||||
|
event = await receive()
|
||||||
|
print("EVENT", event)
|
||||||
|
|
||||||
|
if event["type"] == "websocket.connect":
|
||||||
|
try:
|
||||||
|
print("Let's accept")
|
||||||
|
await send({"type": "websocket.accept"})
|
||||||
|
await peer.connect()
|
||||||
|
except ValueError:
|
||||||
|
await send({"type": "websocket.close"})
|
||||||
|
|
||||||
|
if event["type"] == "websocket.disconnect":
|
||||||
|
print("Closing", event)
|
||||||
|
await peer.disconnect()
|
||||||
|
print("Closed")
|
||||||
|
break
|
||||||
|
|
||||||
|
if event["type"] == "websocket.receive":
|
||||||
|
if event["text"] == "ping":
|
||||||
|
await send({"type": "websocket.send", "text": "pong"})
|
||||||
|
else:
|
||||||
|
await peer.receive(event["text"])
|
6
umap/sync/apps.py
Normal file
6
umap/sync/apps.py
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
from django.apps import AppConfig
|
||||||
|
|
||||||
|
|
||||||
|
class UmapConfig(AppConfig):
|
||||||
|
name = "umap.sync"
|
||||||
|
verbose_name = "uMap Sync"
|
|
@ -1,86 +0,0 @@
|
||||||
import logging
|
|
||||||
|
|
||||||
from channels.generic.websocket import AsyncWebsocketConsumer
|
|
||||||
from django.core.signing import TimestampSigner
|
|
||||||
from pydantic import ValidationError
|
|
||||||
|
|
||||||
from .payloads import (
|
|
||||||
JoinRequest,
|
|
||||||
JoinResponse,
|
|
||||||
ListPeersResponse,
|
|
||||||
OperationMessage,
|
|
||||||
PeerMessage,
|
|
||||||
Request,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class SyncConsumer(AsyncWebsocketConsumer):
|
|
||||||
@property
|
|
||||||
def peers(self):
|
|
||||||
return self.channel_layer.groups[self.map_id].keys()
|
|
||||||
|
|
||||||
async def connect(self):
|
|
||||||
self.map_id = self.scope["url_route"]["kwargs"]["map_id"]
|
|
||||||
|
|
||||||
# Join room group
|
|
||||||
await self.channel_layer.group_add(self.map_id, self.channel_name)
|
|
||||||
|
|
||||||
self.is_authenticated = False
|
|
||||||
await self.accept()
|
|
||||||
|
|
||||||
async def disconnect(self, close_code):
|
|
||||||
await self.channel_layer.group_discard(self.map_id, self.channel_name)
|
|
||||||
await self.send_peers_list()
|
|
||||||
|
|
||||||
async def send_peers_list(self):
|
|
||||||
message = ListPeersResponse(peers=self.peers)
|
|
||||||
await self.broadcast(message.model_dump_json())
|
|
||||||
|
|
||||||
async def broadcast(self, message):
|
|
||||||
# Send to all channels (including sender!)
|
|
||||||
await self.channel_layer.group_send(
|
|
||||||
self.map_id, {"message": message, "type": "on_message"}
|
|
||||||
)
|
|
||||||
|
|
||||||
async def send_to(self, channel, message):
|
|
||||||
# Send to one given channel
|
|
||||||
await self.channel_layer.send(
|
|
||||||
channel, {"message": message, "type": "on_message"}
|
|
||||||
)
|
|
||||||
|
|
||||||
async def on_message(self, event):
|
|
||||||
# Send to self channel
|
|
||||||
await self.send(event["message"])
|
|
||||||
|
|
||||||
async def receive(self, text_data):
|
|
||||||
if not self.is_authenticated:
|
|
||||||
message = JoinRequest.model_validate_json(text_data)
|
|
||||||
signed = TimestampSigner().unsign_object(message.token, max_age=30)
|
|
||||||
user, map_id, permissions = signed.values()
|
|
||||||
if "edit" not in permissions:
|
|
||||||
return await self.disconnect()
|
|
||||||
response = JoinResponse(uuid=self.channel_name, peers=self.peers)
|
|
||||||
await self.send(response.model_dump_json())
|
|
||||||
await self.send_peers_list()
|
|
||||||
self.is_authenticated = True
|
|
||||||
return
|
|
||||||
|
|
||||||
if text_data == "ping":
|
|
||||||
return await self.send("pong")
|
|
||||||
|
|
||||||
try:
|
|
||||||
incoming = Request.model_validate_json(text_data)
|
|
||||||
except ValidationError as error:
|
|
||||||
message = (
|
|
||||||
f"An error occurred when receiving the following message: {text_data!r}"
|
|
||||||
)
|
|
||||||
logging.error(message, error)
|
|
||||||
else:
|
|
||||||
match incoming.root:
|
|
||||||
# Broadcast all operation messages to connected peers
|
|
||||||
case OperationMessage():
|
|
||||||
await self.broadcast(text_data)
|
|
||||||
|
|
||||||
# Send peer messages to the proper peer
|
|
||||||
case PeerMessage():
|
|
||||||
await self.send_to(incoming.root.recipient, text_data)
|
|
23
umap/sync/migrations/0001_initial.py
Normal file
23
umap/sync/migrations/0001_initial.py
Normal file
|
@ -0,0 +1,23 @@
|
||||||
|
# Generated by Django 5.1.4 on 2024-12-27 16:14
|
||||||
|
|
||||||
|
from django.db import migrations, models
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(migrations.Migration):
|
||||||
|
initial = True
|
||||||
|
|
||||||
|
dependencies = []
|
||||||
|
|
||||||
|
operations = [
|
||||||
|
migrations.CreateModel(
|
||||||
|
name="Peer",
|
||||||
|
fields=[
|
||||||
|
(
|
||||||
|
"uuid",
|
||||||
|
models.UUIDField(primary_key=True, serialize=False, unique=True),
|
||||||
|
),
|
||||||
|
("name", models.CharField(max_length=200)),
|
||||||
|
("room_id", models.CharField(max_length=200)),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
]
|
0
umap/sync/migrations/__init__.py
Normal file
0
umap/sync/migrations/__init__.py
Normal file
147
umap/sync/models.py
Normal file
147
umap/sync/models.py
Normal file
|
@ -0,0 +1,147 @@
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import psycopg
|
||||||
|
from django.core.signing import TimestampSigner
|
||||||
|
from django.db import connection, models
|
||||||
|
from psycopg import sql
|
||||||
|
from pydantic import ValidationError
|
||||||
|
|
||||||
|
from .payloads import (
|
||||||
|
JoinRequest,
|
||||||
|
JoinResponse,
|
||||||
|
ListPeersResponse,
|
||||||
|
OperationMessage,
|
||||||
|
PeerMessage,
|
||||||
|
Request,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class Peer(models.Model):
|
||||||
|
uuid = models.UUIDField(unique=True, primary_key=True)
|
||||||
|
name = models.CharField(max_length=200)
|
||||||
|
room_id = models.CharField(max_length=200)
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.is_authenticated = False
|
||||||
|
|
||||||
|
async def get_peers(self):
|
||||||
|
qs = Peer.objects.filter(room_id=self.room_id).values_list("uuid", flat=True)
|
||||||
|
peers = []
|
||||||
|
async for peer in qs:
|
||||||
|
peers.append(peer)
|
||||||
|
return peers
|
||||||
|
|
||||||
|
async def listen_public(self):
|
||||||
|
# We need a dedicated connection for the LISTEN
|
||||||
|
aconn = await psycopg.AsyncConnection.connect(
|
||||||
|
**self.connection_params,
|
||||||
|
autocommit=True,
|
||||||
|
)
|
||||||
|
async with aconn:
|
||||||
|
async with aconn.cursor() as acursor:
|
||||||
|
await acursor.execute(
|
||||||
|
sql.SQL("LISTEN {chan}").format(
|
||||||
|
chan=sql.Identifier(str(self.room_id))
|
||||||
|
)
|
||||||
|
)
|
||||||
|
print("LISTEN", self.room_id)
|
||||||
|
gen = aconn.notifies()
|
||||||
|
async for notify in gen:
|
||||||
|
await self.send(notify.payload)
|
||||||
|
|
||||||
|
async def listen_private(self):
|
||||||
|
aconn = await psycopg.AsyncConnection.connect(
|
||||||
|
**self.connection_params,
|
||||||
|
autocommit=True,
|
||||||
|
)
|
||||||
|
async with aconn:
|
||||||
|
async with aconn.cursor() as acursor:
|
||||||
|
await acursor.execute(
|
||||||
|
sql.SQL("LISTEN {chan}").format(chan=sql.Identifier(str(self.uuid)))
|
||||||
|
)
|
||||||
|
print("LISTEN", self.uuid)
|
||||||
|
gen = aconn.notifies()
|
||||||
|
async for notify in gen:
|
||||||
|
await self.send(notify.payload)
|
||||||
|
|
||||||
|
async def connect(self):
|
||||||
|
# Join room for this map
|
||||||
|
connection_params = connection.get_connection_params()
|
||||||
|
connection_params.pop("cursor_factory")
|
||||||
|
self.connection_params = connection_params
|
||||||
|
self.connection = await psycopg.AsyncConnection.connect(
|
||||||
|
**connection_params,
|
||||||
|
autocommit=True,
|
||||||
|
)
|
||||||
|
asyncio.create_task(self.listen_public())
|
||||||
|
asyncio.create_task(self.listen_private())
|
||||||
|
|
||||||
|
async def disconnect(self):
|
||||||
|
await self.adelete()
|
||||||
|
await self.send_peers_list()
|
||||||
|
|
||||||
|
async def send_peers_list(self):
|
||||||
|
message = ListPeersResponse(peers=await self.get_peers())
|
||||||
|
await self.broadcast(message.model_dump_json())
|
||||||
|
|
||||||
|
async def broadcast(self, message):
|
||||||
|
print("BROADCASTING", message)
|
||||||
|
# Send to all channels (including sender!)
|
||||||
|
async with self.connection.cursor() as cursor:
|
||||||
|
await cursor.execute(
|
||||||
|
sql.SQL("NOTIFY {chan}, {message}").format(
|
||||||
|
chan=sql.Identifier(str(self.room_id)),
|
||||||
|
message=message,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
async def send_to(self, peer_id, message):
|
||||||
|
print("SEND TO", peer_id, message)
|
||||||
|
# Send to one given channel
|
||||||
|
async with self.connection.cursor() as cursor:
|
||||||
|
await cursor.execute(
|
||||||
|
sql.SQL("NOTIFY {chan}, {message}").format(
|
||||||
|
chan=sql.Identifier(str(peer_id)), message=message
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
async def receive(self, text_data):
|
||||||
|
if not self.is_authenticated:
|
||||||
|
print("AUTHENTICATING", self.uuid)
|
||||||
|
message = JoinRequest.model_validate_json(text_data)
|
||||||
|
signed = TimestampSigner().unsign_object(message.token, max_age=30)
|
||||||
|
user, map_id, permissions = signed.values()
|
||||||
|
if "edit" not in permissions:
|
||||||
|
return await self.disconnect()
|
||||||
|
await Peer.asave()
|
||||||
|
response = JoinResponse(uuid=str(self.uuid), peers=await self.get_peers())
|
||||||
|
await self.send(response.model_dump_json())
|
||||||
|
await self.send_peers_list()
|
||||||
|
self.is_authenticated = True
|
||||||
|
return
|
||||||
|
|
||||||
|
if text_data == "ping":
|
||||||
|
return await self.send("pong")
|
||||||
|
|
||||||
|
try:
|
||||||
|
incoming = Request.model_validate_json(text_data)
|
||||||
|
except ValidationError as error:
|
||||||
|
message = (
|
||||||
|
f"An error occurred when receiving the following message: {text_data!r}"
|
||||||
|
)
|
||||||
|
logging.error(message, error)
|
||||||
|
else:
|
||||||
|
match incoming.root:
|
||||||
|
# Broadcast all operation messages to connected peers
|
||||||
|
case OperationMessage():
|
||||||
|
await self.broadcast(text_data)
|
||||||
|
|
||||||
|
# Send peer messages to the proper peer
|
||||||
|
case PeerMessage():
|
||||||
|
await self.send_to(incoming.root.recipient, text_data)
|
||||||
|
|
||||||
|
async def send(self, text):
|
||||||
|
print("SEND", text)
|
||||||
|
await self._send({"type": "websocket.send", "text": text})
|
Loading…
Reference in a new issue