diff --git a/lib/_http_client.js b/lib/_http_client.js index 6fb5dd65cb368c..782b8e9714fe81 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -430,25 +430,13 @@ function socketCloseListener() { req.destroyed = true; if (res) { // Socket closed before we emitted 'end' below. - // TOOD(ronag): res.destroy(err) if (!res.complete) { - res.aborted = true; - res.emit('aborted'); - if (res.listenerCount('error') > 0) { - res.emit('error', connResetException('aborted')); - } + res.destroy(connResetException('aborted')); } req._closed = true; req.emit('close'); if (!res.aborted && res.readable) { - res.on('end', function() { - this.destroyed = true; - this.emit('close'); - }); res.push(null); - } else { - res.destroyed = true; - res.emit('close'); } } else { if (!req.socket._hadError) { @@ -697,7 +685,6 @@ function responseKeepAlive(req) { req.destroyed = true; if (req.res) { - req.res.destroyed = true; // Detach socket from IncomingMessage to avoid destroying the freed // socket in IncomingMessage.destroy(). req.res.socket = null; @@ -752,10 +739,6 @@ function requestOnPrefinish() { function emitFreeNT(req) { req._closed = true; req.emit('close'); - if (req.res) { - req.res.emit('close'); - } - if (req.socket) { req.socket.emit('free'); } diff --git a/lib/_http_incoming.js b/lib/_http_incoming.js index 7943c69f54d911..22ab591b7ad78c 100644 --- a/lib/_http_incoming.js +++ b/lib/_http_incoming.js @@ -27,7 +27,7 @@ const { Symbol } = primordials; -const Stream = require('stream'); +const { Readable, finished } = require('stream'); const kHeaders = Symbol('kHeaders'); const kHeadersCount = Symbol('kHeadersCount'); @@ -54,7 +54,7 @@ function IncomingMessage(socket) { }; } - Stream.Readable.call(this, { autoDestroy: false, ...streamOptions }); + Readable.call(this, streamOptions); this._readableState.readingMore = true; @@ -89,8 +89,8 @@ function IncomingMessage(socket) { // read by the user, so there's no point continuing to handle it. this._dumped = false; } -ObjectSetPrototypeOf(IncomingMessage.prototype, Stream.Readable.prototype); -ObjectSetPrototypeOf(IncomingMessage, Stream.Readable); +ObjectSetPrototypeOf(IncomingMessage.prototype, Readable.prototype); +ObjectSetPrototypeOf(IncomingMessage, Readable); ObjectDefineProperty(IncomingMessage.prototype, 'connection', { get: function() { @@ -160,18 +160,30 @@ IncomingMessage.prototype._read = function _read(n) { readStart(this.socket); }; - // It's possible that the socket will be destroyed, and removed from // any messages, before ever calling this. In that case, just skip // it, since something else is destroying this connection anyway. -IncomingMessage.prototype.destroy = function destroy(error) { - // TODO(ronag): Implement in terms of _destroy - this.destroyed = true; - if (this.socket) - this.socket.destroy(error); - return this; -}; +IncomingMessage.prototype._destroy = function _destroy(err, cb) { + if (!this.readableEnded || !this.complete) { + this.aborted = true; + this.emit('aborted'); + } + // If aborted and the underlying socket is not already destroyed, + // destroy it. + // We have to check if the socket is already destroyed because finished + // does not call the callback when this methdod is invoked from `_http_client` + // in `test/parallel/test-http-client-spurious-aborted.js` + if (this.socket && !this.socket.destroyed && this.aborted) { + this.socket.destroy(err); + const cleanup = finished(this.socket, (e) => { + cleanup(); + onError(this, e || err, cb); + }); + } else { + onError(this, err, cb); + } +}; IncomingMessage.prototype._addHeaderLines = _addHeaderLines; function _addHeaderLines(headers, n) { @@ -349,6 +361,16 @@ IncomingMessage.prototype._dump = function _dump() { } }; +function onError(self, error, cb) { + // This is to keep backward compatible behavior. + // An error is emitted only if there are listeners attached to the event. + if (self.listenerCount('error') === 0) { + cb(); + } else { + cb(error); + } +} + module.exports = { IncomingMessage, readStart, diff --git a/lib/_http_server.js b/lib/_http_server.js index 419b08a7a0986d..8cd10bb3a00194 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -575,14 +575,7 @@ function socketOnClose(socket, state) { function abortIncoming(incoming) { while (incoming.length) { const req = incoming.shift(); - // TODO(ronag): req.destroy(err) - req.aborted = true; - req.destroyed = true; - req.emit('aborted'); - if (req.listenerCount('error') > 0) { - req.emit('error', connResetException('aborted')); - } - req.emit('close'); + req.destroy(connResetException('aborted')); } // Abort socket._httpMessage ? } @@ -741,14 +734,9 @@ function clearIncoming(req) { if (parser && parser.incoming === req) { if (req.readableEnded) { parser.incoming = null; - req.destroyed = true; - req.emit('close'); } else { req.on('end', clearIncoming); } - } else { - req.destroyed = true; - req.emit('close'); } } diff --git a/test/parallel/test-http-client-incomingmessage-destroy.js b/test/parallel/test-http-client-incomingmessage-destroy.js new file mode 100644 index 00000000000000..a0823d37786365 --- /dev/null +++ b/test/parallel/test-http-client-incomingmessage-destroy.js @@ -0,0 +1,25 @@ +'use strict'; + +const common = require('../common'); +const { createServer, get } = require('http'); +const assert = require('assert'); + +const server = createServer(common.mustCall((req, res) => { + res.writeHead(200); + res.write('Part of res.'); +})); + +function onUncaught(error) { + assert.strictEqual(error.message, 'Destroy test'); + server.close(); +} + +process.on('uncaughtException', common.mustCall(onUncaught)); + +server.listen(0, () => { + get({ + port: server.address().port + }, common.mustCall((res) => { + res.destroy(new Error('Destroy test')); + })); +}); diff --git a/test/parallel/test-http-server-incomingmessage-destroy.js b/test/parallel/test-http-server-incomingmessage-destroy.js new file mode 100644 index 00000000000000..cfe7e4feecba45 --- /dev/null +++ b/test/parallel/test-http-server-incomingmessage-destroy.js @@ -0,0 +1,25 @@ +'use strict'; + +const common = require('../common'); +const { createServer, get } = require('http'); +const assert = require('assert'); + +const server = createServer(common.mustCall((req, res) => { + req.destroy(new Error('Destroy test')); +})); + +function onUncaught(error) {} + +process.on('uncaughtException', common.mustNotCall(onUncaught)); + +server.listen(0, common.mustCall(() => { + get({ + port: server.address().port + }, (res) => { + res.resume(); + }).on('error', (error) => { + assert.strictEqual(error.message, 'socket hang up'); + assert.strictEqual(error.code, 'ECONNRESET'); + server.close(); + }); +}));