From c510673d127d02005045b504a7c2d9bed34fe8dc Mon Sep 17 00:00:00 2001 From: Jonas Date: Wed, 11 Jul 2018 10:30:42 +0200 Subject: [PATCH 1/3] implement deduplication options --- README.md | 38 +++++++++++++++++++++ src/QueueLink.test.ts | 79 ++++++++++++++++++++++++++++++++++++++++++- src/QueueLink.ts | 66 +++++++++++++++++++++++++++++++++++- src/TestUtils.ts | 19 +++++++++++ 4 files changed, 200 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 24cb7f8..c4e0005 100644 --- a/README.md +++ b/README.md @@ -51,3 +51,41 @@ this.link = ApolloLink.from([ new HttpLink({ uri: URI_TO_YOUR_GRAPHQL_SERVER }), ]); ``` + +### Deduplicate + +It's possible to deduplicate operations while the queue is closed. +This for example allows to only save the newest changes without the steps between. + +```js +import { ApolloLink } from 'apollo-link'; +import { BatchHttpLink } from 'apollo-link-batch-http'; +import QueueLink from 'apollo-link-queue'; + +const offlineLink = new QueueLink({ + /** + * Decides which entry to keep in the queue in case of duplicate entries. + * Possible values: + * - last: removes existing duplicate + * - first: ignores new duplicate + * - all: doesn't deduplicate + * + * Defaults to 'all'. + */ + keepPolicy: 'last', + + /** + * Specifies which entries are considered duplicates + * + * Defaults to comparing operation operationA.toKey() === operationB.toKey() + * https://www.apollographql.com/docs/link/overview.html + */ + isDuplicate: (a, b) => (a.operationName === 'save' && + a.operationName === b.operationName && a.variables.id === b.variables.id) +}); + +this.link = ApolloLink.from([ + offlineLink, + new BatchHttpLink({ uri: URI_TO_YOUR_GRAPHQL_SERVER }), +]); +``` diff --git a/src/QueueLink.test.ts b/src/QueueLink.test.ts index a97af1e..8ba50e1 100644 --- a/src/QueueLink.test.ts +++ b/src/QueueLink.test.ts @@ -1,6 +1,7 @@ import QueueLink from './QueueLink'; import { assertObservableSequence, + executeMultiple, TestLink, } from './TestUtils'; import { @@ -21,13 +22,26 @@ describe('OnOffLink', () => { }, }; + const testResponse2 = { + data: { + hello2: 'World', + }, + }; + const op: GraphQLRequest = { - query: gql`{ hello }`, + query: gql`query hello { hello }`, context: { testResponse, }, }; + const op2: GraphQLRequest = { + query: gql`query hello2 { hello }`, + context: { + testResponse: testResponse2, + }, + }; + beforeEach(() => { jest.useFakeTimers(); testLink = new TestLink(); @@ -94,6 +108,7 @@ describe('OnOffLink', () => { expect(testLink.operations.length).toBe(0); sub.unsubscribe(); }); + it('releases held requests when you open it', () => { onOffLink.close(); return assertObservableSequence( @@ -105,6 +120,68 @@ describe('OnOffLink', () => { () => { expect(testLink.operations.length).toBe(0); onOffLink.open(); + expect(testLink.operations.length).toBe(1); + jest.runAllTimers(); + }, + ); + }); + + it('releases held deduplicated requests when you open it (last)', () => { + const dedupOnOffLink = new QueueLink({keepPolicy: "last"}); + const myLink = ApolloLink.from([dedupOnOffLink, testLink]); + dedupOnOffLink.close(); + return assertObservableSequence( + executeMultiple(myLink, op, op2, op), + [ + { type: 'next', value: testResponse2 }, + { type: 'next', value: testResponse }, + { type: 'complete' }, + ], + () => { + expect(testLink.operations.length).toBe(0); + dedupOnOffLink.open(); + expect(testLink.operations.length).toBe(2); + jest.runAllTimers(); + }, + ); + }); + + it('releases held deduplicated requests when you open it (first)', () => { + const dedupOnOffLink = new QueueLink({keepPolicy: "first"}); + const myLink = ApolloLink.from([dedupOnOffLink, testLink]); + dedupOnOffLink.close(); + return assertObservableSequence( + executeMultiple(myLink, op, op2, op), + [ + { type: 'next', value: testResponse }, + { type: 'next', value: testResponse2 }, + { type: 'complete' }, + ], + () => { + expect(testLink.operations.length).toBe(0); + dedupOnOffLink.open(); + expect(testLink.operations.length).toBe(2); + jest.runAllTimers(); + }, + ); + }); + + it('releases held deduplicated requests when you open it (all)', () => { + const dedupOnOffLink = new QueueLink({keepPolicy: "all"}); + const myLink = ApolloLink.from([dedupOnOffLink, testLink]); + dedupOnOffLink.close(); + return assertObservableSequence( + executeMultiple(myLink, op, op2, op), + [ + { type: 'next', value: testResponse }, + { type: 'next', value: testResponse2 }, + { type: 'next', value: testResponse }, + { type: 'complete' }, + ], + () => { + expect(testLink.operations.length).toBe(0); + dedupOnOffLink.open(); + expect(testLink.operations.length).toBe(3); jest.runAllTimers(); }, ); diff --git a/src/QueueLink.ts b/src/QueueLink.ts index 4819e0c..1ce6c94 100644 --- a/src/QueueLink.ts +++ b/src/QueueLink.ts @@ -14,9 +14,50 @@ interface OperationQueueEntry { subscription?: { unsubscribe: () => void }; } +export type KeepPolicy = + | 'first' + | 'last' + | 'all'; + +export namespace QueueLink { + export interface Options { + /** + * Decides which entry to keep in the queue in case of duplicate entries. + * + * Defaults to 'all'. + */ + keepPolicy?: KeepPolicy; + + /** + * Specifies which entries are considered duplicates + * + * Defaults to comparing operation operationA.toKey() === operationB.toKey() + * https://www.apollographql.com/docs/link/overview.html + */ + isDuplicate?: (operationA: Operation, operationB: Operation) => boolean; + } +} + +const defaultOptions: QueueLink.Options = { + keepPolicy: 'all', + isDuplicate: (a: Operation, b: Operation) => a.toKey() === b.toKey() +}; + export default class QueueLink extends ApolloLink { private opQueue: OperationQueueEntry[] = []; private isOpen: boolean = true; + private readonly keepPolicy: KeepPolicy; + private readonly isDuplicate: (operationA: Operation, operationB: Operation) => boolean; + + constructor(options: QueueLink.Options = defaultOptions) { + super(); + const { + keepPolicy = defaultOptions.keepPolicy, + isDuplicate = defaultOptions.isDuplicate + } = options; + this.keepPolicy = keepPolicy; + this.isDuplicate = isDuplicate; + } public open() { this.isOpen = true; @@ -49,6 +90,29 @@ export default class QueueLink extends ApolloLink { } private enqueue(entry: OperationQueueEntry) { - this.opQueue.push(entry); + const isDuplicate = ({operation}: OperationQueueEntry) => this.isDuplicate(operation, entry.operation); + switch (this.keepPolicy) { + case "first": + const alreadyInQueue = this.opQueue.some(isDuplicate); + if (alreadyInQueue) { + // if there is already a duplicate entry the new one gets ignored + entry.observer.complete() + } else { + this.opQueue.push(entry); + } + break; + case "last": + const index = this.opQueue.findIndex(isDuplicate); + if (index !== -1) { + // if there is already a duplicate entry it gets removed + const [duplicate] = this.opQueue.splice(index, 1); + duplicate.observer.complete(); + } + this.opQueue.push(entry); + break; + case "all": + this.opQueue.push(entry); + break; + } } } diff --git a/src/TestUtils.ts b/src/TestUtils.ts index 55feb8a..519f006 100644 --- a/src/TestUtils.ts +++ b/src/TestUtils.ts @@ -2,10 +2,14 @@ import { ApolloLink, Operation, Observable, + execute, } from 'apollo-link'; import { ExecutionResult, } from 'graphql'; +import { + GraphQLRequest +} from 'apollo-link/src/types'; export class TestLink extends ApolloLink { public operations: Operation[]; @@ -76,3 +80,18 @@ export const assertObservableSequence = ( initializer(sub); }); }; + +export function executeMultiple(link: ApolloLink, ...operations: GraphQLRequest[]) { + return new Observable(sub => { + let i = 0; + const s = { + next: (v: any) => sub.next(v), + error: (e: any) => sub.error(e), + complete() { + i++; + if (i === operations.length) sub.complete() + } + }; + operations.forEach(op => execute(link, op).subscribe(s)) + }); +} From 00f9082ac8b54acdc8105c63d5bac8ddcb5376e6 Mon Sep 17 00:00:00 2001 From: Jonas Date: Wed, 1 Aug 2018 15:58:48 +0200 Subject: [PATCH 2/3] cancel duplicate operations with an error; allows the user to react to / clean up after a deduped operation --- README.md | 6 ++++++ src/QueueLink.test.ts | 6 +++++- src/QueueLink.ts | 11 +++++++++-- src/TestUtils.ts | 28 ++++++++++++++-------------- 4 files changed, 34 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index c4e0005..ef21285 100644 --- a/README.md +++ b/README.md @@ -85,6 +85,12 @@ const offlineLink = new QueueLink({ }); this.link = ApolloLink.from([ + new RetryLink({ + attempts: { + // We don't want to retry deduped operations + retryIf: (error, _operation) => !!error && error.name !== 'DedupedByQueueError' + } + }), offlineLink, new BatchHttpLink({ uri: URI_TO_YOUR_GRAPHQL_SERVER }), ]); diff --git a/src/QueueLink.test.ts b/src/QueueLink.test.ts index 8ba50e1..b78a78f 100644 --- a/src/QueueLink.test.ts +++ b/src/QueueLink.test.ts @@ -1,4 +1,6 @@ -import QueueLink from './QueueLink'; +import QueueLink, { + DedupedByQueueError +} from './QueueLink'; import { assertObservableSequence, executeMultiple, @@ -133,6 +135,7 @@ describe('OnOffLink', () => { return assertObservableSequence( executeMultiple(myLink, op, op2, op), [ + { type: 'error', value: new DedupedByQueueError()}, { type: 'next', value: testResponse2 }, { type: 'next', value: testResponse }, { type: 'complete' }, @@ -153,6 +156,7 @@ describe('OnOffLink', () => { return assertObservableSequence( executeMultiple(myLink, op, op2, op), [ + { type: 'error', value: new DedupedByQueueError()}, { type: 'next', value: testResponse }, { type: 'next', value: testResponse2 }, { type: 'complete' }, diff --git a/src/QueueLink.ts b/src/QueueLink.ts index 1ce6c94..ca8c103 100644 --- a/src/QueueLink.ts +++ b/src/QueueLink.ts @@ -43,6 +43,13 @@ const defaultOptions: QueueLink.Options = { isDuplicate: (a: Operation, b: Operation) => a.toKey() === b.toKey() }; +export class DedupedByQueueError extends Error { + constructor() { + super('Operation got deduplicated by apollo-link-queue.'); + Object.defineProperty(this, 'name', { value: 'DedupedByQueueError' }); + } +} + export default class QueueLink extends ApolloLink { private opQueue: OperationQueueEntry[] = []; private isOpen: boolean = true; @@ -96,7 +103,7 @@ export default class QueueLink extends ApolloLink { const alreadyInQueue = this.opQueue.some(isDuplicate); if (alreadyInQueue) { // if there is already a duplicate entry the new one gets ignored - entry.observer.complete() + entry.observer.error(new DedupedByQueueError()); } else { this.opQueue.push(entry); } @@ -106,7 +113,7 @@ export default class QueueLink extends ApolloLink { if (index !== -1) { // if there is already a duplicate entry it gets removed const [duplicate] = this.opQueue.splice(index, 1); - duplicate.observer.complete(); + duplicate.observer.error(new DedupedByQueueError()); } this.opQueue.push(entry); break; diff --git a/src/TestUtils.ts b/src/TestUtils.ts index 519f006..5c1db1f 100644 --- a/src/TestUtils.ts +++ b/src/TestUtils.ts @@ -43,7 +43,7 @@ export interface Unsubscribable { } export const assertObservableSequence = ( - observable: Observable, + observable: Observable, sequence: ObservableValue[], initializer: (sub: Unsubscribable) => void = () => undefined, ): Promise => { @@ -54,7 +54,8 @@ export const assertObservableSequence = ( return new Promise((resolve, reject) => { const sub = observable.subscribe({ next: (value) => { - expect({ type: 'next', value }).toEqual(sequence[index]); + const type = value instanceof Error ? 'error' : 'next'; + expect({ type, value }).toEqual(sequence[index]); index++; if (index === sequence.length) { resolve(true); @@ -82,16 +83,15 @@ export const assertObservableSequence = ( }; export function executeMultiple(link: ApolloLink, ...operations: GraphQLRequest[]) { - return new Observable(sub => { - let i = 0; - const s = { - next: (v: any) => sub.next(v), - error: (e: any) => sub.error(e), - complete() { - i++; - if (i === operations.length) sub.complete() - } - }; - operations.forEach(op => execute(link, op).subscribe(s)) - }); + return Observable.from(operations.map(op => execute(link, op))) + .flatMap(o => new Observable(sub => { + o.subscribe({ + next: (v: any) => sub.next(v), + error: (e: any) => { + sub.next(e); + sub.complete(); + }, + complete: () => sub.complete() + }) + })); } From 1985858e65c52e070ef4baef107b9445ca03a72f Mon Sep 17 00:00:00 2001 From: Jonas Date: Wed, 1 Aug 2018 16:30:38 +0200 Subject: [PATCH 3/3] fix skip test --- src/QueueLink.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/QueueLink.test.ts b/src/QueueLink.test.ts index b78a78f..5b4294e 100644 --- a/src/QueueLink.test.ts +++ b/src/QueueLink.test.ts @@ -67,7 +67,7 @@ describe('OnOffLink', () => { }); it('skips the queue when asked to', () => { const opWithSkipQueue: GraphQLRequest = { - query: gql`{ hello }`, + query: gql`query hello { hello }`, context: { skipQueue: true, },