From debd387ba8b24b317a4273e64afa4c02801d8cc0 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 17 Nov 2021 09:38:33 +0100 Subject: [PATCH] fixup: don't read more if failed --- lib/internal/streams/operators.js | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 5765a803582453..5a8ccff0bed437 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -20,6 +20,7 @@ module.exports.map = function map(stream, fn, options) { const queue = []; let reading = false; + let failed = false; // TODO: What about hwm? This will cause some unnecessary buffering. const ret = new Readable({ @@ -70,6 +71,7 @@ module.exports.map = function map(stream, fn, options) { try { return [null, await val]; } catch (err) { + failed = true; return [err, null]; } } @@ -82,21 +84,25 @@ module.exports.map = function map(stream, fn, options) { } function pump () { - while (queue.length < concurrency) { - let val = stream.read(); - if (val === null) { - return; - } - try { + if (failed) { + return; + } + try { + while (queue.length < concurrency) { + let val = stream.read(); + if (val === null) { + return; + } val = fn(val, { signal }); if (val && typeof val.then === 'function') { enqueue(wrap(val)); } else { enqueue([null, val]); } - } catch (err) { - enqueue([err, null]); } + } catch (err) { + failed = true; + enqueue([err, null]); } }