feat: reconnect websocket on disconnection (#2389)

This is a port of this PR: #2235

(But it was easier to copy-paste than rebase, given the split of umap.js
and co.)
This commit is contained in:
Yohan Boniface 2024-12-19 17:38:02 +01:00 committed by GitHub
commit 650110fe8a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 126 additions and 27 deletions

View file

@ -32,6 +32,10 @@
background-color: var(--color-lightCyan);
color: var(--color-dark);
}
.dark .off.connected-peers {
background-color: var(--color-lightGray);
color: var(--color-darkGray);
}
.leaflet-container .edit-cancel,
.leaflet-container .edit-disable,

View file

@ -20,7 +20,7 @@ import loadPopup from '../rendering/popup.js'
class Feature {
constructor(umap, datalayer, geojson = {}, id = null) {
this._umap = umap
this.sync = umap.sync_engine.proxy(this)
this.sync = umap.syncEngine.proxy(this)
this._marked_for_deletion = false
this._isDirty = false
this._ui = null

View file

@ -41,7 +41,7 @@ export class DataLayer extends ServerStored {
constructor(umap, leafletMap, data = {}) {
super()
this._umap = umap
this.sync = umap.sync_engine.proxy(this)
this.sync = umap.syncEngine.proxy(this)
this._index = Array()
this._features = {}
this._geojson = null

View file

@ -3,6 +3,12 @@ import { HybridLogicalClock } from './hlc.js'
import { DataLayerUpdater, FeatureUpdater, MapUpdater } from './updaters.js'
import { WebSocketTransport } from './websocket.js'
// Start reconnecting after 2 seconds, then double the delay each time
// maxing out at 32 seconds.
const RECONNECT_DELAY = 2000
const RECONNECT_DELAY_FACTOR = 2
const MAX_RECONNECT_DELAY = 32000
/**
* The syncEngine exposes an API to sync messages between peers over the network.
*
@ -42,32 +48,65 @@ import { WebSocketTransport } from './websocket.js'
* ```
*/
export class SyncEngine {
constructor(map) {
constructor(umap) {
this._umap = umap
this.updaters = {
map: new MapUpdater(map),
feature: new FeatureUpdater(map),
datalayer: new DataLayerUpdater(map),
map: new MapUpdater(umap),
feature: new FeatureUpdater(umap),
datalayer: new DataLayerUpdater(umap),
}
this.transport = undefined
this._operations = new Operations()
this._reconnectTimeout = null
this._reconnectDelay = RECONNECT_DELAY
this.websocketConnected = false
}
async authenticate(tokenURI, webSocketURI, server) {
const [response, _, error] = await server.get(tokenURI)
async authenticate() {
const websocketTokenURI = this._umap.urls.get('map_websocket_auth_token', {
map_id: this._umap.id,
})
const [response, _, error] = await this._umap.server.get(websocketTokenURI)
if (!error) {
this.start(webSocketURI, response.token)
this.start(response.token)
}
}
start(webSocketURI, authToken) {
this.transport = new WebSocketTransport(webSocketURI, authToken, this)
start(authToken) {
this.transport = new WebSocketTransport(
this._umap.properties.websocketURI,
authToken,
this
)
}
stop() {
if (this.transport) this.transport.close()
if (this.transport) {
this.transport.close()
}
this.transport = undefined
}
onConnection() {
this._reconnectTimeout = null
this._reconnectDelay = RECONNECT_DELAY
this.websocketConnected = true
this.updaters.map.update({ key: 'numberOfConnectedPeers' })
}
reconnect() {
this.websocketConnected = false
this.updaters.map.update({ key: 'numberOfConnectedPeers' })
this._reconnectTimeout = setTimeout(() => {
if (this._reconnectDelay < MAX_RECONNECT_DELAY) {
this._reconnectDelay = this._reconnectDelay * RECONNECT_DELAY_FACTOR
}
this.authenticate()
}, this._reconnectDelay)
}
upsert(subject, metadata, value) {
this._send({ verb: 'upsert', subject, metadata, value })
}

View file

@ -1,15 +1,59 @@
const PONG_TIMEOUT = 5000
const PING_INTERVAL = 30000
const FIRST_CONNECTION_TIMEOUT = 2000
export class WebSocketTransport {
constructor(webSocketURI, authToken, messagesReceiver) {
this.receiver = messagesReceiver
this.closeRequested = false
this.websocket = new WebSocket(webSocketURI)
this.websocket.onopen = () => {
this.send('JoinRequest', { token: authToken })
this.receiver.onConnection()
}
this.websocket.addEventListener('message', this.onMessage.bind(this))
this.receiver = messagesReceiver
this.websocket.onclose = () => {
console.log('websocket closed')
if (!this.closeRequested) {
console.log('Not requested, reconnecting...')
this.receiver.reconnect()
}
}
this.ensureOpen = setInterval(() => {
if (this.websocket.readyState !== WebSocket.OPEN) {
this.websocket.close()
clearInterval(this.ensureOpen)
}
}, FIRST_CONNECTION_TIMEOUT)
// To ensure the connection is still alive, we send ping and expect pong back.
// Websocket provides a `ping` method to keep the connection alive, but it's
// unfortunately not possible to access it from the WebSocket object.
// See https://making.close.com/posts/reliable-websockets/ for more details.
this.pingInterval = setInterval(() => {
if (this.websocket.readyState === WebSocket.OPEN) {
this.websocket.send('ping')
this.pongReceived = false
setTimeout(() => {
if (!this.pongReceived) {
console.warn('No pong received, reconnecting...')
this.websocket.close()
clearInterval(this.pingInterval)
}
}, PONG_TIMEOUT)
}
}, PING_INTERVAL)
}
onMessage(wsMessage) {
this.receiver.receive(JSON.parse(wsMessage.data))
if (wsMessage.data === 'pong') {
this.pongReceived = true
} else {
this.receiver.receive(JSON.parse(wsMessage.data))
}
}
send(kind, payload) {
@ -20,6 +64,7 @@ export class WebSocketTransport {
}
close() {
this.closeRequested = true
this.websocket.close()
}
}

View file

@ -61,8 +61,6 @@ export default class Umap extends ServerStored {
)
this.searchParams = new URLSearchParams(window.location.search)
this.sync_engine = new SyncEngine(this)
this.sync = this.sync_engine.proxy(this)
// Locale name (pt_PT, en_US…)
// To be used for Django localization
if (geojson.properties.locale) setLocale(geojson.properties.locale)
@ -124,6 +122,9 @@ export default class Umap extends ServerStored {
this.share = new Share(this)
this.rules = new Rules(this)
this.syncEngine = new SyncEngine(this)
this.sync = this.syncEngine.proxy(this)
if (this.hasEditMode()) {
this.editPanel = new EditPanel(this, this._leafletMap)
this.fullPanel = new FullPanel(this, this._leafletMap)
@ -1257,18 +1258,13 @@ export default class Umap extends ServerStored {
}
async initSyncEngine() {
// this.properties.websocketEnabled is set by the server admin
if (this.properties.websocketEnabled === false) return
// this.properties.syncEnabled is set by the user in the map settings
if (this.properties.syncEnabled !== true) {
this.sync.stop()
} else {
const ws_token_uri = this.urls.get('map_websocket_auth_token', {
map_id: this.id,
})
await this.sync.authenticate(
ws_token_uri,
this.properties.websocketURI,
this.server
)
await this.sync.authenticate()
}
}
@ -1343,7 +1339,12 @@ export default class Umap extends ServerStored {
},
numberOfConnectedPeers: () => {
Utils.eachElement('.connected-peers span', (el) => {
el.textContent = this.sync.getNumberOfConnectedPeers()
if (this.sync.websocketConnected) {
el.textContent = this.sync.getNumberOfConnectedPeers()
} else {
el.textContent = translate('Disconnected')
}
el.parentElement.classList.toggle('off', !this.sync.websocketConnected)
})
},
'properties.starred': () => {

View file

@ -8,8 +8,11 @@ import { MapUpdater } from '../js/modules/sync/updaters.js'
import { SyncEngine, Operations } from '../js/modules/sync/engine.js'
describe('SyncEngine', () => {
const websocketTokenURI = 'http://localhost:8000/api/v1/maps/1/websocket_auth_token/'
const websocketURI = 'ws://localhost:8000/ws/maps/1/'
it('should initialize methods even before start', () => {
const engine = new SyncEngine({})
const engine = new SyncEngine({}, websocketTokenURI, websocketURI)
engine.upsert()
engine.update()
engine.delete()

View file

@ -126,6 +126,10 @@ async def join_and_listen(
try:
async for raw_message in websocket:
if raw_message == "ping":
await websocket.send("pong")
continue
# recompute the peers list at the time of message-sending.
# as doing so beforehand would miss new connections
other_peers = connections.get_other_peers(websocket)
@ -192,4 +196,7 @@ def run(host: str, port: int):
logging.debug(f"Waiting for connections on {host}:{port}")
await asyncio.Future() # run forever
asyncio.run(_serve())
try:
asyncio.run(_serve())
except KeyboardInterrupt:
print("Closing WebSocket server")