Skip to content

Commit

Permalink
use @heyputer/kv.js for in-memory cache, simplify and test
Browse files Browse the repository at this point in the history
  • Loading branch information
fquffio committed Feb 13, 2025
1 parent 99aa0ae commit e7760c7
Show file tree
Hide file tree
Showing 8 changed files with 330 additions and 150 deletions.
5 changes: 5 additions & 0 deletions .changeset/dry-lions-occur.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@chialab/sveltekit-utils': minor
---

Use `@heyputer/kv.js` for in-memory cache, simplify code and increase test coverage, add `clearPattern()` method to base cache interface.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"lint-fix-all": "yarn run eslint-fix && yarn run prettier-fix"
},
"dependencies": {
"@heyputer/kv.js": "^0.1.9",
"cookie": "^1.0.2",
"pino": "^9.5.0",
"redis": "^4.7.0",
Expand Down
14 changes: 11 additions & 3 deletions src/lib/server/cache/base.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import type { JitterFn, JitterMode } from '../../utils/misc.js';
import type { StorageReadWriter } from '../storage.js';

/**
* Base class for caching.
*/
export abstract class BaseCache<V> {
export abstract class BaseCache<V> implements StorageReadWriter<V> {
/**
* Read an item from the cache, if present.
*
Expand Down Expand Up @@ -47,6 +48,13 @@ export abstract class BaseCache<V> {
*/
public abstract clear(prefix?: string): Promise<void>;

/**
* Flush cache removing all items matching a pattern.
*
* @param pattern Pattern to clear. May include the wildcard `*`.
*/
public abstract clearPattern(pattern: string): Promise<void>;

/**
* Read or set an item in the cache.
*
Expand Down Expand Up @@ -74,12 +82,12 @@ export abstract class BaseCache<V> {
jitter?: JitterMode | JitterFn | undefined,
): Promise<V | undefined> {
const cached = await this.get(key);
if (typeof cached !== 'undefined') {
if (cached !== undefined) {
return cached;
}

const value = await callback();
if (typeof value !== 'undefined') {
if (value !== undefined) {
await this.set(key, value, ttl, jitter);
}

Expand Down
141 changes: 26 additions & 115 deletions src/lib/server/cache/in-memory.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import kvjs from '@heyputer/kv.js';
import { createJitter, JitterMode, type JitterFn } from '../../utils/misc.js';
import { addPrefix, stripPrefix } from '../../utils/string.js';
import { BaseCache } from './base.js';

type ValueWrapper<T> = { value: T; expire: number | undefined };

type InMemoryCacheOptions = {
maxItems?: number;
keyPrefix?: string;
defaultTTL?: number;
defaultJitter?: JitterMode | JitterFn;
Expand All @@ -13,17 +12,17 @@ type InMemoryCacheOptions = {
/** Simple cache with TTL and cap to maximum items stored. */
export class InMemoryCache<V> extends BaseCache<V> {
readonly #options: InMemoryCacheOptions;
readonly #inner: Map<string, ValueWrapper<V>> = new Map();
readonly #inner: kvjs;

public static init<V>(options: InMemoryCacheOptions): InMemoryCache<V> {
return new this<V>(options);
}

private constructor(options: InMemoryCacheOptions, map?: Map<string, ValueWrapper<V>>) {
private constructor(options: InMemoryCacheOptions, store?: kvjs) {
super();

this.#options = Object.freeze({ ...options });
this.#inner = map ?? new Map();
this.#inner = store ?? new kvjs();
}

public child<V2 extends V>(
Expand All @@ -34,24 +33,14 @@ export class InMemoryCache<V> extends BaseCache<V> {
{
...this.#options,
...options,
keyPrefix: this.#key(keyPrefix),
keyPrefix: addPrefix(this.#options.keyPrefix, keyPrefix),
},
this.#inner as Map<`${typeof keyPrefix}${string}`, ValueWrapper<V2>>,
this.#inner,
);
}

public async get(key: string): Promise<V | undefined> {
const cached = this.#inner.get(this.#key(key));
if (typeof cached === 'undefined') {
return undefined;
}
if (!InMemoryCache.#isValid(cached)) {
this.#inner.delete(this.#key(key));

return undefined;
}

return cached.value;
return this.#inner.get(addPrefix(this.#options.keyPrefix, key)) as V | undefined;
}

public async set(
Expand All @@ -61,112 +50,34 @@ export class InMemoryCache<V> extends BaseCache<V> {
jitter?: JitterMode | JitterFn | undefined,
): Promise<void> {
ttl ??= this.#options.defaultTTL;
const jitterFn = createJitter(jitter ?? this.#options.defaultJitter ?? JitterMode.None);
this.#inner.set(this.#key(key), {
value,
expire: typeof ttl === 'number' ? Date.now() + jitterFn(ttl * 1000) : undefined,
});

this.#housekeeping();
}
if (ttl === undefined) {
this.#inner.set(addPrefix(this.#options.keyPrefix, key), value);

public async delete(key: string): Promise<void> {
this.#inner.delete(this.#key(key));
}

public async *keys(prefix?: string): AsyncGenerator<string, void, never> {
for (const key of this.#inner.keys()) {
const strippedKey = this.#stripPrefix(key);
if (typeof strippedKey !== 'undefined' && strippedKey.startsWith(prefix ?? '')) {
yield strippedKey;
}
return;
}
}

public async clear(prefix?: string): Promise<void> {
if (typeof prefix !== 'undefined') {
this.#processBatch(this.#inner.entries(), (key) => key.startsWith(prefix));
} else if (typeof this.#options.keyPrefix !== 'undefined') {
this.#processBatch(this.#inner.entries(), () => true);
} else {
this.#inner.clear();
}
}

/**
* Check if a cached value has expired.
*
* @param cached Cached value.
* @param now Point-in-time to evaluate expiration against.
*/
static #isValid(cached: ValueWrapper<unknown>, now?: number): boolean {
if (typeof cached.expire !== 'number') {
return true;
}

return cached.expire >= (now ?? Date.now());
}

/**
* Run housekeeping tasks on cache instance.
*
* Expired items will be removed, and if the cache is over capacity,
* excess items will be randomly evicted in an attempt to cut size down.
*
* @param batchSize Number of cache items to evaluate at every tick. Keep this low to avoid locking for too long.
*/
#housekeeping(batchSize = 1000): void {
const now = Date.now();
const dropProbability =
typeof this.#options.maxItems !== 'undefined'
? Math.max(
0,
[...this.#inner.keys()].filter((key) => typeof this.#stripPrefix(key) !== 'undefined').length / // Number of items in this cache.
this.#options.maxItems -
1,
)
: 0;

setImmediate(() =>
this.#processBatch(
this.#inner.entries(),
(_, cached) => !InMemoryCache.#isValid(cached, now) || (dropProbability > 0 && Math.random() < dropProbability),
batchSize,
),
this.#inner.setex(
addPrefix(this.#options.keyPrefix, key),
value,
createJitter(jitter ?? this.#options.defaultJitter ?? JitterMode.None)(ttl * 1000),
);
}

#key(key: string | undefined) {
return (this.#options.keyPrefix ?? '') + (key ?? '');
public async delete(key: string): Promise<void> {
this.#inner.del(addPrefix(this.#options.keyPrefix, key));
}

#stripPrefix(key: string) {
const prefix = this.#options.keyPrefix ?? '';
if (!key.startsWith(prefix)) {
return undefined;
}

return key.substring(prefix.length);
public async *keys(prefix?: string): AsyncGenerator<string, void, never> {
yield* this.#inner
.keys(addPrefix(this.#options.keyPrefix, `${prefix ?? ''}*`))
.map((key) => stripPrefix(this.#options.keyPrefix, key)!);
}

#processBatch(
iterator: IterableIterator<[string, ValueWrapper<V>]>,
filter: (key: string, cached: ValueWrapper<V>) => boolean,
batchSize = 1000,
): void {
for (let i = 0; i < batchSize; i++) {
const next = iterator.next();
if (next.done === true) {
return;
}

const [key, cached] = next.value;
const strippedKey = this.#stripPrefix(key);
if (typeof strippedKey !== 'undefined' && filter(strippedKey, cached)) {
this.#inner.delete(key);
}
}
public clear(prefix?: string): Promise<void> {
return this.clearPattern((prefix ?? '') + '*');
}

setImmediate(() => this.#processBatch(iterator, filter, batchSize));
public async clearPattern(pattern: string): Promise<void> {
this.#inner.del(...this.#inner.keys(addPrefix(this.#options.keyPrefix, pattern)));
}
}
45 changes: 14 additions & 31 deletions src/lib/server/cache/redis.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import {
createClient,
createCluster,
type RedisClientType,
type RedisClientOptions,
type RedisClusterType,
type RedisClientType,
type RedisClusterOptions,
type RedisClusterType,
type RedisDefaultModules,
} from 'redis';
import { logger } from '../../logger.js';
import { BaseCache } from './base.js';
import { createJitter, JitterMode, type JitterFn } from '../../utils/misc.js';
import { addPrefix, stripPrefix } from '../../utils/string.js';
import { BaseCache } from './base.js';

type RedisCacheOptions = {
keyPrefix?: string;
Expand Down Expand Up @@ -60,7 +61,7 @@ export class RedisCache<V> extends BaseCache<V | Jsonify<V>> {
}

async #connect(): Promise<RedisClientOrCluster> {
if (typeof this.#connectPromise === 'undefined') {
if (this.#connectPromise === undefined) {
this.#connectPromise = this.#client.connect();
}

Expand All @@ -77,15 +78,15 @@ export class RedisCache<V> extends BaseCache<V | Jsonify<V>> {
{
...this.#options,
...options,
keyPrefix: this.#key(keyPrefix),
keyPrefix: addPrefix(this.#options.keyPrefix, keyPrefix),
},
this.#client.duplicate(),
);
}

public async get(key: string): Promise<Jsonify<V> | undefined> {
const client = await this.#connect();
const val = await client.get(this.#key(key));
const val = await client.get(addPrefix(this.#options.keyPrefix, key));
if (val === null) {
return undefined;
}
Expand Down Expand Up @@ -114,11 +115,11 @@ export class RedisCache<V> extends BaseCache<V | Jsonify<V>> {
const client = await this.#connect();
const val = JSON.stringify(value);
try {
if (typeof ttl === 'undefined') {
await client.set(this.#key(key), val);
if (ttl === undefined) {
await client.set(addPrefix(this.#options.keyPrefix, key), val);
} else {
const jitterFn = createJitter(jitter ?? this.#options.defaultJitter ?? JitterMode.None);
await client.setEx(this.#key(key), Math.round(jitterFn(ttl)), val);
await client.setEx(addPrefix(this.#options.keyPrefix, key), Math.round(jitterFn(ttl)), val);
}
} catch (err) {
logger.error({ key, err }, 'Got error while trying to set cache key');
Expand All @@ -128,18 +129,18 @@ export class RedisCache<V> extends BaseCache<V | Jsonify<V>> {
public async delete(key: string): Promise<void> {
const client = await this.#connect();

await client.del(this.#key(key));
await client.del(addPrefix(this.#options.keyPrefix, key));
}

public async *keys(prefix?: string): AsyncGenerator<string, void, never> {
const matchFilter = this.#key(`${prefix ?? ''}*`);
const matchFilter = addPrefix(this.#options.keyPrefix, `${prefix ?? ''}*`);
const client = await this.#connect();
const clients = 'masters' in client ? client.masters.map(({ client }) => client!) : [client];
for (const clientPromise of clients) {
const client = await clientPromise;

for await (const key of client.scanIterator({ MATCH: matchFilter })) {
yield this.#stripPrefix(key)!;
yield stripPrefix(this.#options.keyPrefix, key)!;
}
}
}
Expand All @@ -152,7 +153,7 @@ export class RedisCache<V> extends BaseCache<V | Jsonify<V>> {
const scanNode = async (client: RedisClientType) => {
let cursor = 0;
do {
const res = await client.scan(cursor, { MATCH: this.#key(pattern) });
const res = await client.scan(cursor, { MATCH: addPrefix(this.#options.keyPrefix, pattern) });
cursor = res.cursor;
if (res.keys.length > 0) {
await client.del(res.keys);
Expand All @@ -165,22 +166,4 @@ export class RedisCache<V> extends BaseCache<V | Jsonify<V>> {
'masters' in client ? client.masters.map(async ({ client }) => scanNode(await client!)) : [scanNode(client)],
);
}

/**
* Return key including prefix.
*
* @param key Key without prefix.
*/
#key(key: string): string {
return (this.#options.keyPrefix ?? '') + key;
}

#stripPrefix(key: string) {
const prefix = this.#options.keyPrefix ?? '';
if (!key.startsWith(prefix)) {
return undefined;
}

return key.substring(prefix.length);
}
}
Loading

0 comments on commit e7760c7

Please sign in to comment.