From 81dfba07fc5d767ad9fbaa2b3b6b0ae9c111d98c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20W=C3=BCrbach?= Date: Mon, 1 Apr 2019 14:38:59 +0200 Subject: [PATCH] Fix stream becoming readable again --- lib/mux.js | 27 ++++++--------------------- test/mux.js | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 21 deletions(-) diff --git a/lib/mux.js b/lib/mux.js index 26003e6e..09040d8b 100644 --- a/lib/mux.js +++ b/lib/mux.js @@ -55,26 +55,11 @@ Mux.prototype._readIncoming = function() { // are empty, or we exhaust our ability to accept chunks. function roundrobin(streams) { var s; - // if there's just one incoming stream we don't have to - // go through all the dequeue/enqueueing - if (streams.length === 1) { - s = streams.shift(); - while (accepting) { - var chunk = s.read(); - if (chunk !== null) { - accepting = out.write(chunk); - } - else break; - } - if (!accepting) streams.push(s); - } - else { - while (accepting && (s = streams.shift())) { - var chunk = s.read(); - if (chunk !== null) { - accepting = out.write(chunk); - streams.push(s); - } + while (accepting && (s = streams.shift())) { + var chunk = s.read(); + if (chunk !== null) { + accepting = out.write(chunk); + streams.push(s); } } } @@ -105,7 +90,7 @@ Mux.prototype._readIncoming = function() { Mux.prototype._scheduleRead = function() { var self = this; - + if (!self.scheduledRead) { schedule(function() { self.scheduledRead = false; diff --git a/test/mux.js b/test/mux.js index 789aa09f..a0befcec 100644 --- a/test/mux.js +++ b/test/mux.js @@ -51,6 +51,40 @@ test("single input", function(done) { input.end(); }); +test("single input, resuming stream", function(done) { + var input = stream(); + var output = stream(); + input.on('end', function() { output.end() }); + + var mux = new Mux(output); + mux.pipeFrom(input); + + // Streams might be blocked and become readable again, simulate this + // using a special read function and a marker + var data = [1,2,3,4,'skip',6,7,8,9]; + + var oldRead = input.read; + input.read = function(size) { + var val = oldRead.call(input, size) + + if (val === 'skip') { + input.emit('readable'); + return null + } + + return val; + } + + data.forEach(input.write.bind(input)); + + readAllObjects(output, function(vals) { + assert.deepEqual([1,2,3,4,6,7,8,9], vals); + done(); + }); + + input.end(); +}); + test("two sequential inputs", function(done) { var input1 = stream(); var input2 = stream();