Skip to content

Commit 2c70d27

Browse files
n1ru4lardatan
andauthoredDec 24, 2024··
fix: avoid "possible EventEmitter memory leak detected" warning (#6789)
* fix: possible EventEmitter memory leak detected * Single place for AbortSignal listeners -> #6789 (#6793) * Single place for AbortSignal listeners * Fix * .. * Lets go * Small improvement * Changeset for utils * Update packages/executor/src/execution/__tests__/abort-signal.test.ts --------- Co-authored-by: Arda TANRIKULU <ardatanrikulu@gmail.com>

File tree

7 files changed

+122
-63
lines changed

7 files changed

+122
-63
lines changed
 

‎.changeset/hip-bikes-hunt.md

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
'@graphql-tools/utils': minor
3+
---
4+
5+
- New helper function `getAbortPromise` to get a promise rejected when `AbortSignal` is aborted
6+
- New helper function `registerAbortSignalListener` to register a listener to abort a promise when `AbortSignal` is aborted
7+
8+
Instead of using `.addEventListener('abort', () => {/* ... */})`, we register a single listener to avoid warnings on Node.js like `MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 abort listeners added. Use emitter.setMaxListeners() to increase limit`.

‎.changeset/selfish-worms-decide.md

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
'@graphql-tools/executor': patch
3+
---
4+
5+
Surpress the "possible EventEmitter memory leak detected." warning occuring on Node.js when passing
6+
a `AbortSignal` to `execute`.
7+
8+
Each execution will now only set up a single listener on the supplied `AbortSignal`. While the warning is harmless it can be misleading, which is the main motivation of this change.

‎packages/executor/src/execution/__tests__/abort-signal.test.ts

+11-11
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,18 @@ import { assertAsyncIterable } from '../../../../loaders/url/tests/test-utils';
77
import { normalizedExecutor } from '../normalizedExecutor';
88

99
describe('Abort Signal', () => {
10+
// Always make sure that listener is registered once or never
11+
let controller: AbortController;
12+
let spy: jest.SpyInstance;
13+
beforeEach(() => {
14+
controller = new AbortController();
15+
spy = jest.spyOn(controller.signal, 'addEventListener');
16+
});
17+
afterEach(() => {
18+
expect(spy.mock.calls.length).toBeLessThanOrEqual(1);
19+
});
1020
it('should stop the subscription', async () => {
11-
expect.assertions(2);
12-
const controller = new AbortController();
21+
expect.assertions(3);
1322
let stopped = false;
1423
const schema = makeExecutableSchema({
1524
typeDefs: /* GraphQL */ `
@@ -64,7 +73,6 @@ describe('Abort Signal', () => {
6473
expect(results).toEqual([0, 1, 2, 3, 4]);
6574
});
6675
it('pending subscription execution is canceled', async () => {
67-
const controller = new AbortController();
6876
const rootResolverGotInvokedD = createDeferred<void>();
6977
const requestGotCancelledD = createDeferred<void>();
7078
let aResolverGotInvoked = false;
@@ -123,8 +131,6 @@ describe('Abort Signal', () => {
123131
expect(aResolverGotInvoked).toEqual(false);
124132
});
125133
it('should stop the serial mutation execution', async () => {
126-
const controller = new AbortController();
127-
128134
let didInvokeFirstFn = false;
129135
let didInvokeSecondFn = false;
130136
let didInvokeThirdFn = false;
@@ -174,7 +180,6 @@ describe('Abort Signal', () => {
174180
expect(didInvokeThirdFn).toBe(false);
175181
});
176182
it('should stop stream execution', async () => {
177-
const controller = new AbortController();
178183
let isAborted = false;
179184

180185
const schema = makeExecutableSchema({
@@ -223,7 +228,6 @@ describe('Abort Signal', () => {
223228
expect(isAborted).toEqual(true);
224229
});
225230
it('stops pending stream execution for incremental delivery (@stream)', async () => {
226-
const controller = new AbortController();
227231
const d = createDeferred<void>();
228232
let isReturnInvoked = false;
229233

@@ -285,7 +289,6 @@ describe('Abort Signal', () => {
285289
expect(isReturnInvoked).toEqual(true);
286290
});
287291
it('stops pending stream execution for parallel sources incremental delivery (@stream)', async () => {
288-
const controller = new AbortController();
289292
const d1 = createDeferred<void>();
290293
const d2 = createDeferred<void>();
291294

@@ -404,7 +407,6 @@ describe('Abort Signal', () => {
404407
},
405408
},
406409
});
407-
const controller = new AbortController();
408410
const result = await normalizedExecutor({
409411
schema,
410412
document: parse(/* GraphQL */ `
@@ -443,7 +445,6 @@ describe('Abort Signal', () => {
443445
expect(bResolverGotInvoked).toBe(false);
444446
});
445447
it('stops promise execution', async () => {
446-
const controller = new AbortController();
447448
const d = createDeferred<void>();
448449

449450
const schema = makeExecutableSchema({
@@ -474,7 +475,6 @@ describe('Abort Signal', () => {
474475
await expect(result$).rejects.toMatchInlineSnapshot(`DOMException {}`);
475476
});
476477
it('does not even try to execute if the signal is already aborted', async () => {
477-
const controller = new AbortController();
478478
let resolverGotInvoked = false;
479479
const schema = makeExecutableSchema({
480480
typeDefs: /* GraphQL */ `

‎packages/executor/src/execution/execute.ts

+37-41
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import {
3535
collectFields,
3636
createGraphQLError,
3737
fakePromise,
38+
getAbortPromise,
3839
getArgumentValues,
3940
getDefinedRootType,
4041
GraphQLResolveInfo,
@@ -52,6 +53,7 @@ import {
5253
Path,
5354
pathToArray,
5455
promiseReduce,
56+
registerAbortSignalListener,
5557
} from '@graphql-tools/utils';
5658
import { TypedDocumentNode } from '@graphql-typed-document-node/core';
5759
import { DisposableSymbols } from '@whatwg-node/disposablestack';
@@ -290,9 +292,7 @@ export function execute<TData = any, TVariables = any, TContext = any>(
290292
function executeImpl<TData = any, TVariables = any, TContext = any>(
291293
exeContext: ExecutionContext<TVariables, TContext>,
292294
): MaybePromise<SingularExecutionResult<TData> | IncrementalExecutionResults<TData>> {
293-
if (exeContext.signal?.aborted) {
294-
throw exeContext.signal.reason;
295-
}
295+
exeContext.signal?.throwIfAborted();
296296

297297
// Return a Promise that will eventually resolve to the data described by
298298
// The "Response" section of the GraphQL specification.
@@ -322,9 +322,7 @@ function executeImpl<TData = any, TVariables = any, TContext = any>(
322322
return initialResult;
323323
},
324324
(error: any) => {
325-
if (exeContext.signal?.aborted) {
326-
throw exeContext.signal.reason;
327-
}
325+
exeContext.signal?.throwIfAborted();
328326

329327
if (error.errors) {
330328
exeContext.errors.push(...error.errors);
@@ -558,9 +556,7 @@ function executeFieldsSerially<TData>(
558556
fields,
559557
(results, [responseName, fieldNodes]) => {
560558
const fieldPath = addPath(path, responseName, parentType.name);
561-
if (exeContext.signal?.aborted) {
562-
throw exeContext.signal.reason;
563-
}
559+
exeContext.signal?.throwIfAborted();
564560

565561
return new ValueOrPromise(() =>
566562
executeField(exeContext, parentType, sourceValue, fieldNodes, fieldPath),
@@ -595,9 +591,7 @@ function executeFields(
595591

596592
try {
597593
for (const [responseName, fieldNodes] of fields) {
598-
if (exeContext.signal?.aborted) {
599-
throw exeContext.signal.reason;
600-
}
594+
exeContext.signal?.throwIfAborted();
601595

602596
const fieldPath = addPath(path, responseName, parentType.name);
603597
const result = executeField(
@@ -958,13 +952,12 @@ async function completeAsyncIteratorValue(
958952
iterator: AsyncIterator<unknown>,
959953
asyncPayloadRecord?: AsyncPayloadRecord,
960954
): Promise<ReadonlyArray<unknown>> {
961-
exeContext.signal?.addEventListener(
962-
'abort',
963-
() => {
955+
if (exeContext.signal && iterator.return) {
956+
registerAbortSignalListener(exeContext.signal, () => {
964957
iterator.return?.();
965-
},
966-
{ once: true },
967-
);
958+
});
959+
}
960+
968961
const errors = asyncPayloadRecord?.errors ?? exeContext.errors;
969962
const stream = getStreamValues(exeContext, fieldNodes, path);
970963
let containsPromise = false;
@@ -1758,15 +1751,22 @@ function assertEventStream(result: unknown, signal?: AbortSignal): AsyncIterable
17581751
'Subscription field must return Async Iterable. ' + `Received: ${inspect(result)}.`,
17591752
);
17601753
}
1761-
return {
1762-
[Symbol.asyncIterator]() {
1763-
const asyncIterator = result[Symbol.asyncIterator]();
1764-
signal?.addEventListener('abort', () => {
1765-
asyncIterator.return?.();
1766-
});
1767-
return asyncIterator;
1768-
},
1769-
};
1754+
if (signal) {
1755+
return {
1756+
[Symbol.asyncIterator]() {
1757+
const asyncIterator = result[Symbol.asyncIterator]();
1758+
1759+
if (asyncIterator.return) {
1760+
registerAbortSignalListener(signal, () => {
1761+
asyncIterator.return?.();
1762+
});
1763+
}
1764+
1765+
return asyncIterator;
1766+
},
1767+
};
1768+
}
1769+
return result;
17701770
}
17711771

17721772
function executeDeferredFragment(
@@ -2084,26 +2084,22 @@ function yieldSubsequentPayloads(
20842084
): AsyncGenerator<SubsequentIncrementalExecutionResult, void, void> {
20852085
let isDone = false;
20862086

2087-
const abortPromise = new Promise<void>((_, reject) => {
2088-
exeContext.signal?.addEventListener(
2089-
'abort',
2090-
() => {
2091-
isDone = true;
2092-
reject(exeContext.signal?.reason);
2093-
},
2094-
{ once: true },
2095-
);
2096-
});
2087+
const abortPromise = exeContext.signal ? getAbortPromise(exeContext.signal) : undefined;
20972088

20982089
async function next(): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> {
20992090
if (isDone) {
21002091
return { value: undefined, done: true };
21012092
}
21022093

2103-
await Promise.race([
2104-
abortPromise,
2105-
...Array.from(exeContext.subsequentPayloads).map(p => p.promise),
2106-
]);
2094+
const subSequentPayloadPromises = Array.from(exeContext.subsequentPayloads).map(
2095+
record => record.promise,
2096+
);
2097+
2098+
if (abortPromise) {
2099+
await Promise.race([abortPromise, ...subSequentPayloadPromises]);
2100+
} else {
2101+
await Promise.race(subSequentPayloadPromises);
2102+
}
21072103

21082104
if (isDone) {
21092105
// a different call to next has exhausted all payloads
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { getAbortPromise } from '@graphql-tools/utils';
2+
13
type ResolvedObject<TData> = {
24
[TKey in keyof TData]: TData[TKey] extends Promise<infer TValue> ? TValue : TData[TKey];
35
};
@@ -14,15 +16,14 @@ export async function promiseForObject<TData>(
1416
signal?: AbortSignal,
1517
): Promise<ResolvedObject<TData>> {
1618
const resolvedObject = Object.create(null);
17-
await new Promise<void>((resolve, reject) => {
18-
signal?.addEventListener('abort', () => {
19-
reject(signal.reason);
20-
});
21-
Promise.all(
22-
Object.entries(object as any).map(async ([key, value]) => {
23-
resolvedObject[key] = await value;
24-
}),
25-
).then(() => resolve(), reject);
26-
});
27-
return resolvedObject;
19+
const promises = Promise.all(
20+
Object.entries(object as any).map(async ([key, value]) => {
21+
resolvedObject[key] = await value;
22+
}),
23+
);
24+
if (signal) {
25+
const abortPromise = getAbortPromise(signal);
26+
return Promise.race([abortPromise, promises]).then(() => resolvedObject);
27+
}
28+
return promises.then(() => resolvedObject);
2829
}

‎packages/utils/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,4 @@ export * from './getDirectiveExtensions.js';
5858
export * from './map-maybe-promise.js';
5959
export * from './fakePromise.js';
6060
export * from './createDeferred.js';
61+
export * from './registerAbortSignalListener.js';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import { memoize1 } from './memoize.js';
2+
3+
// AbortSignal handler cache to avoid the "possible EventEmitter memory leak detected"
4+
// on Node.js
5+
const getListenersOfAbortSignal = memoize1(function getListenersOfAbortSignal(signal: AbortSignal) {
6+
const listeners = new Set<EventListener>();
7+
signal.addEventListener(
8+
'abort',
9+
e => {
10+
for (const listener of listeners) {
11+
listener(e);
12+
}
13+
},
14+
{ once: true },
15+
);
16+
return listeners;
17+
});
18+
19+
/**
20+
* Register an AbortSignal handler for a signal.
21+
* This helper function mainly exists to work around the
22+
* "possible EventEmitter memory leak detected. 11 listeners added. Use emitter.setMaxListeners() to increase limit."
23+
* warning occuring on Node.js
24+
*/
25+
export function registerAbortSignalListener(signal: AbortSignal, listener: VoidFunction) {
26+
// If the signal is already aborted, call the listener immediately
27+
if (signal.aborted) {
28+
listener();
29+
return;
30+
}
31+
getListenersOfAbortSignal(signal).add(listener);
32+
}
33+
34+
export const getAbortPromise = memoize1(function getAbortPromise(signal: AbortSignal) {
35+
return new Promise<void>((_resolve, reject) => {
36+
// If the signal is already aborted, return a rejected promise
37+
if (signal.aborted) {
38+
reject(signal.reason);
39+
return;
40+
}
41+
registerAbortSignalListener(signal, () => {
42+
reject(signal.reason);
43+
});
44+
});
45+
});

0 commit comments

Comments
 (0)
Please sign in to comment.