diff --git a/lib/internal/streams/from.js b/lib/internal/streams/from.js index c4e06945260d5f..6798411fa4b038 100644 --- a/lib/internal/streams/from.js +++ b/lib/internal/streams/from.js @@ -43,9 +43,6 @@ function from(Readable, iterable, opts) { // being called before last iteration completion. let reading = false; - // Flag for when iterator needs to be explicitly closed. - let needToClose = false; - readable._read = function() { if (!reading) { reading = true; @@ -54,19 +51,23 @@ function from(Readable, iterable, opts) { }; readable._destroy = function(error, cb) { - if (needToClose) { - needToClose = false; - PromisePrototypeThen( - close(), - () => process.nextTick(cb, error), - (e) => process.nextTick(cb, error || e), - ); - } else { - cb(error); - } + PromisePrototypeThen( + close(error), + () => process.nextTick(cb, error), // nextTick is here in case cb throws + (e) => process.nextTick(cb, e || error), + ); }; - async function close() { + async function close(error) { + const hadError = (error !== undefined) && (error !== null); + const hasThrow = typeof iterator.throw === 'function'; + if (hadError && hasThrow) { + const { value, done } = await iterator.throw(error); + await value; + if (done) { + return; + } + } if (typeof iterator.return === 'function') { const { value } = await iterator.return(); await value; @@ -75,13 +76,9 @@ function from(Readable, iterable, opts) { async function next() { try { - needToClose = false; const { value, done } = await iterator.next(); - needToClose = !done; if (done) { readable.push(null); - } else if (readable.destroyed) { - await close(); } else { const res = await value; if (res === null) { diff --git a/test/parallel/test-readable-from.js b/test/parallel/test-readable-from.js index a4e0f1b566c32c..aca8f5548a7129 100644 --- a/test/parallel/test-readable-from.js +++ b/test/parallel/test-readable-from.js @@ -4,6 +4,7 @@ const { mustCall } = require('../common'); const { once } = require('events'); const { Readable } = require('stream'); const { strictEqual, throws } = require('assert'); +const common = require('../common'); { throws(() => { @@ -187,6 +188,25 @@ async function endWithError() { } } +async function destroyingStreamWithErrorThrowsInGenerator() { + const validateError = common.mustCall((e) => { + strictEqual(e, 'Boum'); + }); + async function* generate() { + try { + yield 1; + yield 2; + yield 3; + throw new Error(); + } catch (e) { + validateError(e); + } + } + const stream = Readable.from(generate()); + stream.read(); + stream.once('error', common.mustCall()); + stream.destroy('Boum'); +} Promise.all([ toReadableBasicSupport(), @@ -198,5 +218,6 @@ Promise.all([ toReadableOnDataNonObject(), destroysTheStreamWhenThrowing(), asTransformStream(), - endWithError() + endWithError(), + destroyingStreamWithErrorThrowsInGenerator(), ]).then(mustCall());