Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "stream: invoke callback before emitting error always" #29741

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -571,8 +571,7 @@ The `writable.write()` method writes some data to the stream, and calls the
supplied `callback` once the data has been fully handled. If an error
occurs, the `callback` *may or may not* be called with the error as its
first argument. To reliably detect write errors, add a listener for the
`'error'` event. If `callback` is called with an error, it will be called
before the `'error'` event is emitted.
`'error'` event.

The return value is `true` if the internal buffer is less than the
`highWaterMark` configured when the stream was created after admitting `chunk`.
Expand Down
37 changes: 16 additions & 21 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,6 @@ function WritableState(options, stream, isDuplex) {
// Should .destroy() be called after 'finish' (and potentially 'end')
this.autoDestroy = !!(options && options.autoDestroy);

// Indicates whether the stream has errored. When true all write() calls
// should return false. This is needed since when autoDestroy
// is disabled we need a way to tell whether the stream has failed.
this.errored = false;

// Count buffered requests
this.bufferedRequestCount = 0;

Expand Down Expand Up @@ -406,7 +401,7 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
if (!ret)
state.needDrain = true;

if (state.writing || state.corked || state.errored) {
if (state.writing || state.corked) {
var last = state.lastBufferedRequest;
state.lastBufferedRequest = {
chunk,
Expand All @@ -425,9 +420,7 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
doWrite(stream, state, false, len, chunk, encoding, cb);
}

// Return false if errored or destroyed in order to break
// any synchronous while(stream.write(data)) loops.
return ret && !state.errored && !state.destroyed;
return ret;
}

function doWrite(stream, state, writev, len, chunk, encoding, cb) {
Expand All @@ -444,11 +437,18 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.sync = false;
}

function onwriteError(stream, state, er, cb) {
function onwriteError(stream, state, sync, er, cb) {
--state.pendingcb;

cb(er);
// This can emit error, but error must always follow cb.
if (sync) {
// Defer the callback if we are being called synchronously
// to avoid piling up things on the stack
process.nextTick(cb, er);
} else {
// The caller expect this to happen before if
// it is async
cb(er);
}
errorOrDestroy(stream, er);
}

Expand All @@ -465,14 +465,9 @@ function onwrite(stream, er) {
state.length -= state.writelen;
state.writelen = 0;

if (er) {
state.errored = true;
if (sync) {
process.nextTick(onwriteError, stream, state, er, cb);
} else {
onwriteError(stream, state, er, cb);
}
} else {
if (er)
onwriteError(stream, state, sync, er, cb);
else {
// Check if we're actually ready to finish, but don't emit yet
var finished = needFinish(state) || stream.destroyed;

Expand Down Expand Up @@ -627,7 +622,7 @@ Object.defineProperty(Writable.prototype, 'writableLength', {
function needFinish(state) {
return (state.ending &&
state.length === 0 &&
!state.errored &&
!state.errorEmitted &&
state.bufferedRequest === null &&
!state.finished &&
!state.writing);
Expand Down
13 changes: 1 addition & 12 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ function destroy(err, cb) {
const r = this._readableState;
const w = this._writableState;

if (w && err) {
w.errored = true;
}

if ((w && w.destroyed) || (r && r.destroyed)) {
if (cb) {
cb(err);
Expand All @@ -54,12 +50,10 @@ function destroy(err, cb) {
this._destroy(err || null, (err) => {
const emitClose = (w && w.emitClose) || (r && r.emitClose);
if (cb) {
// Invoke callback before scheduling emitClose so that callback
// can schedule before.
cb(err);
if (emitClose) {
process.nextTick(emitCloseNT, this);
}
cb(err);
} else if (needError(this, err)) {
process.nextTick(emitClose ? emitErrorCloseNT : emitErrorNT, this, err);
} else if (emitClose) {
Expand Down Expand Up @@ -97,7 +91,6 @@ function undestroy() {

if (w) {
w.destroyed = false;
w.errored = false;
w.ended = false;
w.ending = false;
w.finalCalled = false;
Expand All @@ -117,10 +110,6 @@ function errorOrDestroy(stream, err) {
const r = stream._readableState;
const w = stream._writableState;

if (w & err) {
w.errored = true;
}

if ((r && r.autoDestroy) || (w && w.autoDestroy))
stream.destroy(err);
else if (needError(stream, err))
Expand Down
5 changes: 1 addition & 4 deletions test/parallel/test-http2-reset-flood.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,7 @@ const worker = new Worker(__filename).on('message', common.mustCall((port) => {
h2header.writeIntBE(1, 0, 3); // Length: 1
h2header.writeIntBE(i, 5, 4); // Stream ID
// 0x88 = :status: 200
if (!conn.write(Buffer.concat([h2header, Buffer.from([0x88])]))) {
process.nextTick(writeRequests);
break;
}
conn.write(Buffer.concat([h2header, Buffer.from([0x88])]));
}
}

Expand Down
14 changes: 0 additions & 14 deletions test/parallel/test-stream-writable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,6 @@ const assert = require('assert');
assert.strictEqual(write.destroyed, true);
}

{
const write = new Writable({
write(chunk, enc, cb) {
this.destroy(new Error('asd'));
cb();
}
});

write.on('error', common.mustCall());
write.on('finish', common.mustNotCall());
write.end('asd');
assert.strictEqual(write.destroyed, true);
}

{
const write = new Writable({
write(chunk, enc, cb) { cb(); }
Expand Down
58 changes: 0 additions & 58 deletions test/parallel/test-stream-writable-write-cb-error.js

This file was deleted.

6 changes: 1 addition & 5 deletions test/parallel/test-wrap-js-stream-exceptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,4 @@ const socket = new JSStreamWrap(new Duplex({
})
}));

socket.end('foo');
socket.on('error', common.expectsError({
type: Error,
message: 'write EPROTO'
}));
assert.throws(() => socket.end('foo'), /Error: write EPROTO/);
14 changes: 8 additions & 6 deletions test/parallel/test-zlib-write-after-close.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ const zlib = require('zlib');
zlib.gzip('hello', common.mustCall(function(err, out) {
const unzip = zlib.createGunzip();
unzip.close(common.mustCall());

unzip.write(out);
unzip.on('error', common.expectsError({
code: 'ERR_STREAM_DESTROYED',
type: Error
}));
common.expectsError(
() => unzip.write(out),
{
code: 'ERR_STREAM_DESTROYED',
type: Error,
message: 'Cannot call write after a stream was destroyed'
}
);
}));
Copy link
Member

@ronag ronag Sep 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This updated test is what's causing the failure. Only reverting this part would also make CI pass.