Skip to content

Commit

Permalink
events: allow use of AbortController with once
Browse files Browse the repository at this point in the history
Allows an AbortSignal to be passed in to events.once() to cancel
waiting on an event.

Signed-off-by: James M Snell <jasnell@gmail.com>

PR-URL: nodejs#34911
Reviewed-By: Denys Otrishko <shishugi@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
jasnell authored and targos committed Apr 26, 2021
1 parent 1e6a488 commit 7f46ec7
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 4 deletions.
30 changes: 29 additions & 1 deletion doc/api/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ class MyClass extends EventEmitter {
}
```

## `events.once(emitter, name)`
## `events.once(emitter, name[, options])`
<!-- YAML
added:
- v11.13.0
Expand All @@ -838,6 +838,9 @@ added:

* `emitter` {EventEmitter}
* `name` {string}
* `options` {Object}
* `signal` {AbortSignal} An {AbortSignal} that may be used to cancel waiting
for the event.
* Returns: {Promise}

Creates a `Promise` that is fulfilled when the `EventEmitter` emits the given
Expand Down Expand Up @@ -896,6 +899,31 @@ ee.emit('error', new Error('boom'));
// Prints: ok boom
```

An {AbortSignal} may be used to cancel waiting for the event early:

```js
const { EventEmitter, once } = require('events');

const ee = new EventEmitter();
const ac = new AbortController();

async function foo(emitter, event, signal) {
try {
await once(emitter, event, { signal });
console.log('event emitted!');
} catch (error) {
if (error.name === 'AbortError') {
console.error('Waiting for the event was canceled!');
} else {
console.error('There was an error', error.message);
}
}
}

foo(ee, 'foo', ac.signal);
ac.abort(); // Abort waiting for the event
```

### Awaiting multiple events emitted on `process.nextTick()`

There is an edge case worth noting when using the `events.once()` function
Expand Down
53 changes: 52 additions & 1 deletion lib/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const kRejection = SymbolFor('nodejs.rejection');
let spliceOne;

const {
hideStackFrames,
kEnhanceStackBeforeInspector,
codes
} = require('internal/errors');
Expand All @@ -59,9 +60,20 @@ const {
inspect
} = require('internal/util/inspect');

const {
validateAbortSignal
} = require('internal/validators');

const kCapture = Symbol('kCapture');
const kErrorMonitor = Symbol('events.errorMonitor');

let DOMException;
const lazyDOMException = hideStackFrames((message, name) => {
if (DOMException === undefined)
DOMException = internalBinding('messaging').DOMException;
return new DOMException(message, name);
});

function EventEmitter(opts) {
EventEmitter.init.call(this, opts);
}
Expand Down Expand Up @@ -622,22 +634,61 @@ function unwrapListeners(arr) {
return ret;
}

