Skip to content

Commit

Permalink
stream: improve tee perf by reduce ReflectConstruct usages
Browse files Browse the repository at this point in the history
also added more webstream creation benchmarks

PR-URL: nodejs#49546
Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
rluvaton authored and alexfernandez committed Nov 1, 2023
1 parent d67aefd commit 6a5796a
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 35 deletions.
68 changes: 60 additions & 8 deletions benchmark/webstreams/creation.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,99 @@
const common = require('../common.js');
const {
ReadableStream,
ReadableStreamDefaultReader,
ReadableStreamBYOBReader,
TransformStream,
WritableStream,
} = require('node:stream/web');
const assert = require('assert');

const bench = common.createBenchmark(main, {
n: [50e3],
kind: ['ReadableStream', 'TransformStream', 'WritableStream'],
kind: [
'ReadableStream',
'TransformStream',
'WritableStream',

'ReadableStreamDefaultReader',
'ReadableStreamBYOBReader',

'ReadableStream.tee',
],
});

let rs, ws, ts;
let readableStream;
let transformStream;
let writableStream;
let readableStreamDefaultReader;
let readableStreamBYOBReader;
let teeResult;

function main({ n, kind }) {
switch (kind) {
case 'ReadableStream':
bench.start();
for (let i = 0; i < n; ++i)
rs = new ReadableStream();
readableStream = new ReadableStream();
bench.end(n);

// Avoid V8 deadcode (elimination)
assert.ok(rs);
assert.ok(readableStream);
break;
case 'WritableStream':
bench.start();
for (let i = 0; i < n; ++i)
ws = new WritableStream();
writableStream = new WritableStream();
bench.end(n);

// Avoid V8 deadcode (elimination)
assert.ok(ws);
assert.ok(writableStream);
break;
case 'TransformStream':
bench.start();
for (let i = 0; i < n; ++i)
ts = new TransformStream();
transformStream = new TransformStream();
bench.end(n);

// Avoid V8 deadcode (elimination)
assert.ok(transformStream);
break;
case 'ReadableStreamDefaultReader': {
const readers = Array.from({ length: n }, () => new ReadableStream());

bench.start();
for (let i = 0; i < n; ++i)
readableStreamDefaultReader = new ReadableStreamDefaultReader(readers[i]);
bench.end(n);

// Avoid V8 deadcode (elimination)
assert.ok(readableStreamDefaultReader);
break;
}
case 'ReadableStreamBYOBReader': {
const readers = Array.from({ length: n }, () => new ReadableStream({ type: 'bytes' }));

bench.start();
for (let i = 0; i < n; ++i)
readableStreamBYOBReader = new ReadableStreamBYOBReader(readers[i]);
bench.end(n);

// Avoid V8 deadcode (elimination)
assert.ok(readableStreamBYOBReader);
break;
}
case 'ReadableStream.tee': {
const streams = Array.from({ length: n }, () => new ReadableStream());

bench.start();
for (let i = 0; i < n; ++i)
teeResult = streams[i].tee();
bench.end(n);

// Avoid V8 deadcode (elimination)
assert.ok(ts);
assert.ok(teeResult);
break;
}
default:
throw new Error('Invalid kind');
}
Expand Down
61 changes: 34 additions & 27 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -1199,34 +1199,41 @@ ObjectDefineProperties(ReadableByteStreamController.prototype, {
[SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableByteStreamController.name),
});

function TeeReadableStream(start, pull, cancel) {
markTransferMode(this, false, true);
this[kType] = 'ReadableStream';
this[kState] = {
disturbed: false,
state: 'readable',
storedError: undefined,
stream: undefined,
transfer: {
writable: undefined,
port: undefined,
promise: undefined,
},
};
this[kIsClosedPromise] = createDeferredPromise();
setupReadableStreamDefaultControllerFromSource(
this,
ObjectCreate(null, {
start: { __proto__: null, value: start },
pull: { __proto__: null, value: pull },
cancel: { __proto__: null, value: cancel },
}),
1,
() => 1);
}

ObjectSetPrototypeOf(TeeReadableStream.prototype, ReadableStream.prototype);
ObjectSetPrototypeOf(TeeReadableStream, ReadableStream);

function createTeeReadableStream(start, pull, cancel) {
return ReflectConstruct(
function() {
markTransferMode(this, false, true);
this[kType] = 'ReadableStream';
this[kState] = {
disturbed: false,
state: 'readable',
storedError: undefined,
stream: undefined,
transfer: {
writable: undefined,
port: undefined,
promise: undefined,
},
};
this[kIsClosedPromise] = createDeferredPromise();
setupReadableStreamDefaultControllerFromSource(
this,
ObjectCreate(null, {
start: { __proto__: null, value: start },
pull: { __proto__: null, value: pull },
cancel: { __proto__: null, value: cancel },
}),
1,
() => 1);
}, [], ReadableStream,
);
const tee = new TeeReadableStream(start, pull, cancel);

// For spec compliance the Tee must be a ReadableStream
tee.constructor = ReadableStream;
return tee;
}

const isReadableStream =
Expand Down

0 comments on commit 6a5796a

Please sign in to comment.