Skip to content

Commit 38b4a3f

Browse files
committed
stream: writable state bitmap
1 parent c19b2a7 commit 38b4a3f

File tree

1 file changed

+133
-59
lines changed

1 file changed

+133
-59
lines changed

lib/internal/streams/writable.js

+133-59
Original file line numberDiff line numberDiff line change
@@ -73,27 +73,35 @@ ObjectSetPrototypeOf(Writable, Stream);
7373
function nop() {}
7474

7575
const kOnFinished = Symbol('kOnFinished');
76+
const kErrored = Symbol('kErrored');
77+
const kCorkedValue = Symbol('kCorked');
78+
79+
const kCorked = 0b111111; // 6 bits
80+
const kObjectMode = 1 << 7;
81+
const kEnded = 1 << 8;
82+
const kConstructed = 1 << 9;
83+
const kSync = 1 << 10;
84+
const kErrorEmitted = 1 << 11;
85+
const kEmitClose = 1 << 12;
86+
const kAutoDestroy = 1 << 13;
87+
const kDestroyed = 1 << 14;
88+
const kClosed = 1 << 15;
89+
const kCloseEmitted = 1 << 16;
90+
const kFinalCalled = 1 << 17;
91+
const kNeedDrain = 1 << 18;
92+
const kEnding = 1 << 19;
93+
const kFinished = 1 << 20;
94+
const kDecodeStrings = 1 << 21;
95+
const kWriting = 1 << 22;
96+
const kBufferProcessing = 1 << 23;
97+
const kPrefinished = 1 << 24;
98+
const kAllBuffers = 1 << 25;
99+
const kAllNoop = 1 << 26;
100+
const kHasOnFinished = 1 << 27;
101+
const kHasErrored = 1 << 28;
102+
const kHasWritable = 1 << 29;
103+
const kWritable = 1 << 30;
76104

77-
const kObjectMode = 1 << 0;
78-
const kEnded = 1 << 1;
79-
const kConstructed = 1 << 2;
80-
const kSync = 1 << 3;
81-
const kErrorEmitted = 1 << 4;
82-
const kEmitClose = 1 << 5;
83-
const kAutoDestroy = 1 << 6;
84-
const kDestroyed = 1 << 7;
85-
const kClosed = 1 << 8;
86-
const kCloseEmitted = 1 << 9;
87-
const kFinalCalled = 1 << 10;
88-
const kNeedDrain = 1 << 11;
89-
const kEnding = 1 << 12;
90-
const kFinished = 1 << 13;
91-
const kDecodeStrings = 1 << 14;
92-
const kWriting = 1 << 15;
93-
const kBufferProcessing = 1 << 16;
94-
const kPrefinished = 1 << 17;
95-
const kAllBuffers = 1 << 18;
96-
const kAllNoop = 1 << 19;
97105

