Skip to content

Commit

Permalink
feat(libp2p): add autodial retry threshold config option (#1943)
Browse files Browse the repository at this point in the history
When auto-dialing peers, if we have failed to dial them recently there's little point in redialing them as it's probably just going to fail again which consumes a slot in the dial queue and other resources.

Adds a `autoDialPeerRetryThreshold` config key to the connection manager which is a value in ms.

If we have attempted to dial a peer and that dial attempt failed but we are under our min connection count, do not auto dial the peer within the retry threshold.

Defaults to 1 minute.

Closes #1899
  • Loading branch information
achingbrain committed Aug 16, 2023
1 parent 87dc7e9 commit 4ef9c79
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 6 deletions.
30 changes: 26 additions & 4 deletions packages/libp2p/src/connection-manager/auto-dial.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { logger } from '@libp2p/logger'
import { PeerMap, PeerSet } from '@libp2p/peer-collections'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { PeerJobQueue } from '../utils/peer-job-queue.js'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_INTERVAL, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, MIN_CONNECTIONS } from './constants.js'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_INTERVAL, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PEER_RETRY_THRESHOLD, AUTO_DIAL_PRIORITY, LAST_DIAL_FAILURE_KEY, MIN_CONNECTIONS } from './constants.js'
import type { Libp2pEvents } from '@libp2p/interface'
import type { EventEmitter } from '@libp2p/interface/events'
import type { PeerStore } from '@libp2p/interface/peer-store'
Expand All @@ -16,6 +17,7 @@ interface AutoDialInit {
autoDialConcurrency?: number
autoDialPriority?: number
autoDialInterval?: number
autoDialPeerRetryThreshold?: number
}

