feat(sync): Reconnect the websocket on failures.

When the websocket is disconnected, try to redo an authentication
roundtrip. This commit does the following changes:

- Change the way the SyncEngine is instanciated, passing it a server
  object and the urls.
- Add a ping/pong mechanism. This is required because otherwise we have
  no certainty that the connection is still alive.
- Try to reconnect when the connection didn't work out, increasing the
  wait time a bit more each time.
This commit is contained in:
Alexis Métaireau 2024-10-24 09:48:56 +02:00
parent bcea419bfd
commit 6643fe0a2b
No known key found for this signature in database
GPG key ID: 1C21B876828E5FF2
4 changed files with 86 additions and 19 deletions

View file

@ -3,6 +3,12 @@ import { HybridLogicalClock } from './hlc.js'
import { DataLayerUpdater, FeatureUpdater, MapUpdater } from './updaters.js'
import { WebSocketTransport } from './websocket.js'
// Start reconnecting after 2 seconds, then double the delay each time
// maxing out at 32 seconds.
const RECONNECT_DELAY = 2000;
const RECONNECT_DELAY_FACTOR = 2;
const MAX_RECONNECT_DELAY = 32000;
/**
* The syncEngine exposes an API to sync messages between peers over the network.
*
@ -42,7 +48,7 @@ import { WebSocketTransport } from './websocket.js'
* ```
*/
export class SyncEngine {
constructor(map) {
constructor(map, urls, server) {
this.updaters = {
map: new MapUpdater(map),
feature: new FeatureUpdater(map),
@ -50,24 +56,54 @@ export class SyncEngine {
}
this.transport = undefined
this._operations = new Operations()
this._server = server
// Store URIs to avoid persisting the map
// mainly to ensure separation of concerns.
this._websocketTokenURI = urls.get('map_websocket_auth_token', {
map_id: map.options.umap_id,
})
this._websocketURI = map.options.websocketURI
this._reconnectTimeout = null;
this._reconnectDelay = RECONNECT_DELAY;
}
async authenticate(tokenURI, webSocketURI, server) {
const [response, _, error] = await server.get(tokenURI)
/**
* Authenticate with the server and start the transport layer.
*/
async authenticate() {
const [response, _, error] = await this._server.get(this._websocketTokenURI)
if (!error) {
this.start(webSocketURI, response.token)
this.start(response.token)
}
}
start(webSocketURI, authToken) {
this.transport = new WebSocketTransport(webSocketURI, authToken, this)
start(authToken) {
this.transport = new WebSocketTransport(this._websocketURI, authToken, this)
}
stop() {
if (this.transport) this.transport.close()
if (this.transport){
this.transport.close()
}
this.transport = undefined
}
onConnection() {
this._reconnectTimeout = null;
this._reconnectDelay = RECONNECT_DELAY;
}
reconnect() {
console.log("reconnecting in ", this._reconnectDelay, " ms")
this._reconnectTimeout = setTimeout(() => {
if (this._reconnectDelay < MAX_RECONNECT_DELAY) {
this._reconnectDelay = this._reconnectDelay * RECONNECT_DELAY_FACTOR
}
this.authenticate()
}, this._reconnectDelay);
}
upsert(subject, metadata, value) {
this._send({ verb: 'upsert', subject, metadata, value })
}
@ -448,3 +484,4 @@ export class Operations {
function debug(...args) {
console.debug('SYNC ⇆', ...args)
}

View file

@ -1,16 +1,48 @@
const PONG_TIMEOUT = 5000;
const PING_INTERVAL = 30000;
export class WebSocketTransport {
constructor(webSocketURI, authToken, messagesReceiver) {
this.receiver = messagesReceiver
this.websocket = new WebSocket(webSocketURI)
this.websocket.onopen = () => {
this.send('JoinRequest', { token: authToken })
this.receiver.onConnection()
}
this.websocket.addEventListener('message', this.onMessage.bind(this))
this.receiver = messagesReceiver
this.websocket.onclose = () => {
console.log("websocket closed")
this.receiver.reconnect()
}
// To ensure the connection is still alive, we send ping and expect pong back.
// Websocket provides a `ping` method to keep the connection alive, but it's
// unfortunately not possible to access it from the WebSocket object.
// See https://making.close.com/posts/reliable-websockets/ for more details.
this.pingInterval = setInterval(() => {
if (this.websocket.readyState === WebSocket.OPEN) {
this.websocket.send('ping');
this.pongReceived = false;
setTimeout(() => {
if (!this.pongReceived) {
console.warn('No pong received, reconnecting...');
this.websocket.close()
clearInterval(this.pingInterval)
}
}, PONG_TIMEOUT);
}
}, PING_INTERVAL);
}
onMessage(wsMessage) {
if (wsMessage.data === 'pong') {
this.pongReceived = true;
} else {
this.receiver.receive(JSON.parse(wsMessage.data))
}
}
send(kind, payload) {
const message = { ...payload }
@ -18,8 +50,4 @@ export class WebSocketTransport {
const encoded = JSON.stringify(message)
this.websocket.send(encoded)
}
close() {
this.websocket.close()
}
}

View file

@ -30,8 +30,6 @@ U.Map = L.Map.extend({
includes: [ControlsMixin],
initialize: async function (el, geojson) {
this.sync_engine = new U.SyncEngine(this)
this.sync = this.sync_engine.proxy(this)
// Locale name (pt_PT, en_US…)
// To be used for Django localization
if (geojson.properties.locale) L.setLocale(geojson.properties.locale)
@ -70,6 +68,9 @@ U.Map = L.Map.extend({
this.server = new U.ServerRequest()
this.request = new U.Request()
this.sync_engine = new U.SyncEngine(this, this.urls, this.server)
this.sync = this.sync_engine.proxy(this)
this.initLoader()
this.name = this.options.name
this.description = this.options.description
@ -209,10 +210,7 @@ U.Map = L.Map.extend({
if (this.options.syncEnabled !== true) {
this.sync.stop()
} else {
const ws_token_uri = this.urls.get('map_websocket_auth_token', {
map_id: this.options.umap_id,
})
await this.sync.authenticate(ws_token_uri, this.options.websocketURI, this.server)
await this.sync.authenticate()
}
},

View file

@ -126,6 +126,10 @@ async def join_and_listen(
try:
async for raw_message in websocket:
if raw_message == "ping":
await websocket.send("pong")
continue
# recompute the peers list at the time of message-sending.
# as doing so beforehand would miss new connections
other_peers = connections.get_other_peers(websocket)