Skip to content

Commit

Permalink
feat(client): add client side update batching
Browse files Browse the repository at this point in the history
the client now batches and sends updates in intervals (default: every 50ms). can be disabled by
setting updateInterval to 0
  • Loading branch information
TimoBechtel committed Jan 21, 2021
1 parent 184b7d0 commit 720c33a
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 18 deletions.
10 changes: 8 additions & 2 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { createWebsocketClient } from './socketAdapter/websocketClient';
import { parsePath } from './parsePath';
import { createStore, Store } from './store';
import { isObject, joinPath, mergeDiff, traverseData } from './utils';
import { createUpdateBatcher } from './updateBatcher';

type Unsubscriber = () => void;

Expand All @@ -22,10 +23,12 @@ export function SocketDBClient({
url,
store = createStore(),
socketClient,
updateInterval = 50,
}: {
url?: string;
store?: Store;
socketClient?: SocketClient;
updateInterval?: number;
} = {}) {
if (!url && !socketClient)
url =
Expand All @@ -37,6 +40,10 @@ export function SocketDBClient({
const subscribedPaths: string[] = [];
const updateListener: UpdateListener = {};

const queueUpdate = createUpdateBatcher((diff) => {
socketClient.send('update', { data: diff });
}, updateInterval);

let connectionLost = false;
socketClient.onConnect(() => {
if (connectionLost) {
Expand Down Expand Up @@ -180,8 +187,7 @@ export function SocketDBClient({
set(value) {
if (!connectionLost) {
const diff = store.put(creatUpdate(path, value));
if (diff && Object.keys(diff).length > 0)
socketClient.send('update', { data: diff });
if (diff && Object.keys(diff).length > 0) queueUpdate(diff);
notifySubscriber(diff);
}
return this;
Expand Down
18 changes: 3 additions & 15 deletions src/server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { SocketServer } from './socketAdapter/socketServer';
import { createWebsocketServer } from './socketAdapter/websocketServer';
import { createStore, Store } from './store';
import { createUpdateBatcher } from './updateBatcher';
import { isObject, joinPath, mergeDiff, traverseData } from './utils';

type Subscribtions = {
Expand All @@ -25,24 +26,11 @@ export function SocketDBServer({
} = {}): SocketDB {
let subscriber: Subscribtions = {};

let queuedUpdate: any;
let pendingUpdate = null;

function queueUpdate(diff: any) {
if (pendingUpdate) {
mergeDiff(diff, queuedUpdate);
} else {
queuedUpdate = diff;
pendingUpdate = setTimeout(() => {
pendingUpdate = null;
notifySubscibers(queuedUpdate);
}, updateInterval);
}
}
const queue = createUpdateBatcher(notifySubscibers, updateInterval);

function update(data: any) {
const diff = store.put(data);
queueUpdate(diff);
queue(diff);
}

function notifySubscibers(diff: any) {
Expand Down
27 changes: 27 additions & 0 deletions src/updateBatcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { mergeDiff } from './utils';

type Queue = (diff: any) => void;

export function createUpdateBatcher(
flush: (diff: any) => void,
updateInterval: number
): Queue {
if (!updateInterval) return flush;

let queuedUpdate: any;
let pendingUpdate = null;

// WARNING: queue function has sideeffects,
// as diff is merged in place
return (diff: any) => {
if (pendingUpdate) {
mergeDiff(diff, queuedUpdate);
} else {
queuedUpdate = diff;
pendingUpdate = setTimeout(() => {
pendingUpdate = null;
flush(queuedUpdate);
}, updateInterval);
}
};
}
32 changes: 32 additions & 0 deletions test/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,38 @@ test('emits update object for path', (done) => {
client.get('players').get('1').get('name').set('Patrick');
});

test('should batch updates', (done) => {
let sendCount = 0;
const socketClient: SocketClient = {
onConnect() {},
onDisconnect() {},
off() {},
on() {},
send(event, { data }) {
expect(event).toEqual('update');
expect(data).toEqual({
players: {
1: {
name: 'Star',
hp: 100,
},
},
});
sendCount++;
},
};
const client = SocketDBClient({ socketClient, updateInterval: 10 });

client.get('players').get('1').get('name').set('Patrick');
client.get('players').get('1').get('name').set('Star');
client.get('players').get('1').get('hp').set(100);

setTimeout(() => {
expect(sendCount).toBe(1);
done();
}, 50);
});

test('merges data on update', (done) => {
const store = createStore();

Expand Down
45 changes: 44 additions & 1 deletion test/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -365,4 +365,47 @@ test('only send data if client is subscribed', (done) => {
}, 100);
});

// TODO: should not notify socket about own update
test('should batch updates', (done) => {
const store = createStore();
let connect: (client: Socket, id: string) => void;
const { addListener, removeListener, notify } = createEventBroker();

const socketServer: SocketServer = {
onConnection(callback) {
connect = callback;
},
};
SocketDBServer({ store, socketServer, updateInterval: 10 });

let receivedCount = 0;
connect(
{
onDisconnect() {},
on: addListener,
off: removeListener,
send(event, { data }) {
if (receivedCount === 0) {
expect(data).toEqual(null);
}
if (receivedCount === 1) {
expect(data).toBe('b');
}
receivedCount++;
},
},
'1'
);

notify('subscribe', { path: 'player' });

notify('update', { data: { player: 'a' } });
notify('update', { data: { player: 'b' } });

setTimeout(() => {
// first: null, second: 'b'
expect(receivedCount).toBe(2);
done();
}, 50);
});

// TODO: should not notify client about its own update

0 comments on commit 720c33a

Please sign in to comment.