mirror of
https://github.com/umap-project/umap.git
synced 2025-04-28 19:42:36 +02:00
feat: reconnect websocket on disconnection
This is a port of this PR: #2235 (But it was easier to copy-paste than rebase, given the split of umap.js and co.) Co-authored-by: Alexis Métaireau <alexis@notmyidea.org> Co-authored-by: David Larlet <david@larlet.fr>
This commit is contained in:
parent
24511d796d
commit
56f2d3d2f9
8 changed files with 126 additions and 27 deletions
|
@ -32,6 +32,10 @@
|
|||
background-color: var(--color-lightCyan);
|
||||
color: var(--color-dark);
|
||||
}
|
||||
.dark .off.connected-peers {
|
||||
background-color: var(--color-lightGray);
|
||||
color: var(--color-darkGray);
|
||||
}
|
||||
|
||||
.leaflet-container .edit-cancel,
|
||||
.leaflet-container .edit-disable,
|
||||
|
|
|
@ -20,7 +20,7 @@ import loadPopup from '../rendering/popup.js'
|
|||
class Feature {
|
||||
constructor(umap, datalayer, geojson = {}, id = null) {
|
||||
this._umap = umap
|
||||
this.sync = umap.sync_engine.proxy(this)
|
||||
this.sync = umap.syncEngine.proxy(this)
|
||||
this._marked_for_deletion = false
|
||||
this._isDirty = false
|
||||
this._ui = null
|
||||
|
|
|
@ -41,7 +41,7 @@ export class DataLayer extends ServerStored {
|
|||
constructor(umap, leafletMap, data = {}) {
|
||||
super()
|
||||
this._umap = umap
|
||||
this.sync = umap.sync_engine.proxy(this)
|
||||
this.sync = umap.syncEngine.proxy(this)
|
||||
this._index = Array()
|
||||
this._features = {}
|
||||
this._geojson = null
|
||||
|
|
|
@ -3,6 +3,12 @@ import { HybridLogicalClock } from './hlc.js'
|
|||
import { DataLayerUpdater, FeatureUpdater, MapUpdater } from './updaters.js'
|
||||
import { WebSocketTransport } from './websocket.js'
|
||||
|
||||
// Start reconnecting after 2 seconds, then double the delay each time
|
||||
// maxing out at 32 seconds.
|
||||
const RECONNECT_DELAY = 2000
|
||||
const RECONNECT_DELAY_FACTOR = 2
|
||||
const MAX_RECONNECT_DELAY = 32000
|
||||
|
||||
/**
|
||||
* The syncEngine exposes an API to sync messages between peers over the network.
|
||||
*
|
||||
|
@ -42,32 +48,65 @@ import { WebSocketTransport } from './websocket.js'
|
|||
* ```
|
||||
*/
|
||||
export class SyncEngine {
|
||||
constructor(map) {
|
||||
constructor(umap) {
|
||||
this._umap = umap
|
||||
this.updaters = {
|
||||
map: new MapUpdater(map),
|
||||
feature: new FeatureUpdater(map),
|
||||
datalayer: new DataLayerUpdater(map),
|
||||
map: new MapUpdater(umap),
|
||||
feature: new FeatureUpdater(umap),
|
||||
datalayer: new DataLayerUpdater(umap),
|
||||
}
|
||||
this.transport = undefined
|
||||
this._operations = new Operations()
|
||||
|
||||
this._reconnectTimeout = null
|
||||
this._reconnectDelay = RECONNECT_DELAY
|
||||
this.websocketConnected = false
|
||||
}
|
||||
|
||||
async authenticate(tokenURI, webSocketURI, server) {
|
||||
const [response, _, error] = await server.get(tokenURI)
|
||||
async authenticate() {
|
||||
const websocketTokenURI = this._umap.urls.get('map_websocket_auth_token', {
|
||||
map_id: this._umap.id,
|
||||
})
|
||||
|
||||
const [response, _, error] = await this._umap.server.get(websocketTokenURI)
|
||||
if (!error) {
|
||||
this.start(webSocketURI, response.token)
|
||||
this.start(response.token)
|
||||
}
|
||||
}
|
||||
|
||||
start(webSocketURI, authToken) {
|
||||
this.transport = new WebSocketTransport(webSocketURI, authToken, this)
|
||||
start(authToken) {
|
||||
this.transport = new WebSocketTransport(
|
||||
this._umap.properties.websocketURI,
|
||||
authToken,
|
||||
this
|
||||
)
|
||||
}
|
||||
|
||||
stop() {
|
||||
if (this.transport) this.transport.close()
|
||||
if (this.transport) {
|
||||
this.transport.close()
|
||||
}
|
||||
this.transport = undefined
|
||||
}
|
||||
|
||||
onConnection() {
|
||||
this._reconnectTimeout = null
|
||||
this._reconnectDelay = RECONNECT_DELAY
|
||||
this.websocketConnected = true
|
||||
this.updaters.map.update({ key: 'numberOfConnectedPeers' })
|
||||
}
|
||||
|
||||
reconnect() {
|
||||
this.websocketConnected = false
|
||||
this.updaters.map.update({ key: 'numberOfConnectedPeers' })
|
||||
|
||||
this._reconnectTimeout = setTimeout(() => {
|
||||
if (this._reconnectDelay < MAX_RECONNECT_DELAY) {
|
||||
this._reconnectDelay = this._reconnectDelay * RECONNECT_DELAY_FACTOR
|
||||
}
|
||||
this.authenticate()
|
||||
}, this._reconnectDelay)
|
||||
}
|
||||
upsert(subject, metadata, value) {
|
||||
this._send({ verb: 'upsert', subject, metadata, value })
|
||||
}
|
||||
|
|
|
@ -1,15 +1,59 @@
|
|||
const PONG_TIMEOUT = 5000
|
||||
const PING_INTERVAL = 30000
|
||||
const FIRST_CONNECTION_TIMEOUT = 2000
|
||||
|
||||
export class WebSocketTransport {
|
||||
constructor(webSocketURI, authToken, messagesReceiver) {
|
||||
this.receiver = messagesReceiver
|
||||
this.closeRequested = false
|
||||
|
||||
this.websocket = new WebSocket(webSocketURI)
|
||||
|
||||
this.websocket.onopen = () => {
|
||||
this.send('JoinRequest', { token: authToken })
|
||||
this.receiver.onConnection()
|
||||
}
|
||||
this.websocket.addEventListener('message', this.onMessage.bind(this))
|
||||
this.receiver = messagesReceiver
|
||||
this.websocket.onclose = () => {
|
||||
console.log('websocket closed')
|
||||
if (!this.closeRequested) {
|
||||
console.log('Not requested, reconnecting...')
|
||||
this.receiver.reconnect()
|
||||
}
|
||||
}
|
||||
|
||||
this.ensureOpen = setInterval(() => {
|
||||
if (this.websocket.readyState !== WebSocket.OPEN) {
|
||||
this.websocket.close()
|
||||
clearInterval(this.ensureOpen)
|
||||
}
|
||||
}, FIRST_CONNECTION_TIMEOUT)
|
||||
|
||||
// To ensure the connection is still alive, we send ping and expect pong back.
|
||||
// Websocket provides a `ping` method to keep the connection alive, but it's
|
||||
// unfortunately not possible to access it from the WebSocket object.
|
||||
// See https://making.close.com/posts/reliable-websockets/ for more details.
|
||||
this.pingInterval = setInterval(() => {
|
||||
if (this.websocket.readyState === WebSocket.OPEN) {
|
||||
this.websocket.send('ping')
|
||||
this.pongReceived = false
|
||||
setTimeout(() => {
|
||||
if (!this.pongReceived) {
|
||||
console.warn('No pong received, reconnecting...')
|
||||
this.websocket.close()
|
||||
clearInterval(this.pingInterval)
|
||||
}
|
||||
}, PONG_TIMEOUT)
|
||||
}
|
||||
}, PING_INTERVAL)
|
||||
}
|
||||
|
||||
onMessage(wsMessage) {
|
||||
this.receiver.receive(JSON.parse(wsMessage.data))
|
||||
if (wsMessage.data === 'pong') {
|
||||
this.pongReceived = true
|
||||
} else {
|
||||
this.receiver.receive(JSON.parse(wsMessage.data))
|
||||
}
|
||||
}
|
||||
|
||||
send(kind, payload) {
|
||||
|
@ -20,6 +64,7 @@ export class WebSocketTransport {
|
|||
}
|
||||
|
||||
close() {
|
||||
this.closeRequested = true
|
||||
this.websocket.close()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -61,8 +61,6 @@ export default class Umap extends ServerStored {
|
|||
)
|
||||
this.searchParams = new URLSearchParams(window.location.search)
|
||||
|
||||
this.sync_engine = new SyncEngine(this)
|
||||
this.sync = this.sync_engine.proxy(this)
|
||||
// Locale name (pt_PT, en_US…)
|
||||
// To be used for Django localization
|
||||
if (geojson.properties.locale) setLocale(geojson.properties.locale)
|
||||
|
@ -124,6 +122,9 @@ export default class Umap extends ServerStored {
|
|||
this.share = new Share(this)
|
||||
this.rules = new Rules(this)
|
||||
|
||||
this.syncEngine = new SyncEngine(this)
|
||||
this.sync = this.syncEngine.proxy(this)
|
||||
|
||||
if (this.hasEditMode()) {
|
||||
this.editPanel = new EditPanel(this, this._leafletMap)
|
||||
this.fullPanel = new FullPanel(this, this._leafletMap)
|
||||
|
@ -1257,18 +1258,13 @@ export default class Umap extends ServerStored {
|
|||
}
|
||||
|
||||
async initSyncEngine() {
|
||||
// this.properties.websocketEnabled is set by the server admin
|
||||
if (this.properties.websocketEnabled === false) return
|
||||
// this.properties.syncEnabled is set by the user in the map settings
|
||||
if (this.properties.syncEnabled !== true) {
|
||||
this.sync.stop()
|
||||
} else {
|
||||
const ws_token_uri = this.urls.get('map_websocket_auth_token', {
|
||||
map_id: this.id,
|
||||
})
|
||||
await this.sync.authenticate(
|
||||
ws_token_uri,
|
||||
this.properties.websocketURI,
|
||||
this.server
|
||||
)
|
||||
await this.sync.authenticate()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1343,7 +1339,12 @@ export default class Umap extends ServerStored {
|
|||
},
|
||||
numberOfConnectedPeers: () => {
|
||||
Utils.eachElement('.connected-peers span', (el) => {
|
||||
el.textContent = this.sync.getNumberOfConnectedPeers()
|
||||
if (this.sync.websocketConnected) {
|
||||
el.textContent = this.sync.getNumberOfConnectedPeers()
|
||||
} else {
|
||||
el.textContent = translate('Disconnected')
|
||||
}
|
||||
el.parentElement.classList.toggle('off', !this.sync.websocketConnected)
|
||||
})
|
||||
},
|
||||
}
|
||||
|
|
|
@ -8,8 +8,11 @@ import { MapUpdater } from '../js/modules/sync/updaters.js'
|
|||
import { SyncEngine, Operations } from '../js/modules/sync/engine.js'
|
||||
|
||||
describe('SyncEngine', () => {
|
||||
const websocketTokenURI = 'http://localhost:8000/api/v1/maps/1/websocket_auth_token/'
|
||||
const websocketURI = 'ws://localhost:8000/ws/maps/1/'
|
||||
|
||||
it('should initialize methods even before start', () => {
|
||||
const engine = new SyncEngine({})
|
||||
const engine = new SyncEngine({}, websocketTokenURI, websocketURI)
|
||||
engine.upsert()
|
||||
engine.update()
|
||||
engine.delete()
|
||||
|
|
|
@ -126,6 +126,10 @@ async def join_and_listen(
|
|||
|
||||
try:
|
||||
async for raw_message in websocket:
|
||||
if raw_message == "ping":
|
||||
await websocket.send("pong")
|
||||
continue
|
||||
|
||||
# recompute the peers list at the time of message-sending.
|
||||
# as doing so beforehand would miss new connections
|
||||
other_peers = connections.get_other_peers(websocket)
|
||||
|
@ -192,4 +196,7 @@ def run(host: str, port: int):
|
|||
logging.debug(f"Waiting for connections on {host}:{port}")
|
||||
await asyncio.Future() # run forever
|
||||
|
||||
asyncio.run(_serve())
|
||||
try:
|
||||
asyncio.run(_serve())
|
||||
except KeyboardInterrupt:
|
||||
print("Closing WebSocket server")
|
||||
|
|
Loading…
Reference in a new issue