refactoring(sync): Simplify the design of the sync engine.

Removes the concept of message dispatcher and message sender, all of
it can be done by the syncengine class, making it easier to grasp.
This commit is contained in:
Alexis Métaireau 2024-06-10 19:29:56 +02:00
parent 7660153b3b
commit b94995120a
4 changed files with 72 additions and 103 deletions

View file

@ -3,15 +3,12 @@ import { MapUpdater, DataLayerUpdater, FeatureUpdater } from './updaters.js'
export class SyncEngine { export class SyncEngine {
constructor(map) { constructor(map) {
this.map = map this.updaters = {
this.dispatcher = new MessagesDispatcher(this.map) map: new MapUpdater(map),
this._initialize() feature: new FeatureUpdater(map),
datalayer: new DataLayerUpdater(map),
} }
_initialize() {
this.transport = undefined this.transport = undefined
const noop = () => {}
// by default, all operations do nothing, until the engine is started.
this.upsert = this.update = this.delete = noop
} }
async authenticate(tokenURI, webSocketURI, server) { async authenticate(tokenURI, webSocketURI, server) {
@ -22,16 +19,47 @@ export class SyncEngine {
} }
start(webSocketURI, authToken) { start(webSocketURI, authToken) {
this.transport = new WebSocketTransport(webSocketURI, authToken, this.dispatcher) this.transport = new WebSocketTransport(webSocketURI, authToken, this)
this.sender = new MessagesSender(this.transport)
this.upsert = this.sender.upsert.bind(this.sender)
this.update = this.sender.update.bind(this.sender)
this.delete = this.sender.delete.bind(this.sender)
} }
stop() { stop() {
if (this.transport) this.transport.close() if (this.transport) this.transport.close()
this._initialize() this.transport = undefined
}
_getUpdater(subject, metadata) {
if (Object.keys(this.updaters).includes(subject)) {
return this.updaters[subject]
}
throw new Error(`Unknown updater ${subject}, ${metadata}`)
}
// This method is called by the transport layer on new messages
receive({ kind, ...payload }) {
if (kind == 'operation') {
let updater = this._getUpdater(payload.subject, payload.metadata)
updater.applyMessage(payload)
} else {
throw new Error(`Unknown dispatch kind: ${kind}`)
}
}
_send(message) {
if (this.transport) {
this.transport.send('operation', message)
}
}
upsert(subject, metadata, value) {
this._send({ verb: 'upsert', subject, metadata, value })
}
update(subject, metadata, key, value) {
this._send({ verb: 'update', subject, metadata, key, value })
}
delete(subject, metadata, key) {
this._send({ verb: 'delete', subject, metadata, key })
} }
/** /**
@ -63,60 +91,3 @@ export class SyncEngine {
return new Proxy(this, handler) return new Proxy(this, handler)
} }
} }
export class MessagesDispatcher {
constructor(map) {
this.map = map
this.updaters = {
map: new MapUpdater(this.map),
feature: new FeatureUpdater(this.map),
datalayer: new DataLayerUpdater(this.map),
}
}
getUpdater(subject, metadata) {
if (Object.keys(this.updaters).includes(subject)) {
return this.updaters[subject]
}
throw new Error(`Unknown updater ${subject}, ${metadata}`)
}
dispatch({ kind, ...payload }) {
if (kind == 'operation') {
let updater = this.getUpdater(payload.subject, payload.metadata)
updater.applyMessage(payload)
} else {
throw new Error(`Unknown dispatch kind: ${kind}`)
}
}
}
/**
* Send messages to other connected peers, using the specified transport.
*
* - `subject` is the type of object this is referering to (map, feature, layer)
* - `metadata` contains information about the object we're refering to (id, layerId for instance)
* - `key` and
* - `value` are the keys and values that are being modified.
*/
export class MessagesSender {
constructor(transport) {
this._transport = transport
}
send(message) {
this._transport.send('operation', message)
}
upsert(subject, metadata, value) {
this.send({ verb: 'upsert', subject, metadata, value })
}
update(subject, metadata, key, value) {
this.send({ verb: 'update', subject, metadata, key, value })
}
delete(subject, metadata, key) {
this.send({ verb: 'delete', subject, metadata, key })
}
}

View file

@ -15,7 +15,7 @@ class BaseUpdater {
// Reduce the current list of attributes, // Reduce the current list of attributes,
// to find the object to set the property onto // to find the object to set the property onto
const objectToSet = parts.reduce((currentObj, part) => { const objectToSet = parts.reduce((currentObj, part) => {
if (part in currentObj) return currentObj[part] if (currentObj !== undefined && part in currentObj) return currentObj[part]
}, obj) }, obj)
// In case the given path doesn't exist, stop here // In case the given path doesn't exist, stop here

View file

@ -9,7 +9,7 @@ export class WebSocketTransport {
} }
onMessage(wsMessage) { onMessage(wsMessage) {
this.receiver.dispatch(JSON.parse(wsMessage.data)) this.receiver.receive(JSON.parse(wsMessage.data))
} }
send(kind, payload) { send(kind, payload) {

View file

@ -5,7 +5,7 @@ import pkg from 'chai'
const { expect } = pkg const { expect } = pkg
import { MapUpdater } from '../js/modules/sync/updaters.js' import { MapUpdater } from '../js/modules/sync/updaters.js'
import { MessagesDispatcher, SyncEngine } from '../js/modules/sync/engine.js' import { SyncEngine } from '../js/modules/sync/engine.js'
describe('SyncEngine', () => { describe('SyncEngine', () => {
it('should initialize methods even before start', function () { it('should initialize methods even before start', function () {
@ -16,10 +16,9 @@ describe('SyncEngine', () => {
}) })
}) })
describe('MessageDispatcher', () => { describe('#dispatch', function () {
describe('#dispatch', function () {
it('should raise an error on unknown updater', function () { it('should raise an error on unknown updater', function () {
const dispatcher = new MessagesDispatcher({}) const dispatcher = new SyncEngine({})
expect(() => { expect(() => {
dispatcher.dispatch({ dispatcher.dispatch({
kind: 'operation', kind: 'operation',
@ -29,7 +28,7 @@ describe('MessageDispatcher', () => {
}).to.throw(Error) }).to.throw(Error)
}) })
it('should produce an error on malformated messages', function () { it('should produce an error on malformated messages', function () {
const dispatcher = new MessagesDispatcher({}) const dispatcher = new SyncEngine({})
expect(() => { expect(() => {
dispatcher.dispatch({ dispatcher.dispatch({
yeah: 'yeah', yeah: 'yeah',
@ -38,14 +37,13 @@ describe('MessageDispatcher', () => {
}).to.throw(Error) }).to.throw(Error)
}) })
it('should raise an unknown operations', function () { it('should raise an unknown operations', function () {
const dispatcher = new MessagesDispatcher({}) const dispatcher = new SyncEngine({})
expect(() => { expect(() => {
dispatcher.dispatch({ dispatcher.dispatch({
kind: 'something-else', kind: 'something-else',
}) })
}).to.throw(Error) }).to.throw(Error)
}) })
})
}) })
describe('Updaters', () => { describe('Updaters', () => {