Skip to content

Commit

Permalink
stream: restore flow if there are 'data' handlers after once('readable')
Browse files Browse the repository at this point in the history
  • Loading branch information
mcollina committed Aug 21, 2018
1 parent 0f85e20 commit 861124a
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 7 deletions.
22 changes: 19 additions & 3 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,12 @@ instance, when the `readable.resume()` method is called without a listener
attached to the `'data'` event, or when a `'data'` event handler is removed
from the stream.

Adding a [`'readable'`][] event handler automatically make the stream to
stop flowing, and the data to be consumed via
[`readable.read()`][stream-read]. If the [`'readable'`] event handler is
removed, then the stream will start flowing again if there is a
[`'data'`][] event handler.

#### Three States

The "two modes" of operation for a `Readable` stream are a simplified
Expand Down Expand Up @@ -666,12 +672,15 @@ within the streams internal buffer.
The `Readable` stream API evolved across multiple Node.js versions and provides
multiple methods of consuming stream data. In general, developers should choose
*one* of the methods of consuming data and *should never* use multiple methods
to consume data from a single stream.
to consume data from a single stream. Specifically, using a combination
of `on('data')`, `on('readable')`, `pipe()` or async iterators could
lead to unintuitive behavior.

Use of the `readable.pipe()` method is recommended for most users as it has been
implemented to provide the easiest way of consuming stream data. Developers that
require more fine-grained control over the transfer and generation of data can
use the [`EventEmitter`][] and `readable.pause()`/`readable.resume()` APIs.
use the [`EventEmitter`][] and `readable.on('readable')`/`readable.read()`
or the `readable.pause()`/`readable.resume()` APIs.

#### Class: stream.Readable
<!-- YAML
Expand Down Expand Up @@ -825,7 +834,11 @@ result in increased throughput.

If both `'readable'` and [`'data'`][] are used at the same time, `'readable'`
takes precedence in controlling the flow, i.e. `'data'` will be emitted
only when [`stream.read()`][stream-read] is called.
only when [`stream.read()`][stream-read] is called. The
`readableFlowing` property would become `false`.
If there are `'data'` listeners when `'readable'` is removed, the stream
will start flowing, i.e. `'data'` events will be emitted without calling
`.resume()`.

##### readable.destroy([error])
<!-- YAML
Expand Down Expand Up @@ -887,6 +900,9 @@ readable.on('data', (chunk) => {
});
```

The `readable.pause()` method has no effect if there is a `'readable'`
event listener.

##### readable.pipe(destination[, options])
<!-- YAML
added: v0.9.4
Expand Down
9 changes: 8 additions & 1 deletion lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ Readable.prototype.on = function(ev, fn) {
} else if (ev === 'readable') {
if (!state.endEmitted && !state.readableListening) {
state.readableListening = state.needReadable = true;
state.flowing = false;
state.emittedReadable = false;
debug('on readable', state.length, state.reading);
if (state.length) {
Expand Down Expand Up @@ -858,6 +859,11 @@ Readable.prototype.removeAllListeners = function(ev) {

function updateReadableListening(self) {
self._readableState.readableListening = self.listenerCount('readable') > 0;

// crude way to check if we should resume
if (self.listenerCount('data') > 0) {
self.resume();
}
}

function nReadingNextTick(self) {
Expand All @@ -872,7 +878,8 @@ Readable.prototype.resume = function() {
if (!state.flowing) {
debug('resume');
// we flow only if there is no one listening
// for readable
// for readable, but we still have to call
// resume()
state.flowing = !state.readableListening;
resume(this, state);
}
Expand Down
61 changes: 61 additions & 0 deletions test/parallel/test-stream-once-readable-pipe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const { Readable, Writable } = require('stream');

// This test ensures that if have 'readable' listener
// on Readable instance it will not disrupt the pipe.

{
let receivedData = '';
const w = new Writable({
write: (chunk, env, callback) => {
receivedData += chunk;
callback();
},
});

const data = ['foo', 'bar', 'baz'];
const r = new Readable({
read: () => {},
});

r.once('readable', common.mustCall());

r.pipe(w);
r.push(data[0]);
r.push(data[1]);
r.push(data[2]);
r.push(null);

w.on('finish', common.mustCall(() => {
assert.strictEqual(receivedData, data.join(''));
}));
}

{
let receivedData = '';
const w = new Writable({
write: (chunk, env, callback) => {
receivedData += chunk;
callback();
},
});

const data = ['foo', 'bar', 'baz'];
const r = new Readable({
read: () => {},
});

r.pipe(w);
r.push(data[0]);
r.push(data[1]);
r.push(data[2]);
r.push(null);
r.once('readable', common.mustCall());

w.on('finish', common.mustCall(() => {
assert.strictEqual(receivedData, data.join(''));
}));
}
7 changes: 4 additions & 3 deletions test/parallel/test-stream-readable-reading-readingMore.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ const Readable = require('stream').Readable;
assert.strictEqual(state.reading, false);
}

const expectedReadingMore = [true, false];
readable.on('readable', common.mustCall(() => {
// 'readable' always gets called before 'end'
// since 'end' hasn't been emitted, more data could be incoming
assert.strictEqual(state.readingMore, true);
// there is only one readingMore scheduled from on('data'),
// after which everything is governed by the .read() call
assert.strictEqual(state.readingMore, expectedReadingMore.shift());

// if the stream has ended, we shouldn't be reading
assert.strictEqual(state.ended, !state.reading);
Expand Down

0 comments on commit 861124a

Please sign in to comment.