From 4861ad6431ab52f5ffaf3a11e45ff7d0bebdccde Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun, 15 Oct 2023 21:31:08 +0300 Subject: [PATCH] stream: call helper function from push and unshift PR-URL: https://github.com/nodejs/node/pull/50173 Reviewed-By: Matteo Collina Reviewed-By: Robert Nagy Reviewed-By: Benjamin Gruenbaum --- lib/internal/streams/readable.js | 193 ++++++++++++++++++++++--------- 1 file changed, 140 insertions(+), 53 deletions(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index f551053bf7b79c..7ceb83d3f20523 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -284,77 +284,164 @@ Readable.prototype[SymbolAsyncDispose] = function() { // similar to how Writable.write() returns true if you should // write() some more. Readable.prototype.push = function(chunk, encoding) { - return readableAddChunk(this, chunk, encoding, false); + debug('push', chunk); + + const state = this._readableState; + return (state[kState] & kObjectMode) === 0 ? + readableAddChunkPushByteMode(this, state, chunk, encoding) : + readableAddChunkPushObjectMode(this, state, chunk, encoding); }; // Unshift should *always* be something directly out of read(). Readable.prototype.unshift = function(chunk, encoding) { - return readableAddChunk(this, chunk, encoding, true); + debug('unshift', chunk); + const state = this._readableState; + return (state[kState] & kObjectMode) === 0 ? + readableAddChunkUnshiftByteMode(this, state, chunk, encoding) : + readableAddChunkUnshiftObjectMode(this, state, chunk); }; -function readableAddChunk(stream, chunk, encoding, addToFront) { - debug('readableAddChunk', chunk); - const state = stream._readableState; - let err; - if ((state[kState] & kObjectMode) === 0) { - if (typeof chunk === 'string') { - encoding = encoding || state.defaultEncoding; - if (state.encoding !== encoding) { - if (addToFront && state.encoding) { - // When unshifting, if state.encoding is set, we have to save - // the string in the BufferList with the state encoding. - chunk = Buffer.from(chunk, encoding).toString(state.encoding); - } else { - chunk = Buffer.from(chunk, encoding); - encoding = ''; - } +function readableAddChunkUnshiftByteMode(stream, state, chunk, encoding) { + if (chunk === null) { + state[kState] &= ~kReading; + onEofChunk(stream, state); + + return false; + } + + if (typeof chunk === 'string') { + encoding = encoding || state.defaultEncoding; + if (state.encoding !== encoding) { + if (state.encoding) { + // When unshifting, if state.encoding is set, we have to save + // the string in the BufferList with the state encoding. + chunk = Buffer.from(chunk, encoding).toString(state.encoding); + } else { + chunk = Buffer.from(chunk, encoding); } - } else if (chunk instanceof Buffer) { - encoding = ''; - } else if (Stream._isUint8Array(chunk)) { - chunk = Stream._uint8ArrayToBuffer(chunk); - encoding = ''; - } else if (chunk != null) { - err = new ERR_INVALID_ARG_TYPE( - 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk); } + } else if (Stream._isUint8Array(chunk)) { + chunk = Stream._uint8ArrayToBuffer(chunk); + } else if (chunk !== undefined && !(chunk instanceof Buffer)) { + errorOrDestroy(stream, new ERR_INVALID_ARG_TYPE( + 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk)); + return false; } - if (err) { - errorOrDestroy(stream, err); - } else if (chunk === null) { + + if (!(chunk && chunk.length > 0)) { + return canPushMore(state); + } + + return readableAddChunkUnshiftValue(stream, state, chunk); +} + +function readableAddChunkUnshiftObjectMode(stream, state, chunk) { + if (chunk === null) { state[kState] &= ~kReading; onEofChunk(stream, state); - } else if (((state[kState] & kObjectMode) !== 0) || (chunk && chunk.length > 0)) { - if (addToFront) { - if ((state[kState] & kEndEmitted) !== 0) - errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); - else if (state.destroyed || state.errored) - return false; - else - addChunk(stream, state, chunk, true); - } else if (state.ended) { - errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); - } else if (state.destroyed || state.errored) { - return false; - } else { - state[kState] &= ~kReading; - if (state.decoder && !encoding) { - chunk = state.decoder.write(chunk); - if (state.objectMode || chunk.length !== 0) - addChunk(stream, state, chunk, false); - else - maybeReadMore(stream, state); - } else { - addChunk(stream, state, chunk, false); - } + + return false; + } + + return readableAddChunkUnshiftValue(stream, state, chunk); +} + +function readableAddChunkUnshiftValue(stream, state, chunk) { + if ((state[kState] & kEndEmitted) !== 0) + errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); + else if (state.destroyed || state.errored) + return false; + else + addChunk(stream, state, chunk, true); + + return canPushMore(state); +} + +function readableAddChunkPushByteMode(stream, state, chunk, encoding) { + if (chunk === null) { + state[kState] &= ~kReading; + onEofChunk(stream, state); + + return false; + } + + if (typeof chunk === 'string') { + encoding = encoding || state.defaultEncoding; + if (state.encoding !== encoding) { + chunk = Buffer.from(chunk, encoding); + encoding = ''; } - } else if (!addToFront) { + } else if (chunk instanceof Buffer) { + encoding = ''; + } else if (Stream._isUint8Array(chunk)) { + chunk = Stream._uint8ArrayToBuffer(chunk); + encoding = ''; + } else if (chunk !== undefined) { + errorOrDestroy(stream, new ERR_INVALID_ARG_TYPE( + 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk)); + return false; + } + + if (!chunk || chunk.length <= 0) { state[kState] &= ~kReading; maybeReadMore(stream, state); + + return canPushMore(state); + } + + if (state.ended) { + errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); + + return false; + } + + if (state.destroyed || state.errored) { + return false; + } + + state[kState] &= ~kReading; + if (state.decoder && !encoding) { + chunk = state.decoder.write(chunk); + if (chunk.length === 0) { + maybeReadMore(stream, state); + + return canPushMore(state); + } } + addChunk(stream, state, chunk, false); + return canPushMore(state); +} + +function readableAddChunkPushObjectMode(stream, state, chunk, encoding) { + if (chunk === null) { + state[kState] &= ~kReading; + onEofChunk(stream, state); + + return false; + } + + if (state.ended) { + errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); + return false; + } + + if (state.destroyed || state.errored) { + return false; + } + + state[kState] &= ~kReading; + if (state.decoder && !encoding) { + chunk = state.decoder.write(chunk); + } + + addChunk(stream, state, chunk, false); + return canPushMore(state); +} + +function canPushMore(state) { // We can push more data if we are below the highWaterMark. // Also, if we have no data yet, we can stand some more bytes. // This is to work around cases where hwm=0, such as the repl.