Skip to content

Commit

Permalink
feat: add prune backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
wemeetagain committed Jun 23, 2020
1 parent 12b4fa3 commit 4eb492c
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 20 deletions.
19 changes: 15 additions & 4 deletions ts/heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ export class Heartbeat {
* @returns {void}
*/
_heartbeat (): void {
this.gossipsub.heartbeatTicks++

// cache scores throught the heartbeat
const scores = new Map<string, number>()
const getScore = (id: string): number => {
Expand All @@ -85,6 +87,9 @@ export class Heartbeat {
const tograft = new Map<Peer, string[]>()
const toprune = new Map<Peer, string[]>()

// clean up expired backoffs
this.gossipsub._clearBackoff()

// maintain the mesh for topics we have joined
this.gossipsub.mesh.forEach((peers, topic) => {
// prune/graft helper functions (defined per topic)
Expand All @@ -96,6 +101,8 @@ export class Heartbeat {
)
// update peer score
this.gossipsub.score.prune(id, topic)
// add prune backoff record
this.gossipsub._addBackoff(id, topic)
// remove peer from mesh
peers.delete(p)
// add to toprune
Expand Down Expand Up @@ -140,10 +147,12 @@ export class Heartbeat {

// do we have enough peers?
if (peers.size < constants.GossipsubDlo) {
const backoff = this.gossipsub.backoff.get(topic)
const ineed = constants.GossipsubD - peers.size
const peersSet = getGossipPeers(this.gossipsub, topic, ineed, p => {
// filter out mesh peers, peers with negative score
return !peers.has(p) && getScore(p.id.toB58String()) >= 0
const id = p.id.toB58String()
// filter out mesh peers, peers we are backing off, peers with negative score
return !peers.has(p) && (!backoff || !backoff.has(id)) && getScore(id) >= 0
})

peersSet.forEach(graftPeer)
Expand Down Expand Up @@ -217,9 +226,11 @@ export class Heartbeat {
// if it's less than D_out, select some peers with outbound connections and graft them
if (outbound < constants.GossipsubDout) {
const ineed = constants.GossipsubDout - outbound
const backoff = this.gossipsub.backoff.get(topic)
getGossipPeers(this.gossipsub, topic, ineed, (p: Peer): boolean => {
// filter our current mesh peers and peers with negative score
return !peers.has(p) && getScore(p.id.toB58String()) >= 0
const id = p.id.toB58String()
// filter our current mesh peers, peers we are backing off, peers with negative score
return !peers.has(p) && (!backoff || !backoff.has(id)) && getScore(id) >= 0
}).forEach(graftPeer)
}
}
Expand Down
148 changes: 132 additions & 16 deletions ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ class Gossipsub extends BasicPubsub {
lastpub: Map<string, number>
gossip: Map<Peer, ControlIHave[]>
control: Map<Peer, ControlMessage>
backoff: Map<string, Map<string, number>>
outbound: Map<Peer, boolean>
score: PeerScore
heartbeatTicks: number
_connectionManager: ConnectionManager
_options: GossipOptions

Expand Down Expand Up @@ -138,6 +140,11 @@ class Gossipsub extends BasicPubsub {
*/
this.control = new Map()

/**
* Prune backoff map
*/
this.backoff = new Map()

/**
* Connection direction cache, marks peers with outbound connections
*
Expand All @@ -161,6 +168,12 @@ class Gossipsub extends BasicPubsub {
*/
this.heartbeat = new Heartbeat(this)

/**
* Number of heartbeats since the beginning of time
* This allows us to amortize some resource cleanup -- eg: backoff cleanup
*/
this.heartbeatTicks = 0

/**
* Connection manager
*/
Expand Down Expand Up @@ -224,7 +237,7 @@ class Gossipsub extends BasicPubsub {
this.gossip.delete(peer)
// Remove from control mapping
this.control.delete(peer)
// Remove from outbound tracking
// Remove from backoff mapping
this.outbound.delete(peer)

// Remove from peer scoring
Expand Down Expand Up @@ -397,6 +410,7 @@ class Gossipsub extends BasicPubsub {
const prune: string[] = []
const id = peer.id.toB58String()
const score = this.score.score(id)
const now = this._now()

graft.forEach(({ topicID }) => {
if (!topicID) {
Expand All @@ -413,6 +427,24 @@ class Gossipsub extends BasicPubsub {
return
}

// make sure we are not backing off that peer
const expire = this.backoff.get(topicID)?.get(id)
if (typeof expire === 'number' && now < expire) {
this.log('GRAFT: ignoring backed off peer %s', id)
// add behavioral penalty
this.score.addPenalty(id, 1)
// check the flood cutoff -- is the GRAFT coming too fast?
const floodCutoff = expire + constants.GossipsubGraftFloodThreshold - constants.GossipsubPruneBackoff
if (now < floodCutoff) {
// extra penalty
this.score.addPenalty(id, 1)
}
// refresh the backoff
this._addBackoff(id, topicID)
prune.push(topicID)
return
}

// check the score
if (score < 0) {
// we don't GRAFT peers with negative score
Expand All @@ -422,6 +454,8 @@ class Gossipsub extends BasicPubsub {
)
// we do send them PRUNE however, because it's a matter of protocol correctness
prune.push(topicID)
// add/refresh backoff so that we don't reGRAFT too early even if the score decays
this._addBackoff(id, topicID)
return
}

Expand All @@ -430,6 +464,7 @@ class Gossipsub extends BasicPubsub {
// mesh takeover attacks combined with love bombing
if (peersInMesh.size >= constants.GossipsubDhi && !this.outbound.get(peer)) {
prune.push(topicID)
this._addBackoff(id, topicID)
return
}

Expand All @@ -442,13 +477,7 @@ class Gossipsub extends BasicPubsub {
return
}

const buildCtrlPruneMsg = (topic: string) => {
return {
topicID: topic
}
}

return prune.map(buildCtrlPruneMsg)
return prune.map(topic => this._makePrune(id, topic))
}

/**
Expand All @@ -458,7 +487,8 @@ class Gossipsub extends BasicPubsub {
* @returns {void}
*/
_handlePrune (peer: Peer, prune: ControlPrune[]): void {
prune.forEach(({ topicID }) => {
const id = peer.id.toB58String()
prune.forEach(({ topicID, backoff }) => {
if (!topicID) {
return
}
Expand All @@ -467,6 +497,65 @@ class Gossipsub extends BasicPubsub {
this.log('PRUNE: Remove mesh link to %s in %s', peer.id.toB58String(), topicID)
peers.delete(peer)
peer.topics.delete(topicID)
// is there a backoff specified by the peer? if so obey it
if (typeof backoff === 'number' && backoff > 0) {
this._doAddBackoff(id, topicID, backoff * 1000)
} else {
this._addBackoff(id, topicID)
}
}
})
}

/**
* Add standard backoff log for a peer in a topic
* @param {string} id
* @param {string} topic
* @returns {void}
*/
_addBackoff (id: string, topic: string): void {
this._doAddBackoff(id, topic, constants.GossipsubPruneBackoff)
}

/**
* Add backoff expiry interval for a peer in a topic
* @param {string} id
* @param {string} topic
* @param {number} interval backoff duration in milliseconds
* @returns {void}
*/
_doAddBackoff (id: string, topic: string, interval: number): void {
let backoff = this.backoff.get(topic)
if (!backoff) {
backoff = new Map()
this.backoff.set(topic, backoff)
}
const expire = this._now() + interval
const existingExpire = backoff.get(id) || 0
if (existingExpire < expire) {
backoff.set(id, expire)
}
}

/**
* Clear expired backoff expiries
* @returns {void}
*/
_clearBackoff (): void {
// we only clear once every 15 ticks to avoid iterating over the maps too much
if (this.heartbeatTicks % 15 !== 0) {
return
}

const now = this._now()
this.backoff.forEach((backoff, topic) => {
backoff.forEach((expire, id) => {
if (expire < now) {
backoff.delete(id)
}
})
if (backoff.size === 0) {
this.backoff.delete(topic)
}
})
}
Expand Down Expand Up @@ -498,6 +587,7 @@ class Gossipsub extends BasicPubsub {
this.lastpub = new Map()
this.gossip = new Map()
this.control = new Map()
this.backoff = new Map()
this.outbound = new Map()
}

Expand Down Expand Up @@ -693,9 +783,9 @@ class Gossipsub extends BasicPubsub {
* @returns {void}
*/
_sendPrune (peer: Peer, topic: string): void {
const prune = [{
topicID: topic
}]
const prune = [
this._makePrune(peer.id.toB58String(), topic)
]

const out = createGossipRpc([], { prune })
this._sendRpc(peer, out)
Expand Down Expand Up @@ -755,20 +845,22 @@ class Gossipsub extends BasicPubsub {
*/
_sendGraftPrune (tograft: Map<Peer, string[]>, toprune: Map<Peer, string[]>): void {
for (const [p, topics] of tograft) {
const id = p.id.toB58String()
const graft = topics.map((topicID) => ({ topicID }))
let prune: ControlPrune[] = []
// If a peer also has prunes, process them now
const pruneMsg = toprune.get(p)
if (pruneMsg) {
prune = pruneMsg.map((topicID) => ({ topicID }))
const pruning = toprune.get(p)
if (pruning) {
prune = pruning.map((topicID) => this._makePrune(id, topicID))
toprune.delete(p)
}

const outRpc = createGossipRpc([], { graft, prune })
this._sendRpc(p, outRpc)
}
for (const [p, topics] of toprune) {
const prune = topics.map((topicID) => ({ topicID }))
const id = p.id.toB58String()
const prune = topics.map((topicID) => this._makePrune(id, topicID))
const outRpc = createGossipRpc([], { prune })
this._sendRpc(p, outRpc)
}
Expand Down Expand Up @@ -877,6 +969,30 @@ class Gossipsub extends BasicPubsub {
_now (): number {
return Date.now()
}

/**
* Make a PRUNE control message for a peer in a topic
* @param {string} id
* @param {string} topic
* @returns {ControlPrune}
*/
_makePrune (id: string, topic: string): ControlPrune {
if (this.peers.get(id)!.protocols.includes(constants.GossipsubIDv10)) {
// Gossipsub v1.0 -- no backoff, the peer won't be able to parse it anyway
return {
topicID: topic,
peers: []
}
}
// backoff is measured in seconds
// GossipsubPruneBackoff is measured in milliseconds
const backoff = constants.GossipsubPruneBackoff / 1000
return {
topicID: topic,
peers: [],
backoff: backoff
}
}
}

export = Gossipsub
7 changes: 7 additions & 0 deletions ts/message/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ export interface ControlGraft {
*/
export interface ControlPrune {
topicID?: string
peers: PeerInfo[]
backoff?: number
}

export interface PeerInfo {
peerID?: Buffer
signedPeerRecord?: Buffer
}

/**
Expand Down
7 changes: 7 additions & 0 deletions ts/message/rpc.proto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,12 @@ message RPC {
message ControlPrune {
optional string topicID = 1;
repeated PeerInfo peers = 2;
optional uint64 backoff = 3;
}
message PeerInfo {
optional bytes peerID = 1;
optional bytes signedPeerRecord = 2;
}
}`

0 comments on commit 4eb492c

Please sign in to comment.