Skip to content

Commit

Permalink
feat: batch all socketevents
Browse files Browse the repository at this point in the history
batches all socket events, this includes subscribe, unsubscribe, subscribeKeys calls. this way
nested subscribe calls (e.g. "on" inside "each") will now only emit a single socket package (with a
list of multiple events)

BREAKING CHANGE: this breaks compatibility with previous versions. make sure to upgrade both client and server
  • Loading branch information
TimoBechtel committed May 15, 2022
1 parent 8659c99 commit 8a06672
Show file tree
Hide file tree
Showing 9 changed files with 742 additions and 700 deletions.
59 changes: 59 additions & 0 deletions src/batchedSocketEvents.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { createQueue } from './queue';
import { createEventBroker } from './socketAdapter/eventBroker';
import { Socket } from './socketAdapter/socket';

type GenericEvents = {
[key: string]: any;
};

export type GenericQueuedEvent<Events extends GenericEvents> = {
event: keyof Events;
data: Events[keyof Events];
};

export function createBatchedClient<Events extends GenericEvents>(
connection: Socket,
interval?: number
) {
type QueuedEvent = GenericQueuedEvent<Events>;

const events = createEventBroker<QueuedEvent['data']>();

const queue = createQueue<QueuedEvent, QueuedEvent[]>({
batch(current, update) {
return [...current, update];
},
createState() {
return [];
},
flush(events) {
connection.send('data', { events });
},
updateInterval: interval,
});

connection.on('data', (data: { events: QueuedEvent[] }) => {
data?.events?.forEach((event) => {
events.notify(event.event as string, event.data);
});
});

return {
queue<K extends keyof Events>(event: K, data: Events[K]) {
queue({ event, data });
},
subscribe<K extends keyof Events>(
event: K,
callback: (data: Events[K]) => void
) {
events.addListener(event as string, callback as any);
return () => events.removeListener(event as string, callback as any);
},
unsubscribe<K extends keyof Events>(
event: K,
callback?: (data: Events[K]) => void
) {
events.removeListener(event as string, callback as any);
},
};
}
31 changes: 18 additions & 13 deletions src/client.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { createHooks, Hook } from 'krog';
import { createBatchedClient } from './batchedSocketEvents';
import {
isNode,
KeyValue,
Expand Down Expand Up @@ -85,31 +86,35 @@ export function SocketDBClient<Schema extends SchemaDefinition = any>({
window.location.hostname
}:${window.location.port}`
: 'ws://localhost:8080');
let socketClient: SocketClient =
let connection: SocketClient =
_socketClient || createWebsocketClient({ url });

const socketEvents = createBatchedClient(connection, updateInterval);

const hooks = createHooks<ClientHooks>();

registerPlugins(plugins);

const queueUpdate = createUpdateBatcher((diff) => {
socketClient.send('update', { data: diff });
// not deep cloning diff (for perf.), because we do not need to
// as diff will be cleared after sending update (see queue)
socketEvents.queue('update', { data: diff });
}, updateInterval);

const subscriptions = {
update: createSubscriptionManager<Node>({
createPathSubscription(path, notify) {
if (isWildcardPath(path)) {
socketClient.on(path, ({ data: keys }: { data: string[] }) => {
socketEvents.subscribe(path, ({ data: keys }: { data: string[] }) => {
notify(() => {
const update: { [key: string]: any } = {};
keys.forEach((key) => (update[key] = null));
return nodeify(update);
});
});
socketClient.send('subscribeKeys', { path: trimWildcard(path) });
socketEvents.queue('subscribeKeys', { path: trimWildcard(path) });
} else {
socketClient.on(path, ({ data }: { data: BatchedUpdate }) => {
socketEvents.subscribe(path, ({ data }: { data: BatchedUpdate }) => {
data.delete?.forEach((path) => {
store.del(path);
subscriptions.update.notify(path, (path) => store.get(path), {
Expand Down Expand Up @@ -143,25 +148,25 @@ export function SocketDBClient<Schema extends SchemaDefinition = any>({
});
}
});
socketClient.send('subscribe', { path });
socketEvents.queue('subscribe', { path });
}
},
destroySubscription(path) {
socketClient.off(path);
socketClient.send('unsubscribe', { path });
socketEvents.unsubscribe(path);
socketEvents.queue('unsubscribe', { path });
},
restoreSubscription(path) {
if (isWildcardPath(path)) {
socketClient.send('subscribeKeys', { path: trimWildcard(path) });
socketEvents.queue('subscribeKeys', { path: trimWildcard(path) });
} else {
socketClient.send('subscribe', { path });
socketEvents.queue('subscribe', { path });
}
},
}),
};

let connectionLost = false;
socketClient.onConnect(() => {
connection.onConnect(() => {
if (connectionLost) {
// reattach subscriptions on every reconnect
subscriptions.update.resubscribe();
Expand All @@ -172,7 +177,7 @@ export function SocketDBClient<Schema extends SchemaDefinition = any>({
hooks.call('client:firstConnect');
}
});
socketClient.onDisconnect(() => {
connection.onDisconnect(() => {
hooks.call('client:disconnect');
connectionLost = true;
});
Expand Down Expand Up @@ -274,7 +279,7 @@ export function SocketDBClient<Schema extends SchemaDefinition = any>({
return {
...get(''),
disconnect() {
socketClient.close();
connection.close();
},
};
}
Expand Down
33 changes: 33 additions & 0 deletions src/queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
export type Queue<T> = (data: T) => void;

export function createQueue<QueuedData, BatchedData>({
batch,
createState,
flush,
updateInterval,
}: {
createState: () => BatchedData;
batch: (current: BatchedData, update: QueuedData) => BatchedData;
flush: (batchedData: BatchedData) => void;
updateInterval?: number;
}): Queue<QueuedData> {
if (!updateInterval)
return (update) => {
flush(batch(createState(), update));
};

let current = createState();
let pendingUpdate: ReturnType<typeof setTimeout> | null = null;

return (update) => {
if (!pendingUpdate) {
current = createState();
pendingUpdate = setTimeout(() => {
pendingUpdate = null;
flush(current);
}, updateInterval);
}

current = batch(current, update);
};
}
27 changes: 17 additions & 10 deletions src/server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { createHooks, Hook } from 'krog';
import { createBatchedClient } from './batchedSocketEvents';
import { Node, traverseNode } from './node';
import { joinPath } from './path';
import { Plugin } from './plugin';
Expand Down Expand Up @@ -174,8 +175,9 @@ export function SocketDBServer({
}
}

socketServer.onConnection((client, id, context = {}) => {
socketServer.onConnection((connection, id, context = {}) => {
const clientContext = { id, context };
const socketEvents = createBatchedClient(connection, updateInterval);
hooks.call(
'server:clientConnect',
{
Expand All @@ -185,7 +187,7 @@ export function SocketDBServer({
{ asRef: true }
);

client.onDisconnect(() => {
connection.onDisconnect(() => {
delete subscriber[id];
delete subscriber[id + 'wildcard']; // this should be handled in a cleaner way
hooks.call(
Expand All @@ -194,34 +196,39 @@ export function SocketDBServer({
{ asRef: true }
);
});
client.on('update', ({ data }: { data: BatchedUpdate }) => {
socketEvents.subscribe('update', ({ data }: { data: BatchedUpdate }) => {
data.delete?.forEach((path) => del(path, clientContext));
if (data.change) update(data.change, clientContext);
});
client.on('subscribe', ({ path, once }) => {
client.send(path, { data: { change: store.get(path) } });
socketEvents.subscribe('subscribe', ({ path, once }) => {
socketEvents.queue(path, {
// deepClone to only send the current snapshot, as data might change while queued
data: { change: deepClone(store.get(path)) },
});
if (once) return;
addSubscriber(id, path, (data) => {
client.send(path, { data });
socketEvents.queue(path, { data });
});
});
client.on('unsubscribe', ({ path }) => {
socketEvents.subscribe('unsubscribe', ({ path }) => {
removeSubscriber(id, path);
});
client.on('subscribeKeys', ({ path }) => {
socketEvents.subscribe('subscribeKeys', ({ path }) => {
const data = store.get(path);
const wildcardPath = joinPath(path, '*');
let keys: string[] = [];
if (isObject(data.value)) {
keys = Object.keys(data.value);
client.send(wildcardPath, { data: keys });
// destructure keys to only send the current keys, as they might change while queued
socketEvents.queue(wildcardPath, { data: [...keys] });
}
addSubscriber(id + 'wildcard', path, (data: BatchedUpdate) => {
if (data.change && isObject(data.change.value)) {
const newKeys = Object.keys(data.change.value).filter(
(key) => !keys.includes(key)
);
if (newKeys.length > 0) client.send(wildcardPath, { data: newKeys });
if (newKeys.length > 0)
socketEvents.queue(wildcardPath, { data: newKeys });
keys = [...keys, ...newKeys];
}
});
Expand Down
67 changes: 30 additions & 37 deletions src/updateBatcher.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { Node } from './node';
import { createStore } from './store';
import { createQueue } from './queue';
import { createStore, Store } from './store';
import { isObject } from './utils';

type Queue = (update: Change | Deletion) => void;

export type BatchedUpdate = {
delete?: string[];
change?: Node;
Expand All @@ -20,42 +19,36 @@ type Deletion = {
};

export function createUpdateBatcher(
flush: (batchedUpdate: BatchedUpdate) => void,
flushUpdate: (batchedUpdate: BatchedUpdate) => void,
updateInterval: number
): Queue {
if (!updateInterval)
return (update) => {
) {
return createQueue<
Change | Deletion,
{ diff: Store; deletions: Set<string> }
>({
createState() {
return {
deletions: new Set(),
diff: createStore(),
};
},
batch(current, update) {
if (update.type === 'delete') {
flush({ delete: [update.path] });
current.diff.del(update.path);
current.deletions.add(update.path);
} else if (update.type === 'change') {
flush({ change: update.data });
current.diff.put(update.data);
}
};

let diff = createStore();
let deletions: Set<string> = new Set();
let pendingUpdate: NodeJS.Timeout | null = null;

return (update: Change | Deletion) => {
if (!pendingUpdate) {
diff = createStore();
deletions = new Set();
pendingUpdate = setTimeout(() => {
pendingUpdate = null;
const update: BatchedUpdate = {};
if (deletions.size > 0) update.delete = Array.from(deletions);
const node = diff.get();
if (isObject(node.value) && Object.keys(node.value).length > 0)
update.change = node;
flush(update);
}, updateInterval);
}

if (update.type === 'delete') {
diff.del(update.path);
deletions.add(update.path);
} else if (update.type === 'change') {
diff.put(update.data);
}
};
return current;
},
flush({ deletions, diff }) {
const update: BatchedUpdate = {};
if (deletions.size > 0) update.delete = Array.from(deletions);
const node = diff.get();
if (isObject(node.value) && Object.keys(node.value).length > 0)
update.change = node;
flushUpdate(update);
},
updateInterval,
});
}
Loading

0 comments on commit 8a06672

Please sign in to comment.