Skip to content

Commit

Permalink
CSC POC ontop of Parser
Browse files Browse the repository at this point in the history
  • Loading branch information
sjpotter committed Oct 31, 2024
1 parent 4708736 commit 33fb92a
Show file tree
Hide file tree
Showing 16 changed files with 273 additions and 54 deletions.
3 changes: 3 additions & 0 deletions packages/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import RedisSentinel from './lib/sentinel';
export { RedisSentinelOptions, RedisSentinelType } from './lib/sentinel/types';
export const createSentinel = RedisSentinel.create;

import { BasicClientSideCache, BasicPooledClientSideCache } from './lib/client/cache';

Check failure on line 23 in packages/client/index.ts

View workflow job for this annotation

GitHub Actions / tests (20, 7.4.0-v1)

Cannot find module './lib/client/cache' or its corresponding type declarations.

Check failure on line 23 in packages/client/index.ts

View workflow job for this annotation

GitHub Actions / tests (18, 6.2.6-v17)

Cannot find module './lib/client/cache' or its corresponding type declarations.

Check failure on line 23 in packages/client/index.ts

View workflow job for this annotation

GitHub Actions / tests (18, 7.2.0-v13)

Cannot find module './lib/client/cache' or its corresponding type declarations.

Check failure on line 23 in packages/client/index.ts

View workflow job for this annotation

GitHub Actions / tests (20, 6.2.6-v17)

Cannot find module './lib/client/cache' or its corresponding type declarations.

Check failure on line 23 in packages/client/index.ts

View workflow job for this annotation

GitHub Actions / tests (20, 7.2.0-v13)

Cannot find module './lib/client/cache' or its corresponding type declarations.

Check failure on line 23 in packages/client/index.ts

View workflow job for this annotation

GitHub Actions / tests (18, 7.4.0-v1)

Cannot find module './lib/client/cache' or its corresponding type declarations.
export { BasicClientSideCache, BasicPooledClientSideCache };

// export { GeoReplyWith } from './lib/commands/generic-transformers';

// export { SetOptions } from './lib/commands/SET';
Expand Down
21 changes: 20 additions & 1 deletion packages/client/lib/client/commands-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ export default class RedisCommandsQueue {
return this.#pubSub.isActive;
}

#invalidateCallback?: (key: RedisArgument | null) => unknown;

constructor(
respVersion: RespVersions,
maxLength: number | null | undefined,
Expand Down Expand Up @@ -109,13 +111,30 @@ export default class RedisCommandsQueue {
onErrorReply: err => this.#onErrorReply(err),
onPush: push => {
if (!this.#onPush(push)) {

switch (push[0].toString()) {
case "invalidate": {
if (this.#invalidateCallback) {
if (push[1] !== null) {
for (const key of push[1]) {
this.#invalidateCallback(key);
}
} else {
this.#invalidateCallback(null);
}
}
break;
}
}
}
},
getTypeMapping: () => this.#getTypeMapping()
});
}

setInvalidateCallback(callback?: (key: RedisArgument | null) => unknown) {
this.#invalidateCallback = callback;
}

