diff --git a/package.json b/package.json index ebe277ee08..b042edff03 100644 --- a/package.json +++ b/package.json @@ -148,6 +148,7 @@ "stream-to-it": "^0.2.2" }, "devDependencies": { + "@libp2p/interface-metrics": "^4.0.0", "@libp2p/interface-mocks": "^7.0.1", "@libp2p/interface-transport-compliance-tests": "^3.0.0", "aegir": "^37.5.3", diff --git a/src/index.ts b/src/index.ts index 7b2930f8de..a57d4ef0f2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -11,6 +11,7 @@ import { CreateListenerOptions, DialOptions, Listener, symbol, Transport } from import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr' import type { Socket, IpcSocketConnectOpts, TcpSocketConnectOpts } from 'net' import type { Connection } from '@libp2p/interface-connection' +import type { CounterGroup, Metrics } from '@libp2p/interface-metrics' const log = logger('libp2p:tcp') @@ -55,11 +56,36 @@ export interface TCPCreateListenerOptions extends CreateListenerOptions, TCPSock } +export interface TCPComponents { + metrics?: Metrics +} + +export interface TCPMetrics { + dialerEvents: CounterGroup + listenerEvents: CounterGroup +} + class TCP implements Transport { private readonly opts: TCPOptions + private readonly metrics?: TCPMetrics + private readonly components: TCPComponents - constructor (options: TCPOptions = {}) { + constructor (components: TCPComponents, options: TCPOptions = {}) { this.opts = options + this.components = components + + if (components.metrics != null) { + this.metrics = { + dialerEvents: components.metrics.registerCounterGroup('libp2p_tcp_dialer_errors_total', { + label: 'event', + help: 'Total count of TCP dialer errors by error type' + }), + listenerEvents: components.metrics.registerCounterGroup('libp2p_tcp_listener_errors_total', { + label: 'event', + help: 'Total count of TCP listener errors by error type' + }) + } + } } get [symbol] (): true { @@ -84,7 +110,8 @@ class TCP implements Transport { remoteAddr: ma, signal: options.signal, socketInactivityTimeout: this.opts.outboundSocketInactivityTimeout, - socketCloseTimeout: this.opts.socketCloseTimeout + socketCloseTimeout: this.opts.socketCloseTimeout, + metrics: this.metrics?.dialerEvents }) log('new outbound connection %s', maConn.remoteAddr) const conn = await options.upgrader.upgradeOutbound(maConn) @@ -107,12 +134,14 @@ class TCP implements Transport { const onError = (err: Error) => { err.message = `connection error ${cOptsStr}: ${err.message}` + this.metrics?.dialerEvents.increment({ error: true }) done(err) } const onTimeout = () => { log('connection timeout %s', cOptsStr) + this.metrics?.dialerEvents.increment({ timeout: true }) const err = errCode(new Error(`connection timeout after ${Date.now() - start}ms`), 'ERR_CONNECT_TIMEOUT') // Note: this will result in onError() being called @@ -121,11 +150,13 @@ class TCP implements Transport { const onConnect = () => { log('connection opened %j', cOpts) + this.metrics?.dialerEvents.increment({ connect: true }) done() } const onAbort = () => { log('connection aborted %j', cOpts) + this.metrics?.dialerEvents.increment({ abort: true }) rawSocket.destroy() done(new AbortError()) } @@ -166,7 +197,8 @@ class TCP implements Transport { ...options, maxConnections: this.opts.maxConnections, socketInactivityTimeout: this.opts.inboundSocketInactivityTimeout, - socketCloseTimeout: this.opts.socketCloseTimeout + socketCloseTimeout: this.opts.socketCloseTimeout, + metrics: this.components.metrics }) } @@ -190,8 +222,8 @@ class TCP implements Transport { } } -export function tcp (init: TCPOptions = {}): (components?: any) => Transport { - return () => { - return new TCP(init) +export function tcp (init: TCPOptions = {}): (components?: TCPComponents) => Transport { + return (components: TCPComponents = {}) => { + return new TCP(components, init) } } diff --git a/src/listener.ts b/src/listener.ts index 05e31dc5bd..85460848e8 100644 --- a/src/listener.ts +++ b/src/listener.ts @@ -11,6 +11,7 @@ import type { MultiaddrConnection, Connection } from '@libp2p/interface-connecti import type { Upgrader, Listener, ListenerEvents } from '@libp2p/interface-transport' import type { Multiaddr } from '@multiformats/multiaddr' import type { TCPCreateListenerOptions } from './index.js' +import type { CounterGroup, Metric, Metrics } from '@libp2p/interface-metrics' const log = logger('libp2p:tcp:listener') @@ -31,6 +32,16 @@ interface Context extends TCPCreateListenerOptions { socketInactivityTimeout?: number socketCloseTimeout?: number maxConnections?: number + metrics?: Metrics +} + +const SERVER_STATUS_UP = 1 +const SERVER_STATUS_DOWN = 0 + +export interface TCPListenerMetrics { + status: Metric + errors: CounterGroup + events: CounterGroup } type Status = {started: false} | {started: true, listeningAddr: Multiaddr, peerId: string | null } @@ -39,8 +50,8 @@ export class TCPListener extends EventEmitter implements Listene private readonly server: net.Server /** Keep track of open connections to destroy in case of timeout */ private readonly connections = new Set() - private status: Status = { started: false } + private metrics?: TCPListenerMetrics constructor (private readonly context: Context) { super() @@ -57,15 +68,62 @@ export class TCPListener extends EventEmitter implements Listene } this.server - .on('listening', () => this.dispatchEvent(new CustomEvent('listening'))) - .on('error', err => this.dispatchEvent(new CustomEvent('error', { detail: err }))) - .on('close', () => this.dispatchEvent(new CustomEvent('close'))) + .on('listening', () => { + if (context.metrics != null) { + // we are listening, register metrics for our port + const address = this.server.address() + let addr: string + + if (address == null) { + addr = 'unknown' + } else if (typeof address === 'string') { + // unix socket + addr = address + } else { + addr = `${address.address}:${address.port}` + } + + context.metrics?.registerMetric(`libp2p_tcp_connections_${addr}_count`, { + help: 'Current active connections in TCP listener', + calculate: () => { + return this.connections.size + } + }) + + this.metrics = { + status: context.metrics.registerMetric(`libp2p_tcp_${addr}_server_status`, { + help: 'Current status of the TCP server' + }), + errors: context.metrics.registerCounterGroup(`libp2p_tcp_${addr}_server_errors_total`, { + label: 'error', + help: 'Total count of TCP listener errors by error type' + }), + events: context.metrics.registerCounterGroup(`libp2p_tcp_$${addr}_socket_events`, { + label: 'event', + help: 'Total count of TCP socket events by event' + }) + } + + this.metrics?.status.update(SERVER_STATUS_UP) + } + + this.dispatchEvent(new CustomEvent('listening')) + }) + .on('error', err => { + this.metrics?.errors.increment({ listen_error: true }) + this.dispatchEvent(new CustomEvent('error', { detail: err })) + }) + .on('close', () => { + this.metrics?.status.update(SERVER_STATUS_DOWN) + this.dispatchEvent(new CustomEvent('close')) + }) } private onSocket (socket: net.Socket) { // Avoid uncaught errors caused by unstable connections socket.on('error', err => { log('socket error', err) + this.metrics?.events.increment({ error: true }) }) let maConn: MultiaddrConnection @@ -73,10 +131,12 @@ export class TCPListener extends EventEmitter implements Listene maConn = toMultiaddrConnection(socket, { listeningAddr: this.status.started ? this.status.listeningAddr : undefined, socketInactivityTimeout: this.context.socketInactivityTimeout, - socketCloseTimeout: this.context.socketCloseTimeout + socketCloseTimeout: this.context.socketCloseTimeout, + metrics: this.metrics?.events }) } catch (err) { log.error('inbound connection failed', err) + this.metrics?.errors.increment({ inbound_to_connection: true }) return } @@ -99,6 +159,7 @@ export class TCPListener extends EventEmitter implements Listene }) .catch(async err => { log.error('inbound connection failed', err) + this.metrics?.errors.increment({ inbound_upgrade: true }) await attemptClose(maConn) }) @@ -111,6 +172,7 @@ export class TCPListener extends EventEmitter implements Listene attemptClose(maConn) .catch(err => { log.error('closing inbound connection failed', err) + this.metrics?.errors.increment({ inbound_closing_failed: true }) }) } } diff --git a/src/socket-to-conn.ts b/src/socket-to-conn.ts index c390f46bc4..a8230cc62f 100644 --- a/src/socket-to-conn.ts +++ b/src/socket-to-conn.ts @@ -9,6 +9,7 @@ import errCode from 'err-code' import type { Socket } from 'net' import type { Multiaddr } from '@multiformats/multiaddr' import type { MultiaddrConnection } from '@libp2p/interface-connection' +import type { CounterGroup } from '@libp2p/interface-metrics' const log = logger('libp2p:tcp:socket') @@ -19,14 +20,15 @@ interface ToConnectionOptions { signal?: AbortSignal socketInactivityTimeout?: number socketCloseTimeout?: number + metrics?: CounterGroup } /** * Convert a socket into a MultiaddrConnection * https://github.com/libp2p/interface-transport#multiaddrconnection */ -export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOptions) => { - options = options ?? {} +export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptions) => { + const metrics = options.metrics const inactivityTimeout = options.socketInactivityTimeout ?? SOCKET_TIMEOUT const closeTimeout = options.socketCloseTimeout ?? CLOSE_TIMEOUT @@ -61,6 +63,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti // https://nodejs.org/dist/latest-v16.x/docs/api/net.html#socketsettimeouttimeout-callback socket.setTimeout(inactivityTimeout, () => { log('%s socket read timeout', lOptsStr) + metrics?.increment({ timeout: true }) // only destroy with an error if the remote has not sent the FIN message let err: Error | undefined @@ -75,6 +78,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti socket.once('close', () => { log('%s socket read timeout', lOptsStr) + metrics?.increment({ close: true }) // In instances where `close` was not explicitly called, // such as an iterable stream ending, ensure we have set the close @@ -88,6 +92,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti // the remote sent a FIN packet which means no more data will be sent // https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-end log('socket ended', maConn.remoteAddr.toString()) + metrics?.increment({ end: true }) }) const maConn: MultiaddrConnection = {