Skip to content

Commit

Permalink
feat!: add metrics (libp2p#223)
Browse files Browse the repository at this point in the history
Uses new metrics interface from libp2p/js-libp2p-interfaces#310 to report useful connection metrics.

Similar to libp2p#217 but it adds the listening host/port to the metrics name to allow multiple TCP listeners to report metrics separately.

BREAKING CHANGE: requires metrics interface v4
  • Loading branch information
achingbrain committed Nov 5, 2022
1 parent 73240c4 commit c004357
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 13 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
"stream-to-it": "^0.2.2"
},
"devDependencies": {
"@libp2p/interface-metrics": "^4.0.0",
"@libp2p/interface-mocks": "^7.0.1",
"@libp2p/interface-transport-compliance-tests": "^3.0.0",
"aegir": "^37.5.3",
Expand Down
44 changes: 38 additions & 6 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { CreateListenerOptions, DialOptions, Listener, symbol, Transport } from
import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr'
import type { Socket, IpcSocketConnectOpts, TcpSocketConnectOpts } from 'net'
import type { Connection } from '@libp2p/interface-connection'
import type { CounterGroup, Metrics } from '@libp2p/interface-metrics'

const log = logger('libp2p:tcp')

Expand Down Expand Up @@ -55,11 +56,36 @@ export interface TCPCreateListenerOptions extends CreateListenerOptions, TCPSock

}

export interface TCPComponents {
metrics?: Metrics
}

export interface TCPMetrics {
dialerEvents: CounterGroup
listenerEvents: CounterGroup
}

class TCP implements Transport {
private readonly opts: TCPOptions
private readonly metrics?: TCPMetrics
private readonly components: TCPComponents

constructor (options: TCPOptions = {}) {
constructor (components: TCPComponents, options: TCPOptions = {}) {
this.opts = options
this.components = components

if (components.metrics != null) {
this.metrics = {
dialerEvents: components.metrics.registerCounterGroup('libp2p_tcp_dialer_errors_total', {
label: 'event',
help: 'Total count of TCP dialer errors by error type'
}),
listenerEvents: components.metrics.registerCounterGroup('libp2p_tcp_listener_errors_total', {
label: 'event',
help: 'Total count of TCP listener errors by error type'
})
}
}
}

get [symbol] (): true {
Expand All @@ -84,7 +110,8 @@ class TCP implements Transport {
remoteAddr: ma,
signal: options.signal,
socketInactivityTimeout: this.opts.outboundSocketInactivityTimeout,
socketCloseTimeout: this.opts.socketCloseTimeout
socketCloseTimeout: this.opts.socketCloseTimeout,
metrics: this.metrics?.dialerEvents
})
log('new outbound connection %s', maConn.remoteAddr)
const conn = await options.upgrader.upgradeOutbound(maConn)
Expand All @@ -107,12 +134,14 @@ class TCP implements Transport {

const onError = (err: Error) => {
err.message = `connection error ${cOptsStr}: ${err.message}`
this.metrics?.dialerEvents.increment({ error: true })

done(err)
}

const onTimeout = () => {
log('connection timeout %s', cOptsStr)
this.metrics?.dialerEvents.increment({ timeout: true })

const err = errCode(new Error(`connection timeout after ${Date.now() - start}ms`), 'ERR_CONNECT_TIMEOUT')
// Note: this will result in onError() being called
Expand All @@ -121,11 +150,13 @@ class TCP implements Transport {

const onConnect = () => {
log('connection opened %j', cOpts)
this.metrics?.dialerEvents.increment({ connect: true })
done()
}

const onAbort = () => {
log('connection aborted %j', cOpts)
this.metrics?.dialerEvents.increment({ abort: true })
rawSocket.destroy()
done(new AbortError())
}
Expand Down Expand Up @@ -166,7 +197,8 @@ class TCP implements Transport {
...options,
maxConnections: this.opts.maxConnections,
socketInactivityTimeout: this.opts.inboundSocketInactivityTimeout,
socketCloseTimeout: this.opts.socketCloseTimeout
socketCloseTimeout: this.opts.socketCloseTimeout,
metrics: this.components.metrics
})
}

Expand All @@ -190,8 +222,8 @@ class TCP implements Transport {
}
}

export function tcp (init: TCPOptions = {}): (components?: any) => Transport {
return () => {
return new TCP(init)
export function tcp (init: TCPOptions = {}): (components?: TCPComponents) => Transport {
return (components: TCPComponents = {}) => {
return new TCP(components, init)
}
}
72 changes: 67 additions & 5 deletions src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { MultiaddrConnection, Connection } from '@libp2p/interface-connecti
import type { Upgrader, Listener, ListenerEvents } from '@libp2p/interface-transport'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { TCPCreateListenerOptions } from './index.js'
import type { CounterGroup, Metric, Metrics } from '@libp2p/interface-metrics'

const log = logger('libp2p:tcp:listener')

Expand All @@ -31,6 +32,16 @@ interface Context extends TCPCreateListenerOptions {
socketInactivityTimeout?: number
socketCloseTimeout?: number
maxConnections?: number
metrics?: Metrics
}

const SERVER_STATUS_UP = 1
const SERVER_STATUS_DOWN = 0

export interface TCPListenerMetrics {
status: Metric
errors: CounterGroup
events: CounterGroup
}

type Status = {started: false} | {started: true, listeningAddr: Multiaddr, peerId: string | null }
Expand All @@ -39,8 +50,8 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
private readonly server: net.Server
/** Keep track of open connections to destroy in case of timeout */
private readonly connections = new Set<MultiaddrConnection>()

private status: Status = { started: false }
private metrics?: TCPListenerMetrics

constructor (private readonly context: Context) {
super()
Expand All @@ -57,26 +68,75 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
}

this.server
.on('listening', () => this.dispatchEvent(new CustomEvent('listening')))
.on('error', err => this.dispatchEvent(new CustomEvent<Error>('error', { detail: err })))
.on('close', () => this.dispatchEvent(new CustomEvent('close')))
.on('listening', () => {
if (context.metrics != null) {
// we are listening, register metrics for our port
const address = this.server.address()
let addr: string

if (address == null) {
addr = 'unknown'
} else if (typeof address === 'string') {
// unix socket
addr = address
} else {
addr = `${address.address}:${address.port}`
}

context.metrics?.registerMetric(`libp2p_tcp_connections_${addr}_count`, {
help: 'Current active connections in TCP listener',
calculate: () => {
return this.connections.size
}
})

this.metrics = {
status: context.metrics.registerMetric(`libp2p_tcp_${addr}_server_status`, {
help: 'Current status of the TCP server'
}),
errors: context.metrics.registerCounterGroup(`libp2p_tcp_${addr}_server_errors_total`, {
label: 'error',
help: 'Total count of TCP listener errors by error type'
}),
events: context.metrics.registerCounterGroup(`libp2p_tcp_$${addr}_socket_events`, {
label: 'event',
help: 'Total count of TCP socket events by event'
})
}

this.metrics?.status.update(SERVER_STATUS_UP)
}

this.dispatchEvent(new CustomEvent('listening'))
})
.on('error', err => {
this.metrics?.errors.increment({ listen_error: true })
this.dispatchEvent(new CustomEvent<Error>('error', { detail: err }))
})
.on('close', () => {
this.metrics?.status.update(SERVER_STATUS_DOWN)
this.dispatchEvent(new CustomEvent('close'))
})
}

private onSocket (socket: net.Socket) {
// Avoid uncaught errors caused by unstable connections
socket.on('error', err => {
log('socket error', err)
this.metrics?.events.increment({ error: true })
})

let maConn: MultiaddrConnection
try {
maConn = toMultiaddrConnection(socket, {
listeningAddr: this.status.started ? this.status.listeningAddr : undefined,
socketInactivityTimeout: this.context.socketInactivityTimeout,
socketCloseTimeout: this.context.socketCloseTimeout
socketCloseTimeout: this.context.socketCloseTimeout,
metrics: this.metrics?.events
})
} catch (err) {
log.error('inbound connection failed', err)
this.metrics?.errors.increment({ inbound_to_connection: true })
return
}

Expand All @@ -99,6 +159,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
})
.catch(async err => {
log.error('inbound connection failed', err)
this.metrics?.errors.increment({ inbound_upgrade: true })

await attemptClose(maConn)
})
Expand All @@ -111,6 +172,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
attemptClose(maConn)
.catch(err => {
log.error('closing inbound connection failed', err)
this.metrics?.errors.increment({ inbound_closing_failed: true })
})
}
}
Expand Down
9 changes: 7 additions & 2 deletions src/socket-to-conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import errCode from 'err-code'
import type { Socket } from 'net'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { MultiaddrConnection } from '@libp2p/interface-connection'
import type { CounterGroup } from '@libp2p/interface-metrics'