addCommand<T>(
args: ReadonlyArray<RedisArgument>,
options?: CommandOptions
Expand Down
62 changes: 50 additions & 12 deletions packages/client/lib/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { ScanOptions, ScanCommonOptions } from '../commands/SCAN';
import { RedisLegacyClient, RedisLegacyClientType } from './legacy-mode';
import { RedisPoolOptions, RedisClientPool } from './pool';
import { RedisVariadicArgument, parseArgs, pushVariadicArguments } from '../commands/generic-transformers';
import { BasicClientSideCache, ClientSideCacheConfig, ClientSideCacheProvider } from './cache';

Check failure on line 18 in packages/client/lib/client/index.ts

View workflow job for this annotation

GitHub Actions / tests (20, 7.4.0-v1)

Cannot find module './cache' or its corresponding type declarations.

Check failure on line 18 in packages/client/lib/client/index.ts

View workflow job for this annotation

GitHub Actions / tests (20, 7.4.0-v1)

Cannot find module './cache' or its corresponding type declarations.

Check failure on line 18 in packages/client/lib/client/index.ts

View workflow job for this annotation

GitHub Actions / tests (18, 6.2.6-v17)

Cannot find module './cache' or its corresponding type declarations.

Check failure on line 18 in packages/client/lib/client/index.ts

View workflow job for this annotation

GitHub Actions / tests (18, 6.2.6-v17)

Cannot find module './cache' or its corresponding type declarations.

Check failure on line 18 in packages/client/lib/client/index.ts

View workflow job for this annotation

GitHub Actions / tests (18, 7.2.0-v13)

Cannot find module './cache' or its corresponding type declarations.

Check failure on line 18 in packages/client/lib/client/index.ts

View workflow job for this annotation

GitHub Actions / tests (18, 7.2.0-v13)

Cannot find module './cache' or its corresponding type declarations.

Check failure on line 18 in packages/client/lib/client/index.ts

View workflow job for this annotation

GitHub Actions / tests (20, 6.2.6-v17)

Cannot find module './cache' or its corresponding type declarations.

Check failure on line 18 in packages/client/lib/client/index.ts

View workflow job for this annotation

GitHub Actions / tests (20, 6.2.6-v17)

Cannot find module './cache' or its corresponding type declarations.

Check failure on line 18 in packages/client/lib/client/index.ts

View workflow job for this annotation

GitHub Actions / tests (20, 7.2.0-v13)

Cannot find module './cache' or its corresponding type declarations.

Check failure on line 18 in packages/client/lib/client/index.ts

View workflow job for this annotation

GitHub Actions / tests (20, 7.2.0-v13)

Cannot find module './cache' or its corresponding type declarations.

Check failure on line 18 in packages/client/lib/client/index.ts

View workflow job for this annotation

GitHub Actions / tests (18, 7.4.0-v1)

Cannot find module './cache' or its corresponding type declarations.

Check failure on line 18 in packages/client/lib/client/index.ts

View workflow job for this annotation

GitHub Actions / tests (18, 7.4.0-v1)

Cannot find module './cache' or its corresponding type declarations.
import { BasicCommandParser, CommandParser } from './parser';

export interface RedisClientOptions<
Expand Down Expand Up @@ -72,6 +73,10 @@ export interface RedisClientOptions<
* TODO
*/
commandOptions?: CommandOptions<TYPE_MAPPING>;
/**
* TODO
*/
clientSideCache?: ClientSideCacheProvider | ClientSideCacheConfig;
}

type WithCommands<
Expand Down Expand Up @@ -280,10 +285,12 @@ export default class RedisClient<
#monitorCallback?: MonitorCallback<TYPE_MAPPING>;
private _self = this;
private _commandOptions?: CommandOptions<TYPE_MAPPING>;

#dirtyWatch?: string;
#epoch: number;
#watchEpoch?: number;

#clientSideCache?: ClientSideCacheProvider;

get options(): RedisClientOptions<M, F, S, RESP> | undefined {
return this._self.#options;
}
Expand All @@ -300,6 +307,11 @@ export default class RedisClient<
return this._self.#queue.isPubSubActive;
}

get socketEpoch() {
return this._self.#socket.socketEpoch;
}


get isWatching() {
return this._self.#watchEpoch !== undefined;
}
Expand All @@ -310,10 +322,20 @@ export default class RedisClient<

constructor(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>) {
super();

this.#options = this.#initiateOptions(options);
this.#queue = this.#initiateQueue();
this.#socket = this.#initiateSocket();
this.#epoch = 0;

if (options?.clientSideCache) {
if (options.clientSideCache instanceof ClientSideCacheProvider) {
this.#clientSideCache = options.clientSideCache;
} else {
const cscConfig = options.clientSideCache;
this.#clientSideCache = new BasicClientSideCache(cscConfig);
}
this.#queue.setInvalidateCallback(this.#clientSideCache.invalidate.bind(this.#clientSideCache));
}
}

#initiateOptions(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisClientOptions<M, F, S, RESP, TYPE_MAPPING> | undefined {
Expand Down Expand Up @@ -347,7 +369,6 @@ export default class RedisClient<

#handshake(selectedDB: number) {
const commands = [];

if (this.#options?.RESP) {
const hello: HelloOptions = {};

Expand Down Expand Up @@ -392,6 +413,13 @@ export default class RedisClient<
);
}

if (this.#clientSideCache) {
const tracking = this.#clientSideCache.trackingOn();
if (tracking) {
commands.push(tracking);
}
}

return commands;
}

