-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for cache tag invalidation #1169
Closed
Closed
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
74deb28
Enable declarationMap in tsconfig.json
martijnwalraven c7da09b
Add apollo-server-caching package and improve typings
martijnwalraven 081abe1
Remove superfluous test steps
martijnwalraven 121bf56
add KeyValueCache implementations for Memcached and Redis
clarencenpy fae6330
promisify client calls so we can use await
clarencenpy ba72648
mocked out all side effects so tests run instantly
clarencenpy 508892c
simplified async code
clarencenpy d0f19e4
test that keys are expired correctly
clarencenpy c2373aa
Merge branch 'server-2.0/caching-connectors' of /Users/clarencenpy/Ap…
clarencenpy 408edba
add support for cache tag invalidation
clarencenpy d48e745
Merge branch 'server-2.0/caching' into server-2.0/caching-connectors
clarencenpy e9407cc
moved cache connectors to `apollo-datasource-rest`
clarencenpy ba12f29
remove unnecessary types
clarencenpy 1c92359
store version key or logical clock in key-value store
clarencenpy File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
.idea |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
142 changes: 142 additions & 0 deletions
142
packages/apollo-datasource-rest/src/__tests__/connectors.test.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
import {} from 'jest'; // makes my editor happy | ||
|
||
// use mock implementations for underlying databases | ||
jest.mock('memcached', () => require('memcached-mock')); | ||
jest.mock('redis', () => require('redis-mock')); | ||
jest.useFakeTimers(); // mocks out setTimeout that is used in redis-mock | ||
|
||
import MemcachedKeyValueCache from '../connectors/memcached'; | ||
import RedisKeyValueCache from '../connectors/redis'; | ||
import { advanceTimeBy, mockDate, unmockDate } from '../__mocks__/date'; | ||
|
||
// run test suite against each implementation of KeyValueCache | ||
describe.each([ | ||
['Memcached Connector', new MemcachedKeyValueCache('localhost:11211')], | ||
['Redis Connector', new RedisKeyValueCache({ host: 'localhost' })], | ||
])('%s', (_, keyValueCache) => { | ||
beforeAll(() => { | ||
mockDate(); | ||
}); | ||
|
||
beforeEach(async () => { | ||
await keyValueCache.flush(); | ||
}); | ||
|
||
afterAll(() => { | ||
unmockDate(); | ||
keyValueCache.close(); | ||
}); | ||
|
||
it('can do a basic get and set', async () => { | ||
await keyValueCache.set('hello', 'world'); | ||
expect(await keyValueCache.get('hello')).toBe('world'); | ||
expect(await keyValueCache.get('missing')).not.toBeDefined(); | ||
}); | ||
|
||
it('can set same key multiple times', async () => { | ||
await keyValueCache.set('hello', 'world'); | ||
await keyValueCache.set('hello', 'world'); | ||
expect(await keyValueCache.get('hello')).toBe('world'); | ||
expect(await keyValueCache.get('missing')).not.toBeDefined(); | ||
}); | ||
|
||
it('can expire keys based on ttl', async () => { | ||
await keyValueCache.set('short', 's', { ttl: 1 }); | ||
await keyValueCache.set('long', 'l', { ttl: 5 }); | ||
expect(await keyValueCache.get('short')).toBe('s'); | ||
expect(await keyValueCache.get('long')).toBe('l'); | ||
advanceTimeBy(1000); | ||
jest.advanceTimersByTime(1000); | ||
expect(await keyValueCache.get('short')).not.toBeDefined(); | ||
expect(await keyValueCache.get('long')).toBe('l'); | ||
advanceTimeBy(4000); | ||
jest.advanceTimersByTime(4000); | ||
expect(await keyValueCache.get('short')).not.toBeDefined(); | ||
expect(await keyValueCache.get('long')).not.toBeDefined(); | ||
}); | ||
|
||
it('can set tags', async () => { | ||
await keyValueCache.set('tagged', 'data', { | ||
ttl: 1, | ||
tags: ['tag1', 'tag2'], | ||
}); | ||
expect(await keyValueCache.get('tagged')).toBe('data'); | ||
}); | ||
|
||
it('can invalidate tags', async () => { | ||
await keyValueCache.set('key1', 'v1', { | ||
ttl: 10, | ||
tags: ['tag1', 'tag2'], | ||
}); | ||
await keyValueCache.set('key2', 'v2', { | ||
ttl: 10, | ||
tags: ['tag2', 'tag3'], | ||
}); | ||
|
||
expect(await keyValueCache.get('key1')).toBe('v1'); | ||
expect(await keyValueCache.get('key2')).toBe('v2'); | ||
await keyValueCache.invalidate(['tag1']); | ||
expect(await keyValueCache.get('key1')).not.toBeDefined(); | ||
expect(await keyValueCache.get('key2')).toBe('v2'); | ||
await keyValueCache.invalidate(['tag3']); | ||
expect(await keyValueCache.get('key1')).not.toBeDefined(); | ||
expect(await keyValueCache.get('key2')).not.toBeDefined(); | ||
}); | ||
|
||
it('can invalidate tag for multiple keys', async () => { | ||
await keyValueCache.set('key1', 'v1', { | ||
ttl: 10, | ||
tags: ['tag1', 'tag2'], | ||
}); | ||
await keyValueCache.set('key2', 'v2', { | ||
ttl: 10, | ||
tags: ['tag2', 'tag3'], | ||
}); | ||
|
||
expect(await keyValueCache.get('key1')).toBe('v1'); | ||
expect(await keyValueCache.get('key2')).toBe('v2'); | ||
await keyValueCache.invalidate(['tag2']); | ||
expect(await keyValueCache.get('key1')).not.toBeDefined(); | ||
expect(await keyValueCache.get('key2')).not.toBeDefined(); | ||
}); | ||
|
||
it('can reset tags', async () => { | ||
await keyValueCache.set('key1', 'v1', { | ||
ttl: 10, | ||
tags: ['tag1', 'tag2'], | ||
}); | ||
await keyValueCache.set('key1', 'v1', { | ||
ttl: 10, | ||
tags: ['tag2', 'tag3'], | ||
}); | ||
|
||
await keyValueCache.invalidate(['tag1']); | ||
expect(await keyValueCache.get('key1')).toBe('v1'); | ||
await keyValueCache.invalidate(['tag3']); | ||
expect(await keyValueCache.get('key1')).not.toBeDefined(); | ||
}); | ||
|
||
it('can invalidate tags before they have been set', async () => { | ||
await keyValueCache.invalidate(['tag1']); | ||
await keyValueCache.set('key1', 'v1', { | ||
ttl: 10, | ||
tags: ['tag1', 'tag2'], | ||
}); | ||
expect(await keyValueCache.get('key1')).toBe('v1'); | ||
}); | ||
|
||
it('can invalidate tags after keys expire', async () => { | ||
await keyValueCache.set('key1', 'v1', { | ||
ttl: 10, | ||
tags: ['tag1'], | ||
}); | ||
advanceTimeBy(5000); | ||
jest.advanceTimersByTime(5000); | ||
expect(await keyValueCache.get('key1')).toBe('v1'); | ||
advanceTimeBy(5000); | ||
jest.advanceTimersByTime(5000); | ||
// key has expired | ||
await keyValueCache.invalidate(['tag1']); | ||
expect(await keyValueCache.get('key1')).not.toBeDefined(); | ||
}); | ||
}); |
99 changes: 99 additions & 0 deletions
99
packages/apollo-datasource-rest/src/connectors/memcached.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
import { KeyValueCache } from '../KeyValueCache'; | ||
import Memcached from 'memcached'; | ||
import { promisify } from 'util'; | ||
|
||
// Store a global version number (like a logical clock) in the key value store | ||
const VERSION_KEY = 'apollo-server-cache:version'; | ||
|
||
export default class MemcachedKeyValueCache implements KeyValueCache { | ||
readonly client; | ||
readonly defaultSetOptions = { | ||
ttl: 300, | ||
tags: [], | ||
}; | ||
|
||
constructor(serverLocation: Memcached.Location, options?: Memcached.options) { | ||
this.client = new Memcached(serverLocation, options); | ||
// promisify client calls for convenience | ||
this.client.get = promisify(this.client.get).bind(this.client); | ||
this.client.getMulti = promisify(this.client.getMulti).bind(this.client); | ||
this.client.incr = promisify(this.client.incr).bind(this.client); | ||
this.client.set = promisify(this.client.set).bind(this.client); | ||
this.client.flush = promisify(this.client.flush).bind(this.client); | ||
} | ||
|
||
async set( | ||
key: string, | ||
data: string, | ||
options?: { ttl?: number; tags?: string[] }, | ||
): Promise<void> { | ||
const { ttl, tags } = Object.assign({}, this.defaultSetOptions, options); | ||
|
||
// get and incr version number | ||
let version = await this.client.get(VERSION_KEY); | ||
if (!version) { | ||
// initialize version number | ||
version = 1; | ||
await this.client.set(VERSION_KEY, version + 1, 0); | ||
} else { | ||
await this.client.incr(VERSION_KEY, 1); | ||
} | ||
|
||
// augment data with tags and version | ||
const payload = { | ||
v: version, | ||
d: data, | ||
t: tags, | ||
}; | ||
|
||
await this.client.set(key, JSON.stringify(payload), ttl); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of serializing tag metadata together with the key, we could also store metadata in separate namespaced keys:
|
||
} | ||
|
||
async get(key: string): Promise<string | undefined> { | ||
const data = await this.client.get(key); | ||
if (!data) return; | ||
|
||
// deserialize data | ||
const payload = JSON.parse(data); | ||
|
||
// check "timestamp" at which tags have been invalidated | ||
const tags = payload.t; | ||
if (tags.length !== 0) { | ||
const versions = await this.client.getMulti(tags); | ||
for (const tag in versions) { | ||
if (versions[tag] !== undefined && versions[tag] > payload.v) { | ||
return; // tag has been invalidated, therefore cache entry not valid | ||
} | ||
} | ||
} | ||
|
||
// all version numbers up to date | ||
return payload.d; | ||
} | ||
|
||
async invalidate(tags: string[]): Promise<void> { | ||
// set the invalidation "timestamp" using logical clock for every tag | ||
let version = await this.client.get(VERSION_KEY); | ||
if (!version) { | ||
// initialize version number | ||
version = 1; | ||
await this.client.set(VERSION_KEY, version + 1, 0); | ||
} else { | ||
await this.client.incr(VERSION_KEY, 1); | ||
} | ||
|
||
const operations: any[] = []; | ||
for (const tag of tags) { | ||
operations.push([tag, version, 0]); // setting ttl to 0, never expire | ||
} | ||
await Promise.all(operations.map(op => this.client.set(...op))); | ||
} | ||
|
||
async flush(): Promise<void> { | ||
await this.client.flush(); | ||
} | ||
|
||
async close(): Promise<void> { | ||
this.client.end(); | ||
} | ||
} |
104 changes: 104 additions & 0 deletions
104
packages/apollo-datasource-rest/src/connectors/redis.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
import { KeyValueCache } from '../KeyValueCache'; | ||
import Redis from 'redis'; | ||
import { promisify } from 'util'; | ||
|
||
const VERSION_KEY = 'apollo-server-cache:version'; | ||
|
||
export default class RedisKeyValueCache implements KeyValueCache { | ||
readonly client; | ||
readonly defaultSetOptions = { | ||
ttl: 300, | ||
tags: [], | ||
}; | ||
|
||
constructor(options: Redis.ClientOpts) { | ||
this.client = Redis.createClient(options); | ||
// promisify client calls for convenience | ||
this.client.get = promisify(this.client.get).bind(this.client); | ||
this.client.mget = promisify(this.client.mget).bind(this.client); | ||
this.client.incr = promisify(this.client.incr).bind(this.client); | ||
this.client.set = promisify(this.client.set).bind(this.client); | ||
this.client.flushdb = promisify(this.client.flushdb).bind(this.client); | ||
this.client.quit = promisify(this.client.quit).bind(this.client); | ||
} | ||
|
||
async set( | ||
key: string, | ||
data: string, | ||
options?: { ttl?: number; tags?: string[] }, | ||
): Promise<void> { | ||
const { ttl, tags } = Object.assign({}, this.defaultSetOptions, options); | ||
|
||
// get and incr version number | ||
let version = await this.client.get(VERSION_KEY); | ||
if (!version) { | ||
// initialize version number | ||
version = 1; | ||
await this.client.set(VERSION_KEY, version + 1); | ||
} else { | ||
await this.client.incr(VERSION_KEY); | ||
} | ||
|
||
// augment data with tags and version | ||
const payload = { | ||
v: version, | ||
d: data, | ||
t: tags, | ||
}; | ||
|
||
await this.client.set(key, JSON.stringify(payload), 'EX', ttl); | ||
} | ||
|
||
async get(key: string): Promise<string | undefined> { | ||
const data = await this.client.get(key); | ||
// reply is null if key is not found | ||
if (data === null) { | ||
return; | ||
} | ||
|
||
// deserialize data | ||
const payload = JSON.parse(data); | ||
// check "timestamp" at which tags have been invalidated | ||
const tags = payload.t; | ||
|
||
if (tags.length !== 0) { | ||
const versions = await this.client.mget(tags); | ||
for (const tag in versions) { | ||
if (versions[tag] !== undefined && versions[tag] > payload.v) { | ||
return; // tag has been invalidated, therefore cache entry not valid | ||
} | ||
} | ||
} | ||
|
||
// all version numbers up to date | ||
return payload.d; | ||
} | ||
|
||
async invalidate(tags: string[]): Promise<void> { | ||
// set the invalidation "timestamp" using logical clock for every tag | ||
let version = await this.client.get(VERSION_KEY); | ||
if (!version) { | ||
// initialize version number | ||
version = 1; | ||
await this.client.set(VERSION_KEY, version + 1); | ||
} else { | ||
await this.client.incr(VERSION_KEY); | ||
} | ||
|
||
const operations: any[] = []; | ||
for (const tag of tags) { | ||
operations.push([tag, version]); // tags should not be set to expire | ||
} | ||
// wait for all operations to finish | ||
await Promise.all(operations.map(op => this.client.set(...op))); | ||
} | ||
|
||
async flush(): Promise<void> { | ||
await this.client.flushdb(); | ||
} | ||
|
||
async close(): Promise<void> { | ||
await this.client.quit(); | ||
return; | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused by the need to increment here, I would expect that to happen only on invalidation. What is the reasoning behind this?