From 586ec48e5ff7b488199ef0d861658c727de4202a Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 25 Oct 2023 23:55:49 +0200 Subject: [PATCH] stream: readable use bitmap accessors PR-URL: https://github.com/nodejs/node/pull/50350 Reviewed-By: Benjamin Gruenbaum Reviewed-By: Raz Luvaton --- lib/internal/streams/readable.js | 58 +++++++++++++++++--------------- 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 902709352a34b5..a18c6d32d7b92c 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -431,7 +431,7 @@ function readableAddChunkUnshiftObjectMode(stream, state, chunk) { function readableAddChunkUnshiftValue(stream, state, chunk) { if ((state[kState] & kEndEmitted) !== 0) errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); - else if (state.destroyed || state.errored) + else if ((state[kState] & (kDestroyed | kErrored)) !== 0) return false; else addChunk(stream, state, chunk, true); @@ -608,7 +608,7 @@ function computeNewHighWaterMark(n) { // This function is designed to be inlinable, so please take care when making // changes to the function body. function howMuchToRead(n, state) { - if (n <= 0 || (state.length === 0 && state.ended)) + if (n <= 0 || (state.length === 0 && (state[kState] & kEnded) !== 0)) return 0; if ((state[kState] & kObjectMode) !== 0) return 1; @@ -652,7 +652,7 @@ Readable.prototype.read = function(n) { state.length >= state.highWaterMark : state.length > 0) || (state[kState] & kEnded) !== 0)) { - debug('read: emitReadable', state.length, (state[kState] & kEnded) !== 0); + debug('read: emitReadable'); if (state.length === 0 && (state[kState] & kEnded) !== 0) endReadable(this); else @@ -810,7 +810,7 @@ function emitReadable(stream) { function emitReadable_(stream) { const state = stream._readableState; debug('emitReadable_'); - if ((state[kState] & (kDestroyed | kErrored)) === 0 && (state.length || state.ended)) { + if ((state[kState] & (kDestroyed | kErrored)) === 0 && (state.length || (state[kState] & kEnded) !== 0)) { stream.emit('readable'); state[kState] &= ~kEmittedReadable; } @@ -891,7 +891,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { const state = this._readableState; if (state.pipes.length === 1) { - if (!state.multiAwaitDrain) { + if ((state[kState] & kMultiAwaitDrain) === 0) { state[kState] |= kMultiAwaitDrain; state.awaitDrainWriters = new SafeSet( state.awaitDrainWriters ? [state.awaitDrainWriters] : [], @@ -907,7 +907,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest !== process.stderr; const endFn = doEnd ? onend : unpipe; - if (state.endEmitted) + if ((state[kState] & kEndEmitted) !== 0) process.nextTick(endFn); else src.once('end', endFn); @@ -966,7 +966,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { if (state.pipes.length === 1 && state.pipes[0] === dest) { debug('false write response, pause', 0); state.awaitDrainWriters = dest; - state.multiAwaitDrain = false; + state[kState] &= ~kMultiAwaitDrain; } else if (state.pipes.length > 1 && state.pipes.includes(dest)) { debug('false write response, pause', state.awaitDrainWriters.size); state.awaitDrainWriters.add(dest); @@ -1038,7 +1038,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { if (dest.writableNeedDrain === true) { pause(); - } else if (!state.flowing) { + } else if ((state[kState] & kFlowing) === 0) { debug('pipe resume'); src.resume(); } @@ -1056,7 +1056,7 @@ function pipeOnDrain(src, dest) { if (state.awaitDrainWriters === dest) { debug('pipeOnDrain', 1); state.awaitDrainWriters = null; - } else if (state.multiAwaitDrain) { + } else if ((state[kState] & kMultiAwaitDrain) !== 0) { debug('pipeOnDrain', state.awaitDrainWriters.size); state.awaitDrainWriters.delete(dest); } @@ -1111,20 +1111,20 @@ Readable.prototype.on = function(ev, fn) { if (ev === 'data') { // Update readableListening so that resume() may be a no-op // a few lines down. This is needed to support once('readable'). - state.readableListening = this.listenerCount('readable') > 0; + state[kState] |= this.listenerCount('readable') > 0 ? kReadableListening : 0; // Try start flowing on next tick if stream isn't explicitly paused. - if (state.flowing !== false) + if ((state[kState] & (kHasFlowing | kFlowing)) !== kHasFlowing) { this.resume(); + } } else if (ev === 'readable') { - if (!state.endEmitted && !state.readableListening) { - state.readableListening = state.needReadable = true; - state.flowing = false; - state.emittedReadable = false; - debug('on readable', state.length, state.reading); + if ((state[kState] & (kEndEmitted | kReadableListening)) === 0) { + state[kState] |= kReadableListening | kNeedReadable | kHasFlowing; + state[kState] &= ~(kFlowing | kEmittedReadable); + debug('on readable'); if (state.length) { emitReadable(this); - } else if (!state.reading) { + } else if ((state[kState] & kReading) === 0) { process.nextTick(nReadingNextTick, this); } } @@ -1171,7 +1171,12 @@ Readable.prototype.removeAllListeners = function(ev) { function updateReadableListening(self) { const state = self._readableState; - state.readableListening = self.listenerCount('readable') > 0; + + if (self.listenerCount('readable') > 0) { + state[kState] |= kReadableListening; + } else { + state[kState] &= ~kReadableListening; + } if ((state[kState] & (kHasPaused | kPaused | kResumeScheduled)) === (kHasPaused | kResumeScheduled)) { // Flowing needs to be set to true now, otherwise @@ -1201,7 +1206,7 @@ Readable.prototype.resume = function() { // for readable, but we still have to call // resume(). state[kState] |= kHasFlowing; - if (!state.readableListening) { + if ((state[kState] & kReadableListening) === 0) { state[kState] |= kFlowing; } else { state[kState] &= ~kFlowing; @@ -1214,8 +1219,8 @@ Readable.prototype.resume = function() { }; function resume(stream, state) { - if (!state.resumeScheduled) { - state.resumeScheduled = true; + if ((state[kState] & kResumeScheduled) === 0) { + state[kState] |= kResumeScheduled; process.nextTick(resume_, stream, state); } } @@ -1236,7 +1241,7 @@ function resume_(stream, state) { Readable.prototype.pause = function() { const state = this._readableState; debug('call pause'); - if (state.flowing !== false) { + if ((state[kState] & (kHasFlowing | kFlowing)) !== kHasFlowing) { debug('pause'); state[kState] |= kHasFlowing; state[kState] &= ~kFlowing; @@ -1651,7 +1656,7 @@ function fromList(n, state) { function endReadable(stream) { const state = stream._readableState; - debug('endReadable', (state[kState] & kEndEmitted) !== 0); + debug('endReadable'); if ((state[kState] & kEndEmitted) === 0) { state[kState] |= kEnded; process.nextTick(endReadableNT, state, stream); @@ -1659,12 +1664,11 @@ function endReadable(stream) { } function endReadableNT(state, stream) { - debug('endReadableNT', state.endEmitted, state.length); + debug('endReadableNT'); // Check that we didn't get one last unshift. - if (!state.errored && !state.closeEmitted && - !state.endEmitted && state.length === 0) { - state.endEmitted = true; + if ((state[kState] & (kErrored | kCloseEmitted | kEndEmitted)) === 0 && state.length === 0) { + state[kState] |= kEndEmitted; stream.emit('end'); if (stream.writable && stream.allowHalfOpen === false) {