interface AutoDialComponents {
Expand All @@ -29,7 +31,8 @@ const defaultOptions = {
maxQueueLength: AUTO_DIAL_MAX_QUEUE_LENGTH,
autoDialConcurrency: AUTO_DIAL_CONCURRENCY,
autoDialPriority: AUTO_DIAL_PRIORITY,
autoDialInterval: AUTO_DIAL_INTERVAL
autoDialInterval: AUTO_DIAL_INTERVAL,
autoDialPeerRetryThreshold: AUTO_DIAL_PEER_RETRY_THRESHOLD
}

export class AutoDial implements Startable {
Expand All @@ -40,6 +43,7 @@ export class AutoDial implements Startable {
private readonly autoDialPriority: number
private readonly autoDialIntervalMs: number
private readonly autoDialMaxQueueLength: number
private readonly autoDialPeerRetryThresholdMs: number
private autoDialInterval?: ReturnType<typeof setInterval>
private started: boolean
private running: boolean
Expand All @@ -56,6 +60,7 @@ export class AutoDial implements Startable {
this.autoDialPriority = init.autoDialPriority ?? defaultOptions.autoDialPriority
this.autoDialIntervalMs = init.autoDialInterval ?? defaultOptions.autoDialInterval
this.autoDialMaxQueueLength = init.maxQueueLength ?? defaultOptions.maxQueueLength
this.autoDialPeerRetryThresholdMs = init.autoDialPeerRetryThreshold ?? defaultOptions.autoDialPeerRetryThreshold
this.started = false
this.running = false
this.queue = new PeerJobQueue({
Expand Down Expand Up @@ -207,9 +212,26 @@ export class AutoDial implements Startable {
return 0
})

log('selected %d/%d peers to dial', sortedPeers.length, peers.length)
const peersThatHaveNotFailed = sortedPeers.filter(peer => {
const lastDialFailure = peer.metadata.get(LAST_DIAL_FAILURE_KEY)

for (const peer of sortedPeers) {
if (lastDialFailure == null) {
return true
}

const lastDialFailureTimestamp = parseInt(uint8ArrayToString(lastDialFailure))

if (isNaN(lastDialFailureTimestamp)) {
return true
}

// only dial if the time since the last failure is above the retry threshold
return Date.now() - lastDialFailureTimestamp > this.autoDialPeerRetryThresholdMs
})

log('selected %d/%d peers to dial', peersThatHaveNotFailed.length, peers.length)

for (const peer of peersThatHaveNotFailed) {
this.queue.add(async () => {
const numConnections = this.connectionManager.getConnectionsMap().size

Expand Down
15 changes: 15 additions & 0 deletions packages/libp2p/src/connection-manager/constants.defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ export const AUTO_DIAL_PRIORITY = 0
*/
export const AUTO_DIAL_MAX_QUEUE_LENGTH = 100

/**
* @see https://libp2p.github.io/js-libp2p/interfaces/libp2p.index.unknown.ConnectionManagerInit.html#autoDialPeerRetryThreshold
*/
export const AUTO_DIAL_PEER_RETRY_THRESHOLD = 1000 * 60

/**
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#inboundConnectionThreshold
*/
Expand All @@ -42,3 +47,13 @@ export const INBOUND_CONNECTION_THRESHOLD = 5
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxIncomingPendingConnections
*/
export const MAX_INCOMING_PENDING_CONNECTIONS = 10

/**
* Store as part of the peer store metadata for a given peer, the value for this
* key is a timestamp of the last time a dial attempted failed with the relevant
* peer stored as a string.
*
* Used to insure we do not endlessly try to auto dial peers we have recently
* failed to dial.
*/
export const LAST_DIAL_FAILURE_KEY = 'last-dial-failure'
19 changes: 17 additions & 2 deletions packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers'
import { type ClearableSignal, anySignal } from 'any-signal'
import pDefer from 'p-defer'
import PQueue from 'p-queue'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { codes } from '../errors.js'
import { getPeerAddress } from '../get-peer.js'
import {
DIAL_TIMEOUT,
MAX_PARALLEL_DIALS_PER_PEER,
MAX_PARALLEL_DIALS,
MAX_PEER_ADDRS_TO_DIAL
MAX_PEER_ADDRS_TO_DIAL,
LAST_DIAL_FAILURE_KEY
} from './constants.js'
import { combineSignals, resolveMultiaddrs } from './utils.js'
import type { AddressSorter, AbortOptions, PendingDial } from '@libp2p/interface'
Expand Down Expand Up @@ -230,9 +232,22 @@ export class DialQueue {
// clean up abort signals/controllers
signal.clear()
})
.catch(err => {
.catch(async err => {
log.error('dial failed to %s', pendingDial.multiaddrs.map(ma => ma.toString()).join(', '), err)

if (peerId != null) {
// record the last failed dial
try {
await this.peerStore.patch(peerId, {
metadata: {
[LAST_DIAL_FAILURE_KEY]: uint8ArrayFromString(Date.now().toString())
}
})
} catch (err: any) {
log.error('could not update last dial failure key for %p', peerId, err)
}
}

// Error is a timeout
if (signal.aborted) {
const error = new CodeError(err.message, codes.ERR_TIMEOUT)
Expand Down
6 changes: 6 additions & 0 deletions packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ export interface ConnectionManagerInit {
*/
autoDialMaxQueueLength?: number

/**
* When we've failed to dial a peer, do not autodial them again within this
* number of ms. (default: 1 minute)
*/
autoDialPeerRetryThreshold?: number

/**
* Sort the known addresses of a peer before trying to dial, By default public
* addresses will be dialled before private (e.g. loopback or LAN) addresses.
Expand Down
67 changes: 67 additions & 0 deletions packages/libp2p/test/connection-manager/auto-dial.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import delay from 'delay'
import pWaitFor from 'p-wait-for'
import Sinon from 'sinon'
import { stubInterface } from 'sinon-ts'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { AutoDial } from '../../src/connection-manager/auto-dial.js'
import { LAST_DIAL_FAILURE_KEY } from '../../src/connection-manager/constants.js'
import { matchPeerId } from '../fixtures/match-peer-id.js'
import type { Libp2pEvents } from '@libp2p/interface'
import type { Connection } from '@libp2p/interface/connection'
Expand Down Expand Up @@ -224,4 +226,69 @@ describe('auto-dial', () => {
// should only have queried peer store once
expect(peerStoreAllSpy.callCount).to.equal(1)
})

it('should not re-dial peers we have recently failed to dial', async () => {
const peerWithAddress: Peer = {
id: await createEd25519PeerId(),
protocols: [],
addresses: [{
multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4001'),
isCertified: true
}],
metadata: new Map(),
tags: new Map()
}
const undialablePeer: Peer = {
id: await createEd25519PeerId(),
protocols: [],
addresses: [{
multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4002'),
isCertified: true
}],
// we failed to dial them recently
metadata: new Map([[LAST_DIAL_FAILURE_KEY, uint8ArrayFromString(`${Date.now() - 10}`)]]),
tags: new Map()
}

await peerStore.save(peerWithAddress.id, peerWithAddress)
await peerStore.save(undialablePeer.id, undialablePeer)

const connectionManager = stubInterface<ConnectionManager>({
getConnectionsMap: new PeerMap(),
getDialQueue: []
})

autoDialler = new AutoDial({
peerStore,
connectionManager,
events
}, {
minConnections: 10,
autoDialPeerRetryThreshold: 2000
})
autoDialler.start()

void autoDialler.autoDial()

await pWaitFor(() => {
return connectionManager.openConnection.callCount === 1
})

expect(connectionManager.openConnection.callCount).to.equal(1)
expect(connectionManager.openConnection.calledWith(matchPeerId(peerWithAddress.id))).to.be.true()
expect(connectionManager.openConnection.calledWith(matchPeerId(undialablePeer.id))).to.be.false()

// pass the retry threshold
await delay(2000)

// autodial again
void autoDialler.autoDial()

await pWaitFor(() => {
return connectionManager.openConnection.callCount === 3
})

// should have retried the unreachable peer
expect(connectionManager.openConnection.calledWith(matchPeerId(undialablePeer.id))).to.be.true()
})
})
13 changes: 13 additions & 0 deletions packages/libp2p/test/connection-manager/direct.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { pEvent } from 'p-event'
import sinon from 'sinon'
import { stubInterface } from 'sinon-ts'
import { defaultComponents, type Components } from '../../src/components.js'
import { LAST_DIAL_FAILURE_KEY } from '../../src/connection-manager/constants.js'
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
import { codes as ErrorCodes } from '../../src/errors.js'
import { type IdentifyService, identifyService } from '../../src/identify/index.js'
Expand Down Expand Up @@ -104,6 +105,18 @@ describe('dialing (direct, WebSockets)', () => {
.and.to.have.nested.property('.code', ErrorCodes.ERR_NO_VALID_ADDRESSES)
})

it('should mark a peer as having recently failed to connect', async () => {
connectionManager = new DefaultConnectionManager(localComponents)
await connectionManager.start()

await expect(connectionManager.openConnection(multiaddr(`/ip4/127.0.0.1/tcp/12984/ws/p2p/${remoteComponents.peerId.toString()}`)))
.to.eventually.be.rejected()

const peer = await localComponents.peerStore.get(remoteComponents.peerId)

expect(peer.metadata.has(LAST_DIAL_FAILURE_KEY)).to.be.true()
})

it('should be able to connect to a given peer', async () => {
connectionManager = new DefaultConnectionManager(localComponents)
await connectionManager.start()
Expand Down

0 comments on commit 4ef9c79

Please sign in to comment.