diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index af42dcd908..a7ac62b41f 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -77,6 +77,24 @@ async function complete(document: DocumentNode, rootValue: unknown = {}) { return result; } +async function completeAsync( + document: DocumentNode, + numCalls: number, + rootValue: unknown = {}, +) { + const result = await execute({ schema, document, rootValue }); + + assert(isAsyncIterable(result)); + + const iterator = result[Symbol.asyncIterator](); + + const promises = []; + for (let i = 0; i < numCalls; i++) { + promises.push(iterator.next()); + } + return Promise.all(promises); +} + function createResolvablePromise(): [Promise, (value?: T) => void] { let resolveFn; const promise = new Promise((resolve) => { @@ -566,6 +584,51 @@ describe('Execute: stream directive', () => { }, }); }); + it('Can handle concurrent calls to .next() without waiting', async () => { + const document = parse(` + query { + friendList @stream(initialCount: 2) { + name + id + } + } + `); + const result = await completeAsync(document, 4, { + async *friendList() { + yield await Promise.resolve(friends[0]); + yield await Promise.resolve(friends[1]); + yield await Promise.resolve(friends[2]); + }, + }); + expectJSON(result).toDeepEqual([ + { + done: false, + value: { + data: { + friendList: [ + { name: 'Luke', id: '1' }, + { name: 'Han', id: '2' }, + ], + }, + hasNext: true, + }, + }, + { + done: false, + value: { + incremental: [ + { + items: [{ name: 'Leia', id: '3' }], + path: ['friendList', 2], + }, + ], + hasNext: true, + }, + }, + { done: false, value: { hasNext: false } }, + { done: true, value: undefined }, + ]); + }); it('Handles error thrown in async iterable before initialCount is reached', async () => { const document = parse(` query { diff --git a/src/execution/execute.ts b/src/execution/execute.ts index f85a159bab..37f1c1227c 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -1823,7 +1823,16 @@ function yieldSubsequentPayloads( }, ); + if (exeContext.subsequentPayloads.length === 0) { + // a different call to next has exhausted all payloads + return { value: undefined, done: true }; + } const index = exeContext.subsequentPayloads.indexOf(asyncPayloadRecord); + if (index === -1) { + // a different call to next has consumed this payload + return race(); + } + exeContext.subsequentPayloads.splice(index, 1); const incrementalResult: IncrementalResult = {};