const log = logger('libp2p:tcp:socket')

Expand All @@ -19,14 +20,15 @@ interface ToConnectionOptions {
signal?: AbortSignal
socketInactivityTimeout?: number
socketCloseTimeout?: number
metrics?: CounterGroup
}

/**
* Convert a socket into a MultiaddrConnection
* https://github.com/libp2p/interface-transport#multiaddrconnection
*/
export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOptions) => {
options = options ?? {}
export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptions) => {
const metrics = options.metrics
const inactivityTimeout = options.socketInactivityTimeout ?? SOCKET_TIMEOUT
const closeTimeout = options.socketCloseTimeout ?? CLOSE_TIMEOUT

Expand Down Expand Up @@ -61,6 +63,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#socketsettimeouttimeout-callback
socket.setTimeout(inactivityTimeout, () => {
log('%s socket read timeout', lOptsStr)
metrics?.increment({ timeout: true })

// only destroy with an error if the remote has not sent the FIN message
let err: Error | undefined
Expand All @@ -75,6 +78,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti

socket.once('close', () => {
log('%s socket read timeout', lOptsStr)
metrics?.increment({ close: true })

// In instances where `close` was not explicitly called,
// such as an iterable stream ending, ensure we have set the close
Expand All @@ -88,6 +92,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
// the remote sent a FIN packet which means no more data will be sent
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-end
log('socket ended', maConn.remoteAddr.toString())
metrics?.increment({ end: true })
})

const maConn: MultiaddrConnection = {
Expand Down

0 comments on commit c004357

Please sign in to comment.