From cd9c8c69c02f759830a24ae6a5e15f865b0475c7 Mon Sep 17 00:00:00 2001 From: Rob Richard Date: Tue, 10 Nov 2020 10:26:19 -0500 Subject: [PATCH] Return underlying AsyncIterators when execute result is returned (#2843) --- src/execution/__tests__/stream-test.js | 141 +++++++++++++++++++++++++ src/execution/execute.js | 26 +++++ 2 files changed, 167 insertions(+) diff --git a/src/execution/__tests__/stream-test.js b/src/execution/__tests__/stream-test.js index 8be71a0333..5d9ab32735 100644 --- a/src/execution/__tests__/stream-test.js +++ b/src/execution/__tests__/stream-test.js @@ -1,6 +1,7 @@ import { expect } from 'chai'; import { describe, it } from 'mocha'; +import invariant from '../../jsutils/invariant'; import isAsyncIterable from '../../jsutils/isAsyncIterable'; import { parse } from '../../language/parser'; @@ -72,6 +73,36 @@ const query = new GraphQLObjectType({ yield {}; }, }, + asyncIterableListDelayed: { + type: new GraphQLList(friendType), + async *resolve() { + for (const friend of friends) { + // pause an additional ms before yielding to allow time + // for tests to return or throw before next value is processed. + // eslint-disable-next-line no-await-in-loop + await new Promise((r) => setTimeout(r, 1)); + yield friend; + } + }, + }, + asyncIterableListNoReturn: { + type: new GraphQLList(friendType), + resolve() { + let i = 0; + return { + [Symbol.asyncIterator]: () => ({ + async next() { + const friend = friends[i++]; + if (friend) { + await new Promise((r) => setTimeout(r, 1)); + return { value: friend, done: false }; + } + return { value: undefined, done: true }; + }, + }), + }; + }, + }, asyncIterableListDelayedClose: { type: new GraphQLList(friendType), async *resolve() { @@ -626,4 +657,114 @@ describe('Execute: stream directive', () => { }, ]); }); + it('Returns underlying async iterables when dispatcher is returned', async () => { + const document = parse(` + query { + asyncIterableListDelayed @stream(initialCount: 1) { + name + id + } + } + `); + const schema = new GraphQLSchema({ query }); + + const executeResult = await execute(schema, document, {}); + invariant(isAsyncIterable(executeResult)); + + const result1 = await executeResult.next(); + expect(result1).to.deep.equal({ + done: false, + value: { + data: { + asyncIterableListDelayed: [ + { + id: '1', + name: 'Luke', + }, + ], + }, + hasNext: true, + }, + }); + + executeResult.return(); + + // this result had started processing before return was called + const result2 = await executeResult.next(); + expect(result2).to.deep.equal({ + done: false, + value: { + data: { + id: '2', + name: 'Han', + }, + hasNext: true, + path: ['asyncIterableListDelayed', 1], + }, + }); + + // third result is not returned because async iterator has returned + const result3 = await executeResult.next(); + expect(result3).to.deep.equal({ + done: false, + value: { + hasNext: false, + }, + }); + }); + it('Can return async iterable when underlying iterable does not have a return method', async () => { + const document = parse(` + query { + asyncIterableListNoReturn @stream(initialCount: 1) { + name + id + } + } + `); + const schema = new GraphQLSchema({ query }); + + const executeResult = await execute(schema, document, {}); + invariant(isAsyncIterable(executeResult)); + + const result1 = await executeResult.next(); + expect(result1).to.deep.equal({ + done: false, + value: { + data: { + asyncIterableListNoReturn: [ + { + id: '1', + name: 'Luke', + }, + ], + }, + hasNext: true, + }, + }); + + executeResult.return(); + + // this result had started processing before return was called + const result2 = await executeResult.next(); + expect(result2).to.deep.equal({ + done: false, + value: { + data: { + id: '2', + name: 'Han', + }, + hasNext: true, + path: ['asyncIterableListNoReturn', 1], + }, + }); + + // third result is not returned because async iterator has returned + const result3 = await executeResult.next(); + expect(result3).to.deep.equal({ + done: false, + value: { + hasNext: false, + }, + }); + }); }); diff --git a/src/execution/execute.js b/src/execution/execute.js index 20ef6c7980..ca9408a7d3 100644 --- a/src/execution/execute.js +++ b/src/execution/execute.js @@ -1645,11 +1645,15 @@ type DispatcherResult = {| */ export class Dispatcher { _subsequentPayloads: Array>>; + _iterators: Array>; + _isDone: boolean; _initialResult: ?ExecutionResult; _hasReturnedInitialResult: boolean; constructor() { this._subsequentPayloads = []; + this._iterators = []; + this._isDone = false; this._hasReturnedInitialResult = false; } @@ -1718,6 +1722,8 @@ export class Dispatcher { itemType: GraphQLOutputType, ): void { const subsequentPayloads = this._subsequentPayloads; + const iterators = this._iterators; + iterators.push(iterator); function next(index) { const fieldPath = addPath(path, index); const patchErrors = []; @@ -1725,6 +1731,7 @@ export class Dispatcher { iterator.next().then( ({ value: data, done }) => { if (done) { + iterators.splice(iterators.indexOf(iterator), 1); return { value: undefined, done: true }; } @@ -1795,6 +1802,14 @@ export class Dispatcher { } _race(): Promise> { + if (this._isDone) { + return Promise.resolve({ + value: { + hasNext: false, + }, + done: false, + }); + } return new Promise((resolve) => { this._subsequentPayloads.forEach((promise) => { promise.then(() => { @@ -1851,6 +1866,16 @@ export class Dispatcher { return this._race(); } + _return(): Promise> { + return Promise.all( + // $FlowFixMe[prop-missing] + this._iterators.map((iterator) => iterator.return?.()), + ).then(() => { + this._isDone = true; + return { value: undefined, done: true }; + }); + } + get(initialResult: ExecutionResult): AsyncIterable { this._initialResult = initialResult; return ({ @@ -1858,6 +1883,7 @@ export class Dispatcher { return this; }, next: () => this._next(), + return: () => this._return(), }: any); } }