Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement deduplication options #11

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,47 @@ this.link = ApolloLink.from([
new HttpLink({ uri: URI_TO_YOUR_GRAPHQL_SERVER }),
]);
```

### Deduplicate

It's possible to deduplicate operations while the queue is closed.
This for example allows to only save the newest changes without the steps between.

```js
import { ApolloLink } from 'apollo-link';
import { BatchHttpLink } from 'apollo-link-batch-http';
import QueueLink from 'apollo-link-queue';

const offlineLink = new QueueLink({
/**
* Decides which entry to keep in the queue in case of duplicate entries.
* Possible values:
* - last: removes existing duplicate
* - first: ignores new duplicate
* - all: doesn't deduplicate
*
* Defaults to 'all'.
*/
keepPolicy: 'last',

/**
* Specifies which entries are considered duplicates
*
* Defaults to comparing operation operationA.toKey() === operationB.toKey()
* https://www.apollographql.com/docs/link/overview.html
*/
isDuplicate: (a, b) => (a.operationName === 'save' &&
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a consumer I would find it useful to specify the keepPolicy per-query using a context option.

Also useful would be specifying a dedupeKey, defaulting to ${operationName}:${variables.id}. dedupeKey would be used to calculate which operations keepPolicy should be applied to, sort of like a named queue. I hope that makes sense!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is a good idea to default to ${operationName}:${variables.id} as this wouldn't fit most usecases.

  • There isn't always an variable id.
  • Maybe it's called differently.
  • We for example use input types which would break this default es well.

The default should always work and if you have a more specific need you can overwrite the default.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, by default the dedupeKey should be undefined and operations shouldn't be deduped in the queue. What I meant to say is the recommended dedupeKey would be ${operationName}:${variables.id}.

The use case I have in mind is editing multiple text documents and saving the edits while offline. I want to only save the final revision, which fits with keepPolicy: 'last'. However I want to keep the last operation for all different saved documents, so you would calculate which to keep with a dedupeKey: 'saveDocument:${id}'. Then when the queue opens it has exactly one saveDocument operation per unique document saved.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Yeah I'm doing exactly the same thing in our project. I'm just arguing that this shouldn't be the default.

a.operationName === b.operationName && a.variables.id === b.variables.id)
});

