Skip to content

Commit

Permalink
fix: replace p-queue with less restrictive queue (#2339)
Browse files Browse the repository at this point in the history
Adds a `Queue` class to `@libp2p/utils` modelled on p-queue with a
few key differences:

1. The queue is externally accessible so we can modify it before jobs run
2. It can be turned into an async generator
3. Jobs remain in the queue while they are executing for better introspection
4. It integrates with libp2p metrics, if desired

The dial queue has been replaced with this new queue class, this means we
don't need to maintain a separate internal queue for pending dials since the
dial queue itself is accessible.
  • Loading branch information
achingbrain authored Jan 6, 2024
1 parent 581574d commit 528d737
Show file tree
Hide file tree
Showing 19 changed files with 1,469 additions and 553 deletions.
16 changes: 16 additions & 0 deletions packages/interface/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ export class CodeError<T extends Record<string, any> = Record<string, never>> ex
}
}

export class AggregateCodeError<T extends Record<string, any> = Record<string, never>> extends AggregateError {
public readonly props: T

constructor (
errors: Error[],
message: string,
public readonly code: string,
props?: T
) {
super(errors, message)

this.name = props?.name ?? 'AggregateCodeError'
this.props = props ?? {} as T // eslint-disable-line @typescript-eslint/consistent-type-assertions
}
}

export class UnexpectedPeerError extends Error {
public code: string

Expand Down
2 changes: 1 addition & 1 deletion packages/interface/src/event-target.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export class TypedEventEmitter<EventMap extends Record<string, any>> extends Eve
return result
}

