Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for cache tag invalidation #1169

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/apollo-datasource-rest/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.idea
5 changes: 4 additions & 1 deletion packages/apollo-datasource-rest/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
"dependencies": {
"apollo-server-env": "2.0.0-beta.7",
"http-cache-semantics": "^4.0.0",
"lru-cache": "^4.1.3"
"memcached": "^2.2.2",
"redis": "^2.8.0"
},
"devDependencies": {
"@types/jest": "^23.0.0",
"@types/lru-cache": "^4.1.1",
"jest": "^23.1.0",
"memcached-mock": "^0.1.0",
"redis-mock": "^0.26.0",
"ts-jest": "^22.4.6"
},
"jest": {
Expand Down
142 changes: 142 additions & 0 deletions packages/apollo-datasource-rest/src/__tests__/connectors.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import {} from 'jest'; // makes my editor happy

// use mock implementations for underlying databases
jest.mock('memcached', () => require('memcached-mock'));
jest.mock('redis', () => require('redis-mock'));
jest.useFakeTimers(); // mocks out setTimeout that is used in redis-mock

import MemcachedKeyValueCache from '../connectors/memcached';
import RedisKeyValueCache from '../connectors/redis';
import { advanceTimeBy, mockDate, unmockDate } from '../__mocks__/date';

// run test suite against each implementation of KeyValueCache
describe.each([
['Memcached Connector', new MemcachedKeyValueCache('localhost:11211')],
['Redis Connector', new RedisKeyValueCache({ host: 'localhost' })],
])('%s', (_, keyValueCache) => {
beforeAll(() => {
mockDate();
});

beforeEach(async () => {
await keyValueCache.flush();
});

afterAll(() => {
unmockDate();
keyValueCache.close();
});

it('can do a basic get and set', async () => {
await keyValueCache.set('hello', 'world');
expect(await keyValueCache.get('hello')).toBe('world');
expect(await keyValueCache.get('missing')).not.toBeDefined();
});

it('can set same key multiple times', async () => {
await keyValueCache.set('hello', 'world');
await keyValueCache.set('hello', 'world');
expect(await keyValueCache.get('hello')).toBe('world');
expect(await keyValueCache.get('missing')).not.toBeDefined();
});

it('can expire keys based on ttl', async () => {
await keyValueCache.set('short', 's', { ttl: 1 });
await keyValueCache.set('long', 'l', { ttl: 5 });
expect(await keyValueCache.get('short')).toBe('s');
expect(await keyValueCache.get('long')).toBe('l');
advanceTimeBy(1000);
jest.advanceTimersByTime(1000);
expect(await keyValueCache.get('short')).not.toBeDefined();
expect(await keyValueCache.get('long')).toBe('l');
advanceTimeBy(4000);
jest.advanceTimersByTime(4000);
expect(await keyValueCache.get('short')).not.toBeDefined();
expect(await keyValueCache.get('long')).not.toBeDefined();
});

it('can set tags', async () => {
await keyValueCache.set('tagged', 'data', {
ttl: 1,
tags: ['tag1', 'tag2'],
});
expect(await keyValueCache.get('tagged')).toBe('data');
});

it('can invalidate tags', async () => {
await keyValueCache.set('key1', 'v1', {
ttl: 10,
tags: ['tag1', 'tag2'],
});
await keyValueCache.set('key2', 'v2', {
ttl: 10,
tags: ['tag2', 'tag3'],
});

expect(await keyValueCache.get('key1')).toBe('v1');
expect(await keyValueCache.get('key2')).toBe('v2');
await keyValueCache.invalidate(['tag1']);
expect(await keyValueCache.get('key1')).not.toBeDefined();
expect(await keyValueCache.get('key2')).toBe('v2');
await keyValueCache.invalidate(['tag3']);
expect(await keyValueCache.get('key1')).not.toBeDefined();
expect(await keyValueCache.get('key2')).not.toBeDefined();
});

it('can invalidate tag for multiple keys', async () => {
await keyValueCache.set('key1', 'v1', {
ttl: 10,
tags: ['tag1', 'tag2'],
});
await keyValueCache.set('key2', 'v2', {
ttl: 10,
tags: ['tag2', 'tag3'],
});

expect(await keyValueCache.get('key1')).toBe('v1');
expect(await keyValueCache.get('key2')).toBe('v2');
await keyValueCache.invalidate(['tag2']);
expect(await keyValueCache.get('key1')).not.toBeDefined();
expect(await keyValueCache.get('key2')).not.toBeDefined();
});

it('can reset tags', async () => {
await keyValueCache.set('key1', 'v1', {
ttl: 10,
tags: ['tag1', 'tag2'],
});
await keyValueCache.set('key1', 'v1', {
ttl: 10,
tags: ['tag2', 'tag3'],
});

await keyValueCache.invalidate(['tag1']);
expect(await keyValueCache.get('key1')).toBe('v1');
await keyValueCache.invalidate(['tag3']);
expect(await keyValueCache.get('key1')).not.toBeDefined();
});

it('can invalidate tags before they have been set', async () => {
await keyValueCache.invalidate(['tag1']);
await keyValueCache.set('key1', 'v1', {
ttl: 10,
tags: ['tag1', 'tag2'],
});
expect(await keyValueCache.get('key1')).toBe('v1');
});

it('can invalidate tags after keys expire', async () => {
await keyValueCache.set('key1', 'v1', {
ttl: 10,
tags: ['tag1'],
});
advanceTimeBy(5000);
jest.advanceTimersByTime(5000);
expect(await keyValueCache.get('key1')).toBe('v1');
advanceTimeBy(5000);
jest.advanceTimersByTime(5000);
// key has expired
await keyValueCache.invalidate(['tag1']);
expect(await keyValueCache.get('key1')).not.toBeDefined();
});
});
99 changes: 99 additions & 0 deletions packages/apollo-datasource-rest/src/connectors/memcached.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { KeyValueCache } from '../KeyValueCache';
import Memcached from 'memcached';
import { promisify } from 'util';

// Store a global version number (like a logical clock) in the key value store
const VERSION_KEY = 'apollo-server-cache:version';

export default class MemcachedKeyValueCache implements KeyValueCache {
readonly client;
readonly defaultSetOptions = {
ttl: 300,
tags: [],
};

constructor(serverLocation: Memcached.Location, options?: Memcached.options) {
this.client = new Memcached(serverLocation, options);
// promisify client calls for convenience
this.client.get = promisify(this.client.get).bind(this.client);
this.client.getMulti = promisify(this.client.getMulti).bind(this.client);
this.client.incr = promisify(this.client.incr).bind(this.client);
this.client.set = promisify(this.client.set).bind(this.client);
this.client.flush = promisify(this.client.flush).bind(this.client);
}

async set(
key: string,
data: string,
options?: { ttl?: number; tags?: string[] },
): Promise<void> {
const { ttl, tags } = Object.assign({}, this.defaultSetOptions, options);

// get and incr version number
let version = await this.client.get(VERSION_KEY);
if (!version) {
// initialize version number
version = 1;
await this.client.set(VERSION_KEY, version + 1, 0);
} else {
await this.client.incr(VERSION_KEY, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused by the need to increment here, I would expect that to happen only on invalidation. What is the reasoning behind this?

}

// augment data with tags and version
const payload = {
v: version,
d: data,
t: tags,
};

await this.client.set(key, JSON.stringify(payload), ttl);
Copy link
Contributor Author

@clarencenpy clarencenpy Jun 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of serializing tag metadata together with the key, we could also store metadata in separate namespaced keys:
set('key1', 'value1', ['tag1', 'tag2']) gets stored like this:

key1: value1
meta-v-key1: <version number> 
meta-t-key1: 'tag1||tag2'

}

async get(key: string): Promise<string | undefined> {
const data = await this.client.get(key);
if (!data) return;

// deserialize data
const payload = JSON.parse(data);

// check "timestamp" at which tags have been invalidated
const tags = payload.t;
if (tags.length !== 0) {
const versions = await this.client.getMulti(tags);
for (const tag in versions) {
if (versions[tag] !== undefined && versions[tag] > payload.v) {
return; // tag has been invalidated, therefore cache entry not valid
}
}
}

// all version numbers up to date
return payload.d;
}

async invalidate(tags: string[]): Promise<void> {
// set the invalidation "timestamp" using logical clock for every tag
let version = await this.client.get(VERSION_KEY);
if (!version) {
// initialize version number
version = 1;
await this.client.set(VERSION_KEY, version + 1, 0);
} else {
await this.client.incr(VERSION_KEY, 1);
}

const operations: any[] = [];
for (const tag of tags) {
operations.push([tag, version, 0]); // setting ttl to 0, never expire
}
await Promise.all(operations.map(op => this.client.set(...op)));
}

async flush(): Promise<void> {
await this.client.flush();
}

async close(): Promise<void> {
this.client.end();
}
}
104 changes: 104 additions & 0 deletions packages/apollo-datasource-rest/src/connectors/redis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { KeyValueCache } from '../KeyValueCache';
import Redis from 'redis';
import { promisify } from 'util';

const VERSION_KEY = 'apollo-server-cache:version';

export default class RedisKeyValueCache implements KeyValueCache {
readonly client;
readonly defaultSetOptions = {
ttl: 300,
tags: [],
};

constructor(options: Redis.ClientOpts) {
this.client = Redis.createClient(options);
// promisify client calls for convenience
this.client.get = promisify(this.client.get).bind(this.client);
this.client.mget = promisify(this.client.mget).bind(this.client);
this.client.incr = promisify(this.client.incr).bind(this.client);
this.client.set = promisify(this.client.set).bind(this.client);
this.client.flushdb = promisify(this.client.flushdb).bind(this.client);
this.client.quit = promisify(this.client.quit).bind(this.client);
}

async set(
key: string,
data: string,
options?: { ttl?: number; tags?: string[] },
): Promise<void> {
const { ttl, tags } = Object.assign({}, this.defaultSetOptions, options);

// get and incr version number
let version = await this.client.get(VERSION_KEY);
if (!version) {
// initialize version number
version = 1;
await this.client.set(VERSION_KEY, version + 1);
} else {
await this.client.incr(VERSION_KEY);
}

// augment data with tags and version
const payload = {
v: version,
d: data,
t: tags,
};

await this.client.set(key, JSON.stringify(payload), 'EX', ttl);
}

async get(key: string): Promise<string | undefined> {
const data = await this.client.get(key);
// reply is null if key is not found
if (data === null) {
return;
}

// deserialize data
const payload = JSON.parse(data);
// check "timestamp" at which tags have been invalidated
const tags = payload.t;

if (tags.length !== 0) {
const versions = await this.client.mget(tags);
for (const tag in versions) {
if (versions[tag] !== undefined && versions[tag] > payload.v) {
return; // tag has been invalidated, therefore cache entry not valid
}
}
}

// all version numbers up to date
return payload.d;
}

async invalidate(tags: string[]): Promise<void> {
// set the invalidation "timestamp" using logical clock for every tag
let version = await this.client.get(VERSION_KEY);
if (!version) {
// initialize version number
version = 1;
await this.client.set(VERSION_KEY, version + 1);
} else {
await this.client.incr(VERSION_KEY);
}

const operations: any[] = [];
for (const tag of tags) {
operations.push([tag, version]); // tags should not be set to expire
}
// wait for all operations to finish
await Promise.all(operations.map(op => this.client.set(...op)));
}

async flush(): Promise<void> {
await this.client.flushdb();
}

async close(): Promise<void> {
await this.client.quit();
return;
}
}