From 29ee68a9194aab18f46d0e9ffe9bc88f5162d6f3 Mon Sep 17 00:00:00 2001 From: atlowChemi Date: Thu, 14 Mar 2024 10:52:31 +0200 Subject: [PATCH 01/10] events: extract addAbortListener for safe internal use Refs: https://github.com/nodejs/node/pull/48596 --- lib/events.js | 31 +------------- lib/internal/events/abort_listener.js | 54 +++++++++++++++++++++++++ test/parallel/test-bootstrap-modules.js | 1 + 3 files changed, 56 insertions(+), 30 deletions(-) create mode 100644 lib/internal/events/abort_listener.js diff --git a/lib/events.js b/lib/events.js index 996658e1d3f565..3d749a355592b8 100644 --- a/lib/events.js +++ b/lib/events.js @@ -48,7 +48,6 @@ const { Symbol, SymbolFor, SymbolAsyncIterator, - SymbolDispose, } = primordials; const kRejection = SymbolFor('nodejs.rejection'); @@ -84,6 +83,7 @@ const { validateObject, validateString, } = require('internal/validators'); +const { addAbortListener } = require('internal/events/abort_listener'); const kCapture = Symbol('kCapture'); const kErrorMonitor = Symbol('events.errorMonitor'); @@ -1222,32 +1222,3 @@ function listenersController() { }, }; } - -let queueMicrotask; - -function addAbortListener(signal, listener) { - if (signal === undefined) { - throw new ERR_INVALID_ARG_TYPE('signal', 'AbortSignal', signal); - } - validateAbortSignal(signal, 'signal'); - validateFunction(listener, 'listener'); - - let removeEventListener; - if (signal.aborted) { - queueMicrotask ??= require('internal/process/task_queues').queueMicrotask; - queueMicrotask(() => listener()); - } else { - kResistStopPropagation ??= require('internal/event_target').kResistStopPropagation; - // TODO(atlowChemi) add { subscription: true } and return directly - signal.addEventListener('abort', listener, { __proto__: null, once: true, [kResistStopPropagation]: true }); - removeEventListener = () => { - signal.removeEventListener('abort', listener); - }; - } - return { - __proto__: null, - [SymbolDispose]() { - removeEventListener?.(); - }, - }; -} diff --git a/lib/internal/events/abort_listener.js b/lib/internal/events/abort_listener.js new file mode 100644 index 00000000000000..033cbf7b250fe0 --- /dev/null +++ b/lib/internal/events/abort_listener.js @@ -0,0 +1,54 @@ +'use strict'; + +const { + SymbolDispose, +} = primordials; +const { + validateAbortSignal, + validateFunction, +} = require('internal/validators'); +const { + codes: { + ERR_INVALID_ARG_TYPE, + }, +} = require('internal/errors'); + +let queueMicrotask; +let kResistStopPropagation; + +/** + * @param {AbortSignal} signal + * @param {EventListener} listener + * @returns {Disposable} + */ +function addAbortListener(signal, listener) { + if (signal === undefined) { + throw new ERR_INVALID_ARG_TYPE('signal', 'AbortSignal', signal); + } + validateAbortSignal(signal, 'signal'); + validateFunction(listener, 'listener'); + + let removeEventListener; + if (signal.aborted) { + queueMicrotask ??= require('internal/process/task_queues').queueMicrotask; + queueMicrotask(() => listener()); + } else { + kResistStopPropagation ??= require('internal/event_target').kResistStopPropagation; + // TODO(atlowChemi) add { subscription: true } and return directly + signal.addEventListener('abort', listener, { __proto__: null, once: true, [kResistStopPropagation]: true }); + removeEventListener = () => { + signal.removeEventListener('abort', listener); + }; + } + return { + __proto__: null, + [SymbolDispose]() { + removeEventListener?.(); + }, + }; +} + +module.exports = { + __proto__: null, + addAbortListener, +}; diff --git a/test/parallel/test-bootstrap-modules.js b/test/parallel/test-bootstrap-modules.js index 124e106067640f..6327fbeb2e7e1b 100644 --- a/test/parallel/test-bootstrap-modules.js +++ b/test/parallel/test-bootstrap-modules.js @@ -99,6 +99,7 @@ expected.beforePreExec = new Set([ 'Internal Binding module_wrap', 'NativeModule internal/modules/cjs/loader', 'Internal Binding wasm_web_api', + 'NativeModule internal/events/abort_listener', ]); expected.atRunTime = new Set([ From 814ff465d6509fac2619fcbc1adde12796f64c8c Mon Sep 17 00:00:00 2001 From: atlowChemi Date: Thu, 14 Mar 2024 10:53:11 +0200 Subject: [PATCH 02/10] child_process: use internal addAbortListener --- lib/child_process.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/child_process.js b/lib/child_process.js index 449013906e93e5..c09fca512584ce 100644 --- a/lib/child_process.js +++ b/lib/child_process.js @@ -785,7 +785,7 @@ function spawn(file, args, options) { if (signal.aborted) { process.nextTick(onAbortListener); } else { - addAbortListener ??= require('events').addAbortListener; + addAbortListener ??= require('internal/events/abort_listener').addAbortListener; const disposable = addAbortListener(signal, onAbortListener); child.once('exit', disposable[SymbolDispose]); } From 0e2ac27563035d4a0294006d220eeb0713caeebd Mon Sep 17 00:00:00 2001 From: atlowChemi Date: Thu, 14 Mar 2024 10:53:38 +0200 Subject: [PATCH 03/10] dgram: use internal addAbortListener --- lib/dgram.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/dgram.js b/lib/dgram.js index 969f696b9e50da..e34e2e81f8d43d 100644 --- a/lib/dgram.js +++ b/lib/dgram.js @@ -63,6 +63,7 @@ const { Buffer } = require('buffer'); const { deprecate, guessHandleType, promisify } = require('internal/util'); const { isArrayBufferView } = require('internal/util/types'); const EventEmitter = require('events'); +const { addAbortListener } = require('internal/events/abort_listener'); const { defaultTriggerAsyncIdScope, symbols: { async_id_symbol, owner_symbol }, @@ -146,7 +147,7 @@ function Socket(type, listener) { if (signal.aborted) { onAborted(); } else { - const disposable = EventEmitter.addAbortListener(signal, onAborted); + const disposable = addAbortListener(signal, onAborted); this.once('close', disposable[SymbolDispose]); } } From 0169b22d59d1c5ce2b717b4ef7dfd974e235bc2f Mon Sep 17 00:00:00 2001 From: atlowChemi Date: Thu, 14 Mar 2024 10:54:19 +0200 Subject: [PATCH 04/10] net: use internal addAbortListener --- lib/net.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/net.js b/lib/net.js index de2cd517eb4580..c72beaaef34f65 100644 --- a/lib/net.js +++ b/lib/net.js @@ -41,6 +41,7 @@ const { } = primordials; const EventEmitter = require('events'); +const { addAbortListener } = require('internal/events/abort_listener'); const stream = require('stream'); let debug = require('internal/util/debuglog').debuglog('net', (fn) => { debug = fn; @@ -1631,7 +1632,7 @@ function addClientAbortSignalOption(self, options) { process.nextTick(onAbort); } else { process.nextTick(() => { - disposable = EventEmitter.addAbortListener(signal, onAbort); + disposable = addAbortListener(signal, onAbort); }); } } @@ -1723,7 +1724,7 @@ function addServerAbortSignalOption(self, options) { if (signal.aborted) { process.nextTick(onAborted); } else { - const disposable = EventEmitter.addAbortListener(signal, onAborted); + const disposable = addAbortListener(signal, onAborted); self.once('close', disposable[SymbolDispose]); } } From 82602238b15dc02c45b9561a5d2856452cd83f5c Mon Sep 17 00:00:00 2001 From: atlowChemi Date: Thu, 14 Mar 2024 10:55:07 +0200 Subject: [PATCH 05/10] readline: use internal addAbortListener --- lib/internal/readline/interface.js | 3 ++- lib/readline.js | 4 ++-- lib/readline/promises.js | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/internal/readline/interface.js b/lib/internal/readline/interface.js index f7f06674ef7c41..f8120ca47b2829 100644 --- a/lib/internal/readline/interface.js +++ b/lib/internal/readline/interface.js @@ -53,6 +53,7 @@ const { stripVTControlCharacters, } = require('internal/util/inspect'); const EventEmitter = require('events'); +const { addAbortListener } = require('internal/events/abort_listener'); const { charLengthAt, charLengthLeft, @@ -326,7 +327,7 @@ function InterfaceConstructor(input, output, completer, terminal) { if (signal.aborted) { process.nextTick(onAborted); } else { - const disposable = EventEmitter.addAbortListener(signal, onAborted); + const disposable = addAbortListener(signal, onAborted); self.once('close', disposable[SymbolDispose]); } } diff --git a/lib/readline.js b/lib/readline.js index 5276d9401b4c12..509c7022ee95cc 100644 --- a/lib/readline.js +++ b/lib/readline.js @@ -145,7 +145,7 @@ Interface.prototype.question = function question(query, options, cb) { const onAbort = () => { this[kQuestionCancel](); }; - addAbortListener ??= require('events').addAbortListener; + addAbortListener ??= require('internal/events/abort_listener').addAbortListener; const disposable = addAbortListener(options.signal, onAbort); const originalCb = cb; cb = typeof cb === 'function' ? (answer) => { @@ -175,7 +175,7 @@ Interface.prototype.question[promisify.custom] = function question(query, option const onAbort = () => { reject(new AbortError(undefined, { cause: options.signal.reason })); }; - addAbortListener ??= require('events').addAbortListener; + addAbortListener ??= require('internal/events/abort_listener').addAbortListener; const disposable = addAbortListener(options.signal, onAbort); cb = (answer) => { disposable[SymbolDispose](); diff --git a/lib/readline/promises.js b/lib/readline/promises.js index 4c2ce90479ef8f..ccd0745a33f941 100644 --- a/lib/readline/promises.js +++ b/lib/readline/promises.js @@ -45,7 +45,7 @@ class Interface extends _Interface { this[kQuestionCancel](); reject(new AbortError(undefined, { cause: options.signal.reason })); }; - addAbortListener ??= require('events').addAbortListener; + addAbortListener ??= require('internal/events/abort_listener').addAbortListener; const disposable = addAbortListener(options.signal, onAbort); cb = (answer) => { From 4a76d0d4be9c7a8295cf3d036bf9f963f8976af7 Mon Sep 17 00:00:00 2001 From: atlowChemi Date: Thu, 14 Mar 2024 10:55:37 +0200 Subject: [PATCH 06/10] http2: use internal addAbortListener --- lib/internal/http2/core.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index a82691a37344f9..296968688e6802 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -46,6 +46,7 @@ assertCrypto(); const assert = require('assert'); const EventEmitter = require('events'); +const { addAbortListener } = require('internal/events/abort_listener'); const fs = require('fs'); const http = require('http'); const { readUInt16BE, readUInt32BE } = require('internal/buffer'); @@ -1832,7 +1833,7 @@ class ClientHttp2Session extends Http2Session { if (signal.aborted) { aborter(); } else { - const disposable = EventEmitter.addAbortListener(signal, aborter); + const disposable = addAbortListener(signal, aborter); stream.once('close', disposable[SymbolDispose]); } } From 54b7633a7cbac5b8d5e487dc59d8e36011923fd3 Mon Sep 17 00:00:00 2001 From: atlowChemi Date: Thu, 14 Mar 2024 10:56:35 +0200 Subject: [PATCH 07/10] stream: use internal addAbortListener --- lib/internal/streams/add-abort-signal.js | 2 +- lib/internal/streams/end-of-stream.js | 4 ++-- lib/internal/streams/pipeline.js | 2 +- lib/internal/webstreams/readablestream.js | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/internal/streams/add-abort-signal.js b/lib/internal/streams/add-abort-signal.js index 819be3ff63e915..10e5545a28345b 100644 --- a/lib/internal/streams/add-abort-signal.js +++ b/lib/internal/streams/add-abort-signal.js @@ -51,7 +51,7 @@ module.exports.addAbortSignalNoValidate = function(signal, stream) { if (signal.aborted) { onAbort(); } else { - addAbortListener ??= require('events').addAbortListener; + addAbortListener ??= require('internal/events/abort_listener').addAbortListener; const disposable = addAbortListener(signal, onAbort); eos(stream, disposable[SymbolDispose]); } diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 663222e3149bad..eda08b5fc60b93 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -254,7 +254,7 @@ function eos(stream, options, callback) { if (options.signal.aborted) { process.nextTick(abort); } else { - addAbortListener ??= require('events').addAbortListener; + addAbortListener ??= require('internal/events/abort_listener').addAbortListener; const disposable = addAbortListener(options.signal, abort); const originalCallback = callback; callback = once((...args) => { @@ -278,7 +278,7 @@ function eosWeb(stream, options, callback) { if (options.signal.aborted) { process.nextTick(abort); } else { - addAbortListener ??= require('events').addAbortListener; + addAbortListener ??= require('internal/events/abort_listener').addAbortListener; const disposable = addAbortListener(options.signal, abort); const originalCallback = callback; callback = once((...args) => { diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index aac7f65f0404d8..4cf6b02766b6a8 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -208,7 +208,7 @@ function pipelineImpl(streams, callback, opts) { finishImpl(new AbortError()); } - addAbortListener ??= require('events').addAbortListener; + addAbortListener ??= require('internal/events/abort_listener').addAbortListener; let disposable; if (outerSignal) { disposable = addAbortListener(outerSignal, abort); diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index d4526011bec46d..f30788421f2cc4 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -1523,7 +1523,7 @@ function readableStreamPipeTo( abortAlgorithm(); return promise.promise; } - addAbortListener ??= require('events').addAbortListener; + addAbortListener ??= require('internal/events/abort_listener').addAbortListener; disposable = addAbortListener(signal, abortAlgorithm); } From be862032623dff1b7617a3ace9edaf10d968b8a9 Mon Sep 17 00:00:00 2001 From: atlowChemi Date: Thu, 14 Mar 2024 10:56:47 +0200 Subject: [PATCH 08/10] test_runner: use internal addAbortListener --- lib/internal/test_runner/test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/test_runner/test.js b/lib/internal/test_runner/test.js index 1642cfdd15e3c6..d57c1d6d12610e 100644 --- a/lib/internal/test_runner/test.js +++ b/lib/internal/test_runner/test.js @@ -27,7 +27,7 @@ const { Symbol, } = primordials; const { getCallerLocation } = internalBinding('util'); -const { addAbortListener } = require('events'); +const { addAbortListener } = require('internal/events/abort_listener'); const { AsyncResource } = require('async_hooks'); const { AbortController } = require('internal/abort_controller'); const { From 4b7ac753b2758bf9c6d7bf67f346c992249ec925 Mon Sep 17 00:00:00 2001 From: atlowChemi Date: Thu, 14 Mar 2024 10:57:28 +0200 Subject: [PATCH 09/10] watch: use internal addAbortListener --- lib/internal/watch_mode/files_watcher.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/internal/watch_mode/files_watcher.js b/lib/internal/watch_mode/files_watcher.js index 9e018045e520fd..e3f37557a627dc 100644 --- a/lib/internal/watch_mode/files_watcher.js +++ b/lib/internal/watch_mode/files_watcher.js @@ -13,6 +13,7 @@ const { kEmptyObject } = require('internal/util'); const { TIMEOUT_MAX } = require('internal/timers'); const EventEmitter = require('events'); +const { addAbortListener } = require('internal/events/abort_listener'); const { watch } = require('fs'); const { fileURLToPath } = require('internal/url'); const { resolve, dirname } = require('path'); @@ -41,7 +42,7 @@ class FilesWatcher extends EventEmitter { this.#signal = signal; if (signal) { - EventEmitter.addAbortListener(signal, () => this.clear()); + addAbortListener(signal, () => this.clear()); } } From 3117855bf7893c901bb171743b5cdc007b7b7f9f Mon Sep 17 00:00:00 2001 From: atlowChemi Date: Fri, 15 Mar 2024 17:19:30 +0200 Subject: [PATCH 10/10] fixup! events: extract addAbortListener for safe internal use --- lib/events.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/events.js b/lib/events.js index 3d749a355592b8..9bb22059c1fd9a 100644 --- a/lib/events.js +++ b/lib/events.js @@ -48,6 +48,7 @@ const { Symbol, SymbolFor, SymbolAsyncIterator, + SymbolDispose, } = primordials; const kRejection = SymbolFor('nodejs.rejection');