Skip to content

Commit

Permalink
stream: support readable/writableHWM for Duplex
Browse files Browse the repository at this point in the history
This commits adds support for readableHighWaterMark and
writableHighWaterMark in Duplex stream, so that they can be set without
accessing the internal state.

Fixes: nodejs#14555
PR-URL: nodejs#14636
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
guymguym authored and mcollina committed Aug 8, 2017
1 parent 4da8b99 commit c3c045a
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 4 deletions.
4 changes: 4 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1752,6 +1752,10 @@ constructor and implement *both* the `readable._read()` and
* `writableObjectMode` {boolean} Defaults to `false`. Sets `objectMode`
for writable side of the stream. Has no effect if `objectMode`
is `true`.
* `readableHighWaterMark` {number} Sets `highWaterMark` for the readable side
of the stream. Has no effect if `highWaterMark` is provided.
* `writableHighWaterMark` {number} Sets `highWaterMark` for the writable side
of the stream. Has no effect if `highWaterMark` is provided.

For example:

Expand Down
18 changes: 16 additions & 2 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,32 @@ function prependListener(emitter, event, fn) {
function ReadableState(options, stream) {
options = options || {};

// Duplex streams are both readable and writable, but share
// the same options object.
// However, some cases require setting options to different
// values for the readable and the writable sides of the duplex stream.
// These options can be provided separately as readableXXX and writableXXX.
var isDuplex = stream instanceof Stream.Duplex;

// object stream flag. Used to make read(n) ignore n and to
// make all the buffer merging and length checks go away
this.objectMode = !!options.objectMode;

if (stream instanceof Stream.Duplex)
if (isDuplex)
this.objectMode = this.objectMode || !!options.readableObjectMode;

// the point at which it stops calling _read() to fill the buffer
// Note: 0 is a valid value, means "don't call _read preemptively ever"
var hwm = options.highWaterMark;
var readableHwm = options.readableHighWaterMark;
var defaultHwm = this.objectMode ? 16 : 16 * 1024;
this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;

if (hwm || hwm === 0)
this.highWaterMark = hwm;
else if (isDuplex && (readableHwm || readableHwm === 0))
this.highWaterMark = readableHwm;
else
this.highWaterMark = defaultHwm;

// cast to ints.
this.highWaterMark = Math.floor(this.highWaterMark);
Expand Down
18 changes: 16 additions & 2 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,33 @@ function nop() {}
function WritableState(options, stream) {
options = options || {};

// Duplex streams are both readable and writable, but share
// the same options object.
// However, some cases require setting options to different
// values for the readable and the writable sides of the duplex stream.
// These options can be provided separately as readableXXX and writableXXX.
var isDuplex = stream instanceof Stream.Duplex;

// object stream flag to indicate whether or not this stream
// contains buffers or objects.
this.objectMode = !!options.objectMode;

if (stream instanceof Stream.Duplex)
if (isDuplex)
this.objectMode = this.objectMode || !!options.writableObjectMode;

// the point at which write() starts returning false
// Note: 0 is a valid value, means that we always return false if
// the entire buffer is not flushed immediately on write()
var hwm = options.highWaterMark;
var writableHwm = options.writableHighWaterMark;
var defaultHwm = this.objectMode ? 16 : 16 * 1024;
this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;

if (hwm || hwm === 0)
this.highWaterMark = hwm;
else if (isDuplex && (writableHwm || writableHwm === 0))
this.highWaterMark = writableHwm;
else
this.highWaterMark = defaultHwm;

// cast to ints.
this.highWaterMark = Math.floor(this.highWaterMark);
Expand Down
71 changes: 71 additions & 0 deletions test/parallel/test-stream-transform-split-highwatermark.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
'use strict';
require('../common');
const assert = require('assert');

const { Transform, Readable, Writable } = require('stream');

const DEFAULT = 16 * 1024;

function testTransform(expectedReadableHwm, expectedWritableHwm, options) {
const t = new Transform(options);
assert.strictEqual(t._readableState.highWaterMark, expectedReadableHwm);
assert.strictEqual(t._writableState.highWaterMark, expectedWritableHwm);
}

// test overriding defaultHwm
testTransform(666, DEFAULT, { readableHighWaterMark: 666 });
testTransform(DEFAULT, 777, { writableHighWaterMark: 777 });
testTransform(666, 777, {
readableHighWaterMark: 666,
writableHighWaterMark: 777,
});

// test 0 overriding defaultHwm
testTransform(0, DEFAULT, { readableHighWaterMark: 0 });
testTransform(DEFAULT, 0, { writableHighWaterMark: 0 });

// test highWaterMark overriding
testTransform(555, 555, {
highWaterMark: 555,
readableHighWaterMark: 666,
});
testTransform(555, 555, {
highWaterMark: 555,
writableHighWaterMark: 777,
});
testTransform(555, 555, {
highWaterMark: 555,
readableHighWaterMark: 666,
writableHighWaterMark: 777,
});

// test highWaterMark = 0 overriding
testTransform(0, 0, {
highWaterMark: 0,
readableHighWaterMark: 666,
});
testTransform(0, 0, {
highWaterMark: 0,
writableHighWaterMark: 777,
});
testTransform(0, 0, {
highWaterMark: 0,
readableHighWaterMark: 666,
writableHighWaterMark: 777,
});

// test undefined, null, NaN
[undefined, null, NaN].forEach((v) => {
testTransform(DEFAULT, DEFAULT, { readableHighWaterMark: v });
testTransform(DEFAULT, DEFAULT, { writableHighWaterMark: v });
testTransform(666, DEFAULT, { highWaterMark: v, readableHighWaterMark: 666 });
testTransform(DEFAULT, 777, { highWaterMark: v, writableHighWaterMark: 777 });
});

// test non Duplex streams ignore the options
{
const r = new Readable({ readableHighWaterMark: 666 });
assert.strictEqual(r._readableState.highWaterMark, DEFAULT);
const w = new Writable({ writableHighWaterMark: 777 });
assert.strictEqual(w._writableState.highWaterMark, DEFAULT);
}

0 comments on commit c3c045a

Please sign in to comment.