From 00f9082ac8b54acdc8105c63d5bac8ddcb5376e6 Mon Sep 17 00:00:00 2001 From: Jonas Date: Wed, 1 Aug 2018 15:58:48 +0200 Subject: [PATCH] cancel duplicate operations with an error; allows the user to react to / clean up after a deduped operation --- README.md | 6 ++++++ src/QueueLink.test.ts | 6 +++++- src/QueueLink.ts | 11 +++++++++-- src/TestUtils.ts | 28 ++++++++++++++-------------- 4 files changed, 34 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index c4e0005..ef21285 100644 --- a/README.md +++ b/README.md @@ -85,6 +85,12 @@ const offlineLink = new QueueLink({ }); this.link = ApolloLink.from([ + new RetryLink({ + attempts: { + // We don't want to retry deduped operations + retryIf: (error, _operation) => !!error && error.name !== 'DedupedByQueueError' + } + }), offlineLink, new BatchHttpLink({ uri: URI_TO_YOUR_GRAPHQL_SERVER }), ]); diff --git a/src/QueueLink.test.ts b/src/QueueLink.test.ts index 8ba50e1..b78a78f 100644 --- a/src/QueueLink.test.ts +++ b/src/QueueLink.test.ts @@ -1,4 +1,6 @@ -import QueueLink from './QueueLink'; +import QueueLink, { + DedupedByQueueError +} from './QueueLink'; import { assertObservableSequence, executeMultiple, @@ -133,6 +135,7 @@ describe('OnOffLink', () => { return assertObservableSequence( executeMultiple(myLink, op, op2, op), [ + { type: 'error', value: new DedupedByQueueError()}, { type: 'next', value: testResponse2 }, { type: 'next', value: testResponse }, { type: 'complete' }, @@ -153,6 +156,7 @@ describe('OnOffLink', () => { return assertObservableSequence( executeMultiple(myLink, op, op2, op), [ + { type: 'error', value: new DedupedByQueueError()}, { type: 'next', value: testResponse }, { type: 'next', value: testResponse2 }, { type: 'complete' }, diff --git a/src/QueueLink.ts b/src/QueueLink.ts index 1ce6c94..ca8c103 100644 --- a/src/QueueLink.ts +++ b/src/QueueLink.ts @@ -43,6 +43,13 @@ const defaultOptions: QueueLink.Options = { isDuplicate: (a: Operation, b: Operation) => a.toKey() === b.toKey() }; +export class DedupedByQueueError extends Error { + constructor() { + super('Operation got deduplicated by apollo-link-queue.'); + Object.defineProperty(this, 'name', { value: 'DedupedByQueueError' }); + } +} + export default class QueueLink extends ApolloLink { private opQueue: OperationQueueEntry[] = []; private isOpen: boolean = true; @@ -96,7 +103,7 @@ export default class QueueLink extends ApolloLink { const alreadyInQueue = this.opQueue.some(isDuplicate); if (alreadyInQueue) { // if there is already a duplicate entry the new one gets ignored - entry.observer.complete() + entry.observer.error(new DedupedByQueueError()); } else { this.opQueue.push(entry); } @@ -106,7 +113,7 @@ export default class QueueLink extends ApolloLink { if (index !== -1) { // if there is already a duplicate entry it gets removed const [duplicate] = this.opQueue.splice(index, 1); - duplicate.observer.complete(); + duplicate.observer.error(new DedupedByQueueError()); } this.opQueue.push(entry); break; diff --git a/src/TestUtils.ts b/src/TestUtils.ts index 519f006..5c1db1f 100644 --- a/src/TestUtils.ts +++ b/src/TestUtils.ts @@ -43,7 +43,7 @@ export interface Unsubscribable { } export const assertObservableSequence = ( - observable: Observable, + observable: Observable, sequence: ObservableValue[], initializer: (sub: Unsubscribable) => void = () => undefined, ): Promise => { @@ -54,7 +54,8 @@ export const assertObservableSequence = ( return new Promise((resolve, reject) => { const sub = observable.subscribe({ next: (value) => { - expect({ type: 'next', value }).toEqual(sequence[index]); + const type = value instanceof Error ? 'error' : 'next'; + expect({ type, value }).toEqual(sequence[index]); index++; if (index === sequence.length) { resolve(true); @@ -82,16 +83,15 @@ export const assertObservableSequence = ( }; export function executeMultiple(link: ApolloLink, ...operations: GraphQLRequest[]) { - return new Observable(sub => { - let i = 0; - const s = { - next: (v: any) => sub.next(v), - error: (e: any) => sub.error(e), - complete() { - i++; - if (i === operations.length) sub.complete() - } - }; - operations.forEach(op => execute(link, op).subscribe(s)) - }); + return Observable.from(operations.map(op => execute(link, op))) + .flatMap(o => new Observable(sub => { + o.subscribe({ + next: (v: any) => sub.next(v), + error: (e: any) => { + sub.next(e); + sub.complete(); + }, + complete: () => sub.complete() + }) + })); }