From 4d93e105bfad79ff6c6f01e4b7c2fdd70caeb43b Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 5 Mar 2020 23:18:23 +0100 Subject: [PATCH] stream: don't destroy final readable stream in pipeline If the last stream in a pipeline is still usable/readable don't destroy it to allow further composition. Fixes: https://github.com/nodejs/node/issues/32105 PR-URL: https://github.com/nodejs/node/pull/32110 Reviewed-By: Matteo Collina Reviewed-By: Luigi Pinca --- lib/internal/streams/pipeline.js | 13 ++++++-- test/parallel/test-stream-pipeline.js | 48 ++++++++++++++++++++++++++- 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index c1299a6db42a3d..f5535bbff05dfe 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -25,9 +25,12 @@ let EE; let PassThrough; let createReadableStreamAsyncIterator; -function destroyer(stream, reading, writing, callback) { +function destroyer(stream, reading, writing, final, callback) { const _destroy = once((err) => { - destroyImpl.destroyer(stream, err); + const readable = stream.readable || isRequest(stream); + if (err || !final || !readable) { + destroyImpl.destroyer(stream, err); + } callback(err); }); @@ -68,6 +71,10 @@ function popCallback(streams) { return streams.pop(); } +function isRequest(stream) { + return stream.setHeader && typeof stream.abort === 'function'; +} + function isPromise(obj) { return !!(obj && typeof obj.then === 'function'); } @@ -159,7 +166,7 @@ function pipeline(...streams) { } function wrap(stream, reading, writing, final) { - destroys.push(destroyer(stream, reading, writing, (err) => { + destroys.push(destroyer(stream, reading, writing, final, (err) => { finish(err, final); })); } diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 74473b0db40600..7e0cb9193dc611 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -916,7 +916,7 @@ const { promisify } = require('util'); const dst = new PassThrough({ autoDestroy: false }); pipeline(src, dst, common.mustCall(() => { assert.strictEqual(src.destroyed, true); - assert.strictEqual(dst.destroyed, true); + assert.strictEqual(dst.destroyed, false); })); src.end(); } @@ -938,3 +938,49 @@ const { promisify } = require('util'); r.push(null); r.emit('close'); } + +{ + const server = http.createServer((req, res) => { + }); + + server.listen(0, () => { + const req = http.request({ + port: server.address().port + }); + + const body = new PassThrough(); + pipeline( + body, + req, + common.mustCall((err) => { + assert(!err); + assert(!req.res); + assert(!req.aborted); + req.abort(); + server.close(); + }) + ); + body.end(); + }); +} + +{ + const src = new PassThrough(); + const dst = new PassThrough(); + pipeline(src, dst, common.mustCall((err) => { + assert(!err); + assert.strictEqual(dst.destroyed, false); + })); + src.end(); +} + +{ + const src = new PassThrough(); + const dst = new PassThrough(); + dst.readable = false; + pipeline(src, dst, common.mustCall((err) => { + assert(!err); + assert.strictEqual(dst.destroyed, true); + })); + src.end(); +}