Skip to content

Commit

Permalink
refactor: avoid http2 dynamic dispatch in socket handlers
Browse files Browse the repository at this point in the history
Steps towards more clean separation between h1 and h2.

Refs: #2816
  • Loading branch information
ronag committed Feb 25, 2024
1 parent 2a368b2 commit 9ec64a0
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 64 deletions.
3 changes: 2 additions & 1 deletion lib/core/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,6 @@ module.exports = {
kHTTP2CopyHeaders: Symbol('http2 copy headers'),
kHTTPConnVersion: Symbol('http connection version'),
kRetryHandlerDefaultRetry: Symbol('retry agent default retry'),
kConstruct: Symbol('constructable')
kConstruct: Symbol('constructable'),
kListeners: Symbol('listeners')
}
143 changes: 80 additions & 63 deletions lib/dispatcher/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const {
kLocalAddress,
kMaxResponseSize,
kHTTPConnVersion,
kListeners,
// HTTP2
kHost,
kHTTP2Session,
Expand Down Expand Up @@ -111,6 +112,20 @@ const FastBuffer = Buffer[Symbol.species]

const kClosedResolve = Symbol('kClosedResolve')

function addListener (obj, name, listener) {
const listeners = (obj[kListeners] ??= [])
listeners.push([name, listener])
obj.on(name, listener)
return obj
}

function removeAllListeners (obj) {
for (const [name, listener] of obj[kListeners] ?? []) {
obj.removeListener(name, listener)
}
obj[kListeners] = null
}

/**
* @type {import('../../types/client.js').default}
*/
Expand Down Expand Up @@ -803,11 +818,8 @@ class Parser {

socket[kClient] = null
socket[kError] = null
socket
.removeListener('error', onSocketError)
.removeListener('readable', onSocketReadable)
.removeListener('end', onSocketEnd)
.removeListener('close', onSocketClose)

removeAllListeners(socket)

client[kSocket] = null
client[kHTTP2Session] = null
Expand Down Expand Up @@ -1050,33 +1062,6 @@ function onParserTimeout (parser) {
}
}

function onSocketReadable () {
const { [kParser]: parser } = this
if (parser) {
parser.readMore()
}
}

function onSocketError (err) {
const { [kClient]: client, [kParser]: parser } = this

assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')

if (client[kHTTPConnVersion] !== 'h2') {
// On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded
// to the user.
if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so for as a valid response.
parser.onMessageComplete()
return
}
}

this[kError] = err

onError(this[kClient], err)
}

function onError (client, err) {
if (
client[kRunning] === 0 &&
Expand All @@ -1097,32 +1082,8 @@ function onError (client, err) {
}
}

function onSocketEnd () {
const { [kParser]: parser, [kClient]: client } = this

if (client[kHTTPConnVersion] !== 'h2') {
if (parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
parser.onMessageComplete()
return
}
}

util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
}

function onSocketClose () {
const { [kClient]: client, [kParser]: parser } = this

if (client[kHTTPConnVersion] === 'h1' && parser) {
if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
parser.onMessageComplete()
}

this[kParser].destroy()
this[kParser] = null
}
const { [kClient]: client } = this

const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))

Expand Down Expand Up @@ -1241,6 +1202,18 @@ async function connect (client) {

client[kHTTP2Session] = session
socket[kHTTP2Session] = session

addListener(socket, 'error', function (err) {
assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')

this[kError] = err

onError(this[kClient], err)
})
addListener(socket, 'end', function () {
util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
})
addListener(socket, 'close', onSocketClose)
} else {
if (!llhttpInstance) {
llhttpInstance = await llhttpPromise
Expand All @@ -1252,19 +1225,63 @@ async function connect (client) {
socket[kReset] = false
socket[kBlocking] = false
socket[kParser] = new Parser(client, socket, llhttpInstance)

addListener(socket, 'error', function (err) {
const { [kParser]: parser } = this

assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')

// On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded
// to the user.
if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so for as a valid response.
parser.onMessageComplete()
return
}

this[kError] = err

onError(this[kClient], err)
})
addListener(socket, 'readable', function () {
const { [kParser]: parser } = this
if (parser) {
parser.readMore()
}
})
addListener(socket, 'end', function () {
const { [kParser]: parser } = this

if (parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
parser.onMessageComplete()
return
}

util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
})
addListener(socket, 'close', function () {
const { [kParser]: parser } = this

if (parser) {
if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
parser.onMessageComplete()
}

this[kParser].destroy()
this[kParser] = null
}

onSocketClose.call(this)
})
}

socket[kCounter] = 0
socket[kMaxRequests] = client[kMaxRequests]
socket[kClient] = client
socket[kError] = null

socket
.on('error', onSocketError)
.on('readable', onSocketReadable)
.on('end', onSocketEnd)
.on('close', onSocketClose)

client[kSocket] = socket

if (channels.connected.hasSubscribers) {
Expand Down

0 comments on commit 9ec64a0

Please sign in to comment.