diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 528137418d42db..2e9f6526f14819 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -30,6 +30,7 @@ const { isNodeStream, willEmitClose: _willEmitClose, } = require('internal/streams/utils'); +const console = require('console'); function isRequest(stream) { return stream.setHeader && typeof stream.abort === 'function'; @@ -52,7 +53,7 @@ function eos(stream, options, callback) { callback = once(callback); const readable = options.readable || - (options.readable !== false && isReadableNodeStream(stream)); + (options.readable !== false && isReadableNodeStream(stream, true)); const writable = options.writable || (options.writable !== false && isWritableNodeStream(stream)); @@ -75,7 +76,7 @@ function eos(stream, options, callback) { // this generic check. let willEmitClose = ( _willEmitClose(stream) && - isReadableNodeStream(stream) === readable && + isReadableNodeStream(stream, true) === readable && isWritableNodeStream(stream) === writable ); diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 854daae9de41b8..d5c4f57d447142 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -9,11 +9,16 @@ const { const kDestroyed = Symbol('kDestroyed'); const kIsDisturbed = Symbol('kIsDisturbed'); -function isReadableNodeStream(obj) { +function isReadableNodeStream(obj, strict = false) { return !!( obj && typeof obj.pipe === 'function' && typeof obj.on === 'function' && + ( + !strict || + typeof obj.read === 'function' || + (typeof obj.pause === 'function' && typeof obj.resume === 'function') + ) && (!obj._writableState || obj._readableState?.readable !== false) && // Duplex (!obj._writableState || obj._readableState) // Writable has .pipe. ); diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index 51838a382ee3bb..28406de519e41c 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -643,3 +643,19 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); const s = new Stream(); finished(s, common.mustNotCall()); } + +{ + const server = http.createServer(common.mustCall(function (req, res) { + fs.createReadStream(__filename).pipe(res) + finished(res, common.mustCall(function (err) { + if (err) { + throw err + } + })) + })).listen(0, function () { + http.request({ method: 'GET', port: this.address().port }, common.mustCall(function (res) { + res.resume() + server.close() + })).end() + }) +}