mirror of
https://github.com/umap-project/umap.git
synced 2025-04-28 19:42:36 +02:00
wip(sync): only return peers with an active connection
This commit is contained in:
parent
11fb29c456
commit
0d5e3047f4
4 changed files with 37 additions and 22 deletions
|
@ -83,7 +83,8 @@ export class SyncEngine {
|
||||||
`${protocol}//${window.location.host}${path}`,
|
`${protocol}//${window.location.host}${path}`,
|
||||||
authToken,
|
authToken,
|
||||||
this,
|
this,
|
||||||
this.peerId
|
this.peerId,
|
||||||
|
this._umap.properties.user?.name
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -146,7 +147,7 @@ export class SyncEngine {
|
||||||
}
|
}
|
||||||
|
|
||||||
getNumberOfConnectedPeers() {
|
getNumberOfConnectedPeers() {
|
||||||
if (this.peers) return this.peers.length
|
if (this.peers) return Object.keys(this.peers).length
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -215,7 +216,7 @@ export class SyncEngine {
|
||||||
* @param {string[]} payload.peers The list of peers uuids
|
* @param {string[]} payload.peers The list of peers uuids
|
||||||
*/
|
*/
|
||||||
onListPeersResponse({ peers }) {
|
onListPeersResponse({ peers }) {
|
||||||
debug('received peerinfo', { peers })
|
debug('received peerinfo', peers)
|
||||||
this.peers = peers
|
this.peers = peers
|
||||||
this.updaters.map.update({ key: 'numberOfConnectedPeers' })
|
this.updaters.map.update({ key: 'numberOfConnectedPeers' })
|
||||||
}
|
}
|
||||||
|
@ -302,7 +303,7 @@ export class SyncEngine {
|
||||||
* @returns {string|bool} the selected peer uuid, or False if none was found.
|
* @returns {string|bool} the selected peer uuid, or False if none was found.
|
||||||
*/
|
*/
|
||||||
_getRandomPeer() {
|
_getRandomPeer() {
|
||||||
const otherPeers = this.peers.filter((p) => p !== this.peerId)
|
const otherPeers = Object.keys(this.peers).filter((p) => p !== this.peerId)
|
||||||
if (otherPeers.length > 0) {
|
if (otherPeers.length > 0) {
|
||||||
const random = Math.floor(Math.random() * otherPeers.length)
|
const random = Math.floor(Math.random() * otherPeers.length)
|
||||||
return otherPeers[random]
|
return otherPeers[random]
|
||||||
|
|
|
@ -3,13 +3,13 @@ const PING_INTERVAL = 30000
|
||||||
const FIRST_CONNECTION_TIMEOUT = 2000
|
const FIRST_CONNECTION_TIMEOUT = 2000
|
||||||
|
|
||||||
export class WebSocketTransport {
|
export class WebSocketTransport {
|
||||||
constructor(webSocketURI, authToken, messagesReceiver, peerId) {
|
constructor(webSocketURI, authToken, messagesReceiver, peerId, username) {
|
||||||
this.receiver = messagesReceiver
|
this.receiver = messagesReceiver
|
||||||
|
|
||||||
this.websocket = new WebSocket(webSocketURI)
|
this.websocket = new WebSocket(webSocketURI)
|
||||||
|
|
||||||
this.websocket.onopen = () => {
|
this.websocket.onopen = () => {
|
||||||
this.send('JoinRequest', { token: authToken, peer: peerId })
|
this.send('JoinRequest', { token: authToken, peer: peerId, username })
|
||||||
this.receiver.onConnection()
|
this.receiver.onConnection()
|
||||||
}
|
}
|
||||||
this.websocket.addEventListener('message', this.onMessage.bind(this))
|
this.websocket.addEventListener('message', this.onMessage.bind(this))
|
||||||
|
|
|
@ -28,8 +28,7 @@ async def application(scope, receive, send):
|
||||||
|
|
||||||
|
|
||||||
async def sync(scope, receive, send, **kwargs):
|
async def sync(scope, receive, send, **kwargs):
|
||||||
room_id = f"umap:{kwargs['map_id']}"
|
peer = Peer(kwargs["map_id"])
|
||||||
peer = Peer(room_id)
|
|
||||||
peer._send = send
|
peer._send = send
|
||||||
while True:
|
while True:
|
||||||
event = await receive()
|
event = await receive()
|
||||||
|
@ -53,16 +52,28 @@ async def sync(scope, receive, send, **kwargs):
|
||||||
|
|
||||||
|
|
||||||
class Peer:
|
class Peer:
|
||||||
def __init__(self, room_id, username=None):
|
def __init__(self, map_id, username=None):
|
||||||
self.username = username or ""
|
self.username = username or ""
|
||||||
self.room_id = room_id
|
self.map_id = map_id
|
||||||
self.is_authenticated = False
|
self.is_authenticated = False
|
||||||
self._subscriptions = []
|
self._subscriptions = []
|
||||||
|
|
||||||
|
@property
|
||||||
|
def room_key(self):
|
||||||
|
return f"umap:{self.map_id}"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def peer_key(self):
|
||||||
|
return f"user:{self.map_id}:{self.peer_id}"
|
||||||
|
|
||||||
async def get_peers(self):
|
async def get_peers(self):
|
||||||
peers = await self.client.hgetall(self.room_id)
|
known = await self.client.hgetall(self.room_key)
|
||||||
# Send only ids for now (values are client names).
|
active = await self.client.pubsub_channels(f"user:{self.map_id}:*")
|
||||||
return peers.keys()
|
active = [name.split(b":")[-1] for name in active]
|
||||||
|
if self.peer_id.encode() not in active:
|
||||||
|
# Our connection may not yet be active
|
||||||
|
active.append(self.peer_id.encode())
|
||||||
|
return {k: v for k, v in known.items() if k in active}
|
||||||
|
|
||||||
async def listen_to_channel(self, channel_name):
|
async def listen_to_channel(self, channel_name):
|
||||||
async def reader(pubsub):
|
async def reader(pubsub):
|
||||||
|
@ -85,14 +96,14 @@ class Peer:
|
||||||
asyncio.create_task(reader(pubsub))
|
asyncio.create_task(reader(pubsub))
|
||||||
|
|
||||||
async def listen(self):
|
async def listen(self):
|
||||||
await self.listen_to_channel(self.room_id)
|
await self.listen_to_channel(self.room_key)
|
||||||
await self.listen_to_channel(self.peer_id)
|
await self.listen_to_channel(self.peer_key)
|
||||||
|
|
||||||
async def connect(self):
|
async def connect(self):
|
||||||
self.client = redis.from_url(settings.REDIS_URL)
|
self.client = redis.from_url(settings.REDIS_URL)
|
||||||
|
|
||||||
async def disconnect(self):
|
async def disconnect(self):
|
||||||
await self.client.hdel(self.room_id, self.peer_id)
|
await self.client.hdel(self.room_key, self.peer_id)
|
||||||
for pubsub in self._subscriptions:
|
for pubsub in self._subscriptions:
|
||||||
await pubsub.unsubscribe()
|
await pubsub.unsubscribe()
|
||||||
await pubsub.close()
|
await pubsub.close()
|
||||||
|
@ -106,24 +117,26 @@ class Peer:
|
||||||
async def broadcast(self, message):
|
async def broadcast(self, message):
|
||||||
print("BROADCASTING", message)
|
print("BROADCASTING", message)
|
||||||
# Send to all channels (including sender!)
|
# Send to all channels (including sender!)
|
||||||
await self.client.publish(self.room_id, message)
|
await self.client.publish(self.room_key, message)
|
||||||
|
|
||||||
async def send_to(self, peer_id, message):
|
async def send_to(self, peer_id, message):
|
||||||
print("SEND TO", peer_id, message)
|
print("SEND TO", peer_id, message)
|
||||||
# Send to one given channel
|
# Send to one given channel
|
||||||
await self.client.publish(peer_id, message)
|
await self.client.publish(f"user:{self.map_id}:{peer_id}", message)
|
||||||
|
|
||||||
async def receive(self, text_data):
|
async def receive(self, text_data):
|
||||||
if not self.is_authenticated:
|
if not self.is_authenticated:
|
||||||
print("AUTHENTICATING", text_data)
|
print("AUTHENTICATING", text_data)
|
||||||
message = JoinRequest.model_validate_json(text_data)
|
message = JoinRequest.model_validate_json(text_data)
|
||||||
signed = TimestampSigner().unsign_object(message.token, max_age=30)
|
signed = TimestampSigner().unsign_object(message.token, max_age=30)
|
||||||
user, room_id, permissions = signed.values()
|
user, map_id, permissions = signed.values()
|
||||||
|
assert str(map_id) == self.map_id
|
||||||
if "edit" not in permissions:
|
if "edit" not in permissions:
|
||||||
return await self.disconnect()
|
return await self.disconnect()
|
||||||
self.peer_id = message.peer
|
self.peer_id = message.peer
|
||||||
|
self.username = message.username
|
||||||
print("AUTHENTICATED", self.peer_id)
|
print("AUTHENTICATED", self.peer_id)
|
||||||
await self.client.hset(self.room_id, self.peer_id, self.username)
|
await self.client.hset(self.room_key, self.peer_id, self.username)
|
||||||
await self.listen()
|
await self.listen()
|
||||||
response = JoinResponse(peer=self.peer_id, peers=await self.get_peers())
|
response = JoinResponse(peer=self.peer_id, peers=await self.get_peers())
|
||||||
await self.send(response.model_dump_json())
|
await self.send(response.model_dump_json())
|
||||||
|
|
|
@ -7,6 +7,7 @@ class JoinRequest(BaseModel):
|
||||||
kind: Literal["JoinRequest"] = "JoinRequest"
|
kind: Literal["JoinRequest"] = "JoinRequest"
|
||||||
token: str
|
token: str
|
||||||
peer: str
|
peer: str
|
||||||
|
username: Optional[str] = ""
|
||||||
|
|
||||||
|
|
||||||
class OperationMessage(BaseModel):
|
class OperationMessage(BaseModel):
|
||||||
|
@ -39,10 +40,10 @@ class JoinResponse(BaseModel):
|
||||||
"""Server response containing the list of peers"""
|
"""Server response containing the list of peers"""
|
||||||
|
|
||||||
kind: Literal["JoinResponse"] = "JoinResponse"
|
kind: Literal["JoinResponse"] = "JoinResponse"
|
||||||
peers: list
|
peers: dict
|
||||||
peer: str
|
peer: str
|
||||||
|
|
||||||
|
|
||||||
class ListPeersResponse(BaseModel):
|
class ListPeersResponse(BaseModel):
|
||||||
kind: Literal["ListPeersResponse"] = "ListPeersResponse"
|
kind: Literal["ListPeersResponse"] = "ListPeersResponse"
|
||||||
peers: list
|
peers: dict
|
||||||
|
|
Loading…
Reference in a new issue