98106
// TODO(benjamingr) it is likely slower to do it this way than with free functions
99107
function makeBitMapDescriptor(bit) {
@@ -176,6 +184,58 @@ ObjectDefineProperties(WritableState.prototype, {
176184

177185
allBuffers: makeBitMapDescriptor(kAllBuffers),
178186
allNoop: makeBitMapDescriptor(kAllNoop),
187+
188+
// Indicates whether the stream has errored. When true all write() calls
189+
// should return false. This is needed since when autoDestroy
190+
// is disabled we need a way to tell whether the stream has failed.
191+
// This is/should be a cold path.
192+
errored: {
193+
enumerable: false,
194+
get() { return (this.state & kHasErrored) !== 0 ? this[kErrored] : null; },
195+
set(value) {
196+
if (value) {
197+
this[kErrored] = value;
198+
this.state |= kHasErrored;
199+
} else {
200+
this.state &= ~kHasErrored;
201+
}
202+
},
203+
},
204+
205+
206+
writable: {
207+
enumerable: false,
208+
get() { return (this.state & kHasWritable) !== 0 ? (this.state & kWritable) !== 0 : null; },
209+
set(value) {
210+
if (value == null) {
211+
this.state &= (kHasWritable | kWritable);
212+
} else if (value) {
213+
this.state |= (kHasWritable | kWritable);
214+
} else {
215+
this.state |= kHasWritable;
216+
this.state &= ~kWritable;
217+
}
218+
},
219+
},
220+
221+
// When true all writes will be buffered until .uncork() call.
222+
// This is/should be a cold path.
223+
corked: {
224+
enumerable: false,
225+
get() {
226+
const corked = this.state & kCorked;
227+
return corked !== kCorked ? val : this[kCorkedValue];
228+
},
229+
set(value) {
230+
if (value < kCorked) {
231+
this.state &= ~kCorked;
232+
this.state |= value;
233+
} else {
234+
this.state |= kCorked
235+
this[kCorkedValue] = value;
236+
}
237+
},
238+
},
179239
});
180240

181241
function WritableState(options, stream, isDuplex) {
@@ -226,9 +286,6 @@ function WritableState(options, stream, isDuplex) {
226286
// socket or file.
227287
this.length = 0;
228288

229-
// When true all writes will be buffered until .uncork() call.
230-
this.corked = 0;
231-
232289
// The callback that's passed to _write(chunk, cb).
233290
this.onwrite = onwrite.bind(undefined, stream);
234291

@@ -247,13 +304,6 @@ function WritableState(options, stream, isDuplex) {
247304
// Number of pending user-supplied write callbacks
248305
// this must be 0 before 'finish' can be emitted.
249306
this.pendingcb = 0;
250-
251-
// Indicates whether the stream has errored. When true all write() calls
252-
// should return false. This is needed since when autoDestroy
253-
// is disabled we need a way to tell whether the stream has failed.
254-
this.errored = null;
255-
256-
this[kOnFinished] = [];
257307
}
258308

259309
function resetBuffer(state) {
@@ -394,17 +444,32 @@ Writable.prototype.write = function(chunk, encoding, cb) {
394444
};
395445

396446
Writable.prototype.cork = function() {
397-
this._writableState.corked++;
447+
const state = this._writableState;
448+
449+
const corked = (state & kCorked) + 1;
450+
if (corked < kCorked) {
451+
state.state += 1;
452+
} else {
453+
state.corked++;
454+
}
398455
};
399456

400457
Writable.prototype.uncork = function() {
401458
const state = this._writableState;
402459

403-
if (state.corked) {
460+
if ((state.state & kCorked) === 0) {
461+
return
462+
}
463+
464+
const corked = state & kCorked;
465+
if (corked < kCorked) {
466+
state.state -= 1;
467+
} else {
404468
state.corked--;
469+
}
405470

406-
if ((state.state & kWriting) === 0)
407-
clearBuffer(this, state);
471+
if ((state.state & kWriting) === 0) {
472+
clearBuffer(this, state);
408473
}
409474
};
410475

@@ -432,7 +497,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
432497
if (!ret)
433498
state.state |= kNeedDrain;
434499

435-
if ((state.state & kWriting) !== 0 || state.corked || state.errored || (state.state & kConstructed) === 0) {
500+
if ((state.state & kWriting) !== 0 || (state.state & (kHasErrored | kCorked)) || (state.state & kConstructed) === 0) {
436501
state.buffered.push({ chunk, encoding, callback });
437502
if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') {
438503
state.state &= ~kAllBuffers;
@@ -450,7 +515,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
450515

451516
// Return false if errored or destroyed in order to break
452517
// any synchronous while(stream.write(data)) loops.
453-
return ret && !state.errored && (state.state & kDestroyed) === 0;
518+
return ret && (state.state & kHasErrored) === 0 && (state.state & kDestroyed) === 0;
454519
}
455520

456521
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
@@ -498,7 +563,7 @@ function onwrite(stream, er) {
498563
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
499564
er.stack; // eslint-disable-line no-unused-expressions
500565

501-
if (!state.errored) {
566+
if ((state.state & kHasErrored) === 0) {
502567
state.errored = er;
503568
}
504569

@@ -573,18 +638,19 @@ function errorBuffer(state) {
573638
callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
574639
}
575640

576-
const onfinishCallbacks = state[kOnFinished].splice(0);
577-
for (let i = 0; i < onfinishCallbacks.length; i++) {
578-
onfinishCallbacks[i](state.errored ?? new ERR_STREAM_DESTROYED('end'));
641+
if ((state.state & kHasOnFinished) !== 0) {
642+
const onfinishCallbacks = state[kOnFinished].splice(0);
643+
for (let i = 0; i < onfinishCallbacks.length; i++) {
644+
onfinishCallbacks[i](state.errored ?? new ERR_STREAM_DESTROYED('end'));
645+
}
579646
}
580647

581648
resetBuffer(state);
582649
}
583650

584651
// If there's something in the buffer waiting, then process it.
585652
function clearBuffer(stream, state) {
586-
if (state.corked ||
587-
(state.state & (kDestroyed | kBufferProcessing)) !== 0 ||
653+
if ((state.state & (kDestroyed | kBufferProcessing | kCorked)) !== 0 ||
588654
(state.state & kConstructed) === 0) {
589655
return;
590656
}
@@ -669,14 +735,16 @@ Writable.prototype.end = function(chunk, encoding, cb) {
669735
}
670736

671737
// .end() fully uncorks.
672-
if (state.corked) {
673-
state.corked = 1;
674-
this.uncork();
738+
if ((state.state & kCorked) !== 0) {
739+
state.state &= ~kCorked;
740+
if ((state.state & kWriting) === 0) {
741+
clearBuffer(this, state);
742+
}
675743
}
676744

677745
if (err) {
678746
// Do nothing...
679-
} else if (!state.errored && (state.state & kEnding) === 0) {
747+
} else if ((state.state & kErrored) === 0 && (state.state & kEnding) === 0) {
680748
// This is forgiving in terms of unnecessary calls to end() and can hide
681749
// logic errors. However, usually such errors are harmless and causing a
682750
// hard error can be disproportionately destructive. It is not always
@@ -698,6 +766,8 @@ Writable.prototype.end = function(chunk, encoding, cb) {
698766
} else if ((state.state & kFinished) !== 0) {
699767
process.nextTick(cb, null);
700768
} else {
769+
state.state |= kHasOnFinished;
770+
state[kOnFinished] ??= [];
701771
state[kOnFinished].push(cb);
702772
}
703773
}
@@ -715,10 +785,10 @@ function needFinish(state) {
715785
kFinished |
716786
kWriting |
717787
kErrorEmitted |
718-
kCloseEmitted
788+
kCloseEmitted |
789+
kHasErrored
719790
)) === (kEnding | kConstructed) &&
720791
state.length === 0 &&
721-
!state.errored &&
722792
state.buffered.length === 0);
723793
}
724794

@@ -734,9 +804,11 @@ function callFinal(stream, state) {
734804

735805
state.pendingcb--;
736806
if (err) {
737-
const onfinishCallbacks = state[kOnFinished].splice(0);
738-
for (let i = 0; i < onfinishCallbacks.length; i++) {
739-
onfinishCallbacks[i](err);
807+
if ((state.state & kHasOnFinished) !== 0) {
808+
const onfinishCallbacks = state[kOnFinished].splice(0);
809+
for (let i = 0; i < onfinishCallbacks.length; i++) {
810+
onfinishCallbacks[i](err);
811+
}
740812
}
741813
errorOrDestroy(stream, err, (state.state & kSync) !== 0);
742814
} else if (needFinish(state)) {
@@ -799,9 +871,11 @@ function finish(stream, state) {
799871
state.pendingcb--;
800872
state.state |= kFinished;
801873

802-
const onfinishCallbacks = state[kOnFinished].splice(0);
803-
for (let i = 0; i < onfinishCallbacks.length; i++) {
804-
onfinishCallbacks[i](null);
874+
if ((state.state & kHasOnFinished) !== 0) {
875+
const onfinishCallbacks = state[kOnFinished].splice(0);
876+
for (let i = 0; i < onfinishCallbacks.length; i++) {
877+
onfinishCallbacks[i](null);
878+
}
805879
}
806880

807881
stream.emit('finish');
@@ -853,8 +927,8 @@ ObjectDefineProperties(Writable.prototype, {
853927
// where the writable side was disabled upon construction.
854928
// Compat. The user might manually disable writable side through
855929
// deprecated setter.
856-
return !!w && w.writable !== false && !w.errored &&
857-
(w.state & (kEnding | kEnded | kDestroyed)) === 0;
930+
return !!w && w.writable !== false &&
931+
(w.state & (kEnding | kEnded | kDestroyed | kHasErrored)) === 0;
858932
},
859933
set(val) {
860934
// Backwards compatible.
@@ -928,7 +1002,7 @@ ObjectDefineProperties(Writable.prototype, {
9281002
__proto__: null,
9291003
enumerable: false,
9301004
get() {
931-
return this._writableState ? this._writableState.errored : null;
1005+
return this._writableState && (this._writableState.state & kHasErrored) !== 0 ? this._writableState.errored : null;
9321006
},
9331007
},
9341008

@@ -938,7 +1012,7 @@ ObjectDefineProperties(Writable.prototype, {
9381012
get: function() {
9391013
return !!(
9401014
this._writableState.writable !== false &&
941-
((this._writableState.state & kDestroyed) !== 0 || this._writableState.errored) &&
1015+
(this._writableState.state & (kDestroyed | kHasErrored)) !== 0 &&
9421016
(this._writableState.state & kFinished) === 0
9431017
);
9441018
},
@@ -952,7 +1026,7 @@ Writable.prototype.destroy = function(err, cb) {
9521026
// Invoke pending callbacks.
9531027
if ((state.state & kDestroyed) === 0 &&
9541028
(state.bufferedIndex < state.buffered.length ||
955-
state[kOnFinished].length)) {
1029+
(((state.state & kHasOnFinished) !== 0) && state[kOnFinished].length))) {
9561030
process.nextTick(errorBuffer, state);
9571031
}
9581032

0 commit comments

Comments
 (0)