diff --git a/src/client.ts b/src/client.ts index e7f9c81..ca8eca6 100644 --- a/src/client.ts +++ b/src/client.ts @@ -3,6 +3,7 @@ import { createWebsocketClient } from './socketAdapter/websocketClient'; import { parsePath } from './parsePath'; import { createStore, Store } from './store'; import { isObject, joinPath, mergeDiff, traverseData } from './utils'; +import { createUpdateBatcher } from './updateBatcher'; type Unsubscriber = () => void; @@ -22,10 +23,12 @@ export function SocketDBClient({ url, store = createStore(), socketClient, + updateInterval = 50, }: { url?: string; store?: Store; socketClient?: SocketClient; + updateInterval?: number; } = {}) { if (!url && !socketClient) url = @@ -37,6 +40,10 @@ export function SocketDBClient({ const subscribedPaths: string[] = []; const updateListener: UpdateListener = {}; + const queueUpdate = createUpdateBatcher((diff) => { + socketClient.send('update', { data: diff }); + }, updateInterval); + let connectionLost = false; socketClient.onConnect(() => { if (connectionLost) { @@ -180,8 +187,7 @@ export function SocketDBClient({ set(value) { if (!connectionLost) { const diff = store.put(creatUpdate(path, value)); - if (diff && Object.keys(diff).length > 0) - socketClient.send('update', { data: diff }); + if (diff && Object.keys(diff).length > 0) queueUpdate(diff); notifySubscriber(diff); } return this; diff --git a/src/server.ts b/src/server.ts index f8008ea..5116187 100644 --- a/src/server.ts +++ b/src/server.ts @@ -1,6 +1,7 @@ import { SocketServer } from './socketAdapter/socketServer'; import { createWebsocketServer } from './socketAdapter/websocketServer'; import { createStore, Store } from './store'; +import { createUpdateBatcher } from './updateBatcher'; import { isObject, joinPath, mergeDiff, traverseData } from './utils'; type Subscribtions = { @@ -25,24 +26,11 @@ export function SocketDBServer({ } = {}): SocketDB { let subscriber: Subscribtions = {}; - let queuedUpdate: any; - let pendingUpdate = null; - - function queueUpdate(diff: any) { - if (pendingUpdate) { - mergeDiff(diff, queuedUpdate); - } else { - queuedUpdate = diff; - pendingUpdate = setTimeout(() => { - pendingUpdate = null; - notifySubscibers(queuedUpdate); - }, updateInterval); - } - } + const queue = createUpdateBatcher(notifySubscibers, updateInterval); function update(data: any) { const diff = store.put(data); - queueUpdate(diff); + queue(diff); } function notifySubscibers(diff: any) { diff --git a/src/updateBatcher.ts b/src/updateBatcher.ts new file mode 100644 index 0000000..2b6b80f --- /dev/null +++ b/src/updateBatcher.ts @@ -0,0 +1,27 @@ +import { mergeDiff } from './utils'; + +type Queue = (diff: any) => void; + +export function createUpdateBatcher( + flush: (diff: any) => void, + updateInterval: number +): Queue { + if (!updateInterval) return flush; + + let queuedUpdate: any; + let pendingUpdate = null; + + // WARNING: queue function has sideeffects, + // as diff is merged in place + return (diff: any) => { + if (pendingUpdate) { + mergeDiff(diff, queuedUpdate); + } else { + queuedUpdate = diff; + pendingUpdate = setTimeout(() => { + pendingUpdate = null; + flush(queuedUpdate); + }, updateInterval); + } + }; +} diff --git a/test/client.test.ts b/test/client.test.ts index 0f349c4..3267347 100644 --- a/test/client.test.ts +++ b/test/client.test.ts @@ -26,6 +26,38 @@ test('emits update object for path', (done) => { client.get('players').get('1').get('name').set('Patrick'); }); +test('should batch updates', (done) => { + let sendCount = 0; + const socketClient: SocketClient = { + onConnect() {}, + onDisconnect() {}, + off() {}, + on() {}, + send(event, { data }) { + expect(event).toEqual('update'); + expect(data).toEqual({ + players: { + 1: { + name: 'Star', + hp: 100, + }, + }, + }); + sendCount++; + }, + }; + const client = SocketDBClient({ socketClient, updateInterval: 10 }); + + client.get('players').get('1').get('name').set('Patrick'); + client.get('players').get('1').get('name').set('Star'); + client.get('players').get('1').get('hp').set(100); + + setTimeout(() => { + expect(sendCount).toBe(1); + done(); + }, 50); +}); + test('merges data on update', (done) => { const store = createStore(); diff --git a/test/server.test.ts b/test/server.test.ts index 01f0149..0a42734 100644 --- a/test/server.test.ts +++ b/test/server.test.ts @@ -365,4 +365,47 @@ test('only send data if client is subscribed', (done) => { }, 100); }); -// TODO: should not notify socket about own update +test('should batch updates', (done) => { + const store = createStore(); + let connect: (client: Socket, id: string) => void; + const { addListener, removeListener, notify } = createEventBroker(); + + const socketServer: SocketServer = { + onConnection(callback) { + connect = callback; + }, + }; + SocketDBServer({ store, socketServer, updateInterval: 10 }); + + let receivedCount = 0; + connect( + { + onDisconnect() {}, + on: addListener, + off: removeListener, + send(event, { data }) { + if (receivedCount === 0) { + expect(data).toEqual(null); + } + if (receivedCount === 1) { + expect(data).toBe('b'); + } + receivedCount++; + }, + }, + '1' + ); + + notify('subscribe', { path: 'player' }); + + notify('update', { data: { player: 'a' } }); + notify('update', { data: { player: 'b' } }); + + setTimeout(() => { + // first: null, second: 'b' + expect(receivedCount).toBe(2); + done(); + }, 50); +}); + +// TODO: should not notify client about its own update