this.link = ApolloLink.from([
new RetryLink({
attempts: {
// We don't want to retry deduped operations
retryIf: (error, _operation) => !!error && error.name !== 'DedupedByQueueError'
}
}),
offlineLink,
new BatchHttpLink({ uri: URI_TO_YOUR_GRAPHQL_SERVER }),
]);
```
87 changes: 84 additions & 3 deletions src/QueueLink.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import QueueLink from './QueueLink';
import QueueLink, {
DedupedByQueueError
} from './QueueLink';
import {
assertObservableSequence,
executeMultiple,
TestLink,
} from './TestUtils';
import {
Expand All @@ -21,13 +24,26 @@ describe('OnOffLink', () => {
},
};

const testResponse2 = {
data: {
hello2: 'World',
},
};

const op: GraphQLRequest = {
query: gql`{ hello }`,
query: gql`query hello { hello }`,
context: {
testResponse,
},
};

const op2: GraphQLRequest = {
query: gql`query hello2 { hello }`,
context: {
testResponse: testResponse2,
},
};

beforeEach(() => {
jest.useFakeTimers();
testLink = new TestLink();
Expand All @@ -51,7 +67,7 @@ describe('OnOffLink', () => {
});
it('skips the queue when asked to', () => {
const opWithSkipQueue: GraphQLRequest = {
query: gql`{ hello }`,
query: gql`query hello { hello }`,
context: {
skipQueue: true,
},
Expand Down Expand Up @@ -94,6 +110,7 @@ describe('OnOffLink', () => {
expect(testLink.operations.length).toBe(0);
sub.unsubscribe();
});

it('releases held requests when you open it', () => {
onOffLink.close();
return assertObservableSequence(
Expand All @@ -105,6 +122,70 @@ describe('OnOffLink', () => {
() => {
expect(testLink.operations.length).toBe(0);
onOffLink.open();
expect(testLink.operations.length).toBe(1);
jest.runAllTimers();
},
);
});

it('releases held deduplicated requests when you open it (last)', () => {
const dedupOnOffLink = new QueueLink({keepPolicy: "last"});
const myLink = ApolloLink.from([dedupOnOffLink, testLink]);
dedupOnOffLink.close();
return assertObservableSequence(
executeMultiple(myLink, op, op2, op),
[
{ type: 'error', value: new DedupedByQueueError()},
{ type: 'next', value: testResponse2 },
{ type: 'next', value: testResponse },
{ type: 'complete' },
],
() => {
expect(testLink.operations.length).toBe(0);
dedupOnOffLink.open();
expect(testLink.operations.length).toBe(2);
jest.runAllTimers();
},
);
});

it('releases held deduplicated requests when you open it (first)', () => {
const dedupOnOffLink = new QueueLink({keepPolicy: "first"});
const myLink = ApolloLink.from([dedupOnOffLink, testLink]);
dedupOnOffLink.close();
return assertObservableSequence(
executeMultiple(myLink, op, op2, op),
[
{ type: 'error', value: new DedupedByQueueError()},
{ type: 'next', value: testResponse },
{ type: 'next', value: testResponse2 },
{ type: 'complete' },
],
() => {
expect(testLink.operations.length).toBe(0);
dedupOnOffLink.open();
expect(testLink.operations.length).toBe(2);
jest.runAllTimers();
},
);
});

it('releases held deduplicated requests when you open it (all)', () => {
const dedupOnOffLink = new QueueLink({keepPolicy: "all"});
const myLink = ApolloLink.from([dedupOnOffLink, testLink]);
dedupOnOffLink.close();
return assertObservableSequence(
executeMultiple(myLink, op, op2, op),
[
{ type: 'next', value: testResponse },
{ type: 'next', value: testResponse2 },
{ type: 'next', value: testResponse },
{ type: 'complete' },
],
() => {
expect(testLink.operations.length).toBe(0);
dedupOnOffLink.open();
expect(testLink.operations.length).toBe(3);
jest.runAllTimers();
},
);
Expand Down
73 changes: 72 additions & 1 deletion src/QueueLink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,57 @@ interface OperationQueueEntry {
subscription?: { unsubscribe: () => void };
}

export type KeepPolicy =
| 'first'
| 'last'
| 'all';

export namespace QueueLink {
export interface Options {
/**
* Decides which entry to keep in the queue in case of duplicate entries.
*
* Defaults to 'all'.
*/
keepPolicy?: KeepPolicy;

/**
* Specifies which entries are considered duplicates
*
* Defaults to comparing operation operationA.toKey() === operationB.toKey()
* https://www.apollographql.com/docs/link/overview.html
*/
isDuplicate?: (operationA: Operation, operationB: Operation) => boolean;
}
}

const defaultOptions: QueueLink.Options = {
keepPolicy: 'all',
isDuplicate: (a: Operation, b: Operation) => a.toKey() === b.toKey()
};

export class DedupedByQueueError extends Error {
constructor() {
super('Operation got deduplicated by apollo-link-queue.');
Object.defineProperty(this, 'name', { value: 'DedupedByQueueError' });
}
}

export default class QueueLink extends ApolloLink {
private opQueue: OperationQueueEntry[] = [];
private isOpen: boolean = true;
private readonly keepPolicy: KeepPolicy;
private readonly isDuplicate: (operationA: Operation, operationB: Operation) => boolean;

constructor(options: QueueLink.Options = defaultOptions) {
super();
const {
keepPolicy = defaultOptions.keepPolicy,
isDuplicate = defaultOptions.isDuplicate
} = options;
this.keepPolicy = keepPolicy;
this.isDuplicate = isDuplicate;
}

public open() {
this.isOpen = true;
Expand Down Expand Up @@ -49,6 +97,29 @@ export default class QueueLink extends ApolloLink {
}

private enqueue(entry: OperationQueueEntry) {
this.opQueue.push(entry);
const isDuplicate = ({operation}: OperationQueueEntry) => this.isDuplicate(operation, entry.operation);
switch (this.keepPolicy) {
case "first":
const alreadyInQueue = this.opQueue.some(isDuplicate);
if (alreadyInQueue) {
// if there is already a duplicate entry the new one gets ignored
entry.observer.error(new DedupedByQueueError());
} else {
this.opQueue.push(entry);
}
break;
case "last":
const index = this.opQueue.findIndex(isDuplicate);
if (index !== -1) {
// if there is already a duplicate entry it gets removed
const [duplicate] = this.opQueue.splice(index, 1);
duplicate.observer.error(new DedupedByQueueError());
}
this.opQueue.push(entry);
break;
case "all":
this.opQueue.push(entry);
break;
}
}
}
23 changes: 21 additions & 2 deletions src/TestUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ import {
ApolloLink,
Operation,
Observable,
execute,
} from 'apollo-link';
import {
ExecutionResult,
} from 'graphql';
import {
GraphQLRequest
} from 'apollo-link/src/types';

export class TestLink extends ApolloLink {
public operations: Operation[];
Expand Down Expand Up @@ -39,7 +43,7 @@ export interface Unsubscribable {
}

export const assertObservableSequence = (
observable: Observable<ExecutionResult>,
observable: Observable<ExecutionResult | Error>,
sequence: ObservableValue[],
initializer: (sub: Unsubscribable) => void = () => undefined,
): Promise<boolean | Error> => {
Expand All @@ -50,7 +54,8 @@ export const assertObservableSequence = (
return new Promise((resolve, reject) => {
const sub = observable.subscribe({
next: (value) => {
expect({ type: 'next', value }).toEqual(sequence[index]);
const type = value instanceof Error ? 'error' : 'next';
expect({ type, value }).toEqual(sequence[index]);
index++;
if (index === sequence.length) {
resolve(true);
Expand All @@ -76,3 +81,17 @@ export const assertObservableSequence = (
initializer(sub);
});
};

export function executeMultiple(link: ApolloLink, ...operations: GraphQLRequest[]) {
return Observable.from(operations.map(op => execute(link, op)))
.flatMap(o => new Observable(sub => {
o.subscribe({
next: (v: any) => sub.next(v),
error: (e: any) => {
sub.next(e);
sub.complete();
},
complete: () => sub.complete()
})
}));
}