diff --git a/umap/static/umap/js/modules/sync/engine.js b/umap/static/umap/js/modules/sync/engine.js index caa36058..9a3ef611 100644 --- a/umap/static/umap/js/modules/sync/engine.js +++ b/umap/static/umap/js/modules/sync/engine.js @@ -3,6 +3,12 @@ import { HybridLogicalClock } from './hlc.js' import { DataLayerUpdater, FeatureUpdater, MapUpdater } from './updaters.js' import { WebSocketTransport } from './websocket.js' +// Start reconnecting after 2 seconds, then double the delay each time +// maxing out at 32 seconds. +const RECONNECT_DELAY = 2000; +const RECONNECT_DELAY_FACTOR = 2; +const MAX_RECONNECT_DELAY = 32000; + /** * The syncEngine exposes an API to sync messages between peers over the network. * @@ -42,7 +48,7 @@ import { WebSocketTransport } from './websocket.js' * ``` */ export class SyncEngine { - constructor(map) { + constructor(map, urls, server) { this.updaters = { map: new MapUpdater(map), feature: new FeatureUpdater(map), @@ -50,24 +56,54 @@ export class SyncEngine { } this.transport = undefined this._operations = new Operations() + this._server = server + + // Store URIs to avoid persisting the map + // mainly to ensure separation of concerns. + this._websocketTokenURI = urls.get('map_websocket_auth_token', { + map_id: map.options.umap_id, + }) + this._websocketURI = map.options.websocketURI + this._reconnectTimeout = null; + this._reconnectDelay = RECONNECT_DELAY; } - async authenticate(tokenURI, webSocketURI, server) { - const [response, _, error] = await server.get(tokenURI) + /** + * Authenticate with the server and start the transport layer. + */ + async authenticate() { + const [response, _, error] = await this._server.get(this._websocketTokenURI) if (!error) { - this.start(webSocketURI, response.token) + this.start(response.token) } } - start(webSocketURI, authToken) { - this.transport = new WebSocketTransport(webSocketURI, authToken, this) + start(authToken) { + this.transport = new WebSocketTransport(this._websocketURI, authToken, this) } stop() { - if (this.transport) this.transport.close() + if (this.transport){ + this.transport.close() + } this.transport = undefined } + onConnection() { + this._reconnectTimeout = null; + this._reconnectDelay = RECONNECT_DELAY; + } + + reconnect() { + console.log("reconnecting in ", this._reconnectDelay, " ms") + this._reconnectTimeout = setTimeout(() => { + if (this._reconnectDelay < MAX_RECONNECT_DELAY) { + this._reconnectDelay = this._reconnectDelay * RECONNECT_DELAY_FACTOR + } + this.authenticate() + }, this._reconnectDelay); + } + upsert(subject, metadata, value) { this._send({ verb: 'upsert', subject, metadata, value }) } @@ -448,3 +484,4 @@ export class Operations { function debug(...args) { console.debug('SYNC ⇆', ...args) } + diff --git a/umap/static/umap/js/modules/sync/websocket.js b/umap/static/umap/js/modules/sync/websocket.js index dc0599dd..97a34afb 100644 --- a/umap/static/umap/js/modules/sync/websocket.js +++ b/umap/static/umap/js/modules/sync/websocket.js @@ -1,15 +1,47 @@ +const PONG_TIMEOUT = 5000; +const PING_INTERVAL = 30000; + export class WebSocketTransport { constructor(webSocketURI, authToken, messagesReceiver) { + this.receiver = messagesReceiver + this.websocket = new WebSocket(webSocketURI) + this.websocket.onopen = () => { this.send('JoinRequest', { token: authToken }) + this.receiver.onConnection() } this.websocket.addEventListener('message', this.onMessage.bind(this)) - this.receiver = messagesReceiver + this.websocket.onclose = () => { + console.log("websocket closed") + this.receiver.reconnect() + } + + // To ensure the connection is still alive, we send ping and expect pong back. + // Websocket provides a `ping` method to keep the connection alive, but it's + // unfortunately not possible to access it from the WebSocket object. + // See https://making.close.com/posts/reliable-websockets/ for more details. + this.pingInterval = setInterval(() => { + if (this.websocket.readyState === WebSocket.OPEN) { + this.websocket.send('ping'); + this.pongReceived = false; + setTimeout(() => { + if (!this.pongReceived) { + console.warn('No pong received, reconnecting...'); + this.websocket.close() + clearInterval(this.pingInterval) + } + }, PONG_TIMEOUT); + } + }, PING_INTERVAL); } onMessage(wsMessage) { - this.receiver.receive(JSON.parse(wsMessage.data)) + if (wsMessage.data === 'pong') { + this.pongReceived = true; + } else { + this.receiver.receive(JSON.parse(wsMessage.data)) + } } send(kind, payload) { @@ -18,8 +50,4 @@ export class WebSocketTransport { const encoded = JSON.stringify(message) this.websocket.send(encoded) } - - close() { - this.websocket.close() - } } diff --git a/umap/static/umap/js/umap.js b/umap/static/umap/js/umap.js index 15b928e9..ccd94e15 100644 --- a/umap/static/umap/js/umap.js +++ b/umap/static/umap/js/umap.js @@ -30,8 +30,6 @@ U.Map = L.Map.extend({ includes: [ControlsMixin], initialize: async function (el, geojson) { - this.sync_engine = new U.SyncEngine(this) - this.sync = this.sync_engine.proxy(this) // Locale name (pt_PT, en_US…) // To be used for Django localization if (geojson.properties.locale) L.setLocale(geojson.properties.locale) @@ -70,6 +68,9 @@ U.Map = L.Map.extend({ this.server = new U.ServerRequest() this.request = new U.Request() + this.sync_engine = new U.SyncEngine(this, this.urls, this.server) + this.sync = this.sync_engine.proxy(this) + this.initLoader() this.name = this.options.name this.description = this.options.description @@ -209,10 +210,7 @@ U.Map = L.Map.extend({ if (this.options.syncEnabled !== true) { this.sync.stop() } else { - const ws_token_uri = this.urls.get('map_websocket_auth_token', { - map_id: this.options.umap_id, - }) - await this.sync.authenticate(ws_token_uri, this.options.websocketURI, this.server) + await this.sync.authenticate() } }, diff --git a/umap/websocket_server.py b/umap/websocket_server.py index a493ebbe..fb024c53 100644 --- a/umap/websocket_server.py +++ b/umap/websocket_server.py @@ -126,6 +126,10 @@ async def join_and_listen( try: async for raw_message in websocket: + if raw_message == "ping": + await websocket.send("pong") + continue + # recompute the peers list at the time of message-sending. # as doing so beforehand would miss new connections other_peers = connections.get_other_peers(websocket)