function once(emitter, name) {
async function once(emitter, name, options = {}) {
const signal = options ? options.signal : undefined;
validateAbortSignal(signal, 'options.signal');
if (signal && signal.aborted)
throw lazyDOMException('The operation was aborted', 'AbortError');
return new Promise((resolve, reject) => {
const errorListener = (err) => {
emitter.removeListener(name, resolver);
if (signal != null) {
eventTargetAgnosticRemoveListener(
signal,
'abort',
abortListener,
{ once: true });
}
reject(err);
};
const resolver = (...args) => {
if (typeof emitter.removeListener === 'function') {
emitter.removeListener('error', errorListener);
}
if (signal != null) {
eventTargetAgnosticRemoveListener(
signal,
'abort',
abortListener,
{ once: true });
}
resolve(args);
};
eventTargetAgnosticAddListener(emitter, name, resolver, { once: true });
if (name !== 'error') {
addErrorHandlerIfEventEmitter(emitter, errorListener, { once: true });
}
function abortListener() {
if (typeof emitter.removeListener === 'function') {
emitter.removeListener(name, resolver);
emitter.removeListener('error', errorListener);
} else {
eventTargetAgnosticRemoveListener(
emitter,
name,
resolver,
{ once: true });
eventTargetAgnosticRemoveListener(
emitter,
'error',
errorListener,
{ once: true });
}
reject(lazyDOMException('The operation was aborted', 'AbortError'));
}
if (signal != null) {
signal.addEventListener('abort', abortListener, { once: true });
}
});
}

Expand Down
10 changes: 10 additions & 0 deletions lib/internal/validators.js
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,15 @@ const validateCallback = hideStackFrames((callback) => {
throw new ERR_INVALID_CALLBACK(callback);
});

const validateAbortSignal = hideStackFrames((signal, name) => {
if (signal !== undefined &&
(signal === null ||
typeof signal !== 'object' ||
!('aborted' in signal))) {
throw new ERR_INVALID_ARG_TYPE(name, 'AbortSignal', signal);
}
});

module.exports = {
isInt32,
isUint32,
Expand All @@ -236,4 +245,5 @@ module.exports = {
validateString,
validateUint32,
validateCallback,
validateAbortSignal,
};
90 changes: 88 additions & 2 deletions test/parallel/test-events-once.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
'use strict';
// Flags: --expose-internals
// Flags: --expose-internals --no-warnings --experimental-abortcontroller

const common = require('../common');
const { once, EventEmitter } = require('events');
const { strictEqual, deepStrictEqual, fail } = require('assert');
const {
strictEqual,
deepStrictEqual,
fail,
rejects,
} = require('assert');
const { EventTarget, Event } = require('internal/event_target');

async function onceAnEvent() {
Expand Down Expand Up @@ -114,6 +119,81 @@ async function prioritizesEventEmitter() {
process.nextTick(() => ee.emit('foo'));
await once(ee, 'foo');
}

async function abortSignalBefore() {
const ee = new EventEmitter();
const ac = new AbortController();
ee.on('error', common.mustNotCall());
ac.abort();

await Promise.all([1, {}, 'hi', null, false].map((signal) => {
return rejects(once(ee, 'foo', { signal }), {
code: 'ERR_INVALID_ARG_TYPE'
});
}));

return rejects(once(ee, 'foo', { signal: ac.signal }), {
name: 'AbortError'
});
}

async function abortSignalAfter() {
const ee = new EventEmitter();
const ac = new AbortController();
ee.on('error', common.mustNotCall());
const r = rejects(once(ee, 'foo', { signal: ac.signal }), {
name: 'AbortError'
});
process.nextTick(() => ac.abort());
return r;
}

async function abortSignalAfterEvent() {
const ee = new EventEmitter();
const ac = new AbortController();
process.nextTick(() => {
ee.emit('foo');
ac.abort();
});
await once(ee, 'foo', { signal: ac.signal });
}

async function eventTargetAbortSignalBefore() {
const et = new EventTarget();
const ac = new AbortController();
ac.abort();

await Promise.all([1, {}, 'hi', null, false].map((signal) => {
return rejects(once(et, 'foo', { signal }), {
code: 'ERR_INVALID_ARG_TYPE'
});
}));

return rejects(once(et, 'foo', { signal: ac.signal }), {
name: 'AbortError'
});
}

async function eventTargetAbortSignalAfter() {
const et = new EventTarget();
const ac = new AbortController();
const r = rejects(once(et, 'foo', { signal: ac.signal }), {
name: 'AbortError'
});
process.nextTick(() => ac.abort());
return r;
}

async function eventTargetAbortSignalAfterEvent() {
const et = new EventTarget();
const ac = new AbortController();
process.nextTick(() => {
et.dispatchEvent(new Event('foo'));
ac.abort();
});
await once(et, 'foo', { signal: ac.signal });
}

Promise.all([
onceAnEvent(),
onceAnEventWithTwoArgs(),
Expand All @@ -123,4 +203,10 @@ Promise.all([
onceWithEventTarget(),
onceWithEventTargetError(),
prioritizesEventEmitter(),
abortSignalBefore(),
abortSignalAfter(),
abortSignalAfterEvent(),
eventTargetAbortSignalBefore(),
eventTargetAbortSignalAfter(),
eventTargetAbortSignalAfterEvent(),
]).then(common.mustCall());

0 comments on commit 7f46ec7

Please sign in to comment.