Skip to content

Commit

Permalink
stream: always defer 'readable' with nextTick
Browse files Browse the repository at this point in the history
  • Loading branch information
mcollina committed Jan 5, 2018
1 parent f89ee06 commit 102c2f0
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 57 deletions.
10 changes: 5 additions & 5 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -508,16 +508,15 @@ function emitReadable(stream) {
if (!state.emittedReadable) {
debug('emitReadable', state.flowing);
state.emittedReadable = true;
if (state.sync)
process.nextTick(emitReadable_, stream);
else
emitReadable_(stream);
process.nextTick(emitReadable_, stream);
}
}

function emitReadable_(stream) {
var state = stream._readableState;
debug('emit readable');
stream.emit('readable');
state.needReadable = !state.flowing && !state.ended;
flow(stream);
}

Expand Down Expand Up @@ -644,6 +643,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
debug('ondata');
increasedAwaitDrain = false;
var ret = dest.write(chunk);
debug('dest.write', ret);
if (false === ret && !increasedAwaitDrain) {
// If the user unpiped during `dest.write()`, it is possible
// to get stuck in a permanently paused state if that write
Expand Down Expand Up @@ -824,8 +824,8 @@ function resume(stream, state) {
}

function resume_(stream, state) {
debug('resume', state.reading);
if (!state.reading) {
debug('resume read 0');
stream.read(0);
}

Expand Down
12 changes: 8 additions & 4 deletions test/parallel/test-net-end-close.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,22 @@ const uv = process.binding('uv');
const s = new net.Socket({
handle: {
readStart: function() {
process.nextTick(() => this.onread(uv.UV_EOF, null));
setImmediate(() => this.onread(uv.UV_EOF, null));
},
close: (cb) => process.nextTick(cb)
close: (cb) => setImmediate(cb)
},
writable: false
});
assert.strictEqual(s, s.resume());

const events = [];

s.on('end', () => events.push('end'));
s.on('close', () => events.push('close'));
s.on('end', () => {
events.push('end');
});
s.on('close', () => {
events.push('close');
});

process.on('exit', () => {
assert.deepStrictEqual(events, [ 'end', 'close' ]);
Expand Down
32 changes: 12 additions & 20 deletions test/parallel/test-stream-pipe-await-drain-push-while-write.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,24 @@ const common = require('../common');
const stream = require('stream');
const assert = require('assert');

const awaitDrainStates = [
1, // after first chunk before callback
1, // after second chunk before callback
0 // resolving chunk pushed after first chunk, awaitDrain is decreased
];

// A writable stream which pushes data onto the stream which pipes into it,
// but only the first time it's written to. Since it's not paused at this time,
// a second write will occur. If the pipe increases awaitDrain twice, we'll
// never get subsequent chunks because 'drain' is only emitted once.
const writable = new stream.Writable({
write: common.mustCall(function(chunk, encoding, cb) {
if (chunk.length === 32 * 1024) { // first chunk
const beforePush = readable._readableState.awaitDrain;
readable.push(Buffer.alloc(34 * 1024)); // above hwm
// We should check if awaitDrain counter is increased.
const afterPush = readable._readableState.awaitDrain;
assert.strictEqual(afterPush - beforePush, 1,
'Counter is not increased for awaitDrain');
}

assert.strictEqual(
awaitDrainStates.shift(),
readable._readableState.awaitDrain,
0,
'State variable awaitDrain is not correct.'
);

if (chunk.length === 32 * 1024) { // first chunk
readable.push(Buffer.alloc(34 * 1024)); // above hwm
// We should check if awaitDrain counter is increased in the next
// tick, because awaitDrain is incremented after this method finished
process.nextTick(() => {
assert.strictEqual(readable._readableState.awaitDrain, 1,
'Counter is not increased for awaitDrain');
});
}

cb();
}, 3)
});
Expand Down
17 changes: 10 additions & 7 deletions test/parallel/test-stream-readable-emittedReadable.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,33 @@ const readable = new Readable({
// Initialized to false.
assert.strictEqual(readable._readableState.emittedReadable, false);

const expected = [Buffer.from('foobar'), Buffer.from('quo'), null];
readable.on('readable', common.mustCall(() => {
// emittedReadable should be true when the readable event is emitted
assert.strictEqual(readable._readableState.emittedReadable, true);
readable.read();
assert.deepStrictEqual(readable.read(), expected.shift());
// emittedReadable is reset to false during read()
assert.strictEqual(readable._readableState.emittedReadable, false);
}, 4));
}, 3));

// When the first readable listener is just attached,
// emittedReadable should be false
assert.strictEqual(readable._readableState.emittedReadable, false);

// Each one of these should trigger a readable event.
// These trigger a single 'readable', as things are batched up
process.nextTick(common.mustCall(() => {
readable.push('foo');
}));
process.nextTick(common.mustCall(() => {
readable.push('bar');
}));
process.nextTick(common.mustCall(() => {

// these triggers two readable events
setImmediate(common.mustCall(() => {
readable.push('quo');
}));
process.nextTick(common.mustCall(() => {
readable.push(null);
process.nextTick(common.mustCall(() => {
readable.push(null);
}));
}));

const noRead = new Readable({
Expand Down
23 changes: 12 additions & 11 deletions test/parallel/test-stream-readable-needReadable.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,17 @@ asyncReadable.on('readable', common.mustCall(() => {
// then we need to notify the reader on future changes.
assert.strictEqual(asyncReadable._readableState.needReadable, true);
}
}, 3));
}, 2));

process.nextTick(common.mustCall(() => {
asyncReadable.push('foooo');
}));
process.nextTick(common.mustCall(() => {
asyncReadable.push('bar');
}));
process.nextTick(common.mustCall(() => {
setImmediate(common.mustCall(() => {
asyncReadable.push(null);
assert.strictEqual(asyncReadable._readableState.needReadable, false);
}));

const flowing = new Readable({
Expand Down Expand Up @@ -84,13 +85,13 @@ slowProducer.on('readable', common.mustCall(() => {

process.nextTick(common.mustCall(() => {
slowProducer.push('foo');
}));
process.nextTick(common.mustCall(() => {
slowProducer.push('foo');
}));
process.nextTick(common.mustCall(() => {
slowProducer.push('foo');
}));
process.nextTick(common.mustCall(() => {
slowProducer.push(null);
process.nextTick(common.mustCall(() => {
slowProducer.push('foo');
process.nextTick(common.mustCall(() => {
slowProducer.push('foo');
process.nextTick(common.mustCall(() => {
slowProducer.push(null);
}));
}));
}));
}));
56 changes: 56 additions & 0 deletions test/parallel/test-stream-readable-object-multi-push-async.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const { Readable } = require('stream');

const MAX = 42;
const BATCH = 10;

const readable = new Readable({
objectMode: true,
read: common.mustCall(function() {
console.log('>> READ');
fetchData((err, data) => {
if (err) {
this.destroy(err);
return;
}

if (data.length === 0) {
console.log('pushing null');
this.push(null);
return;
}

console.log('pushing');
data.forEach((d) => this.push(d));
});
}, Math.floor(MAX / BATCH) + 2)
});

let i = 0;
function fetchData(cb) {
if (i > MAX) {
setTimeout(cb, 10, null, []);
} else {
const array = [];
const max = i + BATCH;
for (; i < max; i++) {
array.push(i);
}
setTimeout(cb, 10, null, array);
}
}

readable.on('readable', () => {
let data;
console.log('readable emitted');
while (data = readable.read()) {
console.log(data);
}
});

readable.on('end', () => {
assert.strictEqual(i, (Math.floor(MAX / BATCH) + 1) * BATCH);
});
12 changes: 8 additions & 4 deletions test/parallel/test-stream-readable-reading-readingMore.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,19 @@ function onStreamEnd() {
assert.strictEqual(state.reading, false);
}

const expected = [
true, // stream is not ended
false // stream is ended
];

readable.on('readable', common.mustCall(() => {
// 'readable' always gets called before 'end'
// since 'end' hasn't been emitted, more data could be incoming
assert.strictEqual(state.readingMore, true);
assert.strictEqual(state.readingMore, expected.shift());

// if the stream has ended, we shouldn't be reading
assert.strictEqual(state.ended, !state.reading);

if (readable.read() === null) // reached end of stream
const data = readable.read();
if (data === null) // reached end of stream
process.nextTick(common.mustCall(onStreamEnd, 1));
}, 2));

Expand Down
13 changes: 7 additions & 6 deletions test/parallel/test-stream2-transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -306,25 +306,26 @@ const Transform = require('_stream_transform');
pt.write(Buffer.from('foog'));
pt.write(Buffer.from('bark'));

assert.strictEqual(emits, 1);
assert.strictEqual(emits, 0);
assert.strictEqual(pt.read(5).toString(), 'foogb');
assert.strictEqual(String(pt.read(5)), 'null');
assert.strictEqual(emits, 0);

pt.write(Buffer.from('bazy'));
pt.write(Buffer.from('kuel'));

assert.strictEqual(emits, 2);
assert.strictEqual(emits, 0);
assert.strictEqual(pt.read(5).toString(), 'arkba');
assert.strictEqual(pt.read(5).toString(), 'zykue');
assert.strictEqual(pt.read(5), null);

pt.end();

assert.strictEqual(emits, 3);
assert.strictEqual(emits, 0);
assert.strictEqual(pt.read(5).toString(), 'l');
assert.strictEqual(pt.read(5), null);

assert.strictEqual(emits, 3);
assert.strictEqual(emits, 0);
}

{
Expand All @@ -338,7 +339,7 @@ const Transform = require('_stream_transform');
pt.write(Buffer.from('foog'));
pt.write(Buffer.from('bark'));

assert.strictEqual(emits, 1);
assert.strictEqual(emits, 0);
assert.strictEqual(pt.read(5).toString(), 'foogb');
assert.strictEqual(pt.read(5), null);

Expand All @@ -352,7 +353,7 @@ const Transform = require('_stream_transform');
pt.once('readable', common.mustCall(function() {
assert.strictEqual(pt.read(5).toString(), 'l');
assert.strictEqual(pt.read(5), null);
assert.strictEqual(emits, 4);
assert.strictEqual(emits, 3);
}));
pt.end();
}));
Expand Down

0 comments on commit 102c2f0

Please sign in to comment.