Skip to content

Commit

Permalink
stream: fix pipeline callback error
Browse files Browse the repository at this point in the history
Checked the right place of error with a better solution on refactor

Fix: nodejs#39447
  • Loading branch information
ktfth committed Aug 1, 2021
1 parent 6c748f6 commit d68ccdf
Showing 1 changed file with 3 additions and 45 deletions.
48 changes: 3 additions & 45 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,10 @@ async function pump(iterable, writable, finish) {
}

function pipeline(...streams) {
let isArray = false;
const callback = once(popCallback(streams));

// stream.pipeline(streams, callback)
if (ArrayIsArray(streams[0]) && streams.length === 1) {
isArray = true;
streams = streams[0];
}

Expand All @@ -180,41 +178,8 @@ function pipeline(...streams) {

let finishCount = 0;

let hasTransform = [];

function finish(err) {
hasTransform = streams.filter((s) => {
try {
const hasTransformContext =
s.constructor.toString().indexOf('Transform') > -1;
const hasNotPassThrough =
s.constructor.toString().indexOf('PassThrough') === -1;
const hasNotWritable =
s.constructor.toString().indexOf('Writable') === -1;
const hasNotAsyncGeneratorFunction =
s.constructor.toString().indexOf('AsyncGeneratorFunction') === -1;
const hasNotFlush =
s._flush === undefined;
return (
hasTransform.length === 0 &&
hasTransformContext &&
hasNotPassThrough &&
hasNotWritable &&
hasNotAsyncGeneratorFunction &&
hasNotFlush &&
!isArray
);
} catch {
return false;
}
});
hasTransform = hasTransform.length > 0;
if (hasTransform) {
finishCount -= streams.length;
} else if (!hasTransform) {
--finishCount;
}
const final = finishCount === 0;
const final = --finishCount === 0;

if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) {
error = err;
Expand All @@ -224,15 +189,8 @@ function pipeline(...streams) {
return;
}

while (
(hasTransform && destroys.length - 1) ||
(!hasTransform && destroys.length)
) {
try {
destroys.shift()(error);
} catch {
// Untreated destroys error
}
while (destroys.length) {
destroys.shift()(error);
}

if (final) {
Expand Down

0 comments on commit d68ccdf

Please sign in to comment.