From 454b0da2e3f7b4a2741ec4a1385cc63f5e160081 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 10 Feb 2023 11:54:44 +0530 Subject: [PATCH 1/3] stream: fix pipeline callback not called on ended stream Fixes: https://github.com/nodejs/node/issues/46595 --- lib/internal/streams/pipeline.js | 12 +++++++++-- test/parallel/test-stream-pipeline.js | 25 +++++++++++++++++++++++ test/parallel/test-webstreams-pipeline.js | 12 ++++++++++- 3 files changed, 46 insertions(+), 3 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 44c0e06ee30557..ee342f8861d5bc 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -38,6 +38,7 @@ const { isTransformStream, isWebStream, isReadableStream, + isReadableFinished, } = require('internal/streams/utils'); const { AbortController } = require('internal/abort_controller'); @@ -417,10 +418,17 @@ function pipe(src, dst, finish, { end }) { // Compat. Before node v10.12.0 stdio used to throw an error so // pipe() did/does not end() stdio destinations. // Now they allow it but "secretly" don't close the underlying fd. - src.once('end', () => { + + function endFn() { ended = true; dst.end(); - }); + } + + src.once('end', endFn); + + if (isReadableFinished(src)) { // End the destination if the source has already ended. + process.nextTick(endFn); + } } else { finish(); } diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 65ef5164c14b4c..d37ca275f1dddf 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1591,3 +1591,28 @@ const tsp = require('timers/promises'); assert.strictEqual(writable.endCount, 1); })); } + +{ + const readable = new Readable({ + read() {} + }); + readable.on('end', common.mustCall(() => { + pipeline(readable, new PassThrough(), common.mustSucceed()); + })); + readable.push(null); + readable.read(); +} + +{ + const dup = new Duplex({ + read() {}, + write(chunk, enc, cb) { + cb(); + } + }); + dup.on('end', common.mustCall(() => { + pipeline(dup, new PassThrough(), common.mustSucceed()); + })); + dup.push(null); + dup.read(); +} diff --git a/test/parallel/test-webstreams-pipeline.js b/test/parallel/test-webstreams-pipeline.js index 46bdf8718ea97a..ac673dd9d42ea7 100644 --- a/test/parallel/test-webstreams-pipeline.js +++ b/test/parallel/test-webstreams-pipeline.js @@ -2,7 +2,7 @@ const common = require('../common'); const assert = require('assert'); -const { Readable, Writable, Transform, pipeline } = require('stream'); +const { Readable, Writable, Transform, pipeline, PassThrough } = require('stream'); const { pipeline: pipelinePromise } = require('stream/promises'); const { ReadableStream, WritableStream, TransformStream } = require('stream/web'); const http = require('http'); @@ -410,3 +410,13 @@ const http = require('http'); } c.close(); } + +{ + const rs = new ReadableStream({ + start(controller) { + controller.close(); + } + }); + + pipeline(rs, new PassThrough(), common.mustSucceed()); +} From e818207a17cc992deeb370cd1807dc2c39f105bc Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 10 Feb 2023 12:30:19 +0530 Subject: [PATCH 2/3] fixup! Co-authored-by: Robert Nagy --- lib/internal/streams/pipeline.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index ee342f8861d5bc..59b165733a7e4e 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -424,11 +424,11 @@ function pipe(src, dst, finish, { end }) { dst.end(); } - src.once('end', endFn); - - if (isReadableFinished(src)) { // End the destination if the source has already ended. + if (isReadableEnded(src)) { // End the destination if the source has already ended. process.nextTick(endFn); - } + } else { + src.once('end', endFn); + } } else { finish(); } From 56e63b838f87d24306c1914b0869ea4a4b338c74 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 10 Feb 2023 12:37:44 +0530 Subject: [PATCH 3/3] fixup! --- lib/internal/streams/pipeline.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 59b165733a7e4e..8d86c718b871c7 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -38,7 +38,7 @@ const { isTransformStream, isWebStream, isReadableStream, - isReadableFinished, + isReadableEnded, } = require('internal/streams/utils'); const { AbortController } = require('internal/abort_controller'); @@ -428,7 +428,7 @@ function pipe(src, dst, finish, { end }) { process.nextTick(endFn); } else { src.once('end', endFn); - } + } } else { finish(); }