Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(incremental): do not ignore errors from filtered streams #4142

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 35 additions & 18 deletions src/execution/IncrementalPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,19 @@ interface SubsequentIncrementalExecutionResultContext {
completed: Array<CompletedResult>;
}

/**
* The IncrementalPublisherState Enum tracks the state of the IncrementalPublisher, which is initialized to
* "Started". When there are no more incremental results to publish, the state is set to "Completed". On the
* next call to next, clean-up is potentially performed and the state is set to "Finished".
*
* If the IncrementalPublisher is ended early, it may be advanced directly from "Started" to "Finished".
*/
enum IncrementalPublisherState {
Started = 1,
Completed = 2,
Finished = 3,
}

/**
* This class is used to publish incremental results to the client, enabling semi-concurrent
* execution while preserving result order.
Expand Down Expand Up @@ -119,14 +132,29 @@ class IncrementalPublisher {
void,
void
> {
let isDone = false;
let incrementalPublisherState: IncrementalPublisherState =
IncrementalPublisherState.Started;

const _finish = async (): Promise<void> => {
incrementalPublisherState = IncrementalPublisherState.Finished;
this._incrementalGraph.abort();
await this._returnAsyncIterators();
};

const _next = async (): Promise<
IteratorResult<SubsequentIncrementalExecutionResult, void>
> => {
if (isDone) {
await this._returnAsyncIteratorsIgnoringErrors();
return { value: undefined, done: true };
switch (incrementalPublisherState) {
case IncrementalPublisherState.Finished: {
return { value: undefined, done: true };
}
case IncrementalPublisherState.Completed: {
await _finish();
return { value: undefined, done: true };
}
case IncrementalPublisherState.Started: {
// continue
}
}

const context: SubsequentIncrementalExecutionResultContext = {
Expand All @@ -147,7 +175,7 @@ class IncrementalPublisher {
const hasNext = this._incrementalGraph.hasNext();

if (!hasNext) {
isDone = true;
incrementalPublisherState = IncrementalPublisherState.Completed;
}

const subsequentIncrementalExecutionResult: SubsequentIncrementalExecutionResult =
Expand All @@ -171,25 +199,20 @@ class IncrementalPublisher {
batch = await this._incrementalGraph.nextCompletedBatch();
} while (batch !== undefined);

await this._returnAsyncIteratorsIgnoringErrors();
return { value: undefined, done: true };
};

const _return = async (): Promise<
IteratorResult<SubsequentIncrementalExecutionResult, void>
> => {
isDone = true;
this._incrementalGraph.abort();
await this._returnAsyncIterators();
await _finish();
return { value: undefined, done: true };
};

const _throw = async (
error?: unknown,
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> => {
isDone = true;
this._incrementalGraph.abort();
await this._returnAsyncIterators();
await _finish();
return Promise.reject(error);
};

Expand Down Expand Up @@ -372,10 +395,4 @@ class IncrementalPublisher {
}
await Promise.all(promises);
}

private async _returnAsyncIteratorsIgnoringErrors(): Promise<void> {
await this._returnAsyncIterators().catch(() => {
// Ignore errors
});
}
}
21 changes: 9 additions & 12 deletions src/execution/__tests__/stream-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { assert, expect } from 'chai';
import { describe, it } from 'mocha';

import { expectJSON } from '../../__testUtils__/expectJSON.js';
import { expectPromise } from '../../__testUtils__/expectPromise.js';
import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js';

import type { PromiseOrValue } from '../../jsutils/PromiseOrValue.js';
Expand Down Expand Up @@ -1791,7 +1792,7 @@ describe('Execute: stream directive', () => {
]);
});

it('Returns iterator and ignores errors when stream payloads are filtered', async () => {
it('Returns iterator and passes through errors when stream payloads are filtered', async () => {
let returned = false;
let requested = false;
const iterable = {
Expand All @@ -1814,7 +1815,7 @@ describe('Execute: stream directive', () => {
},
return: () => {
returned = true;
// Ignores errors from return.
// This error should be passed through.
return Promise.reject(new Error('Oops'));
},
}),
Expand Down Expand Up @@ -1889,8 +1890,8 @@ describe('Execute: stream directive', () => {
},
});

const result3 = await iterator.next();
expectJSON(result3).toDeepEqual({ done: true, value: undefined });
const result3Promise = iterator.next();
await expectPromise(result3Promise).toRejectWith('Oops');

assert(returned);
});
Expand Down Expand Up @@ -2339,6 +2340,8 @@ describe('Execute: stream directive', () => {
}),
return: () => {
returned = true;
// This error should be passed through.
return Promise.reject(new Error('Oops'));
},
}),
};
Expand Down Expand Up @@ -2378,7 +2381,7 @@ describe('Execute: stream directive', () => {
done: true,
value: undefined,
});
await returnPromise;
await expectPromise(returnPromise).toRejectWith('Oops');
assert(returned);
});
it('Can return async iterable when underlying iterable does not have a return method', async () => {
Expand Down Expand Up @@ -2498,13 +2501,7 @@ describe('Execute: stream directive', () => {
done: true,
value: undefined,
});
try {
await throwPromise; /* c8 ignore start */
// Not reachable, always throws
/* c8 ignore stop */
} catch (e) {
// ignore error
}
await expectPromise(throwPromise).toRejectWith('bad');
assert(returned);
});
});
Loading