Skip to content

Commit

Permalink
devp2p -> connection reliability: subdivided interval calls to refill…
Browse files Browse the repository at this point in the history
… RLPx peer connections to improve networking distribution and connection reliability
  • Loading branch information
holgerd77 committed Jan 14, 2021
1 parent 75d224b commit 838d6c4
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 8 deletions.
4 changes: 3 additions & 1 deletion packages/devp2p/src/dpt/dpt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ export class DPT extends EventEmitter {
this._refreshIntervalSelectionCounter = (this._refreshIntervalSelectionCounter + 1) % 10

const peers = this.getPeers()
debug(`call .refresh (${peers.length} peers in table)`)
debug(
`call .refresh() (selector ${this._refreshIntervalSelectionCounter}) (${peers.length} peers in table)`
)

for (const peer of peers) {
// Randomly distributed selector based on peer ID
Expand Down
30 changes: 23 additions & 7 deletions packages/devp2p/src/rlpx/rlpx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import LRUCache from 'lru-cache'
import Common from '@ethereumjs/common'
// note: relative path only valid in .js file in dist
const { version: pVersion } = require('../../package.json')
import { pk2id, createDeferred, formatLogId } from '../util'
import { pk2id, createDeferred, formatLogId, buffer2int } from '../util'
import { Peer, DISCONNECT_REASONS, Capabilities } from './peer'
import { DPT, PeerInfo } from '../dpt'

Expand All @@ -31,6 +31,7 @@ export interface RLPxOptions {
export class RLPx extends EventEmitter {
_privateKey: Buffer
_id: Buffer

_timeout: number
_maxPeers: number
_clientId: Buffer
Expand All @@ -39,11 +40,14 @@ export class RLPx extends EventEmitter {
_common: Common
_listenPort: number | null
_dpt: DPT | null

_peersLRU: LRUCache<string, boolean>
_peersQueue: { peer: PeerInfo; ts: number }[]
_server: net.Server | null
_peers: Map<string, net.Socket | Peer>

_refillIntervalId: NodeJS.Timeout
_refillIntervalSelectionCounter: number = 0

constructor(privateKey: Buffer, options: RLPxOptions) {
super()
Expand Down Expand Up @@ -98,7 +102,9 @@ export class RLPx extends EventEmitter {
this._peers = new Map()
this._peersQueue = []
this._peersLRU = new LRUCache({ max: 25000 })
this._refillIntervalId = setInterval(() => this._refillConnections(), ms('10s'))
const REFILL_INTERVALL = ms('10s')
const refillIntervalSubdivided = Math.floor(REFILL_INTERVALL / 10)
this._refillIntervalId = setInterval(() => this._refillConnections(), refillIntervalSubdivided)
}

listen(...args: any[]) {
Expand Down Expand Up @@ -261,17 +267,27 @@ export class RLPx extends EventEmitter {
_refillConnections() {
if (!this._isAlive()) return
debug(
`refill connections.. peers: ${this._peers.size}, queue size: ${
this._peersQueue.length
}, open slots: ${this._getOpenSlots()}`
`refill connections.. (selector ${this._refillIntervalSelectionCounter}) peers: ${
this._peers.size
}, queue size: ${this._peersQueue.length}, open slots: ${this._getOpenSlots()}`
)
// Rotating selection counter going in loop from 0..9
this._refillIntervalSelectionCounter = (this._refillIntervalSelectionCounter + 1) % 10

this._peersQueue = this._peersQueue.filter((item) => {
if (this._getOpenSlots() === 0) return true
if (item.ts > Date.now()) return true

this._connectToPeer(item.peer)
return false
// Randomly distributed selector based on peer ID
// to decide on subdivided execution
const selector = buffer2int((item.peer.id! as Buffer).slice(0, 1)) % 10
if (selector === this._refillIntervalSelectionCounter) {
this._connectToPeer(item.peer)
return false
} else {
// Still keep peer in queue
return true
}
})
}
}

0 comments on commit 838d6c4

Please sign in to comment.