diff --git a/umap/static/umap/css/bar.css b/umap/static/umap/css/bar.css index eb22f16e..510ab3c4 100644 --- a/umap/static/umap/css/bar.css +++ b/umap/static/umap/css/bar.css @@ -32,6 +32,10 @@ background-color: var(--color-lightCyan); color: var(--color-dark); } +.dark .off.connected-peers { + background-color: var(--color-lightGray); + color: var(--color-darkGray); +} .leaflet-container .edit-cancel, .leaflet-container .edit-disable, diff --git a/umap/static/umap/js/modules/data/features.js b/umap/static/umap/js/modules/data/features.js index ac26dde6..0035f530 100644 --- a/umap/static/umap/js/modules/data/features.js +++ b/umap/static/umap/js/modules/data/features.js @@ -20,7 +20,7 @@ import loadPopup from '../rendering/popup.js' class Feature { constructor(umap, datalayer, geojson = {}, id = null) { this._umap = umap - this.sync = umap.sync_engine.proxy(this) + this.sync = umap.syncEngine.proxy(this) this._marked_for_deletion = false this._isDirty = false this._ui = null diff --git a/umap/static/umap/js/modules/data/layer.js b/umap/static/umap/js/modules/data/layer.js index 734aaca1..ce1f83a2 100644 --- a/umap/static/umap/js/modules/data/layer.js +++ b/umap/static/umap/js/modules/data/layer.js @@ -41,7 +41,7 @@ export class DataLayer extends ServerStored { constructor(umap, leafletMap, data = {}) { super() this._umap = umap - this.sync = umap.sync_engine.proxy(this) + this.sync = umap.syncEngine.proxy(this) this._index = Array() this._features = {} this._geojson = null diff --git a/umap/static/umap/js/modules/sync/engine.js b/umap/static/umap/js/modules/sync/engine.js index caa36058..9ffd3dcf 100644 --- a/umap/static/umap/js/modules/sync/engine.js +++ b/umap/static/umap/js/modules/sync/engine.js @@ -3,6 +3,12 @@ import { HybridLogicalClock } from './hlc.js' import { DataLayerUpdater, FeatureUpdater, MapUpdater } from './updaters.js' import { WebSocketTransport } from './websocket.js' +// Start reconnecting after 2 seconds, then double the delay each time +// maxing out at 32 seconds. +const RECONNECT_DELAY = 2000 +const RECONNECT_DELAY_FACTOR = 2 +const MAX_RECONNECT_DELAY = 32000 + /** * The syncEngine exposes an API to sync messages between peers over the network. * @@ -42,32 +48,65 @@ import { WebSocketTransport } from './websocket.js' * ``` */ export class SyncEngine { - constructor(map) { + constructor(umap) { + this._umap = umap this.updaters = { - map: new MapUpdater(map), - feature: new FeatureUpdater(map), - datalayer: new DataLayerUpdater(map), + map: new MapUpdater(umap), + feature: new FeatureUpdater(umap), + datalayer: new DataLayerUpdater(umap), } this.transport = undefined this._operations = new Operations() + + this._reconnectTimeout = null + this._reconnectDelay = RECONNECT_DELAY + this.websocketConnected = false } - async authenticate(tokenURI, webSocketURI, server) { - const [response, _, error] = await server.get(tokenURI) + async authenticate() { + const websocketTokenURI = this._umap.urls.get('map_websocket_auth_token', { + map_id: this._umap.id, + }) + + const [response, _, error] = await this._umap.server.get(websocketTokenURI) if (!error) { - this.start(webSocketURI, response.token) + this.start(response.token) } } - start(webSocketURI, authToken) { - this.transport = new WebSocketTransport(webSocketURI, authToken, this) + start(authToken) { + this.transport = new WebSocketTransport( + this._umap.properties.websocketURI, + authToken, + this + ) } stop() { - if (this.transport) this.transport.close() + if (this.transport) { + this.transport.close() + } this.transport = undefined } + onConnection() { + this._reconnectTimeout = null + this._reconnectDelay = RECONNECT_DELAY + this.websocketConnected = true + this.updaters.map.update({ key: 'numberOfConnectedPeers' }) + } + + reconnect() { + this.websocketConnected = false + this.updaters.map.update({ key: 'numberOfConnectedPeers' }) + + this._reconnectTimeout = setTimeout(() => { + if (this._reconnectDelay < MAX_RECONNECT_DELAY) { + this._reconnectDelay = this._reconnectDelay * RECONNECT_DELAY_FACTOR + } + this.authenticate() + }, this._reconnectDelay) + } upsert(subject, metadata, value) { this._send({ verb: 'upsert', subject, metadata, value }) } diff --git a/umap/static/umap/js/modules/sync/websocket.js b/umap/static/umap/js/modules/sync/websocket.js index dc0599dd..ce346ad7 100644 --- a/umap/static/umap/js/modules/sync/websocket.js +++ b/umap/static/umap/js/modules/sync/websocket.js @@ -1,15 +1,59 @@ +const PONG_TIMEOUT = 5000 +const PING_INTERVAL = 30000 +const FIRST_CONNECTION_TIMEOUT = 2000 + export class WebSocketTransport { constructor(webSocketURI, authToken, messagesReceiver) { + this.receiver = messagesReceiver + this.closeRequested = false + this.websocket = new WebSocket(webSocketURI) + this.websocket.onopen = () => { this.send('JoinRequest', { token: authToken }) + this.receiver.onConnection() } this.websocket.addEventListener('message', this.onMessage.bind(this)) - this.receiver = messagesReceiver + this.websocket.onclose = () => { + console.log('websocket closed') + if (!this.closeRequested) { + console.log('Not requested, reconnecting...') + this.receiver.reconnect() + } + } + + this.ensureOpen = setInterval(() => { + if (this.websocket.readyState !== WebSocket.OPEN) { + this.websocket.close() + clearInterval(this.ensureOpen) + } + }, FIRST_CONNECTION_TIMEOUT) + + // To ensure the connection is still alive, we send ping and expect pong back. + // Websocket provides a `ping` method to keep the connection alive, but it's + // unfortunately not possible to access it from the WebSocket object. + // See https://making.close.com/posts/reliable-websockets/ for more details. + this.pingInterval = setInterval(() => { + if (this.websocket.readyState === WebSocket.OPEN) { + this.websocket.send('ping') + this.pongReceived = false + setTimeout(() => { + if (!this.pongReceived) { + console.warn('No pong received, reconnecting...') + this.websocket.close() + clearInterval(this.pingInterval) + } + }, PONG_TIMEOUT) + } + }, PING_INTERVAL) } onMessage(wsMessage) { - this.receiver.receive(JSON.parse(wsMessage.data)) + if (wsMessage.data === 'pong') { + this.pongReceived = true + } else { + this.receiver.receive(JSON.parse(wsMessage.data)) + } } send(kind, payload) { @@ -20,6 +64,7 @@ export class WebSocketTransport { } close() { + this.closeRequested = true this.websocket.close() } } diff --git a/umap/static/umap/js/modules/umap.js b/umap/static/umap/js/modules/umap.js index 258b45db..3c163741 100644 --- a/umap/static/umap/js/modules/umap.js +++ b/umap/static/umap/js/modules/umap.js @@ -61,8 +61,6 @@ export default class Umap extends ServerStored { ) this.searchParams = new URLSearchParams(window.location.search) - this.sync_engine = new SyncEngine(this) - this.sync = this.sync_engine.proxy(this) // Locale name (pt_PT, en_US…) // To be used for Django localization if (geojson.properties.locale) setLocale(geojson.properties.locale) @@ -124,6 +122,9 @@ export default class Umap extends ServerStored { this.share = new Share(this) this.rules = new Rules(this) + this.syncEngine = new SyncEngine(this) + this.sync = this.syncEngine.proxy(this) + if (this.hasEditMode()) { this.editPanel = new EditPanel(this, this._leafletMap) this.fullPanel = new FullPanel(this, this._leafletMap) @@ -1257,18 +1258,13 @@ export default class Umap extends ServerStored { } async initSyncEngine() { + // this.properties.websocketEnabled is set by the server admin if (this.properties.websocketEnabled === false) return + // this.properties.syncEnabled is set by the user in the map settings if (this.properties.syncEnabled !== true) { this.sync.stop() } else { - const ws_token_uri = this.urls.get('map_websocket_auth_token', { - map_id: this.id, - }) - await this.sync.authenticate( - ws_token_uri, - this.properties.websocketURI, - this.server - ) + await this.sync.authenticate() } } @@ -1343,7 +1339,12 @@ export default class Umap extends ServerStored { }, numberOfConnectedPeers: () => { Utils.eachElement('.connected-peers span', (el) => { - el.textContent = this.sync.getNumberOfConnectedPeers() + if (this.sync.websocketConnected) { + el.textContent = this.sync.getNumberOfConnectedPeers() + } else { + el.textContent = translate('Disconnected') + } + el.parentElement.classList.toggle('off', !this.sync.websocketConnected) }) }, } diff --git a/umap/static/umap/unittests/sync.js b/umap/static/umap/unittests/sync.js index 36d22b10..c4e34b4a 100644 --- a/umap/static/umap/unittests/sync.js +++ b/umap/static/umap/unittests/sync.js @@ -8,8 +8,11 @@ import { MapUpdater } from '../js/modules/sync/updaters.js' import { SyncEngine, Operations } from '../js/modules/sync/engine.js' describe('SyncEngine', () => { + const websocketTokenURI = 'http://localhost:8000/api/v1/maps/1/websocket_auth_token/' + const websocketURI = 'ws://localhost:8000/ws/maps/1/' + it('should initialize methods even before start', () => { - const engine = new SyncEngine({}) + const engine = new SyncEngine({}, websocketTokenURI, websocketURI) engine.upsert() engine.update() engine.delete() diff --git a/umap/websocket_server.py b/umap/websocket_server.py index a493ebbe..6483d648 100644 --- a/umap/websocket_server.py +++ b/umap/websocket_server.py @@ -126,6 +126,10 @@ async def join_and_listen( try: async for raw_message in websocket: + if raw_message == "ping": + await websocket.send("pong") + continue + # recompute the peers list at the time of message-sending. # as doing so beforehand would miss new connections other_peers = connections.get_other_peers(websocket) @@ -192,4 +196,7 @@ def run(host: str, port: int): logging.debug(f"Waiting for connections on {host}:{port}") await asyncio.Future() # run forever - asyncio.run(_serve()) + try: + asyncio.run(_serve()) + except KeyboardInterrupt: + print("Closing WebSocket server")