Skip to content

Commit

Permalink
refactor: prefix socket events with a context
Browse files Browse the repository at this point in the history
adds a prefix to all socketevents to provide a context. this is a preparation to add more event
types in the future, like "error" or "emit", besides "data" events

BREAKING CHANGE: this breaks compatibility with previous versions. make sure to upgrade both client
and server
  • Loading branch information
TimoBechtel committed May 15, 2022
1 parent ddaab01 commit bb89f10
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 148 deletions.
99 changes: 55 additions & 44 deletions src/client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { createHooks, Hook } from 'krog';
import { createBatchedClient } from './batchedSocketEvents';
import { DATA_CONTEXT, SOCKET_EVENTS } from './constants';
import {
isNode,
KeyValue,
Expand Down Expand Up @@ -101,68 +102,78 @@ export function SocketDBClient<Schema extends SchemaDefinition = any>({
const queueUpdate = createUpdateBatcher((diff) => {
// not deep cloning diff (for perf.), because we do not need to
// as diff will be cleared after sending update (see queue)
socketEvents.queue('update', { data: diff });
socketEvents.queue(SOCKET_EVENTS.data.clientUpdate, { data: diff });
}, updateInterval);

const subscriptions = {
update: createSubscriptionManager<Node>({
createPathSubscription(path, notify) {
if (isWildcardPath(path)) {
socketEvents.subscribe(path, ({ data: keys }: { data: string[] }) => {
notify(() => {
const update: { [key: string]: any } = {};
keys.forEach((key) => (update[key] = null));
return nodeify(update);
});
socketEvents.subscribe(
`${DATA_CONTEXT}:${path}`,
({ data: keys }: { data: string[] }) => {
notify(() => {
const update: { [key: string]: any } = {};
keys.forEach((key) => (update[key] = null));
return nodeify(update);
});
}
);
socketEvents.queue(SOCKET_EVENTS.data.requestKeysSubscription, {
path: trimWildcard(path),
});
socketEvents.queue('subscribeKeys', { path: trimWildcard(path) });
} else {
socketEvents.subscribe(path, ({ data }: { data: BatchedUpdate }) => {
data.delete?.forEach((path) => {
store.del(path);
subscriptions.update.notify(path, (path) => store.get(path), {
recursiveDown: true,
recursiveUp: true,
socketEvents.subscribe(
`${DATA_CONTEXT}:${path}`,
({ data }: { data: BatchedUpdate }) => {
data.delete?.forEach((path) => {
store.del(path);
subscriptions.update.notify(path, (path) => store.get(path), {
recursiveDown: true,
recursiveUp: true,
});
});
});
if (data.change) {
const update = creatUpdate(path, data.change);
store.put(update);
traverseNode(update, (path, data) => {
subscriptions.update.notify(path, () =>
deepClone(store.get(path))
);
if (isObject(data.value)) {
// with child paths, we need to notify all listeners for the wildcard path
subscriptions.update.notify(joinPath(path, '*'), () =>
store.get(path)
if (data.change) {
const update = creatUpdate(path, data.change);
store.put(update);
traverseNode(update, (path, data) => {
subscriptions.update.notify(path, () =>
deepClone(store.get(path))
);
} else {
// notify all subscribers of all child paths that their data has been deleted
// this has the issue, that it notifies even if data has not changed (meaning it was already null)
// examples when this happens:
// - path '/a' is updated with value '1', path '/a/b' is subscribed -> will be notified with value 'null'
// - path '/a' is deleted (set to null), path '/a/b' is subscribed -> will be notified with value 'null'
subscriptions.update.notify(path, nodeify(null), {
excludeSelf: true,
recursiveDown: true,
});
}
});
if (isObject(data.value)) {
// with child paths, we need to notify all listeners for the wildcard path
subscriptions.update.notify(joinPath(path, '*'), () =>
store.get(path)
);
} else {
// notify all subscribers of all child paths that their data has been deleted
// this has the issue, that it notifies even if data has not changed (meaning it was already null)
// examples when this happens:
// - path '/a' is updated with value '1', path '/a/b' is subscribed -> will be notified with value 'null'
// - path '/a' is deleted (set to null), path '/a/b' is subscribed -> will be notified with value 'null'
subscriptions.update.notify(path, nodeify(null), {
excludeSelf: true,
recursiveDown: true,
});
}
});
}
}
});
socketEvents.queue('subscribe', { path });
);
socketEvents.queue(SOCKET_EVENTS.data.requestSubscription, { path });
}
},
destroySubscription(path) {
socketEvents.unsubscribe(path);
socketEvents.queue('unsubscribe', { path });
socketEvents.unsubscribe(`${DATA_CONTEXT}:${path}`);
socketEvents.queue(SOCKET_EVENTS.data.requestUnsubscription, { path });
},
restoreSubscription(path) {
if (isWildcardPath(path)) {
socketEvents.queue('subscribeKeys', { path: trimWildcard(path) });
socketEvents.queue(SOCKET_EVENTS.data.requestKeysSubscription, {
path: trimWildcard(path),
});
} else {
socketEvents.queue('subscribe', { path });
socketEvents.queue(SOCKET_EVENTS.data.requestSubscription, { path });
}
},
}),
Expand Down
10 changes: 10 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
export const DATA_CONTEXT = 'data' as const;

export const SOCKET_EVENTS = {
data: {
clientUpdate: `${DATA_CONTEXT}:update`,
requestSubscription: `${DATA_CONTEXT}:subscribe`,
requestKeysSubscription: `${DATA_CONTEXT}:subscribeKeys`,
requestUnsubscription: `${DATA_CONTEXT}:unsubscribe`,
},
} as const;
87 changes: 52 additions & 35 deletions src/server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { createHooks, Hook } from 'krog';
import { createBatchedClient } from './batchedSocketEvents';
import { DATA_CONTEXT, SOCKET_EVENTS } from './constants';
import { Node, traverseNode } from './node';
import { joinPath } from './path';
import { Plugin } from './plugin';
Expand Down Expand Up @@ -199,43 +200,59 @@ export function SocketDBServer({
{ asRef: true }
);
});
socketEvents.subscribe('update', ({ data }: { data: BatchedUpdate }) => {
data.delete?.forEach((path) => del(path, clientContext));
if (data.change) update(data.change, clientContext);
});
socketEvents.subscribe('subscribe', ({ path, once }) => {
socketEvents.queue(path, {
// deepClone to only send the current snapshot, as data might change while queued
data: { change: deepClone(store.get(path)) },
});
if (once) return;
addSubscriber(id, path, (data) => {
socketEvents.queue(path, { data });
});
});
socketEvents.subscribe('unsubscribe', ({ path }) => {
removeSubscriber(id, path);
});
socketEvents.subscribe('subscribeKeys', ({ path }) => {
const data = store.get(path);
const wildcardPath = joinPath(path, '*');
let keys: string[] = [];
if (isObject(data.value)) {
keys = Object.keys(data.value);
// destructure keys to only send the current keys, as they might change while queued
socketEvents.queue(wildcardPath, { data: [...keys] });
socketEvents.subscribe(
SOCKET_EVENTS.data.clientUpdate,
({ data }: { data: BatchedUpdate }) => {
data.delete?.forEach((path) => del(path, clientContext));
if (data.change) update(data.change, clientContext);
}
);
socketEvents.subscribe(
SOCKET_EVENTS.data.requestSubscription,
({ path, once }) => {
socketEvents.queue(`${DATA_CONTEXT}:${path}`, {
// deepClone to only send the current snapshot, as data might change while queued
data: { change: deepClone(store.get(path)) },
});
if (once) return;
addSubscriber(id, path, (data) => {
socketEvents.queue(`${DATA_CONTEXT}:${path}`, { data });
});
}
addSubscriber(id + 'wildcard', path, (data: BatchedUpdate) => {
if (data.change && isObject(data.change.value)) {
const newKeys = Object.keys(data.change.value).filter(
(key) => !keys.includes(key)
);
if (newKeys.length > 0)
socketEvents.queue(wildcardPath, { data: newKeys });
keys = [...keys, ...newKeys];
);
socketEvents.subscribe(
SOCKET_EVENTS.data.requestUnsubscription,
({ path }) => {
removeSubscriber(id, path);
}
);
socketEvents.subscribe(
SOCKET_EVENTS.data.requestKeysSubscription,
({ path }) => {
const data = store.get(path);
const wildcardPath = joinPath(path, '*');
let keys: string[] = [];
if (isObject(data.value)) {
keys = Object.keys(data.value);
// destructure keys to only send the current keys, as they might change while queued
socketEvents.queue(`${DATA_CONTEXT}:${wildcardPath}`, {
data: [...keys],
});
}
});
});
addSubscriber(id + 'wildcard', path, (data: BatchedUpdate) => {
if (data.change && isObject(data.change.value)) {
const newKeys = Object.keys(data.change.value).filter(
(key) => !keys.includes(key)
);
if (newKeys.length > 0)
socketEvents.queue(`${DATA_CONTEXT}:${wildcardPath}`, {
data: newKeys,
});
keys = [...keys, ...newKeys];
}
});
}
);
});
return api;
}
Loading

0 comments on commit bb89f10

Please sign in to comment.