From d016b9d70897b7702e7862252d768ecdde89bc48 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 25 Jan 2020 15:35:38 +0100 Subject: [PATCH] stream: finished callback for closed streams Previously finished(stream, cb) would not invoke the callback for streams that have already finished, ended or errored before being passed to finished(stream, cb). PR-URL: https://github.com/nodejs/node/pull/31509 Refs: https://github.com/nodejs/node/pull/31508 Reviewed-By: Matteo Collina Reviewed-By: James M Snell Reviewed-By: Rich Trott --- lib/_stream_readable.js | 3 + lib/_stream_writable.js | 3 + lib/internal/streams/async_iterator.js | 10 --- lib/internal/streams/destroy.js | 9 ++ lib/internal/streams/end-of-stream.js | 30 ++++++- test/parallel/test-stream-finished.js | 117 +++++++++++++++++++++++++ 6 files changed, 158 insertions(+), 14 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 4c64fd1b4fb346..30c808efd5cacb 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -148,6 +148,9 @@ function ReadableState(options, stream, isDuplex) { // Indicates whether the stream has errored. this.errored = false; + // Indicates whether the stream has finished destroying. + this.closed = false; + // Crypto is kind of old and crusty. Historically, its default string // encoding is 'binary' so we have to make this configurable. // Everything else in the universe uses 'utf8', though. diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 854da3ef058e94..c42760c2b25e63 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -175,6 +175,9 @@ function WritableState(options, stream, isDuplex) { // is disabled we need a way to tell whether the stream has failed. this.errored = false; + // Indicates whether the stream has finished destroying. + this.closed = false; + // Count buffered requests this.bufferedRequestCount = 0; diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index 1df891d30c4dc4..7e2a5ed56b90fc 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -67,16 +67,6 @@ function finish(self, err) { return new Promise((resolve, reject) => { const stream = self[kStream]; - // TODO(ronag): Remove this check once finished() handles - // already ended and/or destroyed streams. - const ended = stream.destroyed || stream.readableEnded || - (stream._readableState && stream._readableState.endEmitted); - - if (ended) { - resolve(createIterResult(undefined, true)); - return; - } - finished(stream, (err) => { if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { reject(err); diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 8837af2d717781..a5c74e3f158915 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -48,6 +48,13 @@ function destroy(err, cb) { } } + if (w) { + w.closed = true; + } + if (r) { + r.closed = true; + } + if (cb) { // Invoke callback before scheduling emitClose so that callback // can schedule before. @@ -101,6 +108,7 @@ function undestroy() { const w = this._writableState; if (r) { + r.closed = false; r.destroyed = false; r.errored = false; r.reading = false; @@ -110,6 +118,7 @@ function undestroy() { } if (w) { + w.closed = false; w.destroyed = false; w.errored = false; w.ended = false; diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index d6e1c5804eaf4a..fcbca7d21a5350 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -32,6 +32,8 @@ function isWritableFinished(stream) { return wState.finished || (wState.ended && wState.length === 0); } +function nop() {} + function eos(stream, opts, callback) { if (arguments.length === 2) { callback = opts; @@ -52,12 +54,15 @@ function eos(stream, opts, callback) { let writable = opts.writable || (opts.writable !== false && isWritable(stream)); + const wState = stream._writableState; + const rState = stream._readableState; + const onlegacyfinish = () => { if (!stream.writable) onfinish(); }; let writableFinished = stream.writableFinished || - (stream._writableState && stream._writableState.finished); + (rState && rState.finished); const onfinish = () => { writable = false; writableFinished = true; @@ -65,7 +70,7 @@ function eos(stream, opts, callback) { }; let readableEnded = stream.readableEnded || - (stream._readableState && stream._readableState.endEmitted); + (rState && rState.endEmitted); const onend = () => { readable = false; readableEnded = true; @@ -79,7 +84,7 @@ function eos(stream, opts, callback) { const onclose = () => { let err; if (readable && !readableEnded) { - if (!stream._readableState || !stream._readableState.ended) + if (!rState || !rState.ended) err = new ERR_STREAM_PREMATURE_CLOSE(); return callback.call(stream, err); } @@ -99,7 +104,7 @@ function eos(stream, opts, callback) { stream.on('abort', onclose); if (stream.req) onrequest(); else stream.on('request', onrequest); - } else if (writable && !stream._writableState) { // legacy streams + } else if (writable && !wState) { // legacy streams stream.on('end', onlegacyfinish); stream.on('close', onlegacyfinish); } @@ -114,7 +119,24 @@ function eos(stream, opts, callback) { if (opts.error !== false) stream.on('error', onerror); stream.on('close', onclose); + const closed = (wState && wState.closed) || (rState && rState.closed) || + (wState && wState.errorEmitted) || (rState && rState.errorEmitted) || + (wState && wState.finished) || (rState && rState.endEmitted) || + (rState && stream.req && stream.aborted); + + if (closed) { + // TODO(ronag): Re-throw error if errorEmitted? + // TODO(ronag): Throw premature close as if finished was called? + // before being closed? i.e. if closed but not errored, ended or finished. + // TODO(ronag): Throw some kind of error? Does it make sense + // to call finished() on a "finished" stream? + process.nextTick(() => { + callback(); + }); + } + return function() { + callback = nop; stream.removeListener('aborted', onclose); stream.removeListener('complete', onfinish); stream.removeListener('abort', onclose); diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index f6515a01b8d077..e866ba3d740fcc 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -215,3 +215,120 @@ const { promisify } = require('util'); w.end('asd'); w.destroy(); } + +function testClosed(factory) { + { + // If already destroyed but finished is cancelled in same tick + // don't invoke the callback, + + const s = factory(); + s.destroy(); + const dispose = finished(s, common.mustNotCall()); + dispose(); + } + + { + // If already destroyed invoked callback. + + const s = factory(); + s.destroy(); + finished(s, common.mustCall()); + } + + { + // Don't invoke until destroy has completed. + + let destroyed = false; + const s = factory({ + destroy(err, cb) { + setImmediate(() => { + destroyed = true; + cb(); + }); + } + }); + s.destroy(); + finished(s, common.mustCall(() => { + assert.strictEqual(destroyed, true); + })); + } + + { + // Invoke callback even if close is inhibited. + + const s = factory({ + emitClose: false, + destroy(err, cb) { + cb(); + finished(s, common.mustCall()); + } + }); + s.destroy(); + } + + { + // Invoke with deep async. + + const s = factory({ + destroy(err, cb) { + setImmediate(() => { + cb(); + setImmediate(() => { + finished(s, common.mustCall()); + }); + }); + } + }); + s.destroy(); + } +} + +testClosed((opts) => new Readable({ ...opts })); +testClosed((opts) => new Writable({ write() {}, ...opts })); + +{ + const w = new Writable({ + write(chunk, encoding, cb) { + cb(); + }, + autoDestroy: false + }); + w.end('asd'); + process.nextTick(() => { + finished(w, common.mustCall()); + }); +} + +{ + const w = new Writable({ + write(chunk, encoding, cb) { + cb(new Error()); + }, + autoDestroy: false + }); + w.write('asd'); + w.on('error', common.mustCall(() => { + finished(w, common.mustCall()); + })); +} + + +{ + const r = new Readable({ + autoDestroy: false + }); + r.push(null); + r.resume(); + r.on('end', common.mustCall(() => { + finished(r, common.mustCall()); + })); +} + +{ + const rs = fs.createReadStream(__filename, { autoClose: false }); + rs.resume(); + rs.on('close', common.mustNotCall()); + rs.on('end', common.mustCall(() => { + finished(rs, common.mustCall()); + })); +}