feat(sync): Send remote operations to peers when they join.

- Operations are now stored locally in memory. All operations are tied
  to an HLC (Hybrid Logical Clock), making it possible to order them
  concistently.

- Messages are handled in their `on*` methods, leading to a clearer
  implementation.

- When a new peer joins, it asks a random peer for the list of
  operations, and re-apply them locally.

- Messages types names have been updated to follow CamelCase, and to be
  similar across the client and the server.

- Pass `sync=False` to `makeFeature` in the updaters, to avoid
  generating duplicate operations on message retrieval.
This commit is contained in:
Alexis Métaireau 2024-06-17 19:00:30 +02:00
parent 5836ae859a
commit 5485fb99ab
15 changed files with 1210 additions and 74 deletions

View file

@ -1,6 +1,46 @@
import { DataLayerUpdater, FeatureUpdater, MapUpdater } from './updaters.js'
import { WebSocketTransport } from './websocket.js'
import { HybridLogicalClock } from './hlc.js'
import * as Utils from '../utils.js'
/**
* The syncEngine exposes an API to sync messages between peers over the network.
*
* It's taking care of initializing the `transport` layer (sending and receiving
* messages over websocket), the `operations` list (to store them locally),
* and the `updaters` to apply messages to the map.
*
* You can use the `update`, `upsert` and `delete` methods.
*
* @example
*
* ```
* const sync = new SyncEngine(map)
*
* // Get the authentication token from the umap server
* sync.authenticate(tokenURI, webSocketURI, server)
*
* // Alternatively, start the engine manually with
* sync.start(webSocketURI, authToken)
*
* // Then use the `upsert`, `update` and `delete` methods.
* let {metadata, subject} = object.getSyncMetadata()
* sync.upsert(subject, metadata, "value")
* sync.update(subject, metadata, "key", "value")
* sync.delete(subject, metadata, "key")
* ```
*
* A `proxy()` method is also exposed, making it easier to use without having
* to specify `subject` and `metadata` fields on each call:
*
* @example
* ```
* // Or using the `proxy()` method:
* let syncProxy = sync.proxy(object)
* syncProxy.upsert("value")
* syncProxy.update("key", "value")
* ```
*/
export class SyncEngine {
constructor(map) {
this.updaters = {
@ -9,6 +49,7 @@ export class SyncEngine {
datalayer: new DataLayerUpdater(map),
}
this.transport = undefined
this._operations = new Operations()
}
async authenticate(tokenURI, webSocketURI, server) {
@ -27,29 +68,6 @@ export class SyncEngine {
this.transport = undefined
}
_getUpdater(subject, metadata) {
if (Object.keys(this.updaters).includes(subject)) {
return this.updaters[subject]
}
throw new Error(`Unknown updater ${subject}, ${metadata}`)
}
// This method is called by the transport layer on new messages
receive({ kind, ...payload }) {
if (kind === 'operation') {
const updater = this._getUpdater(payload.subject, payload.metadata)
updater.applyMessage(payload)
} else {
throw new Error(`Unknown dispatch kind: ${kind}`)
}
}
_send(message) {
if (this.transport) {
this.transport.send('operation', message)
}
}
upsert(subject, metadata, value) {
this._send({ verb: 'upsert', subject, metadata, value })
}
@ -62,6 +80,182 @@ export class SyncEngine {
this._send({ verb: 'delete', subject, metadata, key })
}
_send(inputMessage) {
let message = this._operations.addLocal(inputMessage)
if (this.offline) return
if (this.transport) {
this.transport.send('OperationMessage', message)
}
}
_getUpdater(subject, metadata) {
if (Object.keys(this.updaters).includes(subject)) {
return this.updaters[subject]
}
throw new Error(`Unknown updater ${subject}, ${metadata}`)
}
_applyOperation(operation) {
const updater = this._getUpdater(operation.subject, operation.metadata)
updater.applyMessage(operation)
}
/**
* This is called by the transport layer on new messages,
* and dispatches the different "on*" methods.
*/
receive({ kind, ...payload }) {
if (kind === 'OperationMessage') {
this.onOperationMessage(payload)
} else if (kind === 'JoinResponse') {
this.onJoinResponse(payload)
} else if (kind === 'ListPeersResponse') {
this.onListPeersResponse(payload)
} else if (kind === 'PeerMessage') {
debug('received peermessage', payload)
if (payload.message.verb === 'ListOperationsRequest') {
this.onListOperationsRequest(payload)
} else if (payload.message.verb === 'ListOperationsResponse') {
this.onListOperationsResponse(payload)
}
} else {
throw new Error(`Received unknown message from the websocket server: ${kind}`)
}
}
/**
* Received when an operation has been performed by another peer.
*
* Stores the passed operation locally and apply it.
*
* @param {Object} payload
*/
onOperationMessage(payload) {
this._operations.storeRemoteOperations([payload])
this._applyOperation(payload)
}
/**
* Received when the server acknowledges the `join` for this peer.
*
* @param {Object} payload
* @param {string} payload.uuid The server-assigned uuid for this peer
* @param {string[]} payload.peers The list of peers uuids
*/
onJoinResponse({ uuid, peers }) {
debug('received join response', { uuid, peers })
this.uuid = uuid
this.peers = peers
// Get one peer at random
let randomPeer = this._getRandomPeer()
if (randomPeer) {
// Retrieve the operations which happened before join.
this.sendToPeer(randomPeer, 'ListOperationsRequest', {
lastKnownHLC: this._operations.getLastKnownHLC(),
})
}
}
/**
* Received when the list of peers has changed.
*
* @param {Object} payload
* @param {string[]} payload.peers The list of peers uuids
*/
onListPeersResponse({ peers }) {
debug('received peerinfo', { peers })
this.peers = peers
}
/**
* Received when another peer asks for the list of operations.
*
* @param {Object} payload
* @param {string} payload.sender the uuid of the requesting peer
* @param {string} payload.latestKnownHLC the latest known HLC of the requesting peer
*/
onListOperationsRequest({ sender, lastKnownHLC }) {
this.sendToPeer(sender, 'ListOperationsResponse', {
operations: this._operations.getOperationsSince(lastKnownHLC),
})
}
/**
* Received when another peer sends the list of operations.
*
* When receiving this message, operations are filtered and applied
*
* @param {*} operations The list of (encoded operations)
*/
onListOperationsResponse({ sender, message }) {
debug(`received operations from peer ${sender}`, message.operations)
if (message.operations.length === 0) return
// Get the list of stored operations before this message.
const remoteOperations = Operations.sort(message.operations)
this._operations.storeRemoteOperations(remoteOperations)
// Sort the local operations only once, see below.
for (const remote of remoteOperations) {
if (this._operations.shouldBypassOperation(remote)) {
debug(
'Skipping the following operation, because a newer one has been found locally',
remote
)
} else {
this._applyOperation(remote)
}
}
// TODO: compact the changes here?
// e.g. we might want to :
// - group cases of multiple updates
// - not apply changes where we have a more recent version (but store them nevertheless)
// 1. Get the list of fields that changed (in the incoming operations)
// 2. For each field, get the last version
// 3. Check if we should apply the changes.
// For each operation
// Get the updated key hlc
// If key.local_hlc > key.remote_hlc: drop
// Else: apply
}
/**
* Send a message to another peer (via the transport layer)
*
* @param {*} recipient
* @param {*} verb
* @param {*} payload
*/
sendToPeer(recipient, verb, payload) {
payload.verb = verb
this.transport.send('PeerMessage', {
sender: this.uuid,
recipient: recipient,
message: payload,
})
}
/**
* Selects a peer ID at random within the known ones.
*
* @returns {string|bool} the selected peer uuid, or False if none was found.
*/
_getRandomPeer() {
let otherPeers = this.peers.filter((p) => p !== this.uuid)
if (otherPeers.length > 0) {
const random = Math.floor(Math.random() * otherPeers.length)
return otherPeers[random]
}
return false
}
/**
* Create a proxy for this sync engine.
*
@ -91,3 +285,160 @@ export class SyncEngine {
return new Proxy(this, handler)
}
}
/**
* Registry of local and remote operations, keeping a constant ordering.
*/
export class Operations {
constructor() {
this._hlc = new HybridLogicalClock()
this._operations = new Array()
}
/**
* Tick the clock and add store the passed message in the operations list.
*
* @param {*} inputMessage
* @returns {*} clock-aware message
*/
addLocal(inputMessage) {
let message = { ...inputMessage, hlc: this._hlc.tick() }
this._operations.push(message)
return message
}
/**
* Returns the current list of operations ordered by their HLC.
*
* This DOES NOT modify the list in place, but instead return a new copy.
*
* @returns {Array}
*/
sorted() {
return Operations.sort(this._operations)
}
/**
* Static method to order the given list of operations by their HCL.
*
* @param {Object[]} operations
* @returns an ordered copy
*/
static sort(operations) {
const copy = [...operations]
copy.sort((a, b) => (a.hlc < b.hlc ? -1 : 1))
return copy
}
/**
* Store a list of remote operations locally
*
* Note that operations are not applied as part of this method.
*
* - Updates the list of operations with the remote ones.
* - Updates the clock to reflect these changes.
*
* @param {Array} remoteOperations
*/
storeRemoteOperations(remoteOperations) {
// get the highest date from the passed operations
let greatestHLC = remoteOperations
.map((op) => op.hlc)
.reduce((max, current) => (current > max ? current : max))
// Bump the current HLC.
this._hlc.receive(greatestHLC)
this._operations.push(...remoteOperations)
}
/**
* Get operations that happened since a specific clock tick.
*/
getOperationsSince(hlc) {
if (!hlc) return this._operations
// first get the position of the clock that was sent
const start = this._operations.findIndex((op) => op.hlc === hlc)
this._operations.slice(start)
return this._operations.filter((op) => op.hlc > hlc)
}
/**
* Returns the last known HLC value.
*/
getLastKnownHLC() {
return this._operations.at(-1)?.hlc
}
/**
* Checks if a given operation should be bypassed.
*
* Note that this doesn't only check the clock, but also if the operation share
* on the same context (subject + metadata).
*
* @param {Object} remote the remote operation to compare to
* @returns bool
*/
shouldBypassOperation(remote) {
const sortedLocalOperations = this.sorted()
// No operations are stored, no need to check
if (sortedLocalOperations.length <= 0) {
debug('No operations are stored, no need to check')
return false
}
// Latest local operation is older than the remote one
const latest = sortedLocalOperations.at(-1)
if (latest.hlc < remote.hlc) {
debug('Latest local operation is older than the remote one')
return false
}
// Skip operations enabling the sync engine:
// If we receive something, we are already connected.
if (
remote.hasOwnProperty('key') &&
remote.key === 'options.syncEnabled' &&
remote.value === true
) {
return true
}
for (const local of sortedLocalOperations) {
if (
local.hlc > remote.hlc &&
Operations.haveSameContext(local, remote) &&
// For now (and until we fix the conflict between updates and upsert)
// upsert always have priority over other operations
remote.verb !== 'upsert'
) {
debug('this is newer:', local)
return true
}
}
return false
}
/**
* Compares two operations to see if they share the same context.
*
* @param {Object} local
* @param {Object} remote
* @return {bool} true if the two operations share the same context.
*/
static haveSameContext(local, remote) {
const shouldCheckKey =
local.hasOwnProperty('key') &&
remote.hasOwnProperty('key') &&
typeof local.key !== 'undefined' &&
typeof remote.key !== 'undefined'
return (
Utils.deepEqual(local.subject, remote.subject) &&
Utils.deepEqual(local.metadata, remote.metadata) &&
(!shouldCheckKey || (shouldCheckKey && local.key == remote.key))
)
}
}
function debug(...args) {
console.debug('SYNC ⇆', ...args)
}

View file

@ -0,0 +1,106 @@
import * as Utils from '../utils.js'
/**
* This is an implementation of a Hybrid Logical Clock (HLC).
*
* There are three parts in the clock:
*
* - walltime: the relative clock of each of the peers
* - NN: a local counter that gets incremented in case of ties.
* - id: to identify the peer
*
* HLCs are used to order operations consistently in distributed systems.
*/
export class HybridLogicalClock {
constructor(walltime = Date.now(), nn = 0, id = Utils.generateId()) {
this._current = { walltime, nn, id }
}
/**
* Return a serialized version of the current clock
*/
serialize(clock = this._current) {
const { walltime, nn, id } = clock
return `${walltime}:${nn}:${id}`
}
/**
* Parse a serialized time and return a JS object.
* @param string raw
* @returns object
*/
parse(raw) {
let tokens = raw.split(':')
if (tokens.length !== 3) {
throw new SyntaxError(`Unable to parse ${raw}`)
}
let [walltime, rawNN, id] = tokens
let nn = Number.parseInt(rawNN)
if (Number.isNaN(nn)) {
nn = 0
}
return { walltime, nn, id }
}
/**
* Increment the current clock by one tick.
*
* - If the current time is greater than the last known tip, increment it.
* - Otherwise, increment the `nn` counter by 1.
*
* This allows each tick to be different from each other.
*
* @returns a serialized clock
*/
tick() {
// Copy the current value of the hlc to avoid concurrency issues
const current = { ...this._current }
const now = Date.now()
let nextValue
if (now > current.walltime) {
nextValue = { ...current, walltime: now, nn: 0 }
} else {
nextValue = { ...current, nn: current.nn + 1 }
}
this._current = nextValue
return this.serialize(this._current)
}
/**
* Receive a remote clock info, and update the local clock.
*
* - If the current wall time is greater than both local and remote wall time, use the local one.
* - If the current wall time is the same, increment max (local, remote) `nn` counter by 1.
* - If remote time is greater, keep the remote time and increment `nn`
* - Otherwise, keep local values and increment `nn`
*
* This allows to take into account clock drifting, when clocks on different peers are getting
* out of sync.
**/
receive(remoteRaw) {
const local = { ...this._current }
const remote = this.parse(remoteRaw)
const now = Date.now()
let nextValue
if (now > local.walltime && now > remote.walltime) {
nextValue = { ...local, walltime: now }
} else if (local.walltime == remote.walltime) {
let nn = Math.max(local.nn, remote.nn) + 1
nextValue = { ...local, nn: nn }
} else if (remote.walltime > local.walltime) {
nextValue = { ...remote, id: local.id, nn: remote.nn + 1 }
} else {
nextValue = { ...local, nn: local.nn + 1 }
}
this._current = nextValue
return this._current
}
}

View file

@ -1,6 +1,6 @@
/**
* This file contains the updaters: classes that are able to convert messages
* received from another party (or the server) to changes on the map.
* Updaters are classes able to convert messages
* received from other peers (or from the server) to changes on the map.
*/
class BaseUpdater {
@ -76,7 +76,7 @@ export class FeatureUpdater extends BaseUpdater {
if (feature) {
feature.geometry = value.geometry
} else {
datalayer.makeFeature(value)
datalayer.makeFeature(value, false)
}
}
@ -85,9 +85,9 @@ export class FeatureUpdater extends BaseUpdater {
const feature = this.getFeatureFromMetadata(metadata)
if (feature === undefined) {
console.error(`Unable to find feature with id = ${metadata.id}.`)
return
}
if (key === 'geometry') {
const datalayer = this.getDataLayerFromID(metadata.layerId)
const feature = this.getFeatureFromMetadata(metadata, value)
feature.geometry = value
} else {

View file

@ -2,7 +2,7 @@ export class WebSocketTransport {
constructor(webSocketURI, authToken, messagesReceiver) {
this.websocket = new WebSocket(webSocketURI)
this.websocket.onopen = () => {
this.send('join', { token: authToken })
this.send('JoinRequest', { token: authToken })
}
this.websocket.addEventListener('message', this.onMessage.bind(this))
this.receiver = messagesReceiver

View file

@ -178,7 +178,7 @@ export function toHTML(r, options) {
}
export function isObject(what) {
return typeof what === 'object' && what !== null
return typeof what === 'object' && what !== null && !Array.isArray(what)
}
export function CopyJSON(geojson) {
@ -406,3 +406,7 @@ export class WithTemplate {
return this.element
}
}
export function deepEqual(object1, object2){
return JSON.stringify(object1) === JSON.stringify(object2)
}

View file

@ -1133,7 +1133,7 @@ U.FormBuilder = L.FormBuilder.extend({
else schema.handler = 'IntInput'
} else if (schema.choices) {
const text_length = schema.choices.reduce(
(acc, [value, label]) => acc + label.length,
(acc, [_, label]) => acc + label.length,
0
)
// Try to be smart and use MultiChoice only

View file

@ -13,7 +13,7 @@ L.Map.mergeOptions({
// we cannot rely on this because of the y is overriden by Leaflet
// See https://github.com/Leaflet/Leaflet/pull/9201
// And let's remove this -y when this PR is merged and released.
demoTileInfos: { s: 'a', z: 9, x: 265, y: 181, '-y': 181, r: '' },
demoTileInfos: { 's': 'a', 'z': 9, 'x': 265, 'y': 181, '-y': 181, 'r': '' },
licences: [],
licence: '',
enableMarkerDraw: true,

View file

@ -0,0 +1,158 @@
import { describe, it } from 'mocha'
import sinon from 'sinon'
import pkg from 'chai'
const { expect } = pkg
import { HybridLogicalClock } from '../js/modules/sync/hlc.js'
describe('HybridLogicalClock', () => {
let clock
describe('#parse', () => {
it('should reject invalid values', () => {
clock = new HybridLogicalClock()
expect(() => clock.parse('invalid')).to.throw()
expect(() => clock.parse('123:456')).to.throw()
expect(() => clock.parse('123:456:789:000')).to.throw()
})
it('should parse correct values', () => {
clock = new HybridLogicalClock()
const result = clock.parse('1625097600000:42:abc-123')
expect(result).to.deep.equal({
walltime: '1625097600000',
nn: 42,
id: 'abc-123',
})
})
it('should default to 0 for nn if none is provided', () => {
clock = new HybridLogicalClock()
const result = clock.parse('1625097600000::abc-123')
expect(result).to.deep.equal({
walltime: '1625097600000',
nn: 0,
id: 'abc-123',
})
})
})
describe('#serialize', () => {
it('should correctly serialize the clock', () => {
clock = new HybridLogicalClock(1625097600000, 42, 'abc-123')
expect(clock.serialize()).to.equal('1625097600000:42:abc-123')
})
})
describe('#tick', () => {
it('should increment walltime when current time is greater', () => {
const now = Date.now()
clock = new HybridLogicalClock(now - 1000, 0, 'test')
const result = clock.tick()
const parsed = clock.parse(result)
expect(parsed.walltime).to.be.at.least(now.toString())
expect(parsed.nn).to.equal(0)
})
it('should increment nn when current time is not greater', () => {
const now = Date.now()
clock = new HybridLogicalClock(now, 5, 'test')
sinon.useFakeTimers(now)
const result = clock.tick()
const parsed = clock.parse(result)
expect(parsed.walltime).to.equal(now.toString())
expect(parsed.nn).to.equal(6)
sinon.restore()
})
})
describe('#receive', () => {
it("should use current time when it's greater than both local and remote", () => {
const now = Date.now()
clock = new HybridLogicalClock(now - 1000, 0, 'local')
const result = clock.receive(`${now - 500}:0:remote`)
expect(result.walltime).to.be.at.least(now)
expect(result.nn).to.equal(0)
expect(result.id).to.equal('local')
})
it('should increment nn when local and remote times are equal', () => {
const now = Date.now()
clock = new HybridLogicalClock(now, 5, 'local')
const result = clock.receive(`${now}:7:remote`)
expect(result.walltime).to.equal(now)
expect(result.nn).to.equal(8)
expect(result.id).to.equal('local')
})
it('should use remote time and increment nn when remote time is greater', () => {
const now = Date.now()
clock = new HybridLogicalClock(now - 1000, 5, 'local')
const result = clock.receive(`${now}:7:remote`)
expect(result.walltime).to.be.least(now.toString())
expect(result.nn).to.equal(8)
expect(result.id).to.equal('local')
})
it('should increment local nn when local time is greater', () => {
const now = Date.now()
clock = new HybridLogicalClock(now, 5, 'local')
const result = clock.receive(`${now - 1000}:7:remote`)
expect(result.walltime).to.be.least(now)
expect(result.nn).to.equal(6)
expect(result.id).to.equal('local')
})
})
it('should maintain causal order across multiple operations', () => {
const hlc = new HybridLogicalClock()
// Simulate a sequence of events
const event1 = hlc.tick()
// Simulate some time passing
const clock = sinon.useFakeTimers(Date.now() + 100)
const event2 = hlc.tick()
// Simulate receiving a message from another node
const remoteEvent = hlc.receive(`${Date.now() - 50}:5:remote-id`)
const event3 = hlc.tick()
// Advance time significantly
clock.tick(1000)
const event4 = hlc.tick()
// Clean up the fake timer
clock.restore()
// Parse all events
const parsedEvent1 = hlc.parse(event1)
const parsedEvent2 = hlc.parse(event2)
const parsedEvent3 = hlc.parse(event3)
const parsedEvent4 = hlc.parse(event4)
// Assertions to ensure causal order is maintained
expect(parsedEvent2.walltime).to.be.greaterThan(parsedEvent1.walltime)
expect(parsedEvent3.walltime).to.equal(parsedEvent2.walltime)
expect(parsedEvent3.nn).to.be.greaterThan(parsedEvent2.nn)
expect(parsedEvent4.walltime).to.be.greaterThan(parsedEvent3.walltime)
// Check that all events have the same id
const uniqueIds = new Set([
parsedEvent1.id,
parsedEvent2.id,
parsedEvent3.id,
parsedEvent4.id,
])
expect(uniqueIds.size).to.equal(1)
// Ensure we can compare events as strings and maintain the same order
const events = [event1, event2, event3, event4]
const sortedEvents = [...events].sort()
expect(sortedEvents).to.deep.equal(events)
})
})

View file

@ -5,10 +5,10 @@ import pkg from 'chai'
const { expect } = pkg
import { MapUpdater } from '../js/modules/sync/updaters.js'
import { SyncEngine } from '../js/modules/sync/engine.js'
import { SyncEngine, Operations } from '../js/modules/sync/engine.js'
describe('SyncEngine', () => {
it('should initialize methods even before start', function () {
it('should initialize methods even before start', () => {
const engine = new SyncEngine({})
engine.upsert()
engine.update()
@ -16,8 +16,8 @@ describe('SyncEngine', () => {
})
})
describe('#dispatch', function () {
it('should raise an error on unknown updater', function () {
describe('#dispatch', () => {
it('should raise an error on unknown updater', () => {
const dispatcher = new SyncEngine({})
expect(() => {
dispatcher.dispatch({
@ -27,7 +27,7 @@ describe('#dispatch', function () {
})
}).to.throw(Error)
})
it('should produce an error on malformated messages', function () {
it('should produce an error on malformated messages', () => {
const dispatcher = new SyncEngine({})
expect(() => {
dispatcher.dispatch({
@ -36,7 +36,7 @@ describe('#dispatch', function () {
})
}).to.throw(Error)
})
it('should raise an unknown operations', function () {
it('should raise an unknown operations', () => {
const dispatcher = new SyncEngine({})
expect(() => {
dispatcher.dispatch({
@ -47,55 +47,55 @@ describe('#dispatch', function () {
})
describe('Updaters', () => {
describe('BaseUpdater', function () {
describe('BaseUpdater', () => {
let updater
let map
let obj
this.beforeEach(function () {
beforeEach(() => {
map = {}
updater = new MapUpdater(map)
obj = {}
})
it('should be able to set object properties', function () {
it('should be able to set object properties', () => {
let obj = {}
updater.updateObjectValue(obj, 'foo', 'foo')
expect(obj).deep.equal({ foo: 'foo' })
})
it('should be able to set object properties recursively on existing objects', function () {
it('should be able to set object properties recursively on existing objects', () => {
let obj = { foo: {} }
updater.updateObjectValue(obj, 'foo.bar', 'foo')
expect(obj).deep.equal({ foo: { bar: 'foo' } })
})
it('should be able to set object properties recursively on deep objects', function () {
it('should be able to set object properties recursively on deep objects', () => {
let obj = { foo: { bar: { baz: {} } } }
updater.updateObjectValue(obj, 'foo.bar.baz.test', 'value')
expect(obj).deep.equal({ foo: { bar: { baz: { test: 'value' } } } })
})
it('should be able to replace object properties recursively on deep objects', function () {
it('should be able to replace object properties recursively on deep objects', () => {
let obj = { foo: { bar: { baz: { test: 'test' } } } }
updater.updateObjectValue(obj, 'foo.bar.baz.test', 'value')
expect(obj).deep.equal({ foo: { bar: { baz: { test: 'value' } } } })
})
it('should not set object properties recursively on non-existing objects', function () {
it('should not set object properties recursively on non-existing objects', () => {
let obj = { foo: {} }
updater.updateObjectValue(obj, 'bar.bar', 'value')
expect(obj).deep.equal({ foo: {} })
})
it('should delete keys for undefined values', function () {
it('should delete keys for undefined values', () => {
let obj = { foo: 'foo' }
updater.updateObjectValue(obj, 'foo', undefined)
expect(obj).deep.equal({})
})
it('should delete keys for undefined values, recursively', function () {
it('should delete keys for undefined values, recursively', () => {
let obj = { foo: { bar: 'bar' } }
updater.updateObjectValue(obj, 'foo.bar', undefined)
@ -103,3 +103,309 @@ describe('Updaters', () => {
})
})
})
describe('Operations', () => {
describe('haveSameContext', () => {
const createOperation = (overrides = {}) => ({
subject: 'feature',
metadata: {
id: 'UxNjQ',
layerId: '606d26bd-230f-4d3e-a2a7-0c3caed71548',
featureType: 'marker',
},
...overrides,
})
it('should check if subject and metadata are the same', () => {
const op1 = createOperation()
const op2 = createOperation()
const op3 = createOperation({
subject: 'datalayer',
metadata: { id: '606d26bd-230f-4d3e-a2a7-0c3caed71548' },
})
expect(Operations.haveSameContext(op1, op2)).to.be.true
expect(Operations.haveSameContext(op1, op3)).to.be.false
expect(Operations.haveSameContext(op2, op3)).to.be.false
})
it('should check if the key matches if there is any provided', () => {
const op1 = createOperation({ key: 'properties.name' })
const op2 = createOperation({ key: 'properties.name' })
const op3 = createOperation({ key: 'geometry' })
const op4 = createOperation()
expect(Operations.haveSameContext(op1, op2)).to.be.true
expect(Operations.haveSameContext(op1, op3)).to.be.false
expect(Operations.haveSameContext(op1, op4)).to.be.true
expect(Operations.haveSameContext(op4, createOperation())).to.be.true
})
it('should use deep equality for subject and metadata', () => {
const op1 = createOperation({ metadata: { nested: { value: 1 } } })
const op2 = createOperation({ metadata: { nested: { value: 1 } } })
const op3 = createOperation({ metadata: { nested: { value: 2 } } })
expect(Operations.haveSameContext(op1, op2)).to.be.true
expect(Operations.haveSameContext(op1, op3)).to.be.false
})
})
describe('sort', () => {
it('should sort operations by timestamp', () => {
const operations = [
{ hlc: '1727193550:44:id1' },
{ hlc: '1727193549:42:id1' },
{ hlc: '1727193551:43:id1' },
]
const sorted = Operations.sort(operations)
expect(sorted).to.deep.equal([
{ hlc: '1727193549:42:id1' },
{ hlc: '1727193550:44:id1' },
{ hlc: '1727193551:43:id1' },
])
})
it('should sort operations by NN when timestamp is the same', () => {
const operations = [
{ hlc: '1727193549:42:id1' },
{ hlc: '1727193549:44:id1' },
{ hlc: '1727193549:43:id1' },
]
const sorted = Operations.sort(operations)
expect(sorted).to.deep.equal([
{ hlc: '1727193549:42:id1' },
{ hlc: '1727193549:43:id1' },
{ hlc: '1727193549:44:id1' },
])
})
it('should sort operations by id if other fields are equal', () => {
const operations = [
{ hlc: '1727193549:42:id3' },
{ hlc: '1727193549:42:id2' },
{ hlc: '1727193549:42:id1' },
]
const sorted = Operations.sort(operations)
expect(sorted).to.deep.equal([
{ hlc: '1727193549:42:id1' },
{ hlc: '1727193549:42:id2' },
{ hlc: '1727193549:42:id3' },
])
})
})
describe('addLocal', () => {
it('should add a local operation with a new hlc', () => {
const ops = new Operations()
const inputMessage = { verb: 'update', subject: 'test' }
const result = ops.addLocal(inputMessage)
expect(result).to.have.property('hlc')
expect(result.hlc).to.match(/^\d+:\d+:[^:]+$/)
expect(result).to.include(inputMessage)
})
})
describe('sorted', () => {
it('should return sorted operations', () => {
const ops = new Operations()
ops._operations = [{ hlc: '1727193549:43:id1' }, { hlc: '1727193549:42:id1' }]
const sorted = ops.sorted()
expect(sorted[0].hlc).to.equal('1727193549:42:id1')
expect(sorted[1].hlc).to.equal('1727193549:43:id1')
})
})
describe('shouldBypassOperation', () => {
let ops
beforeEach(() => {
ops = new Operations()
})
const createOperation = (overrides = {}) => ({
verb: 'update',
subject: 'feature',
metadata: {
id: 'UxNjQ',
layerId: '606d26bd-230f-4d3e-a2a7-0c3caed71548',
featureType: 'marker',
},
key: 'properties.name',
value: 'default',
hlc: '0000000000000:0:f4df51cc-7617-4bd4-8bd2-599cdf17da65',
...overrides,
})
const createUpsertOperation = (overrides = {}) =>
createOperation({
verb: 'upsert',
key: undefined,
value: {
type: 'Feature',
geometry: {
coordinates: [0.439453, 48.04871],
type: 'Point',
},
properties: {},
id: 'UxNjQ',
},
...overrides,
})
it('should return false if no local operation is newer', () => {
const remote = createUpsertOperation({ hlc: '1727184449050:44:id2' })
ops._operations = [
createOperation({
hlc: '1727184449010:0:f4df51cc-7617-4bd4-8bd2-599cdf17da65',
}),
createUpsertOperation({
hlc: '1727184449020:0:b4a221a0-7b62-4588-a6af-041b041006dc',
}),
]
const result = ops.shouldBypassOperation(remote)
expect(result).to.be.false
})
it('should return true if a similar "delete" operation is newer', () => {
const remote = createOperation({
verb: 'delete',
metadata: { id: 'M1NTA', layerId: '1234', featureType: 'marker' },
hlc: '1:0:3f45b56f-f750-4b50-90d7-9ecce4b0cf53',
})
ops._operations = [
createOperation({
verb: 'delete',
metadata: { id: 'M1NTA', layerId: '1234', featureType: 'marker' },
hlc: '2:0:3f45b56f-f750-4b50-90d7-9ecce4b0cf53',
}),
]
const result = ops.shouldBypassOperation(remote)
expect(result).to.be.true
})
describe('update', () => {
it('should check for related updates', () => {
ops._operations = [
createOperation({
value: 'y',
hlc: '1:0:f4df51cc-7617-4bd4-8bd2-599cdf17da65',
}),
createOperation({
value: 'youpi',
hlc: '9:0:f4df51cc-7617-4bd4-8bd2-599cdf17da65',
}),
]
const remoteOperation = createOperation({
value: 'something else',
hlc: '0:0:f4df51cc-7617-4bd4-8bd2-599cdf17da65',
})
const result = ops.shouldBypassOperation(remoteOperation)
expect(result).to.be.true
})
it('should check for related deletes', () => {
ops._operations = [
{
verb: 'delete',
subject: 'feature',
metadata: {
id: 'M1NTA',
layerId: '123',
featureType: 'marker',
},
hlc: '1727196583562:0:3f45b56f-f750-4b50-90d7-9ecce4b0cf53',
key: undefined,
},
]
const remoteOperation = createOperation({
metadata: { id: 'M1NTA', layerId: '123', featureType: 'marker' },
key: 'geometry',
value: { coordinates: [2.944336, 47.070122], type: 'Point' },
hlc: '0:0:3f45b56f-f750-4b50-90d7-9ecce4b0cf53',
})
const result = ops.shouldBypassOperation(remoteOperation)
expect(result).to.be.true
})
})
describe('upsert', () => {
it('should take precedence over updates (even if fresher)', () => {
ops._operations = [
createOperation({
value: 'youpi',
hlc: '1000000000000:0:f4df51cc-7617-4bd4-8bd2-599cdf17da65',
}),
]
const remoteOperation = createUpsertOperation({
hlc: '0000000000000:0:b4a221a0-7b62-4588-a6af-041b041006dc',
})
const result = ops.shouldBypassOperation(remoteOperation)
expect(result).to.be.false
})
})
describe('delete', () => {
it('should check for the same delete', () => {
ops._operations = [
createOperation({
verb: 'delete',
metadata: { id: 'I3MDg', layerId: null, featureType: 'polygon' },
key: undefined,
hlc: '1:0:3f45b56f-f750-4b50-90d7-9ecce4b0cf53',
}),
]
const remoteOperation = createOperation({
verb: 'delete',
metadata: { id: 'I3MDg', layerId: null, featureType: 'polygon' },
key: undefined,
hlc: '0:0:3f45b56f-f750-4b50-90d7-9ecce4b0cf53',
})
const result = ops.shouldBypassOperation(remoteOperation)
expect(result).to.be.true
})
})
})
describe('storeRemoteOperations', () => {
it('should store remote operations and update the local HLC', () => {
const ops = new Operations()
const remoteOps = [{ hlc: '1727193549:42:id2' }, { hlc: '1727193549:43:id2' }]
ops.storeRemoteOperations(remoteOps)
expect(ops._operations).to.deep.equal(remoteOps)
})
})
describe('getOperationsSince', () => {
it('should return operations since a given HLC', () => {
const ops = new Operations()
ops._operations = [
{ hlc: '1727193549:42:id1' },
{ hlc: '1727193549:43:id1' },
{ hlc: '1727193549:44:id1' },
]
const result = ops.getOperationsSince('1727193549:42:id1')
expect(result).to.deep.equal([
{ hlc: '1727193549:43:id1' },
{ hlc: '1727193549:44:id1' },
])
})
it('should return all operations if no HLC is provided', () => {
const ops = new Operations()
ops._operations = [{ hlc: '1727193549:42:id1' }, { hlc: '1727193549:43:id1' }]
const result = ops.getOperationsSince()
expect(result).to.deep.equal(ops._operations)
})
})
})

View file

@ -779,4 +779,27 @@ describe('Utils', () => {
)
})
})
describe('#isObject', () => {
it('should return true for objects', () => {
assert.equal(Utils.isObject({}), true)
assert.equal(Utils.isObject({ foo: 'bar' }), true)
})
it('should return false for Array', () => {
assert.equal(Utils.isObject([]), false)
})
it('should return false on null', () => {
assert.equal(Utils.isObject(null), false)
})
it('should return false on undefined', () => {
assert.equal(Utils.isObject(undefined), false)
})
it('should return false on string', () => {
assert.equal(Utils.isObject(''), false)
})
})
})

View file

@ -1,4 +1,5 @@
import json
import re
import pytest

View file

@ -281,3 +281,65 @@ def test_websocket_connection_can_sync_cloned_polygons(
peerB.get_by_role("button", name="Save").click()
expect(peerB.locator("path")).to_have_count(2)
@pytest.mark.xdist_group(name="websockets")
def test_websocket_connection_can_sync_late_joining_peer(
new_page, live_server, websocket_server, tilelayer
):
map = MapFactory(name="sync", edit_status=Map.ANONYMOUS)
map.settings["properties"]["syncEnabled"] = True
map.save()
DataLayerFactory(map=map, data={})
# Create first peer (A) and have it join immediately
peerA = new_page("Page A")
peerA.goto(f"{live_server.url}{map.get_absolute_url()}?edit")
# Add a marker from peer A
a_create_marker = peerA.get_by_title("Draw a marker")
expect(a_create_marker).to_be_visible()
a_create_marker.click()
a_map_el = peerA.locator("#map")
a_map_el.click(position={"x": 220, "y": 220})
peerA.locator("body").type("First marker")
peerA.locator("body").press("Escape")
# Add a polygon from peer A
create_polygon = peerA.locator(".leaflet-control-toolbar ").get_by_title(
"Draw a polygon"
)
create_polygon.click()
a_map_el.click(position={"x": 200, "y": 200})
a_map_el.click(position={"x": 100, "y": 200})
a_map_el.click(position={"x": 100, "y": 100})
a_map_el.click(position={"x": 200, "y": 100})
a_map_el.click(position={"x": 200, "y": 100})
peerA.keyboard.press("Escape")
# Now create peer B and have it join
peerB = new_page("Page B")
peerB.goto(f"{live_server.url}{map.get_absolute_url()}?edit")
# Check if peer B has received all the updates
b_marker_pane = peerB.locator(".leaflet-marker-pane > div")
b_polygons = peerB.locator(".leaflet-overlay-pane path[fill='DarkBlue']")
expect(b_marker_pane).to_have_count(1)
expect(b_polygons).to_have_count(1)
# Verify marker properties
peerB.locator(".leaflet-marker-icon").first.click()
peerB.get_by_role("link", name="Toggle edit mode (⇧+Click)").click()
expect(peerB.locator('input[name="name"]')).to_have_value("First marker")
# Verify polygon exists (we've already checked the count)
b_polygon = peerB.locator("path")
expect(b_polygon).to_be_visible()
# Optional: Verify polygon properties if you have any specific ones set
# Clean up: close edit mode
peerB.locator("body").press("Escape")

View file

@ -0,0 +1,22 @@
from umap.websocket_server import OperationMessage, PeerMessage, Request, ServerRequest
def test_messages_are_parsed_correctly():
server = Request.model_validate(dict(kind="Server", action="list-peers")).root
assert type(server) is ServerRequest
operation = Request.model_validate(
dict(
kind="OperationMessage",
verb="upsert",
subject="map",
metadata={},
key="key",
)
).root
assert type(operation) is OperationMessage
peer_message = Request.model_validate(
dict(kind="PeerMessage", sender="Alice", recipient="Bob", message={})
).root
assert type(peer_message) is PeerMessage

View file

@ -1359,7 +1359,7 @@ def logout(request):
class LoginPopupEnd(TemplateView):
"""
End of a loggin process in popup.
End of a login process in popup.
Basically close the popup.
"""

View file

@ -1,70 +1,173 @@
#!/usr/bin/env python
import asyncio
import logging
import uuid
from collections import defaultdict
from typing import Literal, Optional
from typing import Literal, Optional, Union
import websockets
from django.conf import settings
from django.core.signing import TimestampSigner
from pydantic import BaseModel, ValidationError
from pydantic import BaseModel, Field, RootModel, ValidationError
from websockets import WebSocketClientProtocol
from websockets.server import serve
from umap.models import Map, User # NOQA
class Connections:
def __init__(self) -> None:
self._connections: set[WebSocketClientProtocol] = set()
self._ids: dict[WebSocketClientProtocol, str] = dict()
def join(self, websocket: WebSocketClientProtocol) -> str:
self._connections.add(websocket)
_id = str(uuid.uuid4())
self._ids[websocket] = _id
return _id
def leave(self, websocket: WebSocketClientProtocol) -> None:
self._connections.remove(websocket)
del self._ids[websocket]
def get(self, id) -> WebSocketClientProtocol:
# use an iterator to stop iterating as soon as we found
return next(k for k, v in self._ids.items() if v == id)
def get_id(self, websocket: WebSocketClientProtocol):
return self._ids[websocket]
def get_other_peers(
self, websocket: WebSocketClientProtocol
) -> set[WebSocketClientProtocol]:
return self._connections - {websocket}
def get_all_peers(self) -> set[WebSocketClientProtocol]:
return self._connections
# Contains the list of websocket connections handled by this process.
# It's a mapping of map_id to a set of the active websocket connections
CONNECTIONS = defaultdict(set)
CONNECTIONS: defaultdict[int, Connections] = defaultdict(Connections)
class JoinMessage(BaseModel):
kind: str = "join"
class JoinRequest(BaseModel):
kind: Literal["JoinRequest"] = "JoinRequest"
token: str
class OperationMessage(BaseModel):
kind: str = "operation"
verb: str = Literal["upsert", "update", "delete"]
subject: str = Literal["map", "layer", "feature"]
"""Message sent from one peer to all the others"""
kind: Literal["OperationMessage"] = "OperationMessage"
verb: Literal["upsert", "update", "delete"]
subject: Literal["map", "datalayer", "feature"]
metadata: Optional[dict] = None
key: Optional[str] = None
class PeerMessage(BaseModel):
"""Message sent from a specific peer to another one"""
kind: Literal["PeerMessage"] = "PeerMessage"
sender: str
recipient: str
# The message can be whatever the peers want. It's not checked by the server.
message: dict
class ServerRequest(BaseModel):
"""A request towards the server"""
kind: Literal["Server"] = "Server"
action: Literal["list-peers"]
class Request(RootModel):
"""Any message coming from the websocket should be one of these, and will be rejected otherwise."""
root: Union[ServerRequest, PeerMessage, OperationMessage] = Field(
discriminator="kind"
)
class JoinResponse(BaseModel):
"""Server response containing the list of peers"""
kind: Literal["JoinResponse"] = "JoinResponse"
peers: list
uuid: str
class ListPeersResponse(BaseModel):
kind: Literal["ListPeersResponse"] = "ListPeersResponse"
peers: list
async def join_and_listen(
map_id: int, permissions: list, user: str | int, websocket: WebSocketClientProtocol
):
"""Join a "room" whith other connected peers.
"""Join a "room" with other connected peers, and wait for messages."""
logging.debug(f"{user} joined room #{map_id}")
connections: Connections = CONNECTIONS[map_id]
_id: str = connections.join(websocket)
# Assign an ID to the joining peer and return it the list of connected peers.
peers: list[WebSocketClientProtocol] = [
connections.get_id(p) for p in connections.get_all_peers()
]
response = JoinResponse(uuid=_id, peers=peers)
await websocket.send(response.model_dump_json())
# Notify all other peers of the new list of connected peers.
message = ListPeersResponse(peers=peers)
websockets.broadcast(
connections.get_other_peers(websocket), message.model_dump_json()
)
New messages will be broadcasted to other connected peers.
"""
print(f"{user} joined room #{map_id}")
CONNECTIONS[map_id].add(websocket)
try:
async for raw_message in websocket:
# recompute the peers-list at the time of message-sending.
# recompute the peers list at the time of message-sending.
# as doing so beforehand would miss new connections
peers = CONNECTIONS[map_id] - {websocket}
# Only relay valid "operation" messages
other_peers = connections.get_other_peers(websocket)
try:
OperationMessage.model_validate_json(raw_message)
websockets.broadcast(peers, raw_message)
incoming = Request.model_validate_json(raw_message)
except ValidationError as e:
error = f"An error occurred when receiving this message: {raw_message}"
print(error, e)
error = f"An error occurred when receiving the following message: {raw_message!r}"
logging.error(error, e)
else:
match incoming.root:
# Broadcast all operation messages to connected peers
case OperationMessage():
websockets.broadcast(other_peers, raw_message)
# Send peer messages to the proper peer
case PeerMessage(recipient=_id):
peer = connections.get(_id)
if peer:
await peer.send(raw_message)
finally:
CONNECTIONS[map_id].remove(websocket)
# On disconnect, remove the connection from the pool
connections.leave(websocket)
# TODO: refactor this in a separate method.
# Notify all other peers of the new list of connected peers.
peers = [connections.get_id(p) for p in connections.get_all_peers()]
message = ListPeersResponse(peers=peers)
websockets.broadcast(
connections.get_other_peers(websocket), message.model_dump_json()
)
async def handler(websocket):
async def handler(websocket: WebSocketClientProtocol):
"""Main WebSocket handler.
If permissions are granted, let the peer enter a room.
Check if the permission is granted and let the peer enter a room.
"""
raw_message = await websocket.recv()
# The first event should always be 'join'
message: JoinMessage = JoinMessage.model_validate_json(raw_message)
message: JoinRequest = JoinRequest.model_validate_json(raw_message)
signed = TimestampSigner().unsign_object(message.token, max_age=30)
user, map_id, permissions = signed.values()
@ -73,7 +176,7 @@ async def handler(websocket):
await join_and_listen(map_id, permissions, user, websocket)
def run(host, port):
def run(host: str, port: int):
if not settings.WEBSOCKET_ENABLED:
msg = (
"WEBSOCKET_ENABLED should be set to True to run the WebSocket Server. "
@ -86,7 +189,7 @@ def run(host, port):
async def _serve():
async with serve(handler, host, port):
print(f"Waiting for connections on {host}:{port}")
logging.debug(f"Waiting for connections on {host}:{port}")
await asyncio.Future() # run forever
asyncio.run(_serve())