From 4eb492ccbe1d4edf8e1ca014b866a47629066320 Mon Sep 17 00:00:00 2001 From: Cayman Date: Mon, 22 Jun 2020 19:29:54 -0500 Subject: [PATCH] feat: add prune backoff --- ts/heartbeat.ts | 19 ++++-- ts/index.ts | 148 +++++++++++++++++++++++++++++++++++----- ts/message/index.ts | 7 ++ ts/message/rpc.proto.ts | 7 ++ 4 files changed, 161 insertions(+), 20 deletions(-) diff --git a/ts/heartbeat.ts b/ts/heartbeat.ts index 5becc5b9..65e9c1b6 100644 --- a/ts/heartbeat.ts +++ b/ts/heartbeat.ts @@ -71,6 +71,8 @@ export class Heartbeat { * @returns {void} */ _heartbeat (): void { + this.gossipsub.heartbeatTicks++ + // cache scores throught the heartbeat const scores = new Map() const getScore = (id: string): number => { @@ -85,6 +87,9 @@ export class Heartbeat { const tograft = new Map() const toprune = new Map() + // clean up expired backoffs + this.gossipsub._clearBackoff() + // maintain the mesh for topics we have joined this.gossipsub.mesh.forEach((peers, topic) => { // prune/graft helper functions (defined per topic) @@ -96,6 +101,8 @@ export class Heartbeat { ) // update peer score this.gossipsub.score.prune(id, topic) + // add prune backoff record + this.gossipsub._addBackoff(id, topic) // remove peer from mesh peers.delete(p) // add to toprune @@ -140,10 +147,12 @@ export class Heartbeat { // do we have enough peers? if (peers.size < constants.GossipsubDlo) { + const backoff = this.gossipsub.backoff.get(topic) const ineed = constants.GossipsubD - peers.size const peersSet = getGossipPeers(this.gossipsub, topic, ineed, p => { - // filter out mesh peers, peers with negative score - return !peers.has(p) && getScore(p.id.toB58String()) >= 0 + const id = p.id.toB58String() + // filter out mesh peers, peers we are backing off, peers with negative score + return !peers.has(p) && (!backoff || !backoff.has(id)) && getScore(id) >= 0 }) peersSet.forEach(graftPeer) @@ -217,9 +226,11 @@ export class Heartbeat { // if it's less than D_out, select some peers with outbound connections and graft them if (outbound < constants.GossipsubDout) { const ineed = constants.GossipsubDout - outbound + const backoff = this.gossipsub.backoff.get(topic) getGossipPeers(this.gossipsub, topic, ineed, (p: Peer): boolean => { - // filter our current mesh peers and peers with negative score - return !peers.has(p) && getScore(p.id.toB58String()) >= 0 + const id = p.id.toB58String() + // filter our current mesh peers, peers we are backing off, peers with negative score + return !peers.has(p) && (!backoff || !backoff.has(id)) && getScore(id) >= 0 }).forEach(graftPeer) } } diff --git a/ts/index.ts b/ts/index.ts index 06daa4a3..b19571b1 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -42,8 +42,10 @@ class Gossipsub extends BasicPubsub { lastpub: Map gossip: Map control: Map + backoff: Map> outbound: Map score: PeerScore + heartbeatTicks: number _connectionManager: ConnectionManager _options: GossipOptions @@ -138,6 +140,11 @@ class Gossipsub extends BasicPubsub { */ this.control = new Map() + /** + * Prune backoff map + */ + this.backoff = new Map() + /** * Connection direction cache, marks peers with outbound connections * @@ -161,6 +168,12 @@ class Gossipsub extends BasicPubsub { */ this.heartbeat = new Heartbeat(this) + /** + * Number of heartbeats since the beginning of time + * This allows us to amortize some resource cleanup -- eg: backoff cleanup + */ + this.heartbeatTicks = 0 + /** * Connection manager */ @@ -224,7 +237,7 @@ class Gossipsub extends BasicPubsub { this.gossip.delete(peer) // Remove from control mapping this.control.delete(peer) - // Remove from outbound tracking + // Remove from backoff mapping this.outbound.delete(peer) // Remove from peer scoring @@ -397,6 +410,7 @@ class Gossipsub extends BasicPubsub { const prune: string[] = [] const id = peer.id.toB58String() const score = this.score.score(id) + const now = this._now() graft.forEach(({ topicID }) => { if (!topicID) { @@ -413,6 +427,24 @@ class Gossipsub extends BasicPubsub { return } + // make sure we are not backing off that peer + const expire = this.backoff.get(topicID)?.get(id) + if (typeof expire === 'number' && now < expire) { + this.log('GRAFT: ignoring backed off peer %s', id) + // add behavioral penalty + this.score.addPenalty(id, 1) + // check the flood cutoff -- is the GRAFT coming too fast? + const floodCutoff = expire + constants.GossipsubGraftFloodThreshold - constants.GossipsubPruneBackoff + if (now < floodCutoff) { + // extra penalty + this.score.addPenalty(id, 1) + } + // refresh the backoff + this._addBackoff(id, topicID) + prune.push(topicID) + return + } + // check the score if (score < 0) { // we don't GRAFT peers with negative score @@ -422,6 +454,8 @@ class Gossipsub extends BasicPubsub { ) // we do send them PRUNE however, because it's a matter of protocol correctness prune.push(topicID) + // add/refresh backoff so that we don't reGRAFT too early even if the score decays + this._addBackoff(id, topicID) return } @@ -430,6 +464,7 @@ class Gossipsub extends BasicPubsub { // mesh takeover attacks combined with love bombing if (peersInMesh.size >= constants.GossipsubDhi && !this.outbound.get(peer)) { prune.push(topicID) + this._addBackoff(id, topicID) return } @@ -442,13 +477,7 @@ class Gossipsub extends BasicPubsub { return } - const buildCtrlPruneMsg = (topic: string) => { - return { - topicID: topic - } - } - - return prune.map(buildCtrlPruneMsg) + return prune.map(topic => this._makePrune(id, topic)) } /** @@ -458,7 +487,8 @@ class Gossipsub extends BasicPubsub { * @returns {void} */ _handlePrune (peer: Peer, prune: ControlPrune[]): void { - prune.forEach(({ topicID }) => { + const id = peer.id.toB58String() + prune.forEach(({ topicID, backoff }) => { if (!topicID) { return } @@ -467,6 +497,65 @@ class Gossipsub extends BasicPubsub { this.log('PRUNE: Remove mesh link to %s in %s', peer.id.toB58String(), topicID) peers.delete(peer) peer.topics.delete(topicID) + // is there a backoff specified by the peer? if so obey it + if (typeof backoff === 'number' && backoff > 0) { + this._doAddBackoff(id, topicID, backoff * 1000) + } else { + this._addBackoff(id, topicID) + } + } + }) + } + + /** + * Add standard backoff log for a peer in a topic + * @param {string} id + * @param {string} topic + * @returns {void} + */ + _addBackoff (id: string, topic: string): void { + this._doAddBackoff(id, topic, constants.GossipsubPruneBackoff) + } + + /** + * Add backoff expiry interval for a peer in a topic + * @param {string} id + * @param {string} topic + * @param {number} interval backoff duration in milliseconds + * @returns {void} + */ + _doAddBackoff (id: string, topic: string, interval: number): void { + let backoff = this.backoff.get(topic) + if (!backoff) { + backoff = new Map() + this.backoff.set(topic, backoff) + } + const expire = this._now() + interval + const existingExpire = backoff.get(id) || 0 + if (existingExpire < expire) { + backoff.set(id, expire) + } + } + + /** + * Clear expired backoff expiries + * @returns {void} + */ + _clearBackoff (): void { + // we only clear once every 15 ticks to avoid iterating over the maps too much + if (this.heartbeatTicks % 15 !== 0) { + return + } + + const now = this._now() + this.backoff.forEach((backoff, topic) => { + backoff.forEach((expire, id) => { + if (expire < now) { + backoff.delete(id) + } + }) + if (backoff.size === 0) { + this.backoff.delete(topic) } }) } @@ -498,6 +587,7 @@ class Gossipsub extends BasicPubsub { this.lastpub = new Map() this.gossip = new Map() this.control = new Map() + this.backoff = new Map() this.outbound = new Map() } @@ -693,9 +783,9 @@ class Gossipsub extends BasicPubsub { * @returns {void} */ _sendPrune (peer: Peer, topic: string): void { - const prune = [{ - topicID: topic - }] + const prune = [ + this._makePrune(peer.id.toB58String(), topic) + ] const out = createGossipRpc([], { prune }) this._sendRpc(peer, out) @@ -755,12 +845,13 @@ class Gossipsub extends BasicPubsub { */ _sendGraftPrune (tograft: Map, toprune: Map): void { for (const [p, topics] of tograft) { + const id = p.id.toB58String() const graft = topics.map((topicID) => ({ topicID })) let prune: ControlPrune[] = [] // If a peer also has prunes, process them now - const pruneMsg = toprune.get(p) - if (pruneMsg) { - prune = pruneMsg.map((topicID) => ({ topicID })) + const pruning = toprune.get(p) + if (pruning) { + prune = pruning.map((topicID) => this._makePrune(id, topicID)) toprune.delete(p) } @@ -768,7 +859,8 @@ class Gossipsub extends BasicPubsub { this._sendRpc(p, outRpc) } for (const [p, topics] of toprune) { - const prune = topics.map((topicID) => ({ topicID })) + const id = p.id.toB58String() + const prune = topics.map((topicID) => this._makePrune(id, topicID)) const outRpc = createGossipRpc([], { prune }) this._sendRpc(p, outRpc) } @@ -877,6 +969,30 @@ class Gossipsub extends BasicPubsub { _now (): number { return Date.now() } + + /** + * Make a PRUNE control message for a peer in a topic + * @param {string} id + * @param {string} topic + * @returns {ControlPrune} + */ + _makePrune (id: string, topic: string): ControlPrune { + if (this.peers.get(id)!.protocols.includes(constants.GossipsubIDv10)) { + // Gossipsub v1.0 -- no backoff, the peer won't be able to parse it anyway + return { + topicID: topic, + peers: [] + } + } + // backoff is measured in seconds + // GossipsubPruneBackoff is measured in milliseconds + const backoff = constants.GossipsubPruneBackoff / 1000 + return { + topicID: topic, + peers: [], + backoff: backoff + } + } } export = Gossipsub diff --git a/ts/message/index.ts b/ts/message/index.ts index fb984c12..eb8484f4 100644 --- a/ts/message/index.ts +++ b/ts/message/index.ts @@ -104,6 +104,13 @@ export interface ControlGraft { */ export interface ControlPrune { topicID?: string + peers: PeerInfo[] + backoff?: number +} + +export interface PeerInfo { + peerID?: Buffer + signedPeerRecord?: Buffer } /** diff --git a/ts/message/rpc.proto.ts b/ts/message/rpc.proto.ts index 89fecf6c..a7ba771d 100644 --- a/ts/message/rpc.proto.ts +++ b/ts/message/rpc.proto.ts @@ -40,5 +40,12 @@ message RPC { message ControlPrune { optional string topicID = 1; + repeated PeerInfo peers = 2; + optional uint64 backoff = 3; + } + + message PeerInfo { + optional bytes peerID = 1; + optional bytes signedPeerRecord = 2; } }`