From 46f9638576a0f532156788f587c7edff48643855 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 5 Nov 2021 13:28:25 +0200 Subject: [PATCH] stream: adjust src hwm when pipelining --- lib/internal/streams/pipeline.js | 38 ++++++++++++++++++++++++++- test/parallel/test-stream-pipeline.js | 11 +++----- test/parallel/test-stream-promises.js | 4 +-- 3 files changed, 42 insertions(+), 11 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 8dc4e5792c47d8..2be0a0399fd8a1 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -294,7 +294,7 @@ function pipelineImpl(streams, callback, opts) { } } else if (isNodeStream(stream)) { if (isReadableNodeStream(ret)) { - ret.pipe(stream); + pipe(ret, stream); // Compat. Before node v10.12.0 stdio used to throw an error so // pipe() did/does not end() stdio destinations. @@ -321,4 +321,40 @@ function pipelineImpl(streams, callback, opts) { return ret; } +function pipe (src, dst) { + const state = src._readableState; + + if (typeof src.read !== 'function' && state) { + src.pipe(dst); + return; + } + + src + .on('end', end) + .on('readable', pump); + dst + .on('drain', pump); + + function end () { + dst.end(); + } + + function pump () { + if (dst.writableNeedDrain) { + return; + } + + while (true) { + state.highwaterMark = dst.writableHighwaterMark || dst.highwaterMark || null; + + const chunk = src.read(); + if (chunk === null || !dst.write(chunk)) { + return; + } + } + } + + process.nextTick(pump); +} + module.exports = { pipelineImpl, pipeline }; diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 061ef923d03a59..f95084c93fe90e 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -22,9 +22,7 @@ const tsp = require('timers/promises'); let finished = false; const processed = []; const expected = [ - Buffer.from('a'), - Buffer.from('b'), - Buffer.from('c'), + Buffer.from('abc') ]; const read = new Readable({ @@ -348,8 +346,7 @@ const tsp = require('timers/promises'); }; const expected = [ - Buffer.from('hello'), - Buffer.from('world'), + Buffer.from('helloworld') ]; const rs = new Readable({ @@ -985,7 +982,7 @@ const tsp = require('timers/promises'); // Make sure 'close' before 'end' finishes without error // if readable has received eof. // Ref: https://github.com/nodejs/node/issues/29699 - const r = new Readable(); + const r = new Readable(({ read() {} })); const w = new Writable({ write(chunk, encoding, cb) { cb(); @@ -1350,7 +1347,7 @@ const tsp = require('timers/promises'); }); const cb = common.mustCall((err) => { assert.strictEqual(err.name, 'AbortError'); - assert.strictEqual(res, '012345'); + assert.strictEqual(res, '01234'); assert.strictEqual(w.destroyed, true); assert.strictEqual(r.destroyed, true); assert.strictEqual(pipelined.destroyed, true); diff --git a/test/parallel/test-stream-promises.js b/test/parallel/test-stream-promises.js index 33bfa292720da1..2ceee5176a57ae 100644 --- a/test/parallel/test-stream-promises.js +++ b/test/parallel/test-stream-promises.js @@ -25,9 +25,7 @@ assert.strictEqual(finished, promisify(stream.finished)); let finished = false; const processed = []; const expected = [ - Buffer.from('a'), - Buffer.from('b'), - Buffer.from('c'), + Buffer.from('abc'), ]; const read = new Readable({