Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: improve readable stream read perf #50173

Merged
merged 1 commit into from
Oct 15, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 140 additions & 53 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -284,77 +284,164 @@ Readable.prototype[SymbolAsyncDispose] = function() {
// similar to how Writable.write() returns true if you should
// write() some more.
Readable.prototype.push = function(chunk, encoding) {
return readableAddChunk(this, chunk, encoding, false);
debug('push', chunk);

const state = this._readableState;
return (state[kState] & kObjectMode) === 0 ?
readableAddChunkPushByteMode(this, state, chunk, encoding) :
readableAddChunkPushObjectMode(this, state, chunk, encoding);
};

// Unshift should *always* be something directly out of read().
Readable.prototype.unshift = function(chunk, encoding) {
return readableAddChunk(this, chunk, encoding, true);
debug('unshift', chunk);
const state = this._readableState;
return (state[kState] & kObjectMode) === 0 ?
readableAddChunkUnshiftByteMode(this, state, chunk, encoding) :
readableAddChunkUnshiftObjectMode(this, state, chunk);
};

function readableAddChunk(stream, chunk, encoding, addToFront) {
debug('readableAddChunk', chunk);
const state = stream._readableState;

let err;
if ((state[kState] & kObjectMode) === 0) {
if (typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
if (state.encoding !== encoding) {
if (addToFront && state.encoding) {
// When unshifting, if state.encoding is set, we have to save
// the string in the BufferList with the state encoding.
chunk = Buffer.from(chunk, encoding).toString(state.encoding);
} else {
chunk = Buffer.from(chunk, encoding);
encoding = '';
}
function readableAddChunkUnshiftByteMode(stream, state, chunk, encoding) {
if (chunk === null) {
state[kState] &= ~kReading;
onEofChunk(stream, state);

return false;
}

if (typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
if (state.encoding !== encoding) {
if (state.encoding) {
// When unshifting, if state.encoding is set, we have to save
// the string in the BufferList with the state encoding.
chunk = Buffer.from(chunk, encoding).toString(state.encoding);
} else {
chunk = Buffer.from(chunk, encoding);
}
} else if (chunk instanceof Buffer) {
encoding = '';
} else if (Stream._isUint8Array(chunk)) {
chunk = Stream._uint8ArrayToBuffer(chunk);
encoding = '';
} else if (chunk != null) {
err = new ERR_INVALID_ARG_TYPE(
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
}
} else if (Stream._isUint8Array(chunk)) {
chunk = Stream._uint8ArrayToBuffer(chunk);
} else if (chunk !== undefined && !(chunk instanceof Buffer)) {
errorOrDestroy(stream, new ERR_INVALID_ARG_TYPE(
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk));
return false;
}

if (err) {
errorOrDestroy(stream, err);
} else if (chunk === null) {

if (!(chunk && chunk.length > 0)) {
return canPushMore(state);
}

return readableAddChunkUnshiftValue(stream, state, chunk);
}

function readableAddChunkUnshiftObjectMode(stream, state, chunk) {
if (chunk === null) {
state[kState] &= ~kReading;
onEofChunk(stream, state);
} else if (((state[kState] & kObjectMode) !== 0) || (chunk && chunk.length > 0)) {
if (addToFront) {
if ((state[kState] & kEndEmitted) !== 0)
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
else if (state.destroyed || state.errored)
return false;
else
addChunk(stream, state, chunk, true);
} else if (state.ended) {
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
} else if (state.destroyed || state.errored) {
return false;
} else {
state[kState] &= ~kReading;
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
if (state.objectMode || chunk.length !== 0)
addChunk(stream, state, chunk, false);
else
maybeReadMore(stream, state);
} else {
addChunk(stream, state, chunk, false);
}

return false;
}

return readableAddChunkUnshiftValue(stream, state, chunk);
}

function readableAddChunkUnshiftValue(stream, state, chunk) {
if ((state[kState] & kEndEmitted) !== 0)
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
else if (state.destroyed || state.errored)
return false;
else
addChunk(stream, state, chunk, true);

return canPushMore(state);
}

function readableAddChunkPushByteMode(stream, state, chunk, encoding) {
if (chunk === null) {
state[kState] &= ~kReading;
onEofChunk(stream, state);

return false;
}

if (typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
if (state.encoding !== encoding) {
chunk = Buffer.from(chunk, encoding);
encoding = '';
}
} else if (!addToFront) {
} else if (chunk instanceof Buffer) {
encoding = '';
} else if (Stream._isUint8Array(chunk)) {
chunk = Stream._uint8ArrayToBuffer(chunk);
encoding = '';
} else if (chunk !== undefined) {
errorOrDestroy(stream, new ERR_INVALID_ARG_TYPE(
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk));
return false;
}

if (!chunk || chunk.length <= 0) {
state[kState] &= ~kReading;
maybeReadMore(stream, state);

return canPushMore(state);
}

if (state.ended) {
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());

return false;
}

if (state.destroyed || state.errored) {
return false;
}

state[kState] &= ~kReading;
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
if (chunk.length === 0) {
maybeReadMore(stream, state);

return canPushMore(state);
}
}

addChunk(stream, state, chunk, false);
return canPushMore(state);
}

function readableAddChunkPushObjectMode(stream, state, chunk, encoding) {
if (chunk === null) {
state[kState] &= ~kReading;
onEofChunk(stream, state);

return false;
}

if (state.ended) {
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
return false;
}

if (state.destroyed || state.errored) {
return false;
}

state[kState] &= ~kReading;
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
}

addChunk(stream, state, chunk, false);
return canPushMore(state);
}

function canPushMore(state) {
// We can push more data if we are below the highWaterMark.
// Also, if we have no data yet, we can stand some more bytes.
// This is to work around cases where hwm=0, such as the repl.
Expand Down