diff --git a/lib/child_process.js b/lib/child_process.js index 449013906e93e5..c09fca512584ce 100644 --- a/lib/child_process.js +++ b/lib/child_process.js @@ -785,7 +785,7 @@ function spawn(file, args, options) { if (signal.aborted) { process.nextTick(onAbortListener); } else { - addAbortListener ??= require('events').addAbortListener; + addAbortListener ??= require('internal/events/abort_listener').addAbortListener; const disposable = addAbortListener(signal, onAbortListener); child.once('exit', disposable[SymbolDispose]); } diff --git a/lib/dgram.js b/lib/dgram.js index 969f696b9e50da..e34e2e81f8d43d 100644 --- a/lib/dgram.js +++ b/lib/dgram.js @@ -63,6 +63,7 @@ const { Buffer } = require('buffer'); const { deprecate, guessHandleType, promisify } = require('internal/util'); const { isArrayBufferView } = require('internal/util/types'); const EventEmitter = require('events'); +const { addAbortListener } = require('internal/events/abort_listener'); const { defaultTriggerAsyncIdScope, symbols: { async_id_symbol, owner_symbol }, @@ -146,7 +147,7 @@ function Socket(type, listener) { if (signal.aborted) { onAborted(); } else { - const disposable = EventEmitter.addAbortListener(signal, onAborted); + const disposable = addAbortListener(signal, onAborted); this.once('close', disposable[SymbolDispose]); } } diff --git a/lib/events.js b/lib/events.js index 996658e1d3f565..9bb22059c1fd9a 100644 --- a/lib/events.js +++ b/lib/events.js @@ -84,6 +84,7 @@ const { validateObject, validateString, } = require('internal/validators'); +const { addAbortListener } = require('internal/events/abort_listener'); const kCapture = Symbol('kCapture'); const kErrorMonitor = Symbol('events.errorMonitor'); @@ -1222,32 +1223,3 @@ function listenersController() { }, }; } - -let queueMicrotask; - -function addAbortListener(signal, listener) { - if (signal === undefined) { - throw new ERR_INVALID_ARG_TYPE('signal', 'AbortSignal', signal); - } - validateAbortSignal(signal, 'signal'); - validateFunction(listener, 'listener'); - - let removeEventListener; - if (signal.aborted) { - queueMicrotask ??= require('internal/process/task_queues').queueMicrotask; - queueMicrotask(() => listener()); - } else { - kResistStopPropagation ??= require('internal/event_target').kResistStopPropagation; - // TODO(atlowChemi) add { subscription: true } and return directly - signal.addEventListener('abort', listener, { __proto__: null, once: true, [kResistStopPropagation]: true }); - removeEventListener = () => { - signal.removeEventListener('abort', listener); - }; - } - return { - __proto__: null, - [SymbolDispose]() { - removeEventListener?.(); - }, - }; -} diff --git a/lib/internal/events/abort_listener.js b/lib/internal/events/abort_listener.js new file mode 100644 index 00000000000000..033cbf7b250fe0 --- /dev/null +++ b/lib/internal/events/abort_listener.js @@ -0,0 +1,54 @@ +'use strict'; + +const { + SymbolDispose, +} = primordials; +const { + validateAbortSignal, + validateFunction, +} = require('internal/validators'); +const { + codes: { + ERR_INVALID_ARG_TYPE, + }, +} = require('internal/errors'); + +let queueMicrotask; +let kResistStopPropagation; + +/** + * @param {AbortSignal} signal + * @param {EventListener} listener + * @returns {Disposable} + */ +function addAbortListener(signal, listener) { + if (signal === undefined) { + throw new ERR_INVALID_ARG_TYPE('signal', 'AbortSignal', signal); + } + validateAbortSignal(signal, 'signal'); + validateFunction(listener, 'listener'); + + let removeEventListener; + if (signal.aborted) { + queueMicrotask ??= require('internal/process/task_queues').queueMicrotask; + queueMicrotask(() => listener()); + } else { + kResistStopPropagation ??= require('internal/event_target').kResistStopPropagation; + // TODO(atlowChemi) add { subscription: true } and return directly + signal.addEventListener('abort', listener, { __proto__: null, once: true, [kResistStopPropagation]: true }); + removeEventListener = () => { + signal.removeEventListener('abort', listener); + }; + } + return { + __proto__: null, + [SymbolDispose]() { + removeEventListener?.(); + }, + }; +} + +module.exports = { + __proto__: null, + addAbortListener, +}; diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index a82691a37344f9..296968688e6802 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -46,6 +46,7 @@ assertCrypto(); const assert = require('assert'); const EventEmitter = require('events'); +const { addAbortListener } = require('internal/events/abort_listener'); const fs = require('fs'); const http = require('http'); const { readUInt16BE, readUInt32BE } = require('internal/buffer'); @@ -1832,7 +1833,7 @@ class ClientHttp2Session extends Http2Session { if (signal.aborted) { aborter(); } else { - const disposable = EventEmitter.addAbortListener(signal, aborter); + const disposable = addAbortListener(signal, aborter); stream.once('close', disposable[SymbolDispose]); } } diff --git a/lib/internal/readline/interface.js b/lib/internal/readline/interface.js index f7f06674ef7c41..f8120ca47b2829 100644 --- a/lib/internal/readline/interface.js +++ b/lib/internal/readline/interface.js @@ -53,6 +53,7 @@ const { stripVTControlCharacters, } = require('internal/util/inspect'); const EventEmitter = require('events'); +const { addAbortListener } = require('internal/events/abort_listener'); const { charLengthAt, charLengthLeft, @@ -326,7 +327,7 @@ function InterfaceConstructor(input, output, completer, terminal) { if (signal.aborted) { process.nextTick(onAborted); } else { - const disposable = EventEmitter.addAbortListener(signal, onAborted); + const disposable = addAbortListener(signal, onAborted); self.once('close', disposable[SymbolDispose]); } } diff --git a/lib/internal/streams/add-abort-signal.js b/lib/internal/streams/add-abort-signal.js index 819be3ff63e915..10e5545a28345b 100644 --- a/lib/internal/streams/add-abort-signal.js +++ b/lib/internal/streams/add-abort-signal.js @@ -51,7 +51,7 @@ module.exports.addAbortSignalNoValidate = function(signal, stream) { if (signal.aborted) { onAbort(); } else { - addAbortListener ??= require('events').addAbortListener; + addAbortListener ??= require('internal/events/abort_listener').addAbortListener; const disposable = addAbortListener(signal, onAbort); eos(stream, disposable[SymbolDispose]); } diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 663222e3149bad..eda08b5fc60b93 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -254,7 +254,7 @@ function eos(stream, options, callback) { if (options.signal.aborted) { process.nextTick(abort); } else { - addAbortListener ??= require('events').addAbortListener; + addAbortListener ??= require('internal/events/abort_listener').addAbortListener; const disposable = addAbortListener(options.signal, abort); const originalCallback = callback; callback = once((...args) => { @@ -278,7 +278,7 @@ function eosWeb(stream, options, callback) { if (options.signal.aborted) { process.nextTick(abort); } else { - addAbortListener ??= require('events').addAbortListener; + addAbortListener ??= require('internal/events/abort_listener').addAbortListener; const disposable = addAbortListener(options.signal, abort); const originalCallback = callback; callback = once((...args) => { diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index aac7f65f0404d8..4cf6b02766b6a8 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -208,7 +208,7 @@ function pipelineImpl(streams, callback, opts) { finishImpl(new AbortError()); } - addAbortListener ??= require('events').addAbortListener; + addAbortListener ??= require('internal/events/abort_listener').addAbortListener; let disposable; if (outerSignal) { disposable = addAbortListener(outerSignal, abort); diff --git a/lib/internal/test_runner/test.js b/lib/internal/test_runner/test.js index 1642cfdd15e3c6..d57c1d6d12610e 100644 --- a/lib/internal/test_runner/test.js +++ b/lib/internal/test_runner/test.js @@ -27,7 +27,7 @@ const { Symbol, } = primordials; const { getCallerLocation } = internalBinding('util'); -const { addAbortListener } = require('events'); +const { addAbortListener } = require('internal/events/abort_listener'); const { AsyncResource } = require('async_hooks'); const { AbortController } = require('internal/abort_controller'); const { diff --git a/lib/internal/watch_mode/files_watcher.js b/lib/internal/watch_mode/files_watcher.js index 9e018045e520fd..e3f37557a627dc 100644 --- a/lib/internal/watch_mode/files_watcher.js +++ b/lib/internal/watch_mode/files_watcher.js @@ -13,6 +13,7 @@ const { kEmptyObject } = require('internal/util'); const { TIMEOUT_MAX } = require('internal/timers'); const EventEmitter = require('events'); +const { addAbortListener } = require('internal/events/abort_listener'); const { watch } = require('fs'); const { fileURLToPath } = require('internal/url'); const { resolve, dirname } = require('path'); @@ -41,7 +42,7 @@ class FilesWatcher extends EventEmitter { this.#signal = signal; if (signal) { - EventEmitter.addAbortListener(signal, () => this.clear()); + addAbortListener(signal, () => this.clear()); } } diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index d4526011bec46d..f30788421f2cc4 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -1523,7 +1523,7 @@ function readableStreamPipeTo( abortAlgorithm(); return promise.promise; } - addAbortListener ??= require('events').addAbortListener; + addAbortListener ??= require('internal/events/abort_listener').addAbortListener; disposable = addAbortListener(signal, abortAlgorithm); } diff --git a/lib/net.js b/lib/net.js index de2cd517eb4580..c72beaaef34f65 100644 --- a/lib/net.js +++ b/lib/net.js @@ -41,6 +41,7 @@ const { } = primordials; const EventEmitter = require('events'); +const { addAbortListener } = require('internal/events/abort_listener'); const stream = require('stream'); let debug = require('internal/util/debuglog').debuglog('net', (fn) => { debug = fn; @@ -1631,7 +1632,7 @@ function addClientAbortSignalOption(self, options) { process.nextTick(onAbort); } else { process.nextTick(() => { - disposable = EventEmitter.addAbortListener(signal, onAbort); + disposable = addAbortListener(signal, onAbort); }); } } @@ -1723,7 +1724,7 @@ function addServerAbortSignalOption(self, options) { if (signal.aborted) { process.nextTick(onAborted); } else { - const disposable = EventEmitter.addAbortListener(signal, onAborted); + const disposable = addAbortListener(signal, onAborted); self.once('close', disposable[SymbolDispose]); } } diff --git a/lib/readline.js b/lib/readline.js index 5276d9401b4c12..509c7022ee95cc 100644 --- a/lib/readline.js +++ b/lib/readline.js @@ -145,7 +145,7 @@ Interface.prototype.question = function question(query, options, cb) { const onAbort = () => { this[kQuestionCancel](); }; - addAbortListener ??= require('events').addAbortListener; + addAbortListener ??= require('internal/events/abort_listener').addAbortListener; const disposable = addAbortListener(options.signal, onAbort); const originalCb = cb; cb = typeof cb === 'function' ? (answer) => { @@ -175,7 +175,7 @@ Interface.prototype.question[promisify.custom] = function question(query, option const onAbort = () => { reject(new AbortError(undefined, { cause: options.signal.reason })); }; - addAbortListener ??= require('events').addAbortListener; + addAbortListener ??= require('internal/events/abort_listener').addAbortListener; const disposable = addAbortListener(options.signal, onAbort); cb = (answer) => { disposable[SymbolDispose](); diff --git a/lib/readline/promises.js b/lib/readline/promises.js index 4c2ce90479ef8f..ccd0745a33f941 100644 --- a/lib/readline/promises.js +++ b/lib/readline/promises.js @@ -45,7 +45,7 @@ class Interface extends _Interface { this[kQuestionCancel](); reject(new AbortError(undefined, { cause: options.signal.reason })); }; - addAbortListener ??= require('events').addAbortListener; + addAbortListener ??= require('internal/events/abort_listener').addAbortListener; const disposable = addAbortListener(options.signal, onAbort); cb = (answer) => { diff --git a/test/parallel/test-bootstrap-modules.js b/test/parallel/test-bootstrap-modules.js index 124e106067640f..6327fbeb2e7e1b 100644 --- a/test/parallel/test-bootstrap-modules.js +++ b/test/parallel/test-bootstrap-modules.js @@ -99,6 +99,7 @@ expected.beforePreExec = new Set([ 'Internal Binding module_wrap', 'NativeModule internal/modules/cjs/loader', 'Internal Binding wasm_web_api', + 'NativeModule internal/events/abort_listener', ]); expected.atRunTime = new Set([