Skip to content

Commit

Permalink
Revert "http: simplify drain()"
Browse files Browse the repository at this point in the history
This reverts commit bdf07f4.

The commit causes somewhat rampant MaxListenerExceeded warnings on
fairly generic npm installs.

Fixes: nodejs#29239
  • Loading branch information
Trott committed Aug 21, 2019
1 parent f70261f commit cefd461
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 26 deletions.
15 changes: 5 additions & 10 deletions lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const {
_checkIsHttpToken: checkIsHttpToken,
debug,
freeParser,
httpSocketSetup,
parsers,
HTTPParser,
prepareError,
Expand All @@ -39,7 +40,7 @@ const Agent = require('_http_agent');
const { Buffer } = require('buffer');
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
const { URL, urlToOptions, searchParamsSymbol } = require('internal/url');
const { kOutHeaders, kNeedDrain } = require('internal/http');
const { kOutHeaders, ondrain } = require('internal/http');
const { connResetException, codes } = require('internal/errors');
const {
ERR_HTTP_HEADERS_SENT,
Expand Down Expand Up @@ -334,14 +335,6 @@ function emitAbortNT() {
this.emit('abort');
}

function ondrain() {
const msg = this._httpMessage;
if (msg && !msg.finished && msg[kNeedDrain]) {
msg[kNeedDrain] = false;
msg.emit('drain');
}
}

function socketCloseListener() {
const socket = this;
const req = socket._httpMessage;
Expand Down Expand Up @@ -659,6 +652,9 @@ function tickOnSocket(req, socket) {
socket.parser = parser;
socket._httpMessage = req;

// Setup "drain" propagation.
httpSocketSetup(socket);

// Propagate headers limit from request object to parser
if (typeof req.maxHeadersCount === 'number') {
parser.maxHeaderPairs = req.maxHeadersCount << 1;
Expand All @@ -670,7 +666,6 @@ function tickOnSocket(req, socket) {
socket.on('data', socketOnData);
socket.on('end', socketOnEnd);
socket.on('close', socketCloseListener);
socket.on('drain', ondrain);

if (
req.timeout !== undefined ||
Expand Down
8 changes: 8 additions & 0 deletions lib/_http_common.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const { methods, HTTPParser } =
internalBinding('http_parser') : internalBinding('http_parser_llhttp');

const FreeList = require('internal/freelist');
const { ondrain } = require('internal/http');
const incoming = require('_http_incoming');
const {
IncomingMessage,
Expand Down Expand Up @@ -200,6 +201,12 @@ function freeParser(parser, req, socket) {
}
}


function httpSocketSetup(socket) {
socket.removeListener('drain', ondrain);
socket.on('drain', ondrain);
}

const tokenRegExp = /^[\^_`a-zA-Z\-0-9!#$%&'*+.|~]+$/;
/**
* Verifies that the given val is a valid HTTP token
Expand Down Expand Up @@ -246,6 +253,7 @@ module.exports = {
CRLF: '\r\n',
debug,
freeParser,
httpSocketSetup,
methods,
parsers,
kIncomingMessage,
Expand Down
12 changes: 4 additions & 8 deletions lib/_http_outgoing.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const { getDefaultHighWaterMark } = require('internal/streams/state');
const assert = require('internal/assert');
const Stream = require('stream');
const internalUtil = require('internal/util');
const { kOutHeaders, utcDate, kNeedDrain } = require('internal/http');
const { kOutHeaders, utcDate } = require('internal/http');
const { Buffer } = require('buffer');
const common = require('_http_common');
const checkIsHttpToken = common._checkIsHttpToken;
Expand Down Expand Up @@ -97,7 +97,6 @@ function OutgoingMessage() {
this._contentLength = null;
this._hasBody = true;
this._trailer = '';
this[kNeedDrain] = false;

this.finished = false;
this._headerSent = false;
Expand Down Expand Up @@ -592,10 +591,7 @@ Object.defineProperty(OutgoingMessage.prototype, 'writableEnded', {

const crlf_buf = Buffer.from('\r\n');
OutgoingMessage.prototype.write = function write(chunk, encoding, callback) {
const ret = write_(this, chunk, encoding, callback, false);
if (!ret)
this[kNeedDrain] = true;
return ret;
return write_(this, chunk, encoding, callback, false);
};

function write_(msg, chunk, encoding, callback, fromEnd) {
Expand Down Expand Up @@ -802,8 +798,8 @@ OutgoingMessage.prototype._flush = function _flush() {
if (this.finished) {
// This is a queue to the server or client to bring in the next this.
this._finish();
} else if (ret && this[kNeedDrain]) {
this[kNeedDrain] = false;
} else if (ret) {
// This is necessary to prevent https from breaking
this.emit('drain');
}
}
Expand Down
12 changes: 5 additions & 7 deletions lib/_http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const {
CRLF,
continueExpression,
chunkExpression,
httpSocketSetup,
kIncomingMessage,
HTTPParser,
_checkInvalidHeaderChar: checkInvalidHeaderChar,
Expand All @@ -40,7 +41,7 @@ const {
const { OutgoingMessage } = require('_http_outgoing');
const {
kOutHeaders,
kNeedDrain,
ondrain,
nowDate,
emitStatistics
} = require('internal/http');
Expand Down Expand Up @@ -358,6 +359,8 @@ function connectionListener(socket) {
function connectionListenerInternal(server, socket) {
debug('SERVER new http connection');

httpSocketSetup(socket);

// Ensure that the server property of the socket is correctly set.
// See https://github.com/nodejs/node/issues/13435
if (socket.server === null)
Expand Down Expand Up @@ -452,12 +455,6 @@ function socketOnDrain(socket, state) {
socket.parser.resume();
socket.resume();
}

const msg = socket._httpMessage;
if (msg && !msg.finished && msg[kNeedDrain]) {
msg[kNeedDrain] = false;
msg.emit('drain');
}
}

function socketOnTimeout() {
Expand Down Expand Up @@ -584,6 +581,7 @@ function onParserExecuteCommon(server, socket, parser, state, ret, d) {
socket.removeListener('end', state.onEnd);
socket.removeListener('close', state.onClose);
socket.removeListener('drain', state.onDrain);
socket.removeListener('drain', ondrain);
socket.removeListener('error', socketOnError);
socket.removeListener('timeout', socketOnTimeout);
unconsume(parser, socket);
Expand Down
6 changes: 5 additions & 1 deletion lib/internal/http.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ function resetCache() {
utcCache = undefined;
}

function ondrain() {
if (this._httpMessage) this._httpMessage.emit('drain');
}

class HttpRequestTiming extends PerformanceEntry {
constructor(statistics) {
super();
Expand All @@ -46,7 +50,7 @@ function emitStatistics(statistics) {

module.exports = {
kOutHeaders: Symbol('kOutHeaders'),
kNeedDrain: Symbol('kNeedDrain'),
ondrain,
nowDate,
utcDate,
emitStatistics
Expand Down

0 comments on commit cefd461

Please sign in to comment.