Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): make Readable.from performance better #37609

Closed
wants to merge 11 commits into from
27 changes: 27 additions & 0 deletions benchmark/streams/readable-from.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
'use strict';

const common = require('../common');
const Readable = require('stream').Readable;

const bench = common.createBenchmark(main, {
n: [1e7],
});

async function main({ n }) {
const arr = [];
for (let i = 0; i < n; i++) {
arr.push(`${i}`);
}

const s = new Readable.from(arr);

bench.start();
s.on('data', (data) => {
// eslint-disable-next-line no-unused-expressions
data;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why "data" is here?

});
s.on('close', () => {
bench.end(n);
});

wwwzbwcom marked this conversation as resolved.
Show resolved Hide resolved
}
49 changes: 30 additions & 19 deletions lib/internal/streams/from.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

const {
PromisePrototypeThen,
PromiseResolve,
SymbolAsyncIterator,
SymbolIterator
SymbolIterator,
} = primordials;
const { Buffer } = require('buffer');

Expand All @@ -25,18 +26,22 @@ function from(Readable, iterable, opts) {
});
}

if (iterable && iterable[SymbolAsyncIterator])
let isAsync = false;
if (iterable && iterable[SymbolAsyncIterator]) {
isAsync = true;
iterator = iterable[SymbolAsyncIterator]();
else if (iterable && iterable[SymbolIterator])
} else if (iterable && iterable[SymbolIterator]) {
isAsync = false;
iterator = iterable[SymbolIterator]();
else
} else {
throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
}

const readable = new Readable({
objectMode: true,
highWaterMark: 1,
// TODO(ronag): What options should be allowed?
...opts
...opts,
});

// Flag to protect against _read
Expand Down Expand Up @@ -75,23 +80,29 @@ function from(Readable, iterable, opts) {
}

async function next() {
try {
const { value, done } = await iterator.next();
if (done) {
readable.push(null);
} else {
const res = await value;
if (res === null) {
reading = false;
throw new ERR_STREAM_NULL_VALUES();
} else if (readable.push(res)) {
next();
for (;;) {
try {
const { value, done } = isAsync ?
await iterator.next() :
iterator.next();

if (done) {
readable.push(null);
} else {
reading = false;
const res = PromiseResolve(value) === value ? await value : value;
wwwzbwcom marked this conversation as resolved.
Show resolved Hide resolved
if (res === null) {
reading = false;
throw new ERR_STREAM_NULL_VALUES();
} else if (readable.push(res)) {
continue;
} else {
reading = false;
}
}
} catch (err) {
readable.destroy(err);
}
} catch (err) {
readable.destroy(err);
break;
}
}
return readable;
Expand Down