From 579d456d2551fea3db9ee2cf9d425d4c61166cad Mon Sep 17 00:00:00 2001 From: jakecastelli <959672929@qq.com> Date: Sat, 1 Jun 2024 22:59:38 +0930 Subject: [PATCH 1/2] stream: pipe to a closed or destroyed stream is not allowed in pipeline --- doc/api/errors.md | 6 ++++++ lib/internal/errors.js | 1 + lib/internal/streams/pipeline.js | 6 ++++++ test/parallel/test-stream-pipeline.js | 13 +++++++++++++ 4 files changed, 26 insertions(+) diff --git a/doc/api/errors.md b/doc/api/errors.md index dde490c29b477c..0727c83386c16d 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -2621,6 +2621,12 @@ or a pipeline ends non gracefully with no explicit error. An attempt was made to call [`stream.push()`][] after a `null`(EOF) had been pushed to the stream. + + +### `ERR_STREAM_UNABLE_TO_PIIPE` + +An attempt was made to pipe to a closed or destroyed stream in a pipeline. + ### `ERR_STREAM_UNSHIFT_AFTER_END_EVENT` diff --git a/lib/internal/errors.js b/lib/internal/errors.js index 8ab4a63a0f7110..9bee977f63198d 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -1716,6 +1716,7 @@ E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error); E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError); E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error); E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error); +E('ERR_STREAM_UNABLE_TO_PIIPE', 'Connot pipe to a closed or destroyed stream', Error); E('ERR_STREAM_UNSHIFT_AFTER_END_EVENT', 'stream.unshift() after end event', Error); E('ERR_STREAM_WRAP', 'Stream has StringDecoder set or is in objectMode', Error); diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index edf1d37f9fe3bf..82150e35d92e42 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -23,6 +23,7 @@ const { ERR_MISSING_ARGS, ERR_STREAM_DESTROYED, ERR_STREAM_PREMATURE_CLOSE, + ERR_STREAM_UNABLE_TO_PIIPE, }, } = require('internal/errors'); @@ -253,10 +254,15 @@ function pipelineImpl(streams, callback, opts) { const stream = streams[i]; const reading = i < streams.length - 1; const writing = i > 0; + const next = i + 1 < streams.length ? streams[i + 1] : null; const end = reading || opts?.end !== false; const isLastStream = i === streams.length - 1; if (isNodeStream(stream)) { + if (next !== null && (next?.closed || next?.destroyed)) { + throw new ERR_STREAM_UNABLE_TO_PIIPE(); + } + if (end) { const { destroy, cleanup } = destroyer(stream, reading, writing); destroys.push(destroy); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 8237fff33b3ac8..8a9cb8ba64a57c 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -17,6 +17,8 @@ const http = require('http'); const { promisify } = require('util'); const net = require('net'); const tsp = require('timers/promises'); +const tmpdir = require('../common/tmpdir'); +const fs = require('fs'); { let finished = false; @@ -69,6 +71,17 @@ const tsp = require('timers/promises'); }, /ERR_INVALID_ARG_TYPE/); } +tmpdir.refresh(); +{ + assert.rejects(async () => { + const read = fs.createReadStream(__filename); + const write = fs.createWriteStream(tmpdir.resolve('a')); + const close = promisify(write.close); + await close.call(write); + await pipelinep(read, write); + }, /ERR_STREAM_UNABLE_TO_PIIPE/).then(common.mustCall()); +} + { const read = new Readable({ read() {} From 845c66c866ec5790e1104a387c30061bcc7d5454 Mon Sep 17 00:00:00 2001 From: jakecastelli <959672929@qq.com> Date: Sun, 2 Jun 2024 02:02:10 +0930 Subject: [PATCH 2/2] fixup! typo --- doc/api/errors.md | 4 ++-- lib/internal/errors.js | 2 +- lib/internal/streams/pipeline.js | 4 ++-- test/parallel/test-stream-pipeline.js | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/doc/api/errors.md b/doc/api/errors.md index 0727c83386c16d..4d3829f78e7b95 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -2621,9 +2621,9 @@ or a pipeline ends non gracefully with no explicit error. An attempt was made to call [`stream.push()`][] after a `null`(EOF) had been pushed to the stream. - + -### `ERR_STREAM_UNABLE_TO_PIIPE` +### `ERR_STREAM_UNABLE_TO_PIPE` An attempt was made to pipe to a closed or destroyed stream in a pipeline. diff --git a/lib/internal/errors.js b/lib/internal/errors.js index 9bee977f63198d..f78be397d5dce2 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -1716,7 +1716,7 @@ E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error); E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError); E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error); E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error); -E('ERR_STREAM_UNABLE_TO_PIIPE', 'Connot pipe to a closed or destroyed stream', Error); +E('ERR_STREAM_UNABLE_TO_PIPE', 'Connot pipe to a closed or destroyed stream', Error); E('ERR_STREAM_UNSHIFT_AFTER_END_EVENT', 'stream.unshift() after end event', Error); E('ERR_STREAM_WRAP', 'Stream has StringDecoder set or is in objectMode', Error); diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 82150e35d92e42..bb34759b1fea12 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -23,7 +23,7 @@ const { ERR_MISSING_ARGS, ERR_STREAM_DESTROYED, ERR_STREAM_PREMATURE_CLOSE, - ERR_STREAM_UNABLE_TO_PIIPE, + ERR_STREAM_UNABLE_TO_PIPE, }, } = require('internal/errors'); @@ -260,7 +260,7 @@ function pipelineImpl(streams, callback, opts) { if (isNodeStream(stream)) { if (next !== null && (next?.closed || next?.destroyed)) { - throw new ERR_STREAM_UNABLE_TO_PIIPE(); + throw new ERR_STREAM_UNABLE_TO_PIPE(); } if (end) { diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 8a9cb8ba64a57c..7e69754b36d771 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -79,7 +79,7 @@ tmpdir.refresh(); const close = promisify(write.close); await close.call(write); await pipelinep(read, write); - }, /ERR_STREAM_UNABLE_TO_PIIPE/).then(common.mustCall()); + }, /ERR_STREAM_UNABLE_TO_PIPE/).then(common.mustCall()); } {