From 5485fb99abed3b907d54440e6e8d0320125a89a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexis=20M=C3=A9taireau?= Date: Mon, 17 Jun 2024 19:00:30 +0200 Subject: [PATCH] feat(sync): Send remote operations to peers when they join. - Operations are now stored locally in memory. All operations are tied to an HLC (Hybrid Logical Clock), making it possible to order them concistently. - Messages are handled in their `on*` methods, leading to a clearer implementation. - When a new peer joins, it asks a random peer for the list of operations, and re-apply them locally. - Messages types names have been updated to follow CamelCase, and to be similar across the client and the server. - Pass `sync=False` to `makeFeature` in the updaters, to avoid generating duplicate operations on message retrieval. --- umap/static/umap/js/modules/sync/engine.js | 397 +++++++++++++++++- umap/static/umap/js/modules/sync/hlc.js | 106 +++++ umap/static/umap/js/modules/sync/updaters.js | 8 +- umap/static/umap/js/modules/sync/websocket.js | 2 +- umap/static/umap/js/modules/utils.js | 6 +- umap/static/umap/js/umap.forms.js | 2 +- umap/static/umap/js/umap.js | 2 +- umap/static/umap/unittests/hlc.js | 158 +++++++ umap/static/umap/unittests/sync.js | 336 ++++++++++++++- umap/static/umap/unittests/utils.js | 23 + umap/tests/integration/test_datalayer.py | 1 + umap/tests/integration/test_websocket_sync.py | 62 +++ umap/tests/test_websocket_server.py | 22 + umap/views.py | 2 +- umap/websocket_server.py | 157 +++++-- 15 files changed, 1210 insertions(+), 74 deletions(-) create mode 100644 umap/static/umap/js/modules/sync/hlc.js create mode 100644 umap/static/umap/unittests/hlc.js create mode 100644 umap/tests/test_websocket_server.py diff --git a/umap/static/umap/js/modules/sync/engine.js b/umap/static/umap/js/modules/sync/engine.js index 2eb97224..6bca71fd 100644 --- a/umap/static/umap/js/modules/sync/engine.js +++ b/umap/static/umap/js/modules/sync/engine.js @@ -1,6 +1,46 @@ import { DataLayerUpdater, FeatureUpdater, MapUpdater } from './updaters.js' import { WebSocketTransport } from './websocket.js' +import { HybridLogicalClock } from './hlc.js' +import * as Utils from '../utils.js' +/** + * The syncEngine exposes an API to sync messages between peers over the network. + * + * It's taking care of initializing the `transport` layer (sending and receiving + * messages over websocket), the `operations` list (to store them locally), + * and the `updaters` to apply messages to the map. + * + * You can use the `update`, `upsert` and `delete` methods. + * + * @example + * + * ``` + * const sync = new SyncEngine(map) + * + * // Get the authentication token from the umap server + * sync.authenticate(tokenURI, webSocketURI, server) + * + * // Alternatively, start the engine manually with + * sync.start(webSocketURI, authToken) + * + * // Then use the `upsert`, `update` and `delete` methods. + * let {metadata, subject} = object.getSyncMetadata() + * sync.upsert(subject, metadata, "value") + * sync.update(subject, metadata, "key", "value") + * sync.delete(subject, metadata, "key") + * ``` + * + * A `proxy()` method is also exposed, making it easier to use without having + * to specify `subject` and `metadata` fields on each call: + * + * @example + * ``` + * // Or using the `proxy()` method: + * let syncProxy = sync.proxy(object) + * syncProxy.upsert("value") + * syncProxy.update("key", "value") + * ``` + */ export class SyncEngine { constructor(map) { this.updaters = { @@ -9,6 +49,7 @@ export class SyncEngine { datalayer: new DataLayerUpdater(map), } this.transport = undefined + this._operations = new Operations() } async authenticate(tokenURI, webSocketURI, server) { @@ -27,29 +68,6 @@ export class SyncEngine { this.transport = undefined } - _getUpdater(subject, metadata) { - if (Object.keys(this.updaters).includes(subject)) { - return this.updaters[subject] - } - throw new Error(`Unknown updater ${subject}, ${metadata}`) - } - - // This method is called by the transport layer on new messages - receive({ kind, ...payload }) { - if (kind === 'operation') { - const updater = this._getUpdater(payload.subject, payload.metadata) - updater.applyMessage(payload) - } else { - throw new Error(`Unknown dispatch kind: ${kind}`) - } - } - - _send(message) { - if (this.transport) { - this.transport.send('operation', message) - } - } - upsert(subject, metadata, value) { this._send({ verb: 'upsert', subject, metadata, value }) } @@ -62,6 +80,182 @@ export class SyncEngine { this._send({ verb: 'delete', subject, metadata, key }) } + _send(inputMessage) { + let message = this._operations.addLocal(inputMessage) + + if (this.offline) return + if (this.transport) { + this.transport.send('OperationMessage', message) + } + } + + _getUpdater(subject, metadata) { + if (Object.keys(this.updaters).includes(subject)) { + return this.updaters[subject] + } + throw new Error(`Unknown updater ${subject}, ${metadata}`) + } + + _applyOperation(operation) { + const updater = this._getUpdater(operation.subject, operation.metadata) + updater.applyMessage(operation) + } + + /** + * This is called by the transport layer on new messages, + * and dispatches the different "on*" methods. + */ + receive({ kind, ...payload }) { + if (kind === 'OperationMessage') { + this.onOperationMessage(payload) + } else if (kind === 'JoinResponse') { + this.onJoinResponse(payload) + } else if (kind === 'ListPeersResponse') { + this.onListPeersResponse(payload) + } else if (kind === 'PeerMessage') { + debug('received peermessage', payload) + if (payload.message.verb === 'ListOperationsRequest') { + this.onListOperationsRequest(payload) + } else if (payload.message.verb === 'ListOperationsResponse') { + this.onListOperationsResponse(payload) + } + } else { + throw new Error(`Received unknown message from the websocket server: ${kind}`) + } + } + + /** + * Received when an operation has been performed by another peer. + * + * Stores the passed operation locally and apply it. + * + * @param {Object} payload + */ + onOperationMessage(payload) { + this._operations.storeRemoteOperations([payload]) + this._applyOperation(payload) + } + + /** + * Received when the server acknowledges the `join` for this peer. + * + * @param {Object} payload + * @param {string} payload.uuid The server-assigned uuid for this peer + * @param {string[]} payload.peers The list of peers uuids + */ + onJoinResponse({ uuid, peers }) { + debug('received join response', { uuid, peers }) + this.uuid = uuid + this.peers = peers + + // Get one peer at random + let randomPeer = this._getRandomPeer() + + if (randomPeer) { + // Retrieve the operations which happened before join. + this.sendToPeer(randomPeer, 'ListOperationsRequest', { + lastKnownHLC: this._operations.getLastKnownHLC(), + }) + } + } + + /** + * Received when the list of peers has changed. + * + * @param {Object} payload + * @param {string[]} payload.peers The list of peers uuids + */ + onListPeersResponse({ peers }) { + debug('received peerinfo', { peers }) + this.peers = peers + } + + /** + * Received when another peer asks for the list of operations. + * + * @param {Object} payload + * @param {string} payload.sender the uuid of the requesting peer + * @param {string} payload.latestKnownHLC the latest known HLC of the requesting peer + */ + onListOperationsRequest({ sender, lastKnownHLC }) { + this.sendToPeer(sender, 'ListOperationsResponse', { + operations: this._operations.getOperationsSince(lastKnownHLC), + }) + } + + /** + * Received when another peer sends the list of operations. + * + * When receiving this message, operations are filtered and applied + * + * @param {*} operations The list of (encoded operations) + */ + onListOperationsResponse({ sender, message }) { + debug(`received operations from peer ${sender}`, message.operations) + + if (message.operations.length === 0) return + + // Get the list of stored operations before this message. + const remoteOperations = Operations.sort(message.operations) + this._operations.storeRemoteOperations(remoteOperations) + + // Sort the local operations only once, see below. + for (const remote of remoteOperations) { + if (this._operations.shouldBypassOperation(remote)) { + debug( + 'Skipping the following operation, because a newer one has been found locally', + remote + ) + } else { + this._applyOperation(remote) + } + } + + // TODO: compact the changes here? + // e.g. we might want to : + // - group cases of multiple updates + // - not apply changes where we have a more recent version (but store them nevertheless) + + // 1. Get the list of fields that changed (in the incoming operations) + // 2. For each field, get the last version + // 3. Check if we should apply the changes. + + // For each operation + // Get the updated key hlc + // If key.local_hlc > key.remote_hlc: drop + // Else: apply + } + + /** + * Send a message to another peer (via the transport layer) + * + * @param {*} recipient + * @param {*} verb + * @param {*} payload + */ + sendToPeer(recipient, verb, payload) { + payload.verb = verb + this.transport.send('PeerMessage', { + sender: this.uuid, + recipient: recipient, + message: payload, + }) + } + + /** + * Selects a peer ID at random within the known ones. + * + * @returns {string|bool} the selected peer uuid, or False if none was found. + */ + _getRandomPeer() { + let otherPeers = this.peers.filter((p) => p !== this.uuid) + if (otherPeers.length > 0) { + const random = Math.floor(Math.random() * otherPeers.length) + return otherPeers[random] + } + return false + } + /** * Create a proxy for this sync engine. * @@ -91,3 +285,160 @@ export class SyncEngine { return new Proxy(this, handler) } } + +/** + * Registry of local and remote operations, keeping a constant ordering. + */ +export class Operations { + constructor() { + this._hlc = new HybridLogicalClock() + this._operations = new Array() + } + + /** + * Tick the clock and add store the passed message in the operations list. + * + * @param {*} inputMessage + * @returns {*} clock-aware message + */ + addLocal(inputMessage) { + let message = { ...inputMessage, hlc: this._hlc.tick() } + this._operations.push(message) + return message + } + + /** + * Returns the current list of operations ordered by their HLC. + * + * This DOES NOT modify the list in place, but instead return a new copy. + * + * @returns {Array} + */ + sorted() { + return Operations.sort(this._operations) + } + + /** + * Static method to order the given list of operations by their HCL. + * + * @param {Object[]} operations + * @returns an ordered copy + */ + static sort(operations) { + const copy = [...operations] + copy.sort((a, b) => (a.hlc < b.hlc ? -1 : 1)) + return copy + } + + /** + * Store a list of remote operations locally + * + * Note that operations are not applied as part of this method. + * + * - Updates the list of operations with the remote ones. + * - Updates the clock to reflect these changes. + * + * @param {Array} remoteOperations + */ + storeRemoteOperations(remoteOperations) { + // get the highest date from the passed operations + let greatestHLC = remoteOperations + .map((op) => op.hlc) + .reduce((max, current) => (current > max ? current : max)) + + // Bump the current HLC. + this._hlc.receive(greatestHLC) + this._operations.push(...remoteOperations) + } + + /** + * Get operations that happened since a specific clock tick. + */ + getOperationsSince(hlc) { + if (!hlc) return this._operations + // first get the position of the clock that was sent + const start = this._operations.findIndex((op) => op.hlc === hlc) + this._operations.slice(start) + return this._operations.filter((op) => op.hlc > hlc) + } + + /** + * Returns the last known HLC value. + */ + getLastKnownHLC() { + return this._operations.at(-1)?.hlc + } + + /** + * Checks if a given operation should be bypassed. + * + * Note that this doesn't only check the clock, but also if the operation share + * on the same context (subject + metadata). + * + * @param {Object} remote the remote operation to compare to + * @returns bool + */ + shouldBypassOperation(remote) { + const sortedLocalOperations = this.sorted() + // No operations are stored, no need to check + if (sortedLocalOperations.length <= 0) { + debug('No operations are stored, no need to check') + return false + } + + // Latest local operation is older than the remote one + const latest = sortedLocalOperations.at(-1) + if (latest.hlc < remote.hlc) { + debug('Latest local operation is older than the remote one') + return false + } + + // Skip operations enabling the sync engine: + // If we receive something, we are already connected. + if ( + remote.hasOwnProperty('key') && + remote.key === 'options.syncEnabled' && + remote.value === true + ) { + return true + } + for (const local of sortedLocalOperations) { + if ( + local.hlc > remote.hlc && + Operations.haveSameContext(local, remote) && + // For now (and until we fix the conflict between updates and upsert) + // upsert always have priority over other operations + remote.verb !== 'upsert' + ) { + debug('this is newer:', local) + return true + } + } + return false + } + + /** + * Compares two operations to see if they share the same context. + * + * @param {Object} local + * @param {Object} remote + * @return {bool} true if the two operations share the same context. + */ + static haveSameContext(local, remote) { + const shouldCheckKey = + local.hasOwnProperty('key') && + remote.hasOwnProperty('key') && + typeof local.key !== 'undefined' && + typeof remote.key !== 'undefined' + + return ( + Utils.deepEqual(local.subject, remote.subject) && + Utils.deepEqual(local.metadata, remote.metadata) && + (!shouldCheckKey || (shouldCheckKey && local.key == remote.key)) + ) + } +} + +function debug(...args) { + console.debug('SYNC ⇆', ...args) +} diff --git a/umap/static/umap/js/modules/sync/hlc.js b/umap/static/umap/js/modules/sync/hlc.js new file mode 100644 index 00000000..f182c0fa --- /dev/null +++ b/umap/static/umap/js/modules/sync/hlc.js @@ -0,0 +1,106 @@ +import * as Utils from '../utils.js' + +/** + * This is an implementation of a Hybrid Logical Clock (HLC). + * + * There are three parts in the clock: + * + * - walltime: the relative clock of each of the peers + * - NN: a local counter that gets incremented in case of ties. + * - id: to identify the peer + * + * HLCs are used to order operations consistently in distributed systems. + */ +export class HybridLogicalClock { + constructor(walltime = Date.now(), nn = 0, id = Utils.generateId()) { + this._current = { walltime, nn, id } + } + + /** + * Return a serialized version of the current clock + */ + serialize(clock = this._current) { + const { walltime, nn, id } = clock + return `${walltime}:${nn}:${id}` + } + + /** + * Parse a serialized time and return a JS object. + * @param string raw + * @returns object + */ + parse(raw) { + let tokens = raw.split(':') + + if (tokens.length !== 3) { + throw new SyntaxError(`Unable to parse ${raw}`) + } + let [walltime, rawNN, id] = tokens + + let nn = Number.parseInt(rawNN) + if (Number.isNaN(nn)) { + nn = 0 + } + return { walltime, nn, id } + } + + /** + * Increment the current clock by one tick. + * + * - If the current time is greater than the last known tip, increment it. + * - Otherwise, increment the `nn` counter by 1. + * + * This allows each tick to be different from each other. + * + * @returns a serialized clock + */ + tick() { + // Copy the current value of the hlc to avoid concurrency issues + const current = { ...this._current } + const now = Date.now() + + let nextValue + + if (now > current.walltime) { + nextValue = { ...current, walltime: now, nn: 0 } + } else { + nextValue = { ...current, nn: current.nn + 1 } + } + + this._current = nextValue + return this.serialize(this._current) + } + + /** + * Receive a remote clock info, and update the local clock. + * + * - If the current wall time is greater than both local and remote wall time, use the local one. + * - If the current wall time is the same, increment max (local, remote) `nn` counter by 1. + * - If remote time is greater, keep the remote time and increment `nn` + * - Otherwise, keep local values and increment `nn` + * + * This allows to take into account clock drifting, when clocks on different peers are getting + * out of sync. + **/ + receive(remoteRaw) { + const local = { ...this._current } + const remote = this.parse(remoteRaw) + const now = Date.now() + + let nextValue + + if (now > local.walltime && now > remote.walltime) { + nextValue = { ...local, walltime: now } + } else if (local.walltime == remote.walltime) { + let nn = Math.max(local.nn, remote.nn) + 1 + nextValue = { ...local, nn: nn } + } else if (remote.walltime > local.walltime) { + nextValue = { ...remote, id: local.id, nn: remote.nn + 1 } + } else { + nextValue = { ...local, nn: local.nn + 1 } + } + + this._current = nextValue + return this._current + } +} diff --git a/umap/static/umap/js/modules/sync/updaters.js b/umap/static/umap/js/modules/sync/updaters.js index ca72ba0e..560b6af6 100644 --- a/umap/static/umap/js/modules/sync/updaters.js +++ b/umap/static/umap/js/modules/sync/updaters.js @@ -1,6 +1,6 @@ /** - * This file contains the updaters: classes that are able to convert messages - * received from another party (or the server) to changes on the map. + * Updaters are classes able to convert messages + * received from other peers (or from the server) to changes on the map. */ class BaseUpdater { @@ -76,7 +76,7 @@ export class FeatureUpdater extends BaseUpdater { if (feature) { feature.geometry = value.geometry } else { - datalayer.makeFeature(value) + datalayer.makeFeature(value, false) } } @@ -85,9 +85,9 @@ export class FeatureUpdater extends BaseUpdater { const feature = this.getFeatureFromMetadata(metadata) if (feature === undefined) { console.error(`Unable to find feature with id = ${metadata.id}.`) + return } if (key === 'geometry') { - const datalayer = this.getDataLayerFromID(metadata.layerId) const feature = this.getFeatureFromMetadata(metadata, value) feature.geometry = value } else { diff --git a/umap/static/umap/js/modules/sync/websocket.js b/umap/static/umap/js/modules/sync/websocket.js index 529c0b19..dc0599dd 100644 --- a/umap/static/umap/js/modules/sync/websocket.js +++ b/umap/static/umap/js/modules/sync/websocket.js @@ -2,7 +2,7 @@ export class WebSocketTransport { constructor(webSocketURI, authToken, messagesReceiver) { this.websocket = new WebSocket(webSocketURI) this.websocket.onopen = () => { - this.send('join', { token: authToken }) + this.send('JoinRequest', { token: authToken }) } this.websocket.addEventListener('message', this.onMessage.bind(this)) this.receiver = messagesReceiver diff --git a/umap/static/umap/js/modules/utils.js b/umap/static/umap/js/modules/utils.js index c159e0fd..39dff71f 100644 --- a/umap/static/umap/js/modules/utils.js +++ b/umap/static/umap/js/modules/utils.js @@ -178,7 +178,7 @@ export function toHTML(r, options) { } export function isObject(what) { - return typeof what === 'object' && what !== null + return typeof what === 'object' && what !== null && !Array.isArray(what) } export function CopyJSON(geojson) { @@ -406,3 +406,7 @@ export class WithTemplate { return this.element } } + +export function deepEqual(object1, object2){ + return JSON.stringify(object1) === JSON.stringify(object2) +} diff --git a/umap/static/umap/js/umap.forms.js b/umap/static/umap/js/umap.forms.js index ce54cfb6..92924b53 100644 --- a/umap/static/umap/js/umap.forms.js +++ b/umap/static/umap/js/umap.forms.js @@ -1133,7 +1133,7 @@ U.FormBuilder = L.FormBuilder.extend({ else schema.handler = 'IntInput' } else if (schema.choices) { const text_length = schema.choices.reduce( - (acc, [value, label]) => acc + label.length, + (acc, [_, label]) => acc + label.length, 0 ) // Try to be smart and use MultiChoice only diff --git a/umap/static/umap/js/umap.js b/umap/static/umap/js/umap.js index 51531341..1f974830 100644 --- a/umap/static/umap/js/umap.js +++ b/umap/static/umap/js/umap.js @@ -13,7 +13,7 @@ L.Map.mergeOptions({ // we cannot rely on this because of the y is overriden by Leaflet // See https://github.com/Leaflet/Leaflet/pull/9201 // And let's remove this -y when this PR is merged and released. - demoTileInfos: { s: 'a', z: 9, x: 265, y: 181, '-y': 181, r: '' }, + demoTileInfos: { 's': 'a', 'z': 9, 'x': 265, 'y': 181, '-y': 181, 'r': '' }, licences: [], licence: '', enableMarkerDraw: true, diff --git a/umap/static/umap/unittests/hlc.js b/umap/static/umap/unittests/hlc.js new file mode 100644 index 00000000..f32a56f5 --- /dev/null +++ b/umap/static/umap/unittests/hlc.js @@ -0,0 +1,158 @@ +import { describe, it } from 'mocha' +import sinon from 'sinon' + +import pkg from 'chai' +const { expect } = pkg + +import { HybridLogicalClock } from '../js/modules/sync/hlc.js' + +describe('HybridLogicalClock', () => { + let clock + + describe('#parse', () => { + it('should reject invalid values', () => { + clock = new HybridLogicalClock() + expect(() => clock.parse('invalid')).to.throw() + expect(() => clock.parse('123:456')).to.throw() + expect(() => clock.parse('123:456:789:000')).to.throw() + }) + + it('should parse correct values', () => { + clock = new HybridLogicalClock() + const result = clock.parse('1625097600000:42:abc-123') + expect(result).to.deep.equal({ + walltime: '1625097600000', + nn: 42, + id: 'abc-123', + }) + }) + + it('should default to 0 for nn if none is provided', () => { + clock = new HybridLogicalClock() + const result = clock.parse('1625097600000::abc-123') + expect(result).to.deep.equal({ + walltime: '1625097600000', + nn: 0, + id: 'abc-123', + }) + }) + }) + + describe('#serialize', () => { + it('should correctly serialize the clock', () => { + clock = new HybridLogicalClock(1625097600000, 42, 'abc-123') + expect(clock.serialize()).to.equal('1625097600000:42:abc-123') + }) + }) + + describe('#tick', () => { + it('should increment walltime when current time is greater', () => { + const now = Date.now() + clock = new HybridLogicalClock(now - 1000, 0, 'test') + const result = clock.tick() + const parsed = clock.parse(result) + expect(parsed.walltime).to.be.at.least(now.toString()) + expect(parsed.nn).to.equal(0) + }) + + it('should increment nn when current time is not greater', () => { + const now = Date.now() + clock = new HybridLogicalClock(now, 5, 'test') + sinon.useFakeTimers(now) + const result = clock.tick() + const parsed = clock.parse(result) + expect(parsed.walltime).to.equal(now.toString()) + expect(parsed.nn).to.equal(6) + sinon.restore() + }) + }) + + describe('#receive', () => { + it("should use current time when it's greater than both local and remote", () => { + const now = Date.now() + clock = new HybridLogicalClock(now - 1000, 0, 'local') + const result = clock.receive(`${now - 500}:0:remote`) + expect(result.walltime).to.be.at.least(now) + expect(result.nn).to.equal(0) + expect(result.id).to.equal('local') + }) + + it('should increment nn when local and remote times are equal', () => { + const now = Date.now() + clock = new HybridLogicalClock(now, 5, 'local') + const result = clock.receive(`${now}:7:remote`) + expect(result.walltime).to.equal(now) + expect(result.nn).to.equal(8) + expect(result.id).to.equal('local') + }) + + it('should use remote time and increment nn when remote time is greater', () => { + const now = Date.now() + clock = new HybridLogicalClock(now - 1000, 5, 'local') + const result = clock.receive(`${now}:7:remote`) + expect(result.walltime).to.be.least(now.toString()) + expect(result.nn).to.equal(8) + expect(result.id).to.equal('local') + }) + + it('should increment local nn when local time is greater', () => { + const now = Date.now() + clock = new HybridLogicalClock(now, 5, 'local') + const result = clock.receive(`${now - 1000}:7:remote`) + expect(result.walltime).to.be.least(now) + expect(result.nn).to.equal(6) + expect(result.id).to.equal('local') + }) + }) + + it('should maintain causal order across multiple operations', () => { + const hlc = new HybridLogicalClock() + + // Simulate a sequence of events + const event1 = hlc.tick() + + // Simulate some time passing + const clock = sinon.useFakeTimers(Date.now() + 100) + + const event2 = hlc.tick() + + // Simulate receiving a message from another node + const remoteEvent = hlc.receive(`${Date.now() - 50}:5:remote-id`) + + const event3 = hlc.tick() + + // Advance time significantly + clock.tick(1000) + + const event4 = hlc.tick() + + // Clean up the fake timer + clock.restore() + + // Parse all events + const parsedEvent1 = hlc.parse(event1) + const parsedEvent2 = hlc.parse(event2) + const parsedEvent3 = hlc.parse(event3) + const parsedEvent4 = hlc.parse(event4) + + // Assertions to ensure causal order is maintained + expect(parsedEvent2.walltime).to.be.greaterThan(parsedEvent1.walltime) + expect(parsedEvent3.walltime).to.equal(parsedEvent2.walltime) + expect(parsedEvent3.nn).to.be.greaterThan(parsedEvent2.nn) + expect(parsedEvent4.walltime).to.be.greaterThan(parsedEvent3.walltime) + + // Check that all events have the same id + const uniqueIds = new Set([ + parsedEvent1.id, + parsedEvent2.id, + parsedEvent3.id, + parsedEvent4.id, + ]) + expect(uniqueIds.size).to.equal(1) + + // Ensure we can compare events as strings and maintain the same order + const events = [event1, event2, event3, event4] + const sortedEvents = [...events].sort() + expect(sortedEvents).to.deep.equal(events) + }) +}) diff --git a/umap/static/umap/unittests/sync.js b/umap/static/umap/unittests/sync.js index e20a5153..36d22b10 100644 --- a/umap/static/umap/unittests/sync.js +++ b/umap/static/umap/unittests/sync.js @@ -5,10 +5,10 @@ import pkg from 'chai' const { expect } = pkg import { MapUpdater } from '../js/modules/sync/updaters.js' -import { SyncEngine } from '../js/modules/sync/engine.js' +import { SyncEngine, Operations } from '../js/modules/sync/engine.js' describe('SyncEngine', () => { - it('should initialize methods even before start', function () { + it('should initialize methods even before start', () => { const engine = new SyncEngine({}) engine.upsert() engine.update() @@ -16,8 +16,8 @@ describe('SyncEngine', () => { }) }) -describe('#dispatch', function () { - it('should raise an error on unknown updater', function () { +describe('#dispatch', () => { + it('should raise an error on unknown updater', () => { const dispatcher = new SyncEngine({}) expect(() => { dispatcher.dispatch({ @@ -27,7 +27,7 @@ describe('#dispatch', function () { }) }).to.throw(Error) }) - it('should produce an error on malformated messages', function () { + it('should produce an error on malformated messages', () => { const dispatcher = new SyncEngine({}) expect(() => { dispatcher.dispatch({ @@ -36,7 +36,7 @@ describe('#dispatch', function () { }) }).to.throw(Error) }) - it('should raise an unknown operations', function () { + it('should raise an unknown operations', () => { const dispatcher = new SyncEngine({}) expect(() => { dispatcher.dispatch({ @@ -47,55 +47,55 @@ describe('#dispatch', function () { }) describe('Updaters', () => { - describe('BaseUpdater', function () { + describe('BaseUpdater', () => { let updater let map let obj - this.beforeEach(function () { + beforeEach(() => { map = {} updater = new MapUpdater(map) obj = {} }) - it('should be able to set object properties', function () { + it('should be able to set object properties', () => { let obj = {} updater.updateObjectValue(obj, 'foo', 'foo') expect(obj).deep.equal({ foo: 'foo' }) }) - it('should be able to set object properties recursively on existing objects', function () { + it('should be able to set object properties recursively on existing objects', () => { let obj = { foo: {} } updater.updateObjectValue(obj, 'foo.bar', 'foo') expect(obj).deep.equal({ foo: { bar: 'foo' } }) }) - it('should be able to set object properties recursively on deep objects', function () { + it('should be able to set object properties recursively on deep objects', () => { let obj = { foo: { bar: { baz: {} } } } updater.updateObjectValue(obj, 'foo.bar.baz.test', 'value') expect(obj).deep.equal({ foo: { bar: { baz: { test: 'value' } } } }) }) - it('should be able to replace object properties recursively on deep objects', function () { + it('should be able to replace object properties recursively on deep objects', () => { let obj = { foo: { bar: { baz: { test: 'test' } } } } updater.updateObjectValue(obj, 'foo.bar.baz.test', 'value') expect(obj).deep.equal({ foo: { bar: { baz: { test: 'value' } } } }) }) - it('should not set object properties recursively on non-existing objects', function () { + it('should not set object properties recursively on non-existing objects', () => { let obj = { foo: {} } updater.updateObjectValue(obj, 'bar.bar', 'value') expect(obj).deep.equal({ foo: {} }) }) - it('should delete keys for undefined values', function () { + it('should delete keys for undefined values', () => { let obj = { foo: 'foo' } updater.updateObjectValue(obj, 'foo', undefined) expect(obj).deep.equal({}) }) - it('should delete keys for undefined values, recursively', function () { + it('should delete keys for undefined values, recursively', () => { let obj = { foo: { bar: 'bar' } } updater.updateObjectValue(obj, 'foo.bar', undefined) @@ -103,3 +103,309 @@ describe('Updaters', () => { }) }) }) + +describe('Operations', () => { + describe('haveSameContext', () => { + const createOperation = (overrides = {}) => ({ + subject: 'feature', + metadata: { + id: 'UxNjQ', + layerId: '606d26bd-230f-4d3e-a2a7-0c3caed71548', + featureType: 'marker', + }, + ...overrides, + }) + + it('should check if subject and metadata are the same', () => { + const op1 = createOperation() + const op2 = createOperation() + const op3 = createOperation({ + subject: 'datalayer', + metadata: { id: '606d26bd-230f-4d3e-a2a7-0c3caed71548' }, + }) + + expect(Operations.haveSameContext(op1, op2)).to.be.true + expect(Operations.haveSameContext(op1, op3)).to.be.false + expect(Operations.haveSameContext(op2, op3)).to.be.false + }) + + it('should check if the key matches if there is any provided', () => { + const op1 = createOperation({ key: 'properties.name' }) + const op2 = createOperation({ key: 'properties.name' }) + const op3 = createOperation({ key: 'geometry' }) + const op4 = createOperation() + + expect(Operations.haveSameContext(op1, op2)).to.be.true + expect(Operations.haveSameContext(op1, op3)).to.be.false + expect(Operations.haveSameContext(op1, op4)).to.be.true + expect(Operations.haveSameContext(op4, createOperation())).to.be.true + }) + + it('should use deep equality for subject and metadata', () => { + const op1 = createOperation({ metadata: { nested: { value: 1 } } }) + const op2 = createOperation({ metadata: { nested: { value: 1 } } }) + const op3 = createOperation({ metadata: { nested: { value: 2 } } }) + + expect(Operations.haveSameContext(op1, op2)).to.be.true + expect(Operations.haveSameContext(op1, op3)).to.be.false + }) + }) + + describe('sort', () => { + it('should sort operations by timestamp', () => { + const operations = [ + { hlc: '1727193550:44:id1' }, + { hlc: '1727193549:42:id1' }, + { hlc: '1727193551:43:id1' }, + ] + const sorted = Operations.sort(operations) + expect(sorted).to.deep.equal([ + { hlc: '1727193549:42:id1' }, + { hlc: '1727193550:44:id1' }, + { hlc: '1727193551:43:id1' }, + ]) + }) + + it('should sort operations by NN when timestamp is the same', () => { + const operations = [ + { hlc: '1727193549:42:id1' }, + { hlc: '1727193549:44:id1' }, + { hlc: '1727193549:43:id1' }, + ] + const sorted = Operations.sort(operations) + expect(sorted).to.deep.equal([ + { hlc: '1727193549:42:id1' }, + { hlc: '1727193549:43:id1' }, + { hlc: '1727193549:44:id1' }, + ]) + }) + + it('should sort operations by id if other fields are equal', () => { + const operations = [ + { hlc: '1727193549:42:id3' }, + { hlc: '1727193549:42:id2' }, + { hlc: '1727193549:42:id1' }, + ] + const sorted = Operations.sort(operations) + expect(sorted).to.deep.equal([ + { hlc: '1727193549:42:id1' }, + { hlc: '1727193549:42:id2' }, + { hlc: '1727193549:42:id3' }, + ]) + }) + }) + + describe('addLocal', () => { + it('should add a local operation with a new hlc', () => { + const ops = new Operations() + const inputMessage = { verb: 'update', subject: 'test' } + const result = ops.addLocal(inputMessage) + expect(result).to.have.property('hlc') + expect(result.hlc).to.match(/^\d+:\d+:[^:]+$/) + expect(result).to.include(inputMessage) + }) + }) + + describe('sorted', () => { + it('should return sorted operations', () => { + const ops = new Operations() + ops._operations = [{ hlc: '1727193549:43:id1' }, { hlc: '1727193549:42:id1' }] + const sorted = ops.sorted() + expect(sorted[0].hlc).to.equal('1727193549:42:id1') + expect(sorted[1].hlc).to.equal('1727193549:43:id1') + }) + }) + + describe('shouldBypassOperation', () => { + let ops + + beforeEach(() => { + ops = new Operations() + }) + + const createOperation = (overrides = {}) => ({ + verb: 'update', + subject: 'feature', + metadata: { + id: 'UxNjQ', + layerId: '606d26bd-230f-4d3e-a2a7-0c3caed71548', + featureType: 'marker', + }, + key: 'properties.name', + value: 'default', + hlc: '0000000000000:0:f4df51cc-7617-4bd4-8bd2-599cdf17da65', + ...overrides, + }) + + const createUpsertOperation = (overrides = {}) => + createOperation({ + verb: 'upsert', + key: undefined, + value: { + type: 'Feature', + geometry: { + coordinates: [0.439453, 48.04871], + type: 'Point', + }, + properties: {}, + id: 'UxNjQ', + }, + ...overrides, + }) + + it('should return false if no local operation is newer', () => { + const remote = createUpsertOperation({ hlc: '1727184449050:44:id2' }) + ops._operations = [ + createOperation({ + hlc: '1727184449010:0:f4df51cc-7617-4bd4-8bd2-599cdf17da65', + }), + createUpsertOperation({ + hlc: '1727184449020:0:b4a221a0-7b62-4588-a6af-041b041006dc', + }), + ] + + const result = ops.shouldBypassOperation(remote) + expect(result).to.be.false + }) + + it('should return true if a similar "delete" operation is newer', () => { + const remote = createOperation({ + verb: 'delete', + metadata: { id: 'M1NTA', layerId: '1234', featureType: 'marker' }, + hlc: '1:0:3f45b56f-f750-4b50-90d7-9ecce4b0cf53', + }) + + ops._operations = [ + createOperation({ + verb: 'delete', + metadata: { id: 'M1NTA', layerId: '1234', featureType: 'marker' }, + hlc: '2:0:3f45b56f-f750-4b50-90d7-9ecce4b0cf53', + }), + ] + + const result = ops.shouldBypassOperation(remote) + expect(result).to.be.true + }) + + describe('update', () => { + it('should check for related updates', () => { + ops._operations = [ + createOperation({ + value: 'y', + hlc: '1:0:f4df51cc-7617-4bd4-8bd2-599cdf17da65', + }), + createOperation({ + value: 'youpi', + hlc: '9:0:f4df51cc-7617-4bd4-8bd2-599cdf17da65', + }), + ] + + const remoteOperation = createOperation({ + value: 'something else', + hlc: '0:0:f4df51cc-7617-4bd4-8bd2-599cdf17da65', + }) + + const result = ops.shouldBypassOperation(remoteOperation) + expect(result).to.be.true + }) + + it('should check for related deletes', () => { + ops._operations = [ + { + verb: 'delete', + subject: 'feature', + metadata: { + id: 'M1NTA', + layerId: '123', + featureType: 'marker', + }, + hlc: '1727196583562:0:3f45b56f-f750-4b50-90d7-9ecce4b0cf53', + key: undefined, + }, + ] + + const remoteOperation = createOperation({ + metadata: { id: 'M1NTA', layerId: '123', featureType: 'marker' }, + key: 'geometry', + value: { coordinates: [2.944336, 47.070122], type: 'Point' }, + hlc: '0:0:3f45b56f-f750-4b50-90d7-9ecce4b0cf53', + }) + + const result = ops.shouldBypassOperation(remoteOperation) + expect(result).to.be.true + }) + }) + + describe('upsert', () => { + it('should take precedence over updates (even if fresher)', () => { + ops._operations = [ + createOperation({ + value: 'youpi', + hlc: '1000000000000:0:f4df51cc-7617-4bd4-8bd2-599cdf17da65', + }), + ] + + const remoteOperation = createUpsertOperation({ + hlc: '0000000000000:0:b4a221a0-7b62-4588-a6af-041b041006dc', + }) + + const result = ops.shouldBypassOperation(remoteOperation) + expect(result).to.be.false + }) + }) + + describe('delete', () => { + it('should check for the same delete', () => { + ops._operations = [ + createOperation({ + verb: 'delete', + metadata: { id: 'I3MDg', layerId: null, featureType: 'polygon' }, + key: undefined, + hlc: '1:0:3f45b56f-f750-4b50-90d7-9ecce4b0cf53', + }), + ] + + const remoteOperation = createOperation({ + verb: 'delete', + metadata: { id: 'I3MDg', layerId: null, featureType: 'polygon' }, + key: undefined, + hlc: '0:0:3f45b56f-f750-4b50-90d7-9ecce4b0cf53', + }) + + const result = ops.shouldBypassOperation(remoteOperation) + expect(result).to.be.true + }) + }) + }) + describe('storeRemoteOperations', () => { + it('should store remote operations and update the local HLC', () => { + const ops = new Operations() + const remoteOps = [{ hlc: '1727193549:42:id2' }, { hlc: '1727193549:43:id2' }] + ops.storeRemoteOperations(remoteOps) + expect(ops._operations).to.deep.equal(remoteOps) + }) + }) + + describe('getOperationsSince', () => { + it('should return operations since a given HLC', () => { + const ops = new Operations() + ops._operations = [ + { hlc: '1727193549:42:id1' }, + { hlc: '1727193549:43:id1' }, + { hlc: '1727193549:44:id1' }, + ] + const result = ops.getOperationsSince('1727193549:42:id1') + expect(result).to.deep.equal([ + { hlc: '1727193549:43:id1' }, + { hlc: '1727193549:44:id1' }, + ]) + }) + + it('should return all operations if no HLC is provided', () => { + const ops = new Operations() + ops._operations = [{ hlc: '1727193549:42:id1' }, { hlc: '1727193549:43:id1' }] + const result = ops.getOperationsSince() + expect(result).to.deep.equal(ops._operations) + }) + }) +}) diff --git a/umap/static/umap/unittests/utils.js b/umap/static/umap/unittests/utils.js index 958ccfa1..abcf15c1 100644 --- a/umap/static/umap/unittests/utils.js +++ b/umap/static/umap/unittests/utils.js @@ -779,4 +779,27 @@ describe('Utils', () => { ) }) }) + + describe('#isObject', () => { + it('should return true for objects', () => { + assert.equal(Utils.isObject({}), true) + assert.equal(Utils.isObject({ foo: 'bar' }), true) + }) + + it('should return false for Array', () => { + assert.equal(Utils.isObject([]), false) + }) + + it('should return false on null', () => { + assert.equal(Utils.isObject(null), false) + }) + + it('should return false on undefined', () => { + assert.equal(Utils.isObject(undefined), false) + }) + + it('should return false on string', () => { + assert.equal(Utils.isObject(''), false) + }) + }) }) diff --git a/umap/tests/integration/test_datalayer.py b/umap/tests/integration/test_datalayer.py index cb311071..e5161f2f 100644 --- a/umap/tests/integration/test_datalayer.py +++ b/umap/tests/integration/test_datalayer.py @@ -1,4 +1,5 @@ import json + import re import pytest diff --git a/umap/tests/integration/test_websocket_sync.py b/umap/tests/integration/test_websocket_sync.py index dfa86c3a..10f979f7 100644 --- a/umap/tests/integration/test_websocket_sync.py +++ b/umap/tests/integration/test_websocket_sync.py @@ -281,3 +281,65 @@ def test_websocket_connection_can_sync_cloned_polygons( peerB.get_by_role("button", name="Save").click() expect(peerB.locator("path")).to_have_count(2) + + +@pytest.mark.xdist_group(name="websockets") +def test_websocket_connection_can_sync_late_joining_peer( + new_page, live_server, websocket_server, tilelayer +): + map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) + map.settings["properties"]["syncEnabled"] = True + map.save() + DataLayerFactory(map=map, data={}) + + # Create first peer (A) and have it join immediately + peerA = new_page("Page A") + peerA.goto(f"{live_server.url}{map.get_absolute_url()}?edit") + + # Add a marker from peer A + a_create_marker = peerA.get_by_title("Draw a marker") + expect(a_create_marker).to_be_visible() + a_create_marker.click() + + a_map_el = peerA.locator("#map") + a_map_el.click(position={"x": 220, "y": 220}) + peerA.locator("body").type("First marker") + peerA.locator("body").press("Escape") + + # Add a polygon from peer A + create_polygon = peerA.locator(".leaflet-control-toolbar ").get_by_title( + "Draw a polygon" + ) + create_polygon.click() + + a_map_el.click(position={"x": 200, "y": 200}) + a_map_el.click(position={"x": 100, "y": 200}) + a_map_el.click(position={"x": 100, "y": 100}) + a_map_el.click(position={"x": 200, "y": 100}) + a_map_el.click(position={"x": 200, "y": 100}) + peerA.keyboard.press("Escape") + + # Now create peer B and have it join + peerB = new_page("Page B") + peerB.goto(f"{live_server.url}{map.get_absolute_url()}?edit") + + # Check if peer B has received all the updates + b_marker_pane = peerB.locator(".leaflet-marker-pane > div") + b_polygons = peerB.locator(".leaflet-overlay-pane path[fill='DarkBlue']") + + expect(b_marker_pane).to_have_count(1) + expect(b_polygons).to_have_count(1) + + # Verify marker properties + peerB.locator(".leaflet-marker-icon").first.click() + peerB.get_by_role("link", name="Toggle edit mode (⇧+Click)").click() + expect(peerB.locator('input[name="name"]')).to_have_value("First marker") + + # Verify polygon exists (we've already checked the count) + b_polygon = peerB.locator("path") + expect(b_polygon).to_be_visible() + + # Optional: Verify polygon properties if you have any specific ones set + + # Clean up: close edit mode + peerB.locator("body").press("Escape") diff --git a/umap/tests/test_websocket_server.py b/umap/tests/test_websocket_server.py new file mode 100644 index 00000000..62bc93e9 --- /dev/null +++ b/umap/tests/test_websocket_server.py @@ -0,0 +1,22 @@ +from umap.websocket_server import OperationMessage, PeerMessage, Request, ServerRequest + + +def test_messages_are_parsed_correctly(): + server = Request.model_validate(dict(kind="Server", action="list-peers")).root + assert type(server) is ServerRequest + + operation = Request.model_validate( + dict( + kind="OperationMessage", + verb="upsert", + subject="map", + metadata={}, + key="key", + ) + ).root + assert type(operation) is OperationMessage + + peer_message = Request.model_validate( + dict(kind="PeerMessage", sender="Alice", recipient="Bob", message={}) + ).root + assert type(peer_message) is PeerMessage diff --git a/umap/views.py b/umap/views.py index ed1be9ca..a85d4b7a 100644 --- a/umap/views.py +++ b/umap/views.py @@ -1359,7 +1359,7 @@ def logout(request): class LoginPopupEnd(TemplateView): """ - End of a loggin process in popup. + End of a login process in popup. Basically close the popup. """ diff --git a/umap/websocket_server.py b/umap/websocket_server.py index 3ba81394..a493ebbe 100644 --- a/umap/websocket_server.py +++ b/umap/websocket_server.py @@ -1,70 +1,173 @@ #!/usr/bin/env python import asyncio +import logging +import uuid from collections import defaultdict -from typing import Literal, Optional +from typing import Literal, Optional, Union import websockets from django.conf import settings from django.core.signing import TimestampSigner -from pydantic import BaseModel, ValidationError +from pydantic import BaseModel, Field, RootModel, ValidationError from websockets import WebSocketClientProtocol from websockets.server import serve -from umap.models import Map, User # NOQA + +class Connections: + def __init__(self) -> None: + self._connections: set[WebSocketClientProtocol] = set() + self._ids: dict[WebSocketClientProtocol, str] = dict() + + def join(self, websocket: WebSocketClientProtocol) -> str: + self._connections.add(websocket) + _id = str(uuid.uuid4()) + self._ids[websocket] = _id + return _id + + def leave(self, websocket: WebSocketClientProtocol) -> None: + self._connections.remove(websocket) + del self._ids[websocket] + + def get(self, id) -> WebSocketClientProtocol: + # use an iterator to stop iterating as soon as we found + return next(k for k, v in self._ids.items() if v == id) + + def get_id(self, websocket: WebSocketClientProtocol): + return self._ids[websocket] + + def get_other_peers( + self, websocket: WebSocketClientProtocol + ) -> set[WebSocketClientProtocol]: + return self._connections - {websocket} + + def get_all_peers(self) -> set[WebSocketClientProtocol]: + return self._connections + # Contains the list of websocket connections handled by this process. # It's a mapping of map_id to a set of the active websocket connections -CONNECTIONS = defaultdict(set) +CONNECTIONS: defaultdict[int, Connections] = defaultdict(Connections) -class JoinMessage(BaseModel): - kind: str = "join" +class JoinRequest(BaseModel): + kind: Literal["JoinRequest"] = "JoinRequest" token: str class OperationMessage(BaseModel): - kind: str = "operation" - verb: str = Literal["upsert", "update", "delete"] - subject: str = Literal["map", "layer", "feature"] + """Message sent from one peer to all the others""" + + kind: Literal["OperationMessage"] = "OperationMessage" + verb: Literal["upsert", "update", "delete"] + subject: Literal["map", "datalayer", "feature"] metadata: Optional[dict] = None key: Optional[str] = None +class PeerMessage(BaseModel): + """Message sent from a specific peer to another one""" + + kind: Literal["PeerMessage"] = "PeerMessage" + sender: str + recipient: str + # The message can be whatever the peers want. It's not checked by the server. + message: dict + + +class ServerRequest(BaseModel): + """A request towards the server""" + + kind: Literal["Server"] = "Server" + action: Literal["list-peers"] + + +class Request(RootModel): + """Any message coming from the websocket should be one of these, and will be rejected otherwise.""" + + root: Union[ServerRequest, PeerMessage, OperationMessage] = Field( + discriminator="kind" + ) + + +class JoinResponse(BaseModel): + """Server response containing the list of peers""" + + kind: Literal["JoinResponse"] = "JoinResponse" + peers: list + uuid: str + + +class ListPeersResponse(BaseModel): + kind: Literal["ListPeersResponse"] = "ListPeersResponse" + peers: list + + async def join_and_listen( map_id: int, permissions: list, user: str | int, websocket: WebSocketClientProtocol ): - """Join a "room" whith other connected peers. + """Join a "room" with other connected peers, and wait for messages.""" + logging.debug(f"{user} joined room #{map_id}") + connections: Connections = CONNECTIONS[map_id] + _id: str = connections.join(websocket) + + # Assign an ID to the joining peer and return it the list of connected peers. + peers: list[WebSocketClientProtocol] = [ + connections.get_id(p) for p in connections.get_all_peers() + ] + response = JoinResponse(uuid=_id, peers=peers) + await websocket.send(response.model_dump_json()) + + # Notify all other peers of the new list of connected peers. + message = ListPeersResponse(peers=peers) + websockets.broadcast( + connections.get_other_peers(websocket), message.model_dump_json() + ) - New messages will be broadcasted to other connected peers. - """ - print(f"{user} joined room #{map_id}") - CONNECTIONS[map_id].add(websocket) try: async for raw_message in websocket: - # recompute the peers-list at the time of message-sending. + # recompute the peers list at the time of message-sending. # as doing so beforehand would miss new connections - peers = CONNECTIONS[map_id] - {websocket} - # Only relay valid "operation" messages + other_peers = connections.get_other_peers(websocket) try: - OperationMessage.model_validate_json(raw_message) - websockets.broadcast(peers, raw_message) + incoming = Request.model_validate_json(raw_message) except ValidationError as e: - error = f"An error occurred when receiving this message: {raw_message}" - print(error, e) + error = f"An error occurred when receiving the following message: {raw_message!r}" + logging.error(error, e) + else: + match incoming.root: + # Broadcast all operation messages to connected peers + case OperationMessage(): + websockets.broadcast(other_peers, raw_message) + + # Send peer messages to the proper peer + case PeerMessage(recipient=_id): + peer = connections.get(_id) + if peer: + await peer.send(raw_message) + finally: - CONNECTIONS[map_id].remove(websocket) + # On disconnect, remove the connection from the pool + connections.leave(websocket) + + # TODO: refactor this in a separate method. + # Notify all other peers of the new list of connected peers. + peers = [connections.get_id(p) for p in connections.get_all_peers()] + message = ListPeersResponse(peers=peers) + websockets.broadcast( + connections.get_other_peers(websocket), message.model_dump_json() + ) -async def handler(websocket): +async def handler(websocket: WebSocketClientProtocol): """Main WebSocket handler. - If permissions are granted, let the peer enter a room. + Check if the permission is granted and let the peer enter a room. """ raw_message = await websocket.recv() # The first event should always be 'join' - message: JoinMessage = JoinMessage.model_validate_json(raw_message) + message: JoinRequest = JoinRequest.model_validate_json(raw_message) signed = TimestampSigner().unsign_object(message.token, max_age=30) user, map_id, permissions = signed.values() @@ -73,7 +176,7 @@ async def handler(websocket): await join_and_listen(map_id, permissions, user, websocket) -def run(host, port): +def run(host: str, port: int): if not settings.WEBSOCKET_ENABLED: msg = ( "WEBSOCKET_ENABLED should be set to True to run the WebSocket Server. " @@ -86,7 +189,7 @@ def run(host, port): async def _serve(): async with serve(handler, host, port): - print(f"Waiting for connections on {host}:{port}") + logging.debug(f"Waiting for connections on {host}:{port}") await asyncio.Future() # run forever asyncio.run(_serve())