Skip to content

Commit

Permalink
stream: adjust src hwm when pipelining
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Nov 7, 2021
1 parent 0a62026 commit 46f9638
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 11 deletions.
38 changes: 37 additions & 1 deletion lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ function pipelineImpl(streams, callback, opts) {
}
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
ret.pipe(stream);
pipe(ret, stream);

// Compat. Before node v10.12.0 stdio used to throw an error so
// pipe() did/does not end() stdio destinations.
Expand All @@ -321,4 +321,40 @@ function pipelineImpl(streams, callback, opts) {
return ret;
}

function pipe (src, dst) {
const state = src._readableState;

if (typeof src.read !== 'function' && state) {
src.pipe(dst);
return;
}

src
.on('end', end)
.on('readable', pump);
dst
.on('drain', pump);

function end () {
dst.end();
}

function pump () {
if (dst.writableNeedDrain) {
return;
}

while (true) {
state.highwaterMark = dst.writableHighwaterMark || dst.highwaterMark || null;

const chunk = src.read();
if (chunk === null || !dst.write(chunk)) {
return;
}
}
}

process.nextTick(pump);
}

module.exports = { pipelineImpl, pipeline };
11 changes: 4 additions & 7 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ const tsp = require('timers/promises');
let finished = false;
const processed = [];
const expected = [
Buffer.from('a'),
Buffer.from('b'),
Buffer.from('c'),
Buffer.from('abc')
];

const read = new Readable({
Expand Down Expand Up @@ -348,8 +346,7 @@ const tsp = require('timers/promises');
};

const expected = [
Buffer.from('hello'),
Buffer.from('world'),
Buffer.from('helloworld')
];

const rs = new Readable({
Expand Down Expand Up @@ -985,7 +982,7 @@ const tsp = require('timers/promises');
// Make sure 'close' before 'end' finishes without error
// if readable has received eof.
// Ref: https://github.com/nodejs/node/issues/29699
const r = new Readable();
const r = new Readable(({ read() {} }));
const w = new Writable({
write(chunk, encoding, cb) {
cb();
Expand Down Expand Up @@ -1350,7 +1347,7 @@ const tsp = require('timers/promises');
});
const cb = common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
assert.strictEqual(res, '012345');
assert.strictEqual(res, '01234');
assert.strictEqual(w.destroyed, true);
assert.strictEqual(r.destroyed, true);
assert.strictEqual(pipelined.destroyed, true);
Expand Down
4 changes: 1 addition & 3 deletions test/parallel/test-stream-promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ assert.strictEqual(finished, promisify(stream.finished));
let finished = false;
const processed = [];
const expected = [
Buffer.from('a'),
Buffer.from('b'),
Buffer.from('c'),
Buffer.from('abc'),
];

const read = new Readable({
Expand Down

0 comments on commit 46f9638

Please sign in to comment.