diff --git a/lib/_http_client.js b/lib/_http_client.js index 67e14fae20ad42..956dd8da7ab00f 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -30,6 +30,7 @@ const { _checkIsHttpToken: checkIsHttpToken, debug, freeParser, + httpSocketSetup, parsers, HTTPParser, prepareError, @@ -39,7 +40,7 @@ const Agent = require('_http_agent'); const { Buffer } = require('buffer'); const { defaultTriggerAsyncIdScope } = require('internal/async_hooks'); const { URL, urlToOptions, searchParamsSymbol } = require('internal/url'); -const { kOutHeaders, kNeedDrain } = require('internal/http'); +const { kOutHeaders, ondrain } = require('internal/http'); const { connResetException, codes } = require('internal/errors'); const { ERR_HTTP_HEADERS_SENT, @@ -334,14 +335,6 @@ function emitAbortNT() { this.emit('abort'); } -function ondrain() { - const msg = this._httpMessage; - if (msg && !msg.finished && msg[kNeedDrain]) { - msg[kNeedDrain] = false; - msg.emit('drain'); - } -} - function socketCloseListener() { const socket = this; const req = socket._httpMessage; @@ -659,6 +652,9 @@ function tickOnSocket(req, socket) { socket.parser = parser; socket._httpMessage = req; + // Setup "drain" propagation. + httpSocketSetup(socket); + // Propagate headers limit from request object to parser if (typeof req.maxHeadersCount === 'number') { parser.maxHeaderPairs = req.maxHeadersCount << 1; @@ -670,7 +666,6 @@ function tickOnSocket(req, socket) { socket.on('data', socketOnData); socket.on('end', socketOnEnd); socket.on('close', socketCloseListener); - socket.on('drain', ondrain); if ( req.timeout !== undefined || diff --git a/lib/_http_common.js b/lib/_http_common.js index c271ec54eb3ab2..b3aeb0721ae58e 100644 --- a/lib/_http_common.js +++ b/lib/_http_common.js @@ -31,6 +31,7 @@ const { methods, HTTPParser } = internalBinding('http_parser') : internalBinding('http_parser_llhttp'); const FreeList = require('internal/freelist'); +const { ondrain } = require('internal/http'); const incoming = require('_http_incoming'); const { IncomingMessage, @@ -200,6 +201,12 @@ function freeParser(parser, req, socket) { } } + +function httpSocketSetup(socket) { + socket.removeListener('drain', ondrain); + socket.on('drain', ondrain); +} + const tokenRegExp = /^[\^_`a-zA-Z\-0-9!#$%&'*+.|~]+$/; /** * Verifies that the given val is a valid HTTP token @@ -246,6 +253,7 @@ module.exports = { CRLF: '\r\n', debug, freeParser, + httpSocketSetup, methods, parsers, kIncomingMessage, diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index 32a51d120bad2b..b09cb611bdf662 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -27,7 +27,7 @@ const { getDefaultHighWaterMark } = require('internal/streams/state'); const assert = require('internal/assert'); const Stream = require('stream'); const internalUtil = require('internal/util'); -const { kOutHeaders, utcDate, kNeedDrain } = require('internal/http'); +const { kOutHeaders, utcDate } = require('internal/http'); const { Buffer } = require('buffer'); const common = require('_http_common'); const checkIsHttpToken = common._checkIsHttpToken; @@ -97,7 +97,6 @@ function OutgoingMessage() { this._contentLength = null; this._hasBody = true; this._trailer = ''; - this[kNeedDrain] = false; this.finished = false; this._headerSent = false; @@ -592,10 +591,7 @@ Object.defineProperty(OutgoingMessage.prototype, 'writableEnded', { const crlf_buf = Buffer.from('\r\n'); OutgoingMessage.prototype.write = function write(chunk, encoding, callback) { - const ret = write_(this, chunk, encoding, callback, false); - if (!ret) - this[kNeedDrain] = true; - return ret; + return write_(this, chunk, encoding, callback, false); }; function write_(msg, chunk, encoding, callback, fromEnd) { @@ -802,8 +798,8 @@ OutgoingMessage.prototype._flush = function _flush() { if (this.finished) { // This is a queue to the server or client to bring in the next this. this._finish(); - } else if (ret && this[kNeedDrain]) { - this[kNeedDrain] = false; + } else if (ret) { + // This is necessary to prevent https from breaking this.emit('drain'); } } diff --git a/lib/_http_server.js b/lib/_http_server.js index 2c48ab9191846b..b083983c944ebe 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -32,6 +32,7 @@ const { CRLF, continueExpression, chunkExpression, + httpSocketSetup, kIncomingMessage, HTTPParser, _checkInvalidHeaderChar: checkInvalidHeaderChar, @@ -40,7 +41,7 @@ const { const { OutgoingMessage } = require('_http_outgoing'); const { kOutHeaders, - kNeedDrain, + ondrain, nowDate, emitStatistics } = require('internal/http'); @@ -358,6 +359,8 @@ function connectionListener(socket) { function connectionListenerInternal(server, socket) { debug('SERVER new http connection'); + httpSocketSetup(socket); + // Ensure that the server property of the socket is correctly set. // See https://github.com/nodejs/node/issues/13435 if (socket.server === null) @@ -452,12 +455,6 @@ function socketOnDrain(socket, state) { socket.parser.resume(); socket.resume(); } - - const msg = socket._httpMessage; - if (msg && !msg.finished && msg[kNeedDrain]) { - msg[kNeedDrain] = false; - msg.emit('drain'); - } } function socketOnTimeout() { @@ -584,6 +581,7 @@ function onParserExecuteCommon(server, socket, parser, state, ret, d) { socket.removeListener('end', state.onEnd); socket.removeListener('close', state.onClose); socket.removeListener('drain', state.onDrain); + socket.removeListener('drain', ondrain); socket.removeListener('error', socketOnError); socket.removeListener('timeout', socketOnTimeout); unconsume(parser, socket); diff --git a/lib/internal/http.js b/lib/internal/http.js index fcfd2c91f22d33..440d8afa2d3182 100644 --- a/lib/internal/http.js +++ b/lib/internal/http.js @@ -28,6 +28,10 @@ function resetCache() { utcCache = undefined; } +function ondrain() { + if (this._httpMessage) this._httpMessage.emit('drain'); +} + class HttpRequestTiming extends PerformanceEntry { constructor(statistics) { super(); @@ -46,7 +50,7 @@ function emitStatistics(statistics) { module.exports = { kOutHeaders: Symbol('kOutHeaders'), - kNeedDrain: Symbol('kNeedDrain'), + ondrain, nowDate, utcDate, emitStatistics