mirror of
https://github.com/umap-project/umap.git
synced 2025-04-29 11:52:38 +02:00
wip(sync): make the client set its peer id
This commit is contained in:
parent
36d9e9bf06
commit
1bf100d7a8
4 changed files with 24 additions and 22 deletions
|
@ -62,6 +62,7 @@ export class SyncEngine {
|
||||||
this._reconnectDelay = RECONNECT_DELAY
|
this._reconnectDelay = RECONNECT_DELAY
|
||||||
this.websocketConnected = false
|
this.websocketConnected = false
|
||||||
this.closeRequested = false
|
this.closeRequested = false
|
||||||
|
this.peerId = Utils.generateId()
|
||||||
}
|
}
|
||||||
|
|
||||||
async authenticate() {
|
async authenticate() {
|
||||||
|
@ -81,7 +82,8 @@ export class SyncEngine {
|
||||||
this.transport = new WebSocketTransport(
|
this.transport = new WebSocketTransport(
|
||||||
`${protocol}//${window.location.host}${path}`,
|
`${protocol}//${window.location.host}${path}`,
|
||||||
authToken,
|
authToken,
|
||||||
this
|
this,
|
||||||
|
this.peerId
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,7 +129,7 @@ export class SyncEngine {
|
||||||
|
|
||||||
if (this.offline) return
|
if (this.offline) return
|
||||||
if (this.transport) {
|
if (this.transport) {
|
||||||
this.transport.send('OperationMessage', { sender: this.uuid, ...message })
|
this.transport.send('OperationMessage', { sender: this.peerId, ...message })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -179,7 +181,7 @@ export class SyncEngine {
|
||||||
* @param {Object} payload
|
* @param {Object} payload
|
||||||
*/
|
*/
|
||||||
onOperationMessage(payload) {
|
onOperationMessage(payload) {
|
||||||
if (payload.sender === this.uuid) return
|
if (payload.sender === this.peerId) return
|
||||||
this._operations.storeRemoteOperations([payload])
|
this._operations.storeRemoteOperations([payload])
|
||||||
this._applyOperation(payload)
|
this._applyOperation(payload)
|
||||||
}
|
}
|
||||||
|
@ -191,9 +193,8 @@ export class SyncEngine {
|
||||||
* @param {string} payload.uuid The server-assigned uuid for this peer
|
* @param {string} payload.uuid The server-assigned uuid for this peer
|
||||||
* @param {string[]} payload.peers The list of peers uuids
|
* @param {string[]} payload.peers The list of peers uuids
|
||||||
*/
|
*/
|
||||||
onJoinResponse({ uuid, peers }) {
|
onJoinResponse({ peer, peers }) {
|
||||||
debug('received join response', { uuid, peers })
|
debug('received join response', { peer, peers })
|
||||||
this.uuid = uuid
|
|
||||||
this.onListPeersResponse({ peers })
|
this.onListPeersResponse({ peers })
|
||||||
|
|
||||||
// Get one peer at random
|
// Get one peer at random
|
||||||
|
@ -289,7 +290,7 @@ export class SyncEngine {
|
||||||
sendToPeer(recipient, verb, payload) {
|
sendToPeer(recipient, verb, payload) {
|
||||||
payload.verb = verb
|
payload.verb = verb
|
||||||
this.transport.send('PeerMessage', {
|
this.transport.send('PeerMessage', {
|
||||||
sender: this.uuid,
|
sender: this.peerId,
|
||||||
recipient: recipient,
|
recipient: recipient,
|
||||||
message: payload,
|
message: payload,
|
||||||
})
|
})
|
||||||
|
@ -301,7 +302,7 @@ export class SyncEngine {
|
||||||
* @returns {string|bool} the selected peer uuid, or False if none was found.
|
* @returns {string|bool} the selected peer uuid, or False if none was found.
|
||||||
*/
|
*/
|
||||||
_getRandomPeer() {
|
_getRandomPeer() {
|
||||||
const otherPeers = this.peers.filter((p) => p !== this.uuid)
|
const otherPeers = this.peers.filter((p) => p !== this.peerId)
|
||||||
if (otherPeers.length > 0) {
|
if (otherPeers.length > 0) {
|
||||||
const random = Math.floor(Math.random() * otherPeers.length)
|
const random = Math.floor(Math.random() * otherPeers.length)
|
||||||
return otherPeers[random]
|
return otherPeers[random]
|
||||||
|
|
|
@ -3,13 +3,13 @@ const PING_INTERVAL = 30000
|
||||||
const FIRST_CONNECTION_TIMEOUT = 2000
|
const FIRST_CONNECTION_TIMEOUT = 2000
|
||||||
|
|
||||||
export class WebSocketTransport {
|
export class WebSocketTransport {
|
||||||
constructor(webSocketURI, authToken, messagesReceiver) {
|
constructor(webSocketURI, authToken, messagesReceiver, peerId) {
|
||||||
this.receiver = messagesReceiver
|
this.receiver = messagesReceiver
|
||||||
|
|
||||||
this.websocket = new WebSocket(webSocketURI)
|
this.websocket = new WebSocket(webSocketURI)
|
||||||
|
|
||||||
this.websocket.onopen = () => {
|
this.websocket.onopen = () => {
|
||||||
this.send('JoinRequest', { token: authToken })
|
this.send('JoinRequest', { token: authToken, peer: peerId })
|
||||||
this.receiver.onConnection()
|
this.receiver.onConnection()
|
||||||
}
|
}
|
||||||
this.websocket.addEventListener('message', this.onMessage.bind(this))
|
this.websocket.addEventListener('message', this.onMessage.bind(this))
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import uuid
|
|
||||||
|
|
||||||
import redis.asyncio as redis
|
import redis.asyncio as redis
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
@ -29,7 +28,8 @@ async def application(scope, receive, send):
|
||||||
|
|
||||||
|
|
||||||
async def sync(scope, receive, send, **kwargs):
|
async def sync(scope, receive, send, **kwargs):
|
||||||
peer = Peer(uuid=uuid.uuid4(), map_id=kwargs["map_id"])
|
room_id = f"umap:{kwargs['map_id']}"
|
||||||
|
peer = Peer(room_id)
|
||||||
peer._send = send
|
peer._send = send
|
||||||
while True:
|
while True:
|
||||||
event = await receive()
|
event = await receive()
|
||||||
|
@ -59,11 +59,9 @@ async def sync(scope, receive, send, **kwargs):
|
||||||
|
|
||||||
|
|
||||||
class Peer:
|
class Peer:
|
||||||
def __init__(self, uuid, map_id, username=None):
|
def __init__(self, room_id, username=None):
|
||||||
self.uuid = uuid
|
|
||||||
self.user_id = f"user:{uuid}"
|
|
||||||
self.username = username or ""
|
self.username = username or ""
|
||||||
self.room_id = f"umap:{map_id}"
|
self.room_id = room_id
|
||||||
self.is_authenticated = False
|
self.is_authenticated = False
|
||||||
|
|
||||||
async def get_peers(self):
|
async def get_peers(self):
|
||||||
|
@ -90,17 +88,17 @@ class Peer:
|
||||||
|
|
||||||
async def listen(self):
|
async def listen(self):
|
||||||
await self.listen_to_channel(self.room_id)
|
await self.listen_to_channel(self.room_id)
|
||||||
await self.listen_to_channel(self.user_id)
|
await self.listen_to_channel(self.peer_id)
|
||||||
|
|
||||||
async def connect(self):
|
async def connect(self):
|
||||||
self.client = redis.from_url(settings.REDIS_URL)
|
self.client = redis.from_url(settings.REDIS_URL)
|
||||||
|
|
||||||
async def disconnect(self):
|
async def disconnect(self):
|
||||||
await self.client.hdel(self.room_id, self.user_id)
|
await self.client.hdel(self.room_id, self.peer_id)
|
||||||
await self.send_peers_list()
|
await self.send_peers_list()
|
||||||
await self.client.aclose()
|
await self.client.aclose()
|
||||||
await self.client.publish(self.room_id, "STOP")
|
await self.client.publish(self.room_id, "STOP")
|
||||||
await self.client.publish(self.user_id, "STOP")
|
await self.client.publish(self.peer_id, "STOP")
|
||||||
|
|
||||||
async def send_peers_list(self):
|
async def send_peers_list(self):
|
||||||
message = ListPeersResponse(peers=await self.get_peers())
|
message = ListPeersResponse(peers=await self.get_peers())
|
||||||
|
@ -124,9 +122,11 @@ class Peer:
|
||||||
user, room_id, permissions = signed.values()
|
user, room_id, permissions = signed.values()
|
||||||
if "edit" not in permissions:
|
if "edit" not in permissions:
|
||||||
return await self.disconnect()
|
return await self.disconnect()
|
||||||
await self.client.hset(self.room_id, self.user_id, self.username)
|
self.peer_id = message.peer
|
||||||
|
print("AUTHENTICATED", self.peer_id)
|
||||||
|
await self.client.hset(self.room_id, self.peer_id, self.username)
|
||||||
await self.listen()
|
await self.listen()
|
||||||
response = JoinResponse(uuid=str(self.uuid), peers=await self.get_peers())
|
response = JoinResponse(peer=self.peer_id, peers=await self.get_peers())
|
||||||
await self.send(response.model_dump_json())
|
await self.send(response.model_dump_json())
|
||||||
await self.send_peers_list()
|
await self.send_peers_list()
|
||||||
self.is_authenticated = True
|
self.is_authenticated = True
|
||||||
|
|
|
@ -6,6 +6,7 @@ from pydantic import BaseModel, Field, RootModel
|
||||||
class JoinRequest(BaseModel):
|
class JoinRequest(BaseModel):
|
||||||
kind: Literal["JoinRequest"] = "JoinRequest"
|
kind: Literal["JoinRequest"] = "JoinRequest"
|
||||||
token: str
|
token: str
|
||||||
|
peer: str
|
||||||
|
|
||||||
|
|
||||||
class OperationMessage(BaseModel):
|
class OperationMessage(BaseModel):
|
||||||
|
@ -39,7 +40,7 @@ class JoinResponse(BaseModel):
|
||||||
|
|
||||||
kind: Literal["JoinResponse"] = "JoinResponse"
|
kind: Literal["JoinResponse"] = "JoinResponse"
|
||||||
peers: list
|
peers: list
|
||||||
uuid: str
|
peer: str
|
||||||
|
|
||||||
|
|
||||||
class ListPeersResponse(BaseModel):
|
class ListPeersResponse(BaseModel):
|
||||||
|
|
Loading…
Reference in a new issue