Skip to content

Commit

Permalink
fix(incremental): pass through errors from return functions that throw
Browse files Browse the repository at this point in the history
Early execution may result in non-completed streams after publishing is completed -- these streams must be closed using their return methods. When this occurs, we should pass through any error that occurs in the clean-up function instead of swallowing errors.

Swallowing errors is a bad practice that could lead to memory leaks. The argument in favor of swallowing the error might be that because the stream was "executed early" and the error does not impact any of the returned data, there is no "place" to forward the error.

But there is a way to forward the error, and that's on the next() call that returns { value: undefined, done: true } to end the stream. We will therefore appropriately send all the data and be able to pass through an error. Servers processing our stream should be made aware of this behavior and put in place error handling procedures that allow them to forward the data to clients when they see a payload with { hasNext: false } and then filter any further errors from clients (versus holding that { hasNext: false } until the clean-up has been performed, which would be up to servers.
  • Loading branch information
yaacovCR committed Jul 9, 2024
1 parent 18d284c commit ed9caee
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 30 deletions.
53 changes: 35 additions & 18 deletions src/execution/IncrementalPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,19 @@ interface SubsequentIncrementalExecutionResultContext {
completed: Array<CompletedResult>;
}

/**
* The IncrementalPublisherState Enum tracks the state of the IncrementalPublisher, which is initialized to
* "Started". When there are no more incremental results to publish, the state is set to "Completed". On the
* next call to next, clean-up is potentially performed and the state is set to "Finished".
*
* If the IncrementalPublisher is ended early, it may be advanced directly from "Started" to "Finished".
*/
enum IncrementalPublisherState {
Started = 1,
Completed = 2,
Finished = 3,
}

/**
* This class is used to publish incremental results to the client, enabling semi-concurrent
* execution while preserving result order.
Expand Down Expand Up @@ -119,14 +132,29 @@ class IncrementalPublisher {
void,
void
> {
let isDone = false;
let incrementalPublisherState: IncrementalPublisherState =
IncrementalPublisherState.Started;

const _finish = async (): Promise<void> => {
incrementalPublisherState = IncrementalPublisherState.Finished;
this._incrementalGraph.abort();
await this._returnAsyncIterators();
};

const _next = async (): Promise<
IteratorResult<SubsequentIncrementalExecutionResult, void>
> => {
if (isDone) {
await this._returnAsyncIteratorsIgnoringErrors();
return { value: undefined, done: true };
switch (incrementalPublisherState) {
case IncrementalPublisherState.Finished: {
return { value: undefined, done: true };
}
case IncrementalPublisherState.Completed: {
await _finish();
return { value: undefined, done: true };
}
case IncrementalPublisherState.Started: {
// continue
}
}

const context: SubsequentIncrementalExecutionResultContext = {
Expand All @@ -147,7 +175,7 @@ class IncrementalPublisher {
const hasNext = this._incrementalGraph.hasNext();

if (!hasNext) {
isDone = true;
incrementalPublisherState = IncrementalPublisherState.Completed;
}

const subsequentIncrementalExecutionResult: SubsequentIncrementalExecutionResult =
Expand All @@ -171,25 +199,20 @@ class IncrementalPublisher {
batch = await this._incrementalGraph.nextCompletedBatch();
} while (batch !== undefined);

await this._returnAsyncIteratorsIgnoringErrors();
return { value: undefined, done: true };
};

const _return = async (): Promise<
IteratorResult<SubsequentIncrementalExecutionResult, void>
> => {
isDone = true;
this._incrementalGraph.abort();
await this._returnAsyncIterators();
await _finish();
return { value: undefined, done: true };
};

const _throw = async (
error?: unknown,
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> => {
isDone = true;
this._incrementalGraph.abort();
await this._returnAsyncIterators();
await _finish();
return Promise.reject(error);
};

Expand Down Expand Up @@ -372,10 +395,4 @@ class IncrementalPublisher {
}
await Promise.all(promises);
}

private async _returnAsyncIteratorsIgnoringErrors(): Promise<void> {
await this._returnAsyncIterators().catch(() => {
// Ignore errors
});
}
}
21 changes: 9 additions & 12 deletions src/execution/__tests__/stream-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { assert, expect } from 'chai';
import { describe, it } from 'mocha';

import { expectJSON } from '../../__testUtils__/expectJSON.js';
import { expectPromise } from '../../__testUtils__/expectPromise.js';
import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js';

import type { PromiseOrValue } from '../../jsutils/PromiseOrValue.js';
Expand Down Expand Up @@ -1791,7 +1792,7 @@ describe('Execute: stream directive', () => {
]);
});

it('Returns iterator and ignores errors when stream payloads are filtered', async () => {
it('Returns iterator and passes through errors when stream payloads are filtered', async () => {
let returned = false;
let requested = false;
const iterable = {
Expand All @@ -1814,7 +1815,7 @@ describe('Execute: stream directive', () => {
},
return: () => {
returned = true;
// Ignores errors from return.
// This error should be passed through.
return Promise.reject(new Error('Oops'));
},
}),
Expand Down Expand Up @@ -1889,8 +1890,8 @@ describe('Execute: stream directive', () => {
},
});

const result3 = await iterator.next();
expectJSON(result3).toDeepEqual({ done: true, value: undefined });
const result3Promise = iterator.next();
await expectPromise(result3Promise).toRejectWith('Oops');

assert(returned);
});
Expand Down Expand Up @@ -2339,6 +2340,8 @@ describe('Execute: stream directive', () => {
}),
return: () => {
returned = true;
// This error should be passed through.
return Promise.reject(new Error('Oops'));
},
}),
};
Expand Down Expand Up @@ -2378,7 +2381,7 @@ describe('Execute: stream directive', () => {
done: true,
value: undefined,
});
await returnPromise;
await expectPromise(returnPromise).toRejectWith('Oops');
assert(returned);
});
it('Can return async iterable when underlying iterable does not have a return method', async () => {
Expand Down Expand Up @@ -2498,13 +2501,7 @@ describe('Execute: stream directive', () => {
done: true,
value: undefined,
});
try {
await throwPromise; /* c8 ignore start */
// Not reachable, always throws
/* c8 ignore stop */
} catch (e) {
// ignore error
}
await expectPromise(throwPromise).toRejectWith('bad');
assert(returned);
});
});

0 comments on commit ed9caee

Please sign in to comment.