Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for cache tag invalidation #1169

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import { advanceTimeBy, mockDate, unmockDate } from '../__mocks__/date';

// run test suite against each implementation of KeyValueCache
describe.each([
['Memcached Connector', new MemcachedKeyValueCache('mockhostname')],
['Redis Connector', new RedisKeyValueCache({ host: 'mockhostname' })],
['Memcached Connector', new MemcachedKeyValueCache('localhost:11211')],
['Redis Connector', new RedisKeyValueCache({ host: 'localhost' })],
])('%s', (_, keyValueCache) => {
beforeAll(() => {
mockDate();
});

beforeEach(() => {
keyValueCache.flush();
beforeEach(async () => {
await keyValueCache.flush();
});

afterAll(() => {
Expand Down
40 changes: 30 additions & 10 deletions packages/apollo-datasource-rest/src/connectors/memcached.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@ import { KeyValueCache } from '../KeyValueCache';
import Memcached from 'memcached';
import { promisify } from 'util';

// Store a global version number (like a logical clock) in the key value store
const VERSION_KEY = 'apollo-server-cache:version';

export default class MemcachedKeyValueCache implements KeyValueCache {
readonly client;
readonly defaultSetOptions = {
ttl: 300,
tags: [],
};
// might have to deal with issues of overflow, use BigInt?
private logicalClock = 0;

constructor(serverLocation: Memcached.Location, options?: Memcached.options) {
this.client = new Memcached(serverLocation, options);
// promisify client calls for convenience
this.client.get = promisify(this.client.get).bind(this.client);
this.client.getMulti = promisify(this.client.getMulti).bind(this.client);
this.client.incr = promisify(this.client.incr).bind(this.client);
this.client.set = promisify(this.client.set).bind(this.client);
this.client.flush = promisify(this.client.flush).bind(this.client);
}
Expand All @@ -27,8 +29,17 @@ export default class MemcachedKeyValueCache implements KeyValueCache {
): Promise<void> {
const { ttl, tags } = Object.assign({}, this.defaultSetOptions, options);

// get and incr version number
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I'd like to avoid is paying the price for keeping a version (or versions) when a particular entry isn't using cache tags. In that case, I don't think we'll want to read (let alone increment) the version at all.

let version = await this.client.get(VERSION_KEY);
if (!version) {
// initialize version number
version = 1;
await this.client.set(VERSION_KEY, version + 1, 0);
} else {
await this.client.incr(VERSION_KEY, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused by the need to increment here, I would expect that to happen only on invalidation. What is the reasoning behind this?

}

// augment data with tags and version
const version = ++this.logicalClock;
const payload = {
v: version,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, this wasn't really what I had in mind, and I don't think this will work as is. If we keep a logical clock in memory, that will be per process. So it seems there is no way to keep this working correctly across multiple server instances talking to the same store.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was that we would store <tag>: <version> pairs in cache entries, and also keep separate <tag>: <version> entries. We could then use store primitives to safely increment the version for a tag entry without the need for a global clock.

Copy link
Contributor Author

@clarencenpy clarencenpy Jun 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that it would be easy to move the logical clock into the key-value store as an entry of its own, once we think about multiple server processes. It seems to me like an easier thing to keep track of? Refer to this commit

Copy link
Contributor Author

@clarencenpy clarencenpy Jun 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this boils down to a tradeoff between managing versions for every tag (more complex code, and reading all tag versions for each set operation), vs managing a global version number (will we pay in performance?). Would be happy to learn about other alternatives that I am missing!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, interesting! I'm worried keeping a global version instead of a version per tag will become a bottleneck and/or source of concurrency issues, but other people may have a better idea of the impact. I suspect that will depend on the characteristics of the store.

On Memcache, one thing to take into consideration is that the global version key will only be stored on a single node, so that will receive a lot of requests. And if it goes down, you basically lose the ability to validate all data (although the same is true for a subset of the data for per-tag version keys).

d: data,
Expand All @@ -47,10 +58,12 @@ export default class MemcachedKeyValueCache implements KeyValueCache {

// check "timestamp" at which tags have been invalidated
const tags = payload.t;
const versions = await this.client.getMulti(tags);
for (const tag in versions) {
if (versions[tag] !== undefined && versions[tag] > payload.v) {
return; // tag has been invalidated, therefore cache entry not valid
if (tags.length !== 0) {
const versions = await this.client.getMulti(tags);
for (const tag in versions) {
if (versions[tag] !== undefined && versions[tag] > payload.v) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd need a way to deal with overflow here, because currently once we wrap around entries will never be valid again.

One benefit of using per-tag versions is that we don't have to depend on ordering. All we need to check is if versions match, and if they do not the entry is invalid. So (I think?) that gets around having to deal with overflow.

return; // tag has been invalidated, therefore cache entry not valid
}
}
}

Expand All @@ -60,11 +73,18 @@ export default class MemcachedKeyValueCache implements KeyValueCache {

async invalidate(tags: string[]): Promise<void> {
// set the invalidation "timestamp" using logical clock for every tag
const version = ++this.logicalClock;
let version = await this.client.get(VERSION_KEY);
Copy link
Contributor

@martijnwalraven martijnwalraven Jun 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can do this in a more efficient way, because checking to see whether the key exists first on every increment seems expensive (and may also run into concurrency issues). Not sure if this is part of the underlying protocol and whether our client supports it, but it seems libmemcached has a memcached_increment_with_initial function.

if (!version) {
// initialize version number
version = 1;
await this.client.set(VERSION_KEY, version + 1, 0);
} else {
await this.client.incr(VERSION_KEY, 1);
}

const operations: any[] = [];
for (const tag of tags) {
// what should be a good ttl to set here?
operations.push([tag, version, undefined]);
operations.push([tag, version, 0]); // setting ttl to 0, never expire
}
await Promise.all(operations.map(op => this.client.set(...op)));
}
Expand Down
39 changes: 31 additions & 8 deletions packages/apollo-datasource-rest/src/connectors/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@ import { KeyValueCache } from '../KeyValueCache';
import Redis from 'redis';
import { promisify } from 'util';

const VERSION_KEY = 'apollo-server-cache:version';

export default class RedisKeyValueCache implements KeyValueCache {
readonly client;
readonly defaultSetOptions = {
ttl: 300,
tags: [],
};
private logicalClock = 0;

constructor(options: Redis.ClientOpts) {
this.client = Redis.createClient(options);
// promisify client calls for convenience
this.client.get = promisify(this.client.get).bind(this.client);
this.client.mget = promisify(this.client.mget).bind(this.client);
this.client.incr = promisify(this.client.incr).bind(this.client);
this.client.set = promisify(this.client.set).bind(this.client);
this.client.flushdb = promisify(this.client.flushdb).bind(this.client);
this.client.quit = promisify(this.client.quit).bind(this.client);
Expand All @@ -27,8 +29,17 @@ export default class RedisKeyValueCache implements KeyValueCache {
): Promise<void> {
const { ttl, tags } = Object.assign({}, this.defaultSetOptions, options);

// get and incr version number
let version = await this.client.get(VERSION_KEY);
if (!version) {
// initialize version number
version = 1;
await this.client.set(VERSION_KEY, version + 1);
} else {
await this.client.incr(VERSION_KEY);
}

// augment data with tags and version
const version = ++this.logicalClock;
const payload = {
v: version,
d: data,
Expand All @@ -49,10 +60,13 @@ export default class RedisKeyValueCache implements KeyValueCache {
const payload = JSON.parse(data);
// check "timestamp" at which tags have been invalidated
const tags = payload.t;
const versions = await this.client.mget(tags);
for (const tag in versions) {
if (versions[tag] !== undefined && versions[tag] > payload.v) {
return; // tag has been invalidated, therefore cache entry not valid

if (tags.length !== 0) {
const versions = await this.client.mget(tags);
for (const tag in versions) {
if (versions[tag] !== undefined && versions[tag] > payload.v) {
return; // tag has been invalidated, therefore cache entry not valid
}
}
}

Expand All @@ -62,11 +76,20 @@ export default class RedisKeyValueCache implements KeyValueCache {

async invalidate(tags: string[]): Promise<void> {
// set the invalidation "timestamp" using logical clock for every tag
const version = ++this.logicalClock;
let version = await this.client.get(VERSION_KEY);
if (!version) {
// initialize version number
version = 1;
await this.client.set(VERSION_KEY, version + 1);
} else {
await this.client.incr(VERSION_KEY);
}

const operations: any[] = [];
for (const tag of tags) {
operations.push([tag, version, undefined]);
operations.push([tag, version]); // tags should not be set to expire
}
// wait for all operations to finish
await Promise.all(operations.map(op => this.client.set(...op)));
}

Expand Down