Skip to content

Commit

Permalink
Merge pull request #1036 from ethereumjs/network-improvements
Browse files Browse the repository at this point in the history
Network improvements
  • Loading branch information
holgerd77 authored Jan 15, 2021
2 parents 5ac5bc7 + c110b70 commit 816e0bc
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 25 deletions.
5 changes: 4 additions & 1 deletion packages/client/lib/sync/fullsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,17 @@ export class FullSynchronizer extends Synchronizer {
let oldHead = Buffer.alloc(0)
const newHeadBlock = await this.vm.blockchain.getHead()
let newHead = newHeadBlock.hash()
const firstHeadBlock = newHeadBlock
let firstHeadBlock = newHeadBlock
let lastHeadBlock = newHeadBlock
while (!newHead.equals(oldHead) && !this.stopSyncing) {
oldHead = newHead
this.vmPromise = this.vm.runBlockchain(this.vm.blockchain, 1)
await this.vmPromise
const headBlock = await this.vm.blockchain.getHead()
newHead = headBlock.hash()
if (blockCounter === 0) {
firstHeadBlock = headBlock
}
// check if we did run a new block:
if (!newHead.equals(oldHead)) {
blockCounter += 1
Expand Down
20 changes: 15 additions & 5 deletions packages/devp2p/src/dpt/dpt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export class DPT extends EventEmitter {
this._kbucket = new KBucket(this._id)
this._kbucket.on('added', (peer: PeerInfo) => this.emit('peer:added', peer))
this._kbucket.on('removed', (peer: PeerInfo) => this.emit('peer:removed', peer))
this._kbucket.on('ping', this._onKBucketPing)
this._kbucket.on('ping', this._onKBucketPing.bind(this))

this._server = new DPTServer(this, this.privateKey, {
timeout: options.timeout,
Expand Down Expand Up @@ -108,15 +108,23 @@ export class DPT extends EventEmitter {
})
.then(() => {
if (++count < oldPeers.length) return

if (err === null) this.banlist.add(newPeer, ms('5m'))
else this._kbucket.add(newPeer)
})
}
}

_onServerPeers(peers: any[]): void {
for (const peer of peers) this.addPeer(peer).catch(() => {})
_onServerPeers(peers: PeerInfo[]): void {
const DIFF_TIME_MS = 200
let ms = 0
for (const peer of peers) {
setTimeout(() => {
this.addPeer(peer).catch((error) => {
this.emit('error', error)
})
}, ms)
ms += DIFF_TIME_MS
}
}

async bootstrap(peer: PeerInfo): Promise<void> {
Expand Down Expand Up @@ -173,7 +181,9 @@ export class DPT extends EventEmitter {
this._refreshIntervalSelectionCounter = (this._refreshIntervalSelectionCounter + 1) % 10

const peers = this.getPeers()
debug(`call .refresh (${peers.length} peers in table)`)
debug(
`call .refresh() (selector ${this._refreshIntervalSelectionCounter}) (${peers.length} peers in table)`
)

for (const peer of peers) {
// Randomly distributed selector based on peer ID
Expand Down
4 changes: 2 additions & 2 deletions packages/devp2p/src/dpt/kbucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ export class KBucket extends EventEmitter {
this.emit('removed', peer)
})

this._kbucket.on('ping', (oldPeers: PeerInfo[], newPeer: PeerInfo) => {
this.emit('ping', { oldPeers, newPeer })
this._kbucket.on('ping', (oldPeers: PeerInfo[], newPeer: PeerInfo | undefined) => {
this.emit('ping', oldPeers, newPeer)
})
}

Expand Down
1 change: 1 addition & 0 deletions packages/devp2p/src/rlpx/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ export class Peer extends EventEmitter {
break
}
} catch (err) {
this.disconnect(DISCONNECT_REASONS.SUBPROTOCOL_ERROR)
debug(`Error on peer socket data handling: ${err}`)
this.emit('error', err)
}
Expand Down
36 changes: 27 additions & 9 deletions packages/devp2p/src/rlpx/rlpx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import LRUCache from 'lru-cache'
import Common from '@ethereumjs/common'
// note: relative path only valid in .js file in dist
const { version: pVersion } = require('../../package.json')
import { pk2id, createDeferred, formatLogId } from '../util'
import { pk2id, createDeferred, formatLogId, buffer2int } from '../util'
import { Peer, DISCONNECT_REASONS, Capabilities } from './peer'
import { DPT, PeerInfo } from '../dpt'

Expand All @@ -31,6 +31,7 @@ export interface RLPxOptions {
export class RLPx extends EventEmitter {
_privateKey: Buffer
_id: Buffer

_timeout: number
_maxPeers: number
_clientId: Buffer
Expand All @@ -39,11 +40,14 @@ export class RLPx extends EventEmitter {
_common: Common
_listenPort: number | null
_dpt: DPT | null

_peersLRU: LRUCache<string, boolean>
_peersQueue: { peer: PeerInfo; ts: number }[]
_server: net.Server | null
_peers: Map<string, net.Socket | Peer>

_refillIntervalId: NodeJS.Timeout
_refillIntervalSelectionCounter: number = 0

constructor(privateKey: Buffer, options: RLPxOptions) {
super()
Expand Down Expand Up @@ -80,9 +84,11 @@ export class RLPx extends EventEmitter {
if (this._getOpenSlots() > 0) return this._connectToPeer(peer)
this._peersQueue.push({ peer: peer, ts: 0 }) // save to queue
})
this._dpt.on('peer:removed', (peer: any) => {
this._dpt.on('peer:removed', (peer: PeerInfo) => {
// remove from queue
this._peersQueue = this._peersQueue.filter((item: any) => !item.peer.id.equals(peer.id))
this._peersQueue = this._peersQueue.filter(
(item) => !(item.peer.id! as Buffer).equals(peer.id as Buffer)
)
})
}

Expand All @@ -96,7 +102,9 @@ export class RLPx extends EventEmitter {
this._peers = new Map()
this._peersQueue = []
this._peersLRU = new LRUCache({ max: 25000 })
this._refillIntervalId = setInterval(() => this._refillConnections(), ms('10s'))
const REFILL_INTERVALL = ms('10s')
const refillIntervalSubdivided = Math.floor(REFILL_INTERVALL / 10)
this._refillIntervalId = setInterval(() => this._refillConnections(), refillIntervalSubdivided)
}

listen(...args: any[]) {
Expand Down Expand Up @@ -259,17 +267,27 @@ export class RLPx extends EventEmitter {
_refillConnections() {
if (!this._isAlive()) return
debug(
`refill connections.. queue size: ${this._peersQueue.length}, peers: ${
`refill connections.. (selector ${this._refillIntervalSelectionCounter}) peers: ${
this._peers.size
}, open slots: ${this._getOpenSlots()}`
}, queue size: ${this._peersQueue.length}, open slots: ${this._getOpenSlots()}`
)
// Rotating selection counter going in loop from 0..9
this._refillIntervalSelectionCounter = (this._refillIntervalSelectionCounter + 1) % 10

this._peersQueue = this._peersQueue.filter((item: any) => {
this._peersQueue = this._peersQueue.filter((item) => {
if (this._getOpenSlots() === 0) return true
if (item.ts > Date.now()) return true

this._connectToPeer(item.peer)
return false
// Randomly distributed selector based on peer ID
// to decide on subdivided execution
const selector = buffer2int((item.peer.id! as Buffer).slice(0, 1)) % 10
if (selector === this._refillIntervalSelectionCounter) {
this._connectToPeer(item.peer)
return false
} else {
// Still keep peer in queue
return true
}
})
}
}
11 changes: 10 additions & 1 deletion packages/devp2p/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { randomBytes } from 'crypto'
import { privateKeyVerify, publicKeyConvert } from 'secp256k1'
import createKeccakHash from 'keccak'
import { decode } from 'rlp'
import { ETH } from './eth'
import { LES } from './les'

export function keccak256(...buffers: Buffer[]) {
const buffer = Buffer.concat(buffers)
Expand Down Expand Up @@ -53,7 +55,14 @@ export function xor(a: Buffer, b: any): Buffer {
return buffer
}

export function assertEq(expected: any, actual: any, msg: string, debug: any): void {
type assertInput = Buffer | Buffer[] | ETH.StatusMsg | LES.Status | number | null

export function assertEq(
expected: assertInput,
actual: assertInput,
msg: string,
debug: Function
): void {
let message
if (Buffer.isBuffer(expected) && Buffer.isBuffer(actual)) {
if (expected.equals(actual)) return
Expand Down
13 changes: 8 additions & 5 deletions packages/devp2p/test/integration/dpt-simulator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ test('DPT: remove node', async (t) => {
async.series(
[
function (cb) {
dpts[0].on('peer:added', function (peer: any) {
dpts[0].on('peer:added', function (peer) {
dpts[0].removePeer(peer)
cb(null)
})
Expand Down Expand Up @@ -64,13 +64,13 @@ test('DPT: ban node', async (t) => {
async.series(
[
function (cb) {
dpts[0].on('peer:added', function (peer: any) {
dpts[0].on('peer:added', function (peer) {
dpts[0].banPeer(peer)
cb(null)
})
},
function (cb) {
dpts[0].on('peer:removed', function (peer: any) {
dpts[0].on('peer:removed', function (peer) {
t.equal(dpts[0].banlist.has(peer), true, 'ban-list should contain peer')
t.equal(
dpts[0].getPeers().length,
Expand Down Expand Up @@ -150,11 +150,14 @@ test('DPT: simulate bootstrap', async (t) => {
}

await delay(250)
util.destroyDPTs(dpts)

// dpts.forEach((dpt, i) => console.log(`${i}:${dpt.getPeers().length}`))
for (const dpt of dpts)
for (const dpt of dpts) {
t.equal(dpt.getPeers().length, numDPTs, 'Peers should be distributed to all DPTs')
}
await delay(1000)

util.destroyDPTs(dpts)

t.end()
})
4 changes: 2 additions & 2 deletions packages/devp2p/test/integration/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import Common from '@ethereumjs/common'
export const localhost = '127.0.0.1'
export const basePort = 30306

export function getTestDPTs(numDPTs: any) {
export function getTestDPTs(numDPTs: number) {
const dpts = []

for (let i = 0; i < numDPTs; ++i) {
Expand All @@ -30,7 +30,7 @@ export function initTwoPeerDPTSetup() {
return dpts
}

export function destroyDPTs(dpts: any) {
export function destroyDPTs(dpts: DPT[]) {
for (const dpt of dpts) dpt.destroy()
}

Expand Down

0 comments on commit 816e0bc

Please sign in to comment.