From 9ec64a0c79d3a4e6ef846b022cadbfd9eee06ffa Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Feb 2024 11:03:21 +0100 Subject: [PATCH] refactor: avoid http2 dynamic dispatch in socket handlers Steps towards more clean separation between h1 and h2. Refs: https://github.com/nodejs/undici/pull/2816 --- lib/core/symbols.js | 3 +- lib/dispatcher/client.js | 143 ++++++++++++++++++++++----------------- 2 files changed, 82 insertions(+), 64 deletions(-) diff --git a/lib/core/symbols.js b/lib/core/symbols.js index 68d8566fac0..d3edb65dda6 100644 --- a/lib/core/symbols.js +++ b/lib/core/symbols.js @@ -59,5 +59,6 @@ module.exports = { kHTTP2CopyHeaders: Symbol('http2 copy headers'), kHTTPConnVersion: Symbol('http connection version'), kRetryHandlerDefaultRetry: Symbol('retry agent default retry'), - kConstruct: Symbol('constructable') + kConstruct: Symbol('constructable'), + kListeners: Symbol('listeners') } diff --git a/lib/dispatcher/client.js b/lib/dispatcher/client.js index cb3feabb37b..2e05169d7ce 100644 --- a/lib/dispatcher/client.js +++ b/lib/dispatcher/client.js @@ -72,6 +72,7 @@ const { kLocalAddress, kMaxResponseSize, kHTTPConnVersion, + kListeners, // HTTP2 kHost, kHTTP2Session, @@ -111,6 +112,20 @@ const FastBuffer = Buffer[Symbol.species] const kClosedResolve = Symbol('kClosedResolve') +function addListener (obj, name, listener) { + const listeners = (obj[kListeners] ??= []) + listeners.push([name, listener]) + obj.on(name, listener) + return obj +} + +function removeAllListeners (obj) { + for (const [name, listener] of obj[kListeners] ?? []) { + obj.removeListener(name, listener) + } + obj[kListeners] = null +} + /** * @type {import('../../types/client.js').default} */ @@ -803,11 +818,8 @@ class Parser { socket[kClient] = null socket[kError] = null - socket - .removeListener('error', onSocketError) - .removeListener('readable', onSocketReadable) - .removeListener('end', onSocketEnd) - .removeListener('close', onSocketClose) + + removeAllListeners(socket) client[kSocket] = null client[kHTTP2Session] = null @@ -1050,33 +1062,6 @@ function onParserTimeout (parser) { } } -function onSocketReadable () { - const { [kParser]: parser } = this - if (parser) { - parser.readMore() - } -} - -function onSocketError (err) { - const { [kClient]: client, [kParser]: parser } = this - - assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') - - if (client[kHTTPConnVersion] !== 'h2') { - // On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded - // to the user. - if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) { - // We treat all incoming data so for as a valid response. - parser.onMessageComplete() - return - } - } - - this[kError] = err - - onError(this[kClient], err) -} - function onError (client, err) { if ( client[kRunning] === 0 && @@ -1097,32 +1082,8 @@ function onError (client, err) { } } -function onSocketEnd () { - const { [kParser]: parser, [kClient]: client } = this - - if (client[kHTTPConnVersion] !== 'h2') { - if (parser.statusCode && !parser.shouldKeepAlive) { - // We treat all incoming data so far as a valid response. - parser.onMessageComplete() - return - } - } - - util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this))) -} - function onSocketClose () { - const { [kClient]: client, [kParser]: parser } = this - - if (client[kHTTPConnVersion] === 'h1' && parser) { - if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) { - // We treat all incoming data so far as a valid response. - parser.onMessageComplete() - } - - this[kParser].destroy() - this[kParser] = null - } + const { [kClient]: client } = this const err = this[kError] || new SocketError('closed', util.getSocketInfo(this)) @@ -1241,6 +1202,18 @@ async function connect (client) { client[kHTTP2Session] = session socket[kHTTP2Session] = session + + addListener(socket, 'error', function (err) { + assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') + + this[kError] = err + + onError(this[kClient], err) + }) + addListener(socket, 'end', function () { + util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this))) + }) + addListener(socket, 'close', onSocketClose) } else { if (!llhttpInstance) { llhttpInstance = await llhttpPromise @@ -1252,6 +1225,56 @@ async function connect (client) { socket[kReset] = false socket[kBlocking] = false socket[kParser] = new Parser(client, socket, llhttpInstance) + + addListener(socket, 'error', function (err) { + const { [kParser]: parser } = this + + assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') + + // On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded + // to the user. + if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) { + // We treat all incoming data so for as a valid response. + parser.onMessageComplete() + return + } + + this[kError] = err + + onError(this[kClient], err) + }) + addListener(socket, 'readable', function () { + const { [kParser]: parser } = this + if (parser) { + parser.readMore() + } + }) + addListener(socket, 'end', function () { + const { [kParser]: parser } = this + + if (parser.statusCode && !parser.shouldKeepAlive) { + // We treat all incoming data so far as a valid response. + parser.onMessageComplete() + return + } + + util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this))) + }) + addListener(socket, 'close', function () { + const { [kParser]: parser } = this + + if (parser) { + if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) { + // We treat all incoming data so far as a valid response. + parser.onMessageComplete() + } + + this[kParser].destroy() + this[kParser] = null + } + + onSocketClose.call(this) + }) } socket[kCounter] = 0 @@ -1259,12 +1282,6 @@ async function connect (client) { socket[kClient] = client socket[kError] = null - socket - .on('error', onSocketError) - .on('readable', onSocketReadable) - .on('end', onSocketEnd) - .on('close', onSocketClose) - client[kSocket] = socket if (channels.connected.hasSubscribers) {