Expand Down Expand Up @@ -445,6 +473,7 @@ export default class RedisClient<
})
.on('error', err => {
this.emit('error', err);
this.#clientSideCache?.onError();
if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) {
this.#queue.flushWaitingForReply(err);
} else {
Expand All @@ -453,7 +482,6 @@ export default class RedisClient<
})
.on('connect', () => this.emit('connect'))
.on('ready', () => {
this.#epoch++;
this.emit('ready');
this.#setPingTimer();
this.#maybeScheduleWrite();
Expand Down Expand Up @@ -581,13 +609,21 @@ export default class RedisClient<
commandOptions: CommandOptions<TYPE_MAPPING> | undefined,
transformReply: TransformReply | undefined,
) {
const reply = await this.sendCommand(parser.redisArgs, commandOptions);
const csc = this._self.#clientSideCache;
const defaultTypeMapping = this._self.#options?.commandOptions === commandOptions;

if (transformReply) {
return transformReply(reply, parser.preserve, commandOptions?.typeMapping);
}
const fn = () => { return this.sendCommand(parser.redisArgs, commandOptions) };

if (csc && command.CACHEABLE && defaultTypeMapping) {
return await csc.handleCache(this._self, parser as BasicCommandParser, fn, transformReply, commandOptions?.typeMapping);
} else {
const reply = await fn();

return reply;
if (transformReply) {
return transformReply(reply, parser.preserve, commandOptions?.typeMapping);
}
return reply;
}
}

/**
Expand Down Expand Up @@ -752,7 +788,7 @@ export default class RedisClient<
const reply = await this._self.sendCommand(
pushVariadicArguments(['WATCH'], key)
);
this._self.#watchEpoch ??= this._self.#epoch;
this._self.#watchEpoch ??= this._self.socketEpoch;
return reply as unknown as ReplyWithTypeMapping<SimpleStringReply<'OK'>, TYPE_MAPPING>;
}

Expand Down Expand Up @@ -819,7 +855,7 @@ export default class RedisClient<
}

const chainId = Symbol('Pipeline Chain'),
promise = Promise.all(
promise = Promise.allSettled(
commands.map(({ args }) => this._self.#queue.addCommand(args, {
chainId,
typeMapping: this._commandOptions?.typeMapping
Expand Down Expand Up @@ -855,7 +891,7 @@ export default class RedisClient<
throw new WatchError(dirtyWatch);
}

if (watchEpoch && watchEpoch !== this._self.#epoch) {
if (watchEpoch && watchEpoch !== this._self.socketEpoch) {
throw new WatchError('Client reconnected after WATCH');
}

Expand Down Expand Up @@ -1075,6 +1111,7 @@ export default class RedisClient<
return new Promise<void>(resolve => {
clearTimeout(this._self.#pingTimer);
this._self.#socket.close();
this._self.#clientSideCache?.onClose();

if (this._self.#queue.isEmpty()) {
this._self.#socket.destroySocket();
Expand All @@ -1099,6 +1136,7 @@ export default class RedisClient<
clearTimeout(this._self.#pingTimer);
this._self.#queue.flushAll(new DisconnectsClientError());
this._self.#socket.destroy();
this._self.#clientSideCache?.onClose();
}

ref() {
Expand Down
10 changes: 9 additions & 1 deletion packages/client/lib/client/linked-list.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ export class DoublyLinkedList<T> {
export interface SinglyLinkedNode<T> {
value: T;
next: SinglyLinkedNode<T> | undefined;
removed: boolean;
}

export class SinglyLinkedList<T> {
Expand All @@ -140,7 +141,8 @@ export class SinglyLinkedList<T> {

const node = {
value,
next: undefined
next: undefined,
removed: false
};

if (this.#head === undefined) {
Expand All @@ -151,6 +153,9 @@ export class SinglyLinkedList<T> {
}

remove(node: SinglyLinkedNode<T>, parent: SinglyLinkedNode<T> | undefined) {
if (node.removed) {
throw new Error("node already removed");
}
--this.#length;

if (this.#head === node) {
Expand All @@ -165,6 +170,8 @@ export class SinglyLinkedList<T> {
} else {
parent!.next = node.next;
}

node.removed = true;
}

shift() {
Expand All @@ -177,6 +184,7 @@ export class SinglyLinkedList<T> {
this.#head = node.next;
}

node.removed = true;
return node.value;
}

Expand Down
5 changes: 5 additions & 0 deletions packages/client/lib/client/parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ export class BasicCommandParser implements CommandParser {
return this.#keys[0];
}

get cacheKey() {
let cacheKey = this.#redisArgs.map((arg) => arg.length).join('_');
return cacheKey + '_' + this.#redisArgs.join('_');
}

push(...arg: Array<RedisArgument>) {
this.#redisArgs.push(...arg);
};
Expand Down
Loading

0 comments on commit 33fb92a

Please sign in to comment.