From 2c258ab4b81c002e371ee512f9cd195ba0a32ebf Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 21 Dec 2019 12:08:36 +0100 Subject: [PATCH] stream: pipeline should use req.abort() to destroy response destroy(err) on http response will propagate the error to the request causing 'error' to be unexpectedly emitted. Furthermore, response.destroy() unlike request.abort() does not _dump buffered data. Fixes a breaking change introduced in https://github.com/nodejs/node/commit/648088289d619bfb149fe90316ce0127083c4c99. Prefer res.req.abort() over res.destroy() until this situation is clarified. Fixes: https://github.com/nodejs/node/issues/31029 Refs: https://github.com/nodejs/node/commit/648088289d619bfb149fe90316ce0127083c4c99 PR-URL: https://github.com/nodejs/node/pull/31054 Reviewed-By: Luigi Pinca Reviewed-By: Matteo Collina Reviewed-By: Stephen Belanger Reviewed-By: Rich Trott Reviewed-By: Ruben Bridgewater --- lib/internal/streams/pipeline.js | 15 +++--------- test/parallel/test-stream-pipeline.js | 35 ++++++++++++++++++++++++++- 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index ed5556e5d0a600..92a91c30171af1 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -17,7 +17,7 @@ const { } = require('internal/errors').codes; function isRequest(stream) { - return stream.setHeader && typeof stream.abort === 'function'; + return stream && stream.setHeader && typeof stream.abort === 'function'; } function destroyer(stream, reading, writing, callback) { @@ -43,22 +43,13 @@ function destroyer(stream, reading, writing, callback) { // request.destroy just do .end - .abort is what we want if (isRequest(stream)) return stream.abort(); - if (typeof stream.destroy === 'function') { - if (stream.req && stream._writableState === undefined) { - // This is a ClientRequest - // TODO(mcollina): backward compatible fix to avoid crashing. - // Possibly remove in a later semver-major change. - stream.req.on('error', noop); - } - return stream.destroy(err); - } + if (isRequest(stream.req)) return stream.req.abort(); + if (typeof stream.destroy === 'function') return stream.destroy(err); callback(err || new ERR_STREAM_DESTROYED('pipe')); }; } -function noop() {} - function pipe(from, to) { return from.pipe(to); } diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 4a41f053bd0a85..f6ee97ba43d053 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1,7 +1,14 @@ 'use strict'; const common = require('../common'); -const { Stream, Writable, Readable, Transform, pipeline } = require('stream'); +const { + Stream, + Writable, + Readable, + Transform, + pipeline, + PassThrough +} = require('stream'); const assert = require('assert'); const http = require('http'); const { promisify } = require('util'); @@ -483,3 +490,29 @@ const { promisify } = require('util'); { code: 'ERR_INVALID_CALLBACK' } ); } + +{ + const server = http.Server(function(req, res) { + res.write('asd'); + }); + server.listen(0, function() { + http.get({ port: this.address().port }, (res) => { + const stream = new PassThrough(); + + stream.on('error', common.mustCall()); + + pipeline( + res, + stream, + common.mustCall((err) => { + assert.ok(err); + // TODO(ronag): + // assert.strictEqual(err.message, 'oh no'); + server.close(); + }) + ); + + stream.destroy(new Error('oh no')); + }).on('error', common.mustNotCall()); + }); +}