Skip to content

Commit 9529f21

Browse files
committed
stream: writable state bitmap
1 parent c19b2a7 commit 9529f21

File tree

1 file changed

+88
-35
lines changed

1 file changed

+88
-35
lines changed

lib/internal/streams/writable.js

+88-35
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ ObjectSetPrototypeOf(Writable, Stream);
7373
function nop() {}
7474

7575
const kOnFinished = Symbol('kOnFinished');
76+
const kErrored = Symbol('kErrored');
77+
const kCorked = Symbol('kCorked');
7678

7779
const kObjectMode = 1 << 0;
7880
const kEnded = 1 << 1;
@@ -94,6 +96,12 @@ const kBufferProcessing = 1 << 16;
9496
const kPrefinished = 1 << 17;
9597
const kAllBuffers = 1 << 18;
9698
const kAllNoop = 1 << 19;
99+
const kHasOnFinished = 1 << 20;
100+
const kHasErrored = 1 << 21;
101+
102+
const kCorkedShift = 22;
103+
const kCorkedMask = 0b1111
104+
const kCorked = kCorkedMask << kCorkedShift; // 4 bits
97105

98106
// TODO(benjamingr) it is likely slower to do it this way than with free functions
99107
function makeBitMapDescriptor(bit) {
@@ -176,6 +184,46 @@ ObjectDefineProperties(WritableState.prototype, {
176184

177185
allBuffers: makeBitMapDescriptor(kAllBuffers),
178186
allNoop: makeBitMapDescriptor(kAllNoop),
187+
188+
// Indicates whether the stream has errored. When true all write() calls
189+
// should return false. This is needed since when autoDestroy
190+
// is disabled we need a way to tell whether the stream has failed.
191+
// This is/should be a cold path.
192+
errored: {
193+
enumerable: false,
194+
get() { return (this.state & kHasErrored) !== 0 ? this[kErrored] : null; },
195+
set(value) {
196+
if (value) {
197+
this[kErrored] = value;
198+
this.state |= kHasErrored;
199+
} else {
200+
delete this[kErrored];
201+
this.state &= ~kHasErrored;
202+
}
203+
},
204+
},
205+
206+
// When true all writes will be buffered until .uncork() call.
207+
// This is/should be a cold path.
208+
corked: {
209+
enumerable: false,
210+
get() {
211+
const val = (this.state >>> kCorkedShift) & kCorkedMask;
212+
return val < kCorkedMask ? val : this[kCorked];
213+
},
214+
set(value) {
215+
if (value >= kCorkedMask) {
216+
this[kCorked] = value;
217+
this.state |= kCorkedMask << kCorkedShift;
218+
} else {
219+
if ((this.state >>> kCorkedShift) === kCorkedMask) {
220+
delete this[kCorked];
221+
}
222+
this.state &= ~kCorked;
223+
this.state |= value << kCorkedShift;
224+
}
225+
},
226+
}
179227
});
180228

181229
function WritableState(options, stream, isDuplex) {
@@ -226,9 +274,6 @@ function WritableState(options, stream, isDuplex) {
226274
// socket or file.
227275
this.length = 0;
228276

229-
// When true all writes will be buffered until .uncork() call.
230-
this.corked = 0;
231-
232277
// The callback that's passed to _write(chunk, cb).
233278
this.onwrite = onwrite.bind(undefined, stream);
234279

@@ -247,13 +292,6 @@ function WritableState(options, stream, isDuplex) {
247292
// Number of pending user-supplied write callbacks
248293
// this must be 0 before 'finish' can be emitted.
249294
this.pendingcb = 0;
250-
251-
// Indicates whether the stream has errored. When true all write() calls
252-
// should return false. This is needed since when autoDestroy
253-
// is disabled we need a way to tell whether the stream has failed.
254-
this.errored = null;
255-
256-
this[kOnFinished] = [];
257295
}
258296

259297
function resetBuffer(state) {
@@ -394,13 +432,21 @@ Writable.prototype.write = function(chunk, encoding, cb) {
394432
};
395433

396434
Writable.prototype.cork = function() {
397-
this._writableState.corked++;
435+
const state = this._writableState;
436+
437+
const corked = ((state & kCorked) >>> kCorkedShift) + 1;
438+
if (corked < kCorkedMask) {
439+
this.state |= corked << kCorkedShift;
440+
} else {
441+
this._writableState.corked++;
442+
}
398443
};
399444

400445
Writable.prototype.uncork = function() {
401446
const state = this._writableState;
402447

403-
if (state.corked) {
448+
if ((state.state & kCorked) !== 0) {
449+
// TODO: Optimize
404450
state.corked--;
405451

406452
if ((state.state & kWriting) === 0)
@@ -432,7 +478,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
432478
if (!ret)
433479
state.state |= kNeedDrain;
434480

435-
if ((state.state & kWriting) !== 0 || state.corked || state.errored || (state.state & kConstructed) === 0) {
481+
if ((state.state & kWriting) !== 0 || (state.state & (kHasErrored | kCorked)) || (state.state & kConstructed) === 0) {
436482
state.buffered.push({ chunk, encoding, callback });
437483
if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') {
438484
state.state &= ~kAllBuffers;
@@ -450,7 +496,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
450496

451497
// Return false if errored or destroyed in order to break
452498
// any synchronous while(stream.write(data)) loops.
453-
return ret && !state.errored && (state.state & kDestroyed) === 0;
499+
return ret && (state.state & kHasErrored) === 0 && (state.state & kDestroyed) === 0;
454500
}
455501

456502
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
@@ -498,7 +544,7 @@ function onwrite(stream, er) {
498544
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
499545
er.stack; // eslint-disable-line no-unused-expressions
500546

501-
if (!state.errored) {
547+
if ((state.state & kHasErrored) === 0) {
502548
state.errored = er;
503549
}
504550

@@ -573,18 +619,19 @@ function errorBuffer(state) {
573619
callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
574620
}
575621

576-
const onfinishCallbacks = state[kOnFinished].splice(0);
577-
for (let i = 0; i < onfinishCallbacks.length; i++) {
578-
onfinishCallbacks[i](state.errored ?? new ERR_STREAM_DESTROYED('end'));
622+
if ((state.state & kHasOnFinished) !== 0) {
623+
const onfinishCallbacks = state[kOnFinished].splice(0);
624+
for (let i = 0; i < onfinishCallbacks.length; i++) {
625+
onfinishCallbacks[i](state.errored ?? new ERR_STREAM_DESTROYED('end'));
626+
}
579627
}
580628

581629
resetBuffer(state);
582630
}
583631

584632
// If there's something in the buffer waiting, then process it.
585633
function clearBuffer(stream, state) {
586-
if (state.corked ||
587-
(state.state & (kDestroyed | kBufferProcessing)) !== 0 ||
634+
if ((state.state & (kDestroyed | kBufferProcessing | kCorked)) !== 0 ||
588635
(state.state & kConstructed) === 0) {
589636
return;
590637
}
@@ -669,14 +716,14 @@ Writable.prototype.end = function(chunk, encoding, cb) {
669716
}
670717

671718
// .end() fully uncorks.
672-
if (state.corked) {
719+
if ((state.state & kCorked) !== 0) {
673720
state.corked = 1;
674721
this.uncork();
675722
}
676723

677724
if (err) {
678725
// Do nothing...
679-
} else if (!state.errored && (state.state & kEnding) === 0) {
726+
} else if ((state.state & kErrored) === 0 && (state.state & kEnding) === 0) {
680727
// This is forgiving in terms of unnecessary calls to end() and can hide
681728
// logic errors. However, usually such errors are harmless and causing a
682729
// hard error can be disproportionately destructive. It is not always
@@ -698,6 +745,8 @@ Writable.prototype.end = function(chunk, encoding, cb) {
698745
} else if ((state.state & kFinished) !== 0) {
699746
process.nextTick(cb, null);
700747
} else {
748+
state.state |= kHasOnFinished;
749+
state[kOnFinished] ??= [];
701750
state[kOnFinished].push(cb);
702751
}
703752
}
@@ -715,10 +764,10 @@ function needFinish(state) {
715764
kFinished |
716765
kWriting |
717766
kErrorEmitted |
718-
kCloseEmitted
767+
kCloseEmitted |
768+
kHasErrored
719769
)) === (kEnding | kConstructed) &&
720770
state.length === 0 &&
721-
!state.errored &&
722771
state.buffered.length === 0);
723772
}
724773

@@ -734,9 +783,11 @@ function callFinal(stream, state) {
734783

735784
state.pendingcb--;
736785
if (err) {
737-
const onfinishCallbacks = state[kOnFinished].splice(0);
738-
for (let i = 0; i < onfinishCallbacks.length; i++) {
739-
onfinishCallbacks[i](err);
786+
if ((state.state & kHasOnFinished) !== 0) {
787+
const onfinishCallbacks = state[kOnFinished].splice(0);
788+
for (let i = 0; i < onfinishCallbacks.length; i++) {
789+
onfinishCallbacks[i](err);
790+
}
740791
}
741792
errorOrDestroy(stream, err, (state.state & kSync) !== 0);
742793
} else if (needFinish(state)) {
@@ -799,9 +850,11 @@ function finish(stream, state) {
799850
state.pendingcb--;
800851
state.state |= kFinished;
801852

802-
const onfinishCallbacks = state[kOnFinished].splice(0);
803-
for (let i = 0; i < onfinishCallbacks.length; i++) {
804-
onfinishCallbacks[i](null);
853+
if ((state.state & kHasOnFinished) !== 0) {
854+
const onfinishCallbacks = state[kOnFinished].splice(0);
855+
for (let i = 0; i < onfinishCallbacks.length; i++) {
856+
onfinishCallbacks[i](null);
857+
}
805858
}
806859

807860
stream.emit('finish');
@@ -853,8 +906,8 @@ ObjectDefineProperties(Writable.prototype, {
853906
// where the writable side was disabled upon construction.
854907
// Compat. The user might manually disable writable side through
855908
// deprecated setter.
856-
return !!w && w.writable !== false && !w.errored &&
857-
(w.state & (kEnding | kEnded | kDestroyed)) === 0;
909+
return !!w && w.writable !== false &&
910+
(w.state & (kEnding | kEnded | kDestroyed | kHasErrored)) === 0;
858911
},
859912
set(val) {
860913
// Backwards compatible.
@@ -928,7 +981,7 @@ ObjectDefineProperties(Writable.prototype, {
928981
__proto__: null,
929982
enumerable: false,
930983
get() {
931-
return this._writableState ? this._writableState.errored : null;
984+
return this._writableState && (this._writableState.state & kHasErrored) !== 0 ? this._writableState.errored : null;
932985
},
933986
},
934987

@@ -938,7 +991,7 @@ ObjectDefineProperties(Writable.prototype, {
938991
get: function() {
939992
return !!(
940993
this._writableState.writable !== false &&
941-
((this._writableState.state & kDestroyed) !== 0 || this._writableState.errored) &&
994+
(this._writableState.state & (kDestroyed | kHasErrored)) !== 0 &&
942995
(this._writableState.state & kFinished) === 0
943996
);
944997
},
@@ -952,7 +1005,7 @@ Writable.prototype.destroy = function(err, cb) {
9521005
// Invoke pending callbacks.
9531006
if ((state.state & kDestroyed) === 0 &&
9541007
(state.bufferedIndex < state.buffered.length ||
955-
state[kOnFinished].length)) {
1008+
(((state.state & kHasOnFinished) !== 0) && state[kOnFinished].length))) {
9561009
process.nextTick(errorBuffer, state);
9571010
}
9581011

0 commit comments

Comments
 (0)