diff --git a/events.js b/events.js index 34b69a0..4657e66 100644 --- a/events.js +++ b/events.js @@ -55,6 +55,7 @@ function EventEmitter() { } module.exports = EventEmitter; module.exports.once = once; +module.exports.on = on; // Backwards-compat with node 0.10.x EventEmitter.EventEmitter = EventEmitter; @@ -467,6 +468,109 @@ function once(emitter, name) { }); } +function createIterResult(value, done) { + return { value: value, done: done }; +} + +var AsyncIteratorPrototype = undefined; + +function on(emitter, event) { + // Initialize it on first run + if (AsyncIteratorPrototype === undefined) { + var asyncGenerator; + try { + asyncGenerator = Function('return async function*() {};')(); + } catch (err) {} + if (asyncGenerator) { + AsyncIteratorPrototype = Object.getPrototypeOf( + Object.getPrototypeOf(asyncGenerator).prototype); + } else { + AsyncIteratorPrototype = null; + } + } + + var unconsumedEvents = []; + var unconsumedPromises = []; + var error = null; + var finished = false; + var iterator = { + next: function next() { + // First, we consume all unread events + var value = unconsumedEvents.shift(); + if (value) { + return Promise.resolve(createIterResult(value, false)); + } + + // Then we error, if an error happened + // This happens one time if at all, because after 'error' + // we stop listening + if (error) { + var p = Promise.reject(error); + error = null; + return p; + } + + // If the iterator is finished, resolve to done + if (finished) { + return Promise.resolve(createIterResult(undefined, true)); + } + return new Promise(function (resolve, reject) { + unconsumedPromises.push({ resolve: resolve, reject: reject }); + }); + }, + 'return': function _return() { + emitter.removeListener(event, eventHandler); + emitter.removeListener('error', errorHandler); + finished = true; + + for (var i = 0, l = unconsumedPromises.length; i < l; i++) { + unconsumedPromises[i].resolve(createIterResult(undefined, true)); + } + return Promise.resolve(createIterResult(undefined, true)); + }, + 'throw': function _throw(err) { + if (!err || !(err instanceof Error)) { + throw new TypeError('The "EventEmitter.AsyncIterator" property must be an instance of Error. Received ' + typeof err); + } + error = err; + emitter.removeListener(event, eventHandler); + emitter.removeListener('error', errorHandler); + } + }; + + iterator[Symbol.asyncIterator] = function () { return this; }; + + Object.setPrototypeOf(iterator, AsyncIteratorPrototype); + + emitter.on(event, eventHandler); + emitter.on('error', errorHandler); + + return iterator; + + function eventHandler() { + var args = [].slice.call(arguments); + var promise = unconsumedPromises.shift(); + if (promise) { + promise.resolve(createIterResult(args, false)); + } else { + unconsumedEvents.push(args); + } + } + + function errorHandler(err) { + finished = true; + + var toError = unconsumedPromises.shift(); + if (toError) { + toError.reject(err); + } else { + // The next time we call next() + error = err; + } + iterator.return(); + } +} + function addErrorHandlerIfEventEmitter(emitter, handler, flags) { if (typeof emitter.on === 'function') { eventTargetAgnosticAddListener(emitter, 'error', handler, flags); diff --git a/tests/index.js b/tests/index.js index 2d739e6..50430b0 100644 --- a/tests/index.js +++ b/tests/index.js @@ -50,6 +50,12 @@ if (functionsHaveNames()) { require('./modify-in-emit.js'); require('./num-args.js'); require('./once.js'); +if (typeof Promise === 'function' && hasSymbols() && Symbol.asyncIterator) { + require('./on-async-iterator.js'); +} else { + // Async iterator support is not available. + test('./on-async-iterator.js', { skip: true }, function () {}); +} require('./prepend.js'); require('./set-max-listeners-side-effects.js'); require('./special-event-names.js'); diff --git a/tests/on-async-iterator.js b/tests/on-async-iterator.js new file mode 100644 index 0000000..fa8d842 --- /dev/null +++ b/tests/on-async-iterator.js @@ -0,0 +1,224 @@ +'use strict'; + +var common = require('./common'); +var assert = require('assert'); +var EventEmitter = require('../').EventEmitter; +var on = require('../').on; + +async function basic() { + var ee = new EventEmitter(); + process.nextTick(function () { + ee.emit('foo', 'bar'); + // 'bar' is a spurious event, we are testing + // that it does not show up in the iterable + ee.emit('bar', 24); + ee.emit('foo', 42); + }); + + var iterable = on(ee, 'foo'); + + var expected = [['bar'], [42]]; + + for await (var event of iterable) { + var current = expected.shift(); + + assert.deepStrictEqual(current, event); + + if (expected.length === 0) { + break; + } + } + assert.strictEqual(ee.listenerCount('foo'), 0); + assert.strictEqual(ee.listenerCount('error'), 0); +} + +async function error() { + var ee = new EventEmitter(); + var _err = new Error('kaboom'); + process.nextTick(function () { + ee.emit('error', _err); + }); + + var iterable = on(ee, 'foo'); + let looped = false; + let thrown = false; + + try { + // eslint-disable-next-line no-unused-vars + for await (var event of iterable) { + looped = true; + } + } catch (err) { + thrown = true; + assert.strictEqual(err, _err); + } + assert.strictEqual(thrown, true); + assert.strictEqual(looped, false); +} + +async function errorDelayed() { + var ee = new EventEmitter(); + var _err = new Error('kaboom'); + process.nextTick(function () { + ee.emit('foo', 42); + ee.emit('error', _err); + }); + + var iterable = on(ee, 'foo'); + var expected = [[42]]; + let thrown = false; + + try { + for await (var event of iterable) { + var current = expected.shift(); + assert.deepStrictEqual(current, event); + } + } catch (err) { + thrown = true; + assert.strictEqual(err, _err); + } + assert.strictEqual(thrown, true); + assert.strictEqual(ee.listenerCount('foo'), 0); + assert.strictEqual(ee.listenerCount('error'), 0); +} + +async function throwInLoop() { + var ee = new EventEmitter(); + var _err = new Error('kaboom'); + + process.nextTick(function () { + ee.emit('foo', 42); + }); + + try { + for await (var event of on(ee, 'foo')) { + assert.deepStrictEqual(event, [42]); + throw _err; + } + } catch (err) { + assert.strictEqual(err, _err); + } + + assert.strictEqual(ee.listenerCount('foo'), 0); + assert.strictEqual(ee.listenerCount('error'), 0); +} + +async function next() { + var ee = new EventEmitter(); + var iterable = on(ee, 'foo'); + + process.nextTick(function() { + ee.emit('foo', 'bar'); + ee.emit('foo', 42); + iterable.return(); + }); + + var results = await Promise.all([ + iterable.next(), + iterable.next(), + iterable.next() + ]); + + assert.deepStrictEqual(results, [{ + value: ['bar'], + done: false + }, { + value: [42], + done: false + }, { + value: undefined, + done: true + }]); + + assert.deepStrictEqual(await iterable.next(), { + value: undefined, + done: true + }); +} + +async function nextError() { + var ee = new EventEmitter(); + var iterable = on(ee, 'foo'); + var _err = new Error('kaboom'); + process.nextTick(function() { + ee.emit('error', _err); + }); + var results = await Promise.allSettled([ + iterable.next(), + iterable.next(), + iterable.next() + ]); + assert.deepStrictEqual(results, [{ + status: 'rejected', + reason: _err + }, { + status: 'fulfilled', + value: { + value: undefined, + done: true + } + }, { + status: 'fulfilled', + value: { + value: undefined, + done: true + } + }]); + assert.strictEqual(ee.listeners('error').length, 0); +} + +async function iterableThrow() { + var ee = new EventEmitter(); + var iterable = on(ee, 'foo'); + + process.nextTick(function () { + ee.emit('foo', 'bar'); + ee.emit('foo', 42); // lost in the queue + iterable.throw(_err); + }); + + var _err = new Error('kaboom'); + let thrown = false; + + assert.throws(function () { + // No argument + iterable.throw(); + }, { + message: 'The "EventEmitter.AsyncIterator" property must be' + + ' an instance of Error. Received undefined', + name: 'TypeError' + }); + + var expected = [['bar'], [42]]; + + try { + for await (var event of iterable) { + assert.deepStrictEqual(event, expected.shift()); + } + } catch (err) { + thrown = true; + assert.strictEqual(err, _err); + } + assert.strictEqual(thrown, true); + assert.strictEqual(expected.length, 0); + assert.strictEqual(ee.listenerCount('foo'), 0); + assert.strictEqual(ee.listenerCount('error'), 0); +} + +async function run() { + var funcs = [ + basic, + error, + errorDelayed, + throwInLoop, + next, + nextError, + iterableThrow, + ]; + + for (var fn of funcs) { + await fn(); + } +} + +module.exports = run();