From 75d224b86c7c746db1a11eccdac420e78dfb5c83 Mon Sep 17 00:00:00 2001 From: holgerd77 Date: Fri, 8 Jan 2021 12:19:54 +0100 Subject: [PATCH 1/5] devp2p -> connection reliability: RLPx type improvements --- packages/devp2p/src/rlpx/peer.ts | 1 + packages/devp2p/src/rlpx/rlpx.ts | 12 +++++++----- packages/devp2p/src/util.ts | 11 ++++++++++- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/packages/devp2p/src/rlpx/peer.ts b/packages/devp2p/src/rlpx/peer.ts index c8eb153c3f..a85f6c8b2a 100644 --- a/packages/devp2p/src/rlpx/peer.ts +++ b/packages/devp2p/src/rlpx/peer.ts @@ -529,6 +529,7 @@ export class Peer extends EventEmitter { break } } catch (err) { + this.disconnect(DISCONNECT_REASONS.SUBPROTOCOL_ERROR) debug(`Error on peer socket data handling: ${err}`) this.emit('error', err) } diff --git a/packages/devp2p/src/rlpx/rlpx.ts b/packages/devp2p/src/rlpx/rlpx.ts index 08b87f9369..b930545be3 100644 --- a/packages/devp2p/src/rlpx/rlpx.ts +++ b/packages/devp2p/src/rlpx/rlpx.ts @@ -80,9 +80,11 @@ export class RLPx extends EventEmitter { if (this._getOpenSlots() > 0) return this._connectToPeer(peer) this._peersQueue.push({ peer: peer, ts: 0 }) // save to queue }) - this._dpt.on('peer:removed', (peer: any) => { + this._dpt.on('peer:removed', (peer: PeerInfo) => { // remove from queue - this._peersQueue = this._peersQueue.filter((item: any) => !item.peer.id.equals(peer.id)) + this._peersQueue = this._peersQueue.filter( + (item) => !(item.peer.id! as Buffer).equals(peer.id as Buffer) + ) }) } @@ -259,12 +261,12 @@ export class RLPx extends EventEmitter { _refillConnections() { if (!this._isAlive()) return debug( - `refill connections.. queue size: ${this._peersQueue.length}, peers: ${ - this._peers.size + `refill connections.. peers: ${this._peers.size}, queue size: ${ + this._peersQueue.length }, open slots: ${this._getOpenSlots()}` ) - this._peersQueue = this._peersQueue.filter((item: any) => { + this._peersQueue = this._peersQueue.filter((item) => { if (this._getOpenSlots() === 0) return true if (item.ts > Date.now()) return true diff --git a/packages/devp2p/src/util.ts b/packages/devp2p/src/util.ts index 71ad48a824..3e008bd4b0 100644 --- a/packages/devp2p/src/util.ts +++ b/packages/devp2p/src/util.ts @@ -3,6 +3,8 @@ import { randomBytes } from 'crypto' import { privateKeyVerify, publicKeyConvert } from 'secp256k1' import createKeccakHash from 'keccak' import { decode } from 'rlp' +import { ETH } from './eth' +import { LES } from './les' export function keccak256(...buffers: Buffer[]) { const buffer = Buffer.concat(buffers) @@ -53,7 +55,14 @@ export function xor(a: Buffer, b: any): Buffer { return buffer } -export function assertEq(expected: any, actual: any, msg: string, debug: any): void { +type assertInput = Buffer | Buffer[] | ETH.StatusMsg | LES.Status | number | null + +export function assertEq( + expected: assertInput, + actual: assertInput, + msg: string, + debug: Function +): void { let message if (Buffer.isBuffer(expected) && Buffer.isBuffer(actual)) { if (expected.equals(actual)) return From 838d6c4a0d8f3b714ace2d8381739b217748d23a Mon Sep 17 00:00:00 2001 From: holgerd77 Date: Fri, 8 Jan 2021 12:50:29 +0100 Subject: [PATCH 2/5] devp2p -> connection reliability: subdivided interval calls to refill RLPx peer connections to improve networking distribution and connection reliability --- packages/devp2p/src/dpt/dpt.ts | 4 +++- packages/devp2p/src/rlpx/rlpx.ts | 30 +++++++++++++++++++++++------- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/packages/devp2p/src/dpt/dpt.ts b/packages/devp2p/src/dpt/dpt.ts index 4e2dbad160..e0373b3f82 100644 --- a/packages/devp2p/src/dpt/dpt.ts +++ b/packages/devp2p/src/dpt/dpt.ts @@ -173,7 +173,9 @@ export class DPT extends EventEmitter { this._refreshIntervalSelectionCounter = (this._refreshIntervalSelectionCounter + 1) % 10 const peers = this.getPeers() - debug(`call .refresh (${peers.length} peers in table)`) + debug( + `call .refresh() (selector ${this._refreshIntervalSelectionCounter}) (${peers.length} peers in table)` + ) for (const peer of peers) { // Randomly distributed selector based on peer ID diff --git a/packages/devp2p/src/rlpx/rlpx.ts b/packages/devp2p/src/rlpx/rlpx.ts index b930545be3..0786945f1d 100644 --- a/packages/devp2p/src/rlpx/rlpx.ts +++ b/packages/devp2p/src/rlpx/rlpx.ts @@ -8,7 +8,7 @@ import LRUCache from 'lru-cache' import Common from '@ethereumjs/common' // note: relative path only valid in .js file in dist const { version: pVersion } = require('../../package.json') -import { pk2id, createDeferred, formatLogId } from '../util' +import { pk2id, createDeferred, formatLogId, buffer2int } from '../util' import { Peer, DISCONNECT_REASONS, Capabilities } from './peer' import { DPT, PeerInfo } from '../dpt' @@ -31,6 +31,7 @@ export interface RLPxOptions { export class RLPx extends EventEmitter { _privateKey: Buffer _id: Buffer + _timeout: number _maxPeers: number _clientId: Buffer @@ -39,11 +40,14 @@ export class RLPx extends EventEmitter { _common: Common _listenPort: number | null _dpt: DPT | null + _peersLRU: LRUCache _peersQueue: { peer: PeerInfo; ts: number }[] _server: net.Server | null _peers: Map + _refillIntervalId: NodeJS.Timeout + _refillIntervalSelectionCounter: number = 0 constructor(privateKey: Buffer, options: RLPxOptions) { super() @@ -98,7 +102,9 @@ export class RLPx extends EventEmitter { this._peers = new Map() this._peersQueue = [] this._peersLRU = new LRUCache({ max: 25000 }) - this._refillIntervalId = setInterval(() => this._refillConnections(), ms('10s')) + const REFILL_INTERVALL = ms('10s') + const refillIntervalSubdivided = Math.floor(REFILL_INTERVALL / 10) + this._refillIntervalId = setInterval(() => this._refillConnections(), refillIntervalSubdivided) } listen(...args: any[]) { @@ -261,17 +267,27 @@ export class RLPx extends EventEmitter { _refillConnections() { if (!this._isAlive()) return debug( - `refill connections.. peers: ${this._peers.size}, queue size: ${ - this._peersQueue.length - }, open slots: ${this._getOpenSlots()}` + `refill connections.. (selector ${this._refillIntervalSelectionCounter}) peers: ${ + this._peers.size + }, queue size: ${this._peersQueue.length}, open slots: ${this._getOpenSlots()}` ) + // Rotating selection counter going in loop from 0..9 + this._refillIntervalSelectionCounter = (this._refillIntervalSelectionCounter + 1) % 10 this._peersQueue = this._peersQueue.filter((item) => { if (this._getOpenSlots() === 0) return true if (item.ts > Date.now()) return true - this._connectToPeer(item.peer) - return false + // Randomly distributed selector based on peer ID + // to decide on subdivided execution + const selector = buffer2int((item.peer.id! as Buffer).slice(0, 1)) % 10 + if (selector === this._refillIntervalSelectionCounter) { + this._connectToPeer(item.peer) + return false + } else { + // Still keep peer in queue + return true + } }) } } From 88e5f039eb124f03f7dda88047b6e19f1f1e4195 Mon Sep 17 00:00:00 2001 From: holgerd77 Date: Fri, 8 Jan 2021 12:58:04 +0100 Subject: [PATCH 3/5] client -> vm execution: fixed firstBlock number and hash association on execution log msg output --- packages/client/lib/sync/fullsync.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/client/lib/sync/fullsync.ts b/packages/client/lib/sync/fullsync.ts index 9075bf235a..15f61ada56 100644 --- a/packages/client/lib/sync/fullsync.ts +++ b/packages/client/lib/sync/fullsync.ts @@ -71,7 +71,7 @@ export class FullSynchronizer extends Synchronizer { let oldHead = Buffer.alloc(0) const newHeadBlock = await this.vm.blockchain.getHead() let newHead = newHeadBlock.hash() - const firstHeadBlock = newHeadBlock + let firstHeadBlock = newHeadBlock let lastHeadBlock = newHeadBlock while (!newHead.equals(oldHead) && !this.stopSyncing) { oldHead = newHead @@ -79,6 +79,9 @@ export class FullSynchronizer extends Synchronizer { await this.vmPromise const headBlock = await this.vm.blockchain.getHead() newHead = headBlock.hash() + if (blockCounter === 0) { + firstHeadBlock = headBlock + } // check if we did run a new block: if (!newHead.equals(oldHead)) { blockCounter += 1 From 06f31ea42fe5617b784ec4fd07de21fe1221ffdd Mon Sep 17 00:00:00 2001 From: holgerd77 Date: Fri, 8 Jan 2021 15:09:34 +0100 Subject: [PATCH 4/5] devp2p -> connection reliability: fixed an error in DPT not properly banning old peers and replacing with a new peer on KBucket ping --- packages/devp2p/src/dpt/dpt.ts | 14 +++++++++----- packages/devp2p/src/dpt/kbucket.ts | 4 ++-- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/packages/devp2p/src/dpt/dpt.ts b/packages/devp2p/src/dpt/dpt.ts index e0373b3f82..69d3c7ddef 100644 --- a/packages/devp2p/src/dpt/dpt.ts +++ b/packages/devp2p/src/dpt/dpt.ts @@ -68,7 +68,7 @@ export class DPT extends EventEmitter { this._kbucket = new KBucket(this._id) this._kbucket.on('added', (peer: PeerInfo) => this.emit('peer:added', peer)) this._kbucket.on('removed', (peer: PeerInfo) => this.emit('peer:removed', peer)) - this._kbucket.on('ping', this._onKBucketPing) + this._kbucket.on('ping', this._onKBucketPing.bind(this)) this._server = new DPTServer(this, this.privateKey, { timeout: options.timeout, @@ -77,7 +77,7 @@ export class DPT extends EventEmitter { }) this._server.once('listening', () => this.emit('listening')) this._server.once('close', () => this.emit('close')) - this._server.on('peers', (peers) => this._onServerPeers(peers)) + this._server.on('peers', (peers, remote) => this._onServerPeers(peers)) this._server.on('error', (err) => this.emit('error', err)) const refreshIntervalSubdivided = Math.floor((options.refreshInterval ?? ms('60s')) / 10) @@ -108,15 +108,19 @@ export class DPT extends EventEmitter { }) .then(() => { if (++count < oldPeers.length) return - if (err === null) this.banlist.add(newPeer, ms('5m')) else this._kbucket.add(newPeer) }) } } - _onServerPeers(peers: any[]): void { - for (const peer of peers) this.addPeer(peer).catch(() => {}) + _onServerPeers(peers: PeerInfo[]): void { + const ms = 0 + for (const peer of peers) { + this.addPeer(peer).catch((error) => { + this.emit('error', error ) + }) + } } async bootstrap(peer: PeerInfo): Promise { diff --git a/packages/devp2p/src/dpt/kbucket.ts b/packages/devp2p/src/dpt/kbucket.ts index 07eb8cb435..8f10c6674d 100644 --- a/packages/devp2p/src/dpt/kbucket.ts +++ b/packages/devp2p/src/dpt/kbucket.ts @@ -32,8 +32,8 @@ export class KBucket extends EventEmitter { this.emit('removed', peer) }) - this._kbucket.on('ping', (oldPeers: PeerInfo[], newPeer: PeerInfo) => { - this.emit('ping', { oldPeers, newPeer }) + this._kbucket.on('ping', (oldPeers: PeerInfo[], newPeer: PeerInfo | undefined) => { + this.emit('ping', oldPeers, newPeer) }) } From c110b70b86b09d53f117b73568390e846b33cf31 Mon Sep 17 00:00:00 2001 From: holgerd77 Date: Fri, 8 Jan 2021 15:29:22 +0100 Subject: [PATCH 5/5] devp2p -> connection reliability: distribute network traffic on DPT additions of new neighbour peers --- packages/devp2p/src/dpt/dpt.ts | 16 ++++++++++------ packages/devp2p/src/dpt/kbucket.ts | 2 +- .../devp2p/test/integration/dpt-simulator.ts | 13 ++++++++----- packages/devp2p/test/integration/util.ts | 4 ++-- 4 files changed, 21 insertions(+), 14 deletions(-) diff --git a/packages/devp2p/src/dpt/dpt.ts b/packages/devp2p/src/dpt/dpt.ts index 69d3c7ddef..da20fa5b19 100644 --- a/packages/devp2p/src/dpt/dpt.ts +++ b/packages/devp2p/src/dpt/dpt.ts @@ -77,7 +77,7 @@ export class DPT extends EventEmitter { }) this._server.once('listening', () => this.emit('listening')) this._server.once('close', () => this.emit('close')) - this._server.on('peers', (peers, remote) => this._onServerPeers(peers)) + this._server.on('peers', (peers) => this._onServerPeers(peers)) this._server.on('error', (err) => this.emit('error', err)) const refreshIntervalSubdivided = Math.floor((options.refreshInterval ?? ms('60s')) / 10) @@ -115,12 +115,16 @@ export class DPT extends EventEmitter { } _onServerPeers(peers: PeerInfo[]): void { - const ms = 0 + const DIFF_TIME_MS = 200 + let ms = 0 for (const peer of peers) { - this.addPeer(peer).catch((error) => { - this.emit('error', error ) - }) - } + setTimeout(() => { + this.addPeer(peer).catch((error) => { + this.emit('error', error) + }) + }, ms) + ms += DIFF_TIME_MS + } } async bootstrap(peer: PeerInfo): Promise { diff --git a/packages/devp2p/src/dpt/kbucket.ts b/packages/devp2p/src/dpt/kbucket.ts index 8f10c6674d..f536becde3 100644 --- a/packages/devp2p/src/dpt/kbucket.ts +++ b/packages/devp2p/src/dpt/kbucket.ts @@ -32,7 +32,7 @@ export class KBucket extends EventEmitter { this.emit('removed', peer) }) - this._kbucket.on('ping', (oldPeers: PeerInfo[], newPeer: PeerInfo | undefined) => { + this._kbucket.on('ping', (oldPeers: PeerInfo[], newPeer: PeerInfo | undefined) => { this.emit('ping', oldPeers, newPeer) }) } diff --git a/packages/devp2p/test/integration/dpt-simulator.ts b/packages/devp2p/test/integration/dpt-simulator.ts index 290f447491..40a209ff92 100644 --- a/packages/devp2p/test/integration/dpt-simulator.ts +++ b/packages/devp2p/test/integration/dpt-simulator.ts @@ -32,7 +32,7 @@ test('DPT: remove node', async (t) => { async.series( [ function (cb) { - dpts[0].on('peer:added', function (peer: any) { + dpts[0].on('peer:added', function (peer) { dpts[0].removePeer(peer) cb(null) }) @@ -64,13 +64,13 @@ test('DPT: ban node', async (t) => { async.series( [ function (cb) { - dpts[0].on('peer:added', function (peer: any) { + dpts[0].on('peer:added', function (peer) { dpts[0].banPeer(peer) cb(null) }) }, function (cb) { - dpts[0].on('peer:removed', function (peer: any) { + dpts[0].on('peer:removed', function (peer) { t.equal(dpts[0].banlist.has(peer), true, 'ban-list should contain peer') t.equal( dpts[0].getPeers().length, @@ -150,11 +150,14 @@ test('DPT: simulate bootstrap', async (t) => { } await delay(250) - util.destroyDPTs(dpts) // dpts.forEach((dpt, i) => console.log(`${i}:${dpt.getPeers().length}`)) - for (const dpt of dpts) + for (const dpt of dpts) { t.equal(dpt.getPeers().length, numDPTs, 'Peers should be distributed to all DPTs') + } + await delay(1000) + + util.destroyDPTs(dpts) t.end() }) diff --git a/packages/devp2p/test/integration/util.ts b/packages/devp2p/test/integration/util.ts index 5991cc40fc..b8602e8246 100644 --- a/packages/devp2p/test/integration/util.ts +++ b/packages/devp2p/test/integration/util.ts @@ -5,7 +5,7 @@ import Common from '@ethereumjs/common' export const localhost = '127.0.0.1' export const basePort = 30306 -export function getTestDPTs(numDPTs: any) { +export function getTestDPTs(numDPTs: number) { const dpts = [] for (let i = 0; i < numDPTs; ++i) { @@ -30,7 +30,7 @@ export function initTwoPeerDPTSetup() { return dpts } -export function destroyDPTs(dpts: any) { +export function destroyDPTs(dpts: DPT[]) { for (const dpt of dpts) dpt.destroy() }