safeDispatchEvent<Detail>(type: keyof EventMap, detail: CustomEventInit<Detail>): boolean {
safeDispatchEvent<Detail>(type: keyof EventMap, detail: CustomEventInit<Detail> = {}): boolean {
return this.dispatchEvent(new CustomEvent<Detail>(type as string, detail))
}
}
Expand Down
33 changes: 14 additions & 19 deletions packages/kad-dht/src/routing-table/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { CodeError, TypedEventEmitter } from '@libp2p/interface'
import { PeerSet } from '@libp2p/peer-collections'
import { PeerJobQueue } from '@libp2p/utils/peer-job-queue'
import { PeerQueue } from '@libp2p/utils/peer-queue'
import { pbStream } from 'it-protobuf-stream'
import { Message, MessageType } from '../message/dht.js'
import * as utils from '../utils.js'
Expand Down Expand Up @@ -44,7 +44,7 @@ export interface RoutingTableEvents {
export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implements Startable {
public kBucketSize: number
public kb?: KBucket
public pingQueue: PeerJobQueue
public pingQueue: PeerQueue<boolean>

private readonly log: Logger
private readonly components: RoutingTableComponents
Expand All @@ -56,8 +56,6 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
private readonly tagValue: number
private readonly metrics?: {
routingTableSize: Metric
pingQueueSize: Metric
pingRunning: Metric
}

constructor (components: RoutingTableComponents, init: RoutingTableInit) {
Expand All @@ -75,23 +73,18 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
this.tagName = tagName ?? KAD_CLOSE_TAG_NAME
this.tagValue = tagValue ?? KAD_CLOSE_TAG_VALUE

const updatePingQueueSizeMetric = (): void => {
this.metrics?.pingQueueSize.update(this.pingQueue.size)
this.metrics?.pingRunning.update(this.pingQueue.pending)
}

this.pingQueue = new PeerJobQueue({ concurrency: this.pingConcurrency })
this.pingQueue.addListener('add', updatePingQueueSizeMetric)
this.pingQueue.addListener('next', updatePingQueueSizeMetric)
this.pingQueue.addListener('error', err => {
this.log.error('error pinging peer', err)
this.pingQueue = new PeerQueue({
concurrency: this.pingConcurrency,
metricName: `${logPrefix.replaceAll(':', '_')}_ping_queue`,
metrics: this.components.metrics
})
this.pingQueue.addEventListener('error', evt => {
this.log.error('error pinging peer', evt.detail)
})

if (this.components.metrics != null) {
this.metrics = {
routingTableSize: this.components.metrics.registerMetric(`${logPrefix.replaceAll(':', '_')}_routing_table_size`),
pingQueueSize: this.components.metrics.registerMetric(`${logPrefix.replaceAll(':', '_')}_ping_queue_size`),
pingRunning: this.components.metrics.registerMetric(`${logPrefix.replaceAll(':', '_')}_ping_running`)
routingTableSize: this.components.metrics.registerMetric(`${logPrefix.replaceAll(':', '_')}_routing_table_size`)
}
}
}
Expand Down Expand Up @@ -204,8 +197,10 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
const results = await Promise.all(
oldContacts.map(async oldContact => {
// if a previous ping wants us to ping this contact, re-use the result
if (this.pingQueue.hasJob(oldContact.peer)) {
return this.pingQueue.joinJob(oldContact.peer)
const pingJob = this.pingQueue.find(oldContact.peer)

if (pingJob != null) {
return pingJob.join()
}

return this.pingQueue.add(async () => {
Expand Down
1 change: 0 additions & 1 deletion packages/libp2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@
"merge-options": "^3.0.4",
"multiformats": "^13.0.0",
"p-defer": "^4.0.0",
"p-queue": "^8.0.0",
"private-ip": "^3.0.1",
"rate-limiter-flexible": "^4.0.0",
"uint8arraylist": "^2.4.3",
Expand Down
19 changes: 11 additions & 8 deletions packages/libp2p/src/connection-manager/auto-dial.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { PeerMap, PeerSet } from '@libp2p/peer-collections'
import { PeerJobQueue } from '@libp2p/utils/peer-job-queue'
import { PeerQueue } from '@libp2p/utils/peer-queue'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE, AUTO_DIAL_INTERVAL, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PEER_RETRY_THRESHOLD, AUTO_DIAL_PRIORITY, LAST_DIAL_FAILURE_KEY, MIN_CONNECTIONS } from './constants.js'
import type { Libp2pEvents, Logger, ComponentLogger, TypedEventTarget, PeerStore, Startable } from '@libp2p/interface'
import type { Libp2pEvents, Logger, ComponentLogger, TypedEventTarget, PeerStore, Startable, Metrics } from '@libp2p/interface'
import type { ConnectionManager } from '@libp2p/interface-internal'

interface AutoDialInit {
Expand All @@ -20,6 +20,7 @@ interface AutoDialComponents {
peerStore: PeerStore
events: TypedEventTarget<Libp2pEvents>
logger: ComponentLogger
metrics?: Metrics
}

const defaultOptions = {
Expand All @@ -35,7 +36,7 @@ const defaultOptions = {
export class AutoDial implements Startable {
private readonly connectionManager: ConnectionManager
private readonly peerStore: PeerStore
private readonly queue: PeerJobQueue
private readonly queue: PeerQueue<void>
private readonly minConnections: number
private readonly autoDialPriority: number
private readonly autoDialIntervalMs: number
Expand Down Expand Up @@ -64,11 +65,13 @@ export class AutoDial implements Startable {
this.log = components.logger.forComponent('libp2p:connection-manager:auto-dial')
this.started = false
this.running = false
this.queue = new PeerJobQueue({
concurrency: init.autoDialConcurrency ?? defaultOptions.autoDialConcurrency
this.queue = new PeerQueue({
concurrency: init.autoDialConcurrency ?? defaultOptions.autoDialConcurrency,
metricName: 'libp2p_autodial_queue',
metrics: components.metrics
})
this.queue.addListener('error', (err) => {
this.log.error('error during auto-dial', err)
this.queue.addEventListener('error', (evt) => {
this.log.error('error during auto-dial', evt.detail)
})

// check the min connection limit whenever a peer disconnects
Expand Down Expand Up @@ -179,7 +182,7 @@ export class AutoDial implements Startable {
}

// remove peers already in the autodial queue
if (this.queue.hasJob(peer.id)) {
if (this.queue.has(peer.id)) {
this.log.trace('not autodialing %p because they are already being autodialed', peer.id)
return false
}
Expand Down
Loading

0 comments on commit 528d737

Please sign in to comment.