Skip to content

Commit

Permalink
stream: fix pipe deadlock when starting with needDrain
Browse files Browse the repository at this point in the history
Fixes: #36544

PR-URL: #36563
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
  • Loading branch information
ronag committed Dec 20, 2020
1 parent daa1322 commit ab895bd
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 28 deletions.
52 changes: 28 additions & 24 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -716,35 +716,39 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
ondrain();
}

function pause() {
// If the user unpiped during `dest.write()`, it is possible
// to get stuck in a permanently paused state if that write
// also returned false.
// => Check whether `dest` is still a piping destination.
if (!cleanedUp) {
if (state.pipes.length === 1 && state.pipes[0] === dest) {
debug('false write response, pause', 0);
state.awaitDrainWriters = dest;
state.multiAwaitDrain = false;
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
debug('false write response, pause', state.awaitDrainWriters.size);
state.awaitDrainWriters.add(dest);
}
src.pause();
}
if (!ondrain) {
// When the dest drains, it reduces the awaitDrain counter
// on the source. This would be more elegant with a .once()
// handler in flow(), but adding and removing repeatedly is
// too slow.
ondrain = pipeOnDrain(src, dest);
dest.on('drain', ondrain);
}
}

src.on('data', ondata);
function ondata(chunk) {
debug('ondata');
const ret = dest.write(chunk);
debug('dest.write', ret);
if (ret === false) {
// If the user unpiped during `dest.write()`, it is possible
// to get stuck in a permanently paused state if that write
// also returned false.
// => Check whether `dest` is still a piping destination.
if (!cleanedUp) {
if (state.pipes.length === 1 && state.pipes[0] === dest) {
debug('false write response, pause', 0);
state.awaitDrainWriters = dest;
state.multiAwaitDrain = false;
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
debug('false write response, pause', state.awaitDrainWriters.size);
state.awaitDrainWriters.add(dest);
}
src.pause();
}
if (!ondrain) {
// When the dest drains, it reduces the awaitDrain counter
// on the source. This would be more elegant with a .once()
// handler in flow(), but adding and removing repeatedly is
// too slow.
ondrain = pipeOnDrain(src, dest);
dest.on('drain', ondrain);
}
pause();
}
}

Expand Down Expand Up @@ -793,7 +797,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {

if (dest.writableNeedDrain === true) {
if (state.flowing) {
src.pause();
pause();
}
} else if (!state.flowing) {
debug('pipe resume');
Expand Down
11 changes: 7 additions & 4 deletions test/parallel/test-stream-pipe-needDrain.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ const assert = require('assert');
const Readable = require('_stream_readable');
const Writable = require('_stream_writable');

// Pipe should not continue writing if writable needs drain.
// Pipe should pause temporarily if writable needs drain.
{
const w = new Writable({
write(buf, encoding, callback) {

}
process.nextTick(callback);
},
highWaterMark: 1
});

while (w.write('asd'));
Expand All @@ -20,10 +21,12 @@ const Writable = require('_stream_writable');
const r = new Readable({
read() {
this.push('asd');
this.push(null);
}
});

w.write = common.mustNotCall();
r.on('pause', common.mustCall(2));
r.on('end', common.mustCall());

r.pipe(w);
}

0 comments on commit ab895bd

Please sign in to comment.