Skip to content

Commit

Permalink
Merge pull request #91 from ChainSafe/cayman/gs1.1-outbound-mesh-quota
Browse files Browse the repository at this point in the history
gs1.1 outbound mesh quota
  • Loading branch information
wemeetagain committed Jun 19, 2020
2 parents a30801b + 5c6df19 commit b25f9d0
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 47 deletions.
17 changes: 8 additions & 9 deletions test/gossip.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ describe('gossip', () => {
nodeA.log.restore()
})

it('should send piggyback gossip into other sent messages', async function () {
it('should send piggyback control into other sent messages', async function () {
this.timeout(10000)
const nodeA = nodes[0]
const topic = 'Z'
Expand All @@ -79,21 +79,20 @@ describe('gossip', () => {
const nodeB = nodes.find((n) => n.peerId.toB58String() === peerB.id.toB58String())

// set spy
sinon.spy(nodeB, 'log')
sinon.spy(nodeA, '_piggybackControl')

// manually add control message to be sent to peerB
nodeA.control.set(peerB, { graft: [{ topicID: topic }] })
const graft = { graft: [{ topicID: topic }] }
nodeA.control.set(peerB, graft)

await nodeA.publish(topic, Buffer.from('hey'))

await new Promise((resolve) => nodeA.once('gossipsub:heartbeat', resolve))
expect(nodeB.log.callCount).to.be.gt(1)
expect(nodeA._piggybackControl.callCount).to.be.equal(1)
// expect control message to be sent alongside published message
const call = nodeB.log.getCalls().find((call) => call.args[0] === 'GRAFT: Add mesh link from %s in %s')
expect(call).to.not.equal(undefined)
expect(call.args[1]).to.equal(nodeA.peerId.toB58String())
const call = nodeA._piggybackControl.getCalls()[0]
expect(call.args[2].graft).to.deep.equal(graft.graft)

// unset spy
nodeB.log.restore()
nodeA._piggybackControl.restore()
})
})
163 changes: 134 additions & 29 deletions ts/heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,52 +75,157 @@ export class Heartbeat {
// that hasn't been piggybacked since the last heartbeat
this.gossipsub._flush()

/**
* @type {Map<Peer, Array<String>>}
*/
// cache scores throught the heartbeat
const scores = new Map<string, number>()
const getScore = (id: string): number => {
let s = scores.get(id)
if (s === undefined) {
s = this.gossipsub.score.score(id)
scores.set(id, s)
}
return s
}

const tograft = new Map<Peer, string[]>()
const toprune = new Map<Peer, string[]>()

// maintain the mesh for topics we have joined
this.gossipsub.mesh.forEach((peers, topic) => {
// prune/graft helper functions (defined per topic)
const prunePeer = (p: Peer): void => {
const id = p.id.toB58String()
this.gossipsub.log(
'HEARTBEAT: Remove mesh link to %s in %s',
id, topic
)
// update peer score
this.gossipsub.score.prune(id, topic)
// remove peer from mesh
peers.delete(p)
// add to toprune
const topics = toprune.get(p)
if (!topics) {
toprune.set(p, [topic])
} else {
topics.push(topic)
}
}
const graftPeer = (p: Peer): void => {
const id = p.id.toB58String()
this.gossipsub.log(
'HEARTBEAT: Add mesh link to %s in %s',
id, topic
)
// update peer score
this.gossipsub.score.graft(id, topic)
// add peer to mesh
peers.add(p)
// add to tograft
const topics = tograft.get(p)
if (!topics) {
tograft.set(p, [topic])
} else {
topics.push(topic)
}
}

// drop all peers with negative score
peers.forEach(p => {
const id = p.id.toB58String()
const score = getScore(id)
if (score < 0) {
this.gossipsub.log(
'HEARTBEAT: Prune peer %s with negative score: score=%d, topic=%s',
id, score, topic
)
prunePeer(p)
}
})

// do we have enough peers?
if (peers.size < constants.GossipsubDlo) {
const ineed = constants.GossipsubD - peers.size
const peersSet = getGossipPeers(this.gossipsub, topic, ineed)
peersSet.forEach((peer) => {
// add topic peers not already in mesh
if (peers.has(peer)) {
return
}

this.gossipsub.log('HEARTBEAT: Add mesh link to %s in %s', peer.id.toB58String(), topic)
peers.add(peer)
const peerGrafts = tograft.get(peer)
if (!peerGrafts) {
tograft.set(peer, [topic])
} else {
peerGrafts.push(topic)
}
const peersSet = getGossipPeers(this.gossipsub, topic, ineed, p => {
// filter out mesh peers, peers with negative score
return !peers.has(p) && getScore(p.id.toB58String()) >= 0
})

peersSet.forEach(graftPeer)
}

// do we have to many peers?
if (peers.size > constants.GossipsubDhi) {
const idontneed = peers.size - constants.GossipsubD
let peersArray = Array.from(peers)
peersArray = shuffle(peersArray)
peersArray = peersArray.slice(0, idontneed)
// sort by score
peersArray.sort((a, b) => getScore(b.id.toB58String()) - getScore(a.id.toB58String()))
// We keep the first D_score peers by score and the remaining up to D randomly
// under the constraint that we keep D_out peers in the mesh (if we have that many)
peersArray = peersArray.slice(0, constants.GossipsubDscore).concat(
shuffle(peersArray.slice(constants.GossipsubDscore))
)

peersArray.forEach((peer) => {
this.gossipsub.log('HEARTBEAT: Remove mesh link to %s in %s', peer.id.toB58String(), topic)
peers.delete(peer)
const peerPrunes = toprune.get(peer)
if (!peerPrunes) {
toprune.set(peer, [topic])
} else {
peerPrunes.push(topic)
// count the outbound peers we are keeping
let outbound = 0
peersArray.slice(0, constants.GossipsubD).forEach(p => {
if (this.gossipsub.outbound.get(p)) {
outbound++
}
})

// if it's less than D_out, bubble up some outbound peers from the random selection
if (outbound < constants.GossipsubDout) {
const rotate = (i: number): void => {
// rotate the peersArray to the right and put the ith peer in the front
const p = peersArray[i]
for (let j = i; j > 0; j--) {
peersArray[j] = peersArray[j - 1]
}
peersArray[0] = p
}

// first bubble up all outbound peers already in the selection to the front
if (outbound > 0) {
let ihave = outbound
for (let i = 1; i < constants.GossipsubD && ihave > 0; i++) {
if (this.gossipsub.outbound.get(peersArray[i])) {
rotate(i)
ihave--
}
}
}

// now bubble up enough outbound peers outside the selection to the front
let ineed = constants.GossipsubD - outbound
for (let i = constants.GossipsubD; i < peersArray.length && ineed > 0; i++) {
if (this.gossipsub.outbound.get(peersArray[i])) {
rotate(i)
ineed--
}
}
}

// prune the excess peers
peersArray.slice(0, constants.GossipsubD).forEach(prunePeer)
}

// do we have enough outbound peers?
if (peers.size >= constants.GossipsubDlo) {
// count the outbound peers we have
let outbound = 0
peers.forEach(p => {
if (this.gossipsub.outbound.get(p)) {
outbound++
}
})

// if it's less than D_out, select some peers with outbound connections and graft them
if (outbound < constants.GossipsubDout) {
const ineed = constants.GossipsubDout - outbound
getGossipPeers(this.gossipsub, topic, ineed, (p: Peer): boolean => {
// filter our current mesh peers and peers with negative score
return !peers.has(p) && getScore(p.id.toB58String()) >= 0
}).forEach(graftPeer)
}
}

this.gossipsub._emitGossip(topic, peers)
Expand Down
70 changes: 63 additions & 7 deletions ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ class Gossipsub extends BasicPubsub {
lastpub: Map<string, number>
gossip: Map<Peer, ControlIHave[]>
control: Map<Peer, ControlMessage>
outbound: Map<Peer, boolean>
score: PeerScore
_connectionManager: ConnectionManager
_options: GossipOptions

public static multicodec: string = constants.GossipsubIDv10
Expand Down Expand Up @@ -136,6 +138,13 @@ class Gossipsub extends BasicPubsub {
*/
this.control = new Map()

/**
* Connection direction cache, marks peers with outbound connections
*
* @type {Map<Peer, boolean>}
*/
this.outbound = new Map()

/**
* Use the overriden mesgIdFn or the default one.
*/
Expand All @@ -152,6 +161,11 @@ class Gossipsub extends BasicPubsub {
*/
this.heartbeat = new Heartbeat(this)

/**
* Connection manager
*/
this._connectionManager = connectionManager

/**
* Peer score tracking
*/
Expand All @@ -170,6 +184,18 @@ class Gossipsub extends BasicPubsub {
// Add to peer scoring
this.score.addPeer(peerId.toB58String())

// track the connection direction
let outbound = false
for (const c of this._connectionManager.getAll(peerId)) {
if (c.stat.direction === 'outbound') {
if (Array.from(c.registry.values()).some(rvalue => protocols.includes(rvalue.protocol))) {
outbound = true
break
}
}
}
this.outbound.set(p, outbound)

return p
}

Expand Down Expand Up @@ -198,6 +224,8 @@ class Gossipsub extends BasicPubsub {
this.gossip.delete(peer)
// Remove from control mapping
this.control.delete(peer)
// Remove from outbound tracking
this.outbound.delete(peer)

// Remove from peer scoring
this.score.removePeer(peer.id.toB58String())
Expand Down Expand Up @@ -367,20 +395,47 @@ class Gossipsub extends BasicPubsub {
*/
_handleGraft (peer: Peer, graft: ControlGraft[]): ControlPrune[] | undefined {
const prune: string[] = []
const id = peer.id.toB58String()
const score = this.score.score(id)

graft.forEach(({ topicID }) => {
if (!topicID) {
return
}
const peers = this.mesh.get(topicID)
if (!peers) {
const peersInMesh = this.mesh.get(topicID)
if (!peersInMesh) {
// spam hardening: ignore GRAFTs for unknown topics
return
}

// check if peer is already in the mesh; if so do nothing
if (peersInMesh.has(peer)) {
return
}

// check the score
if (score < 0) {
// we don't GRAFT peers with negative score
this.log(
'GRAFT: ignoring peer %s with negative score: score=%d, topic=%s',
id, score, topicID
)
// we do send them PRUNE however, because it's a matter of protocol correctness
prune.push(topicID)
} else {
this.log('GRAFT: Add mesh link from %s in %s', peer.id.toB58String(), topicID)
peers.add(peer)
peer.topics.add(topicID)
this.mesh.set(topicID, peers)
return
}

// check the number of mesh peers; if it is at (or over) Dhi, we only accept grafts
// from peers with outbound connections; this is a defensive check to restrict potential
// mesh takeover attacks combined with love bombing
if (peersInMesh.size >= constants.GossipsubDhi && !this.outbound.get(peer)) {
prune.push(topicID)
return
}

this.log('GRAFT: Add mesh link from %s in %s', id, topicID)
peersInMesh.add(peer)
peer.topics.add(topicID)
})

if (!prune.length) {
Expand Down Expand Up @@ -443,6 +498,7 @@ class Gossipsub extends BasicPubsub {
this.lastpub = new Map()
this.gossip = new Map()
this.control = new Map()
this.outbound = new Map()
}

/**
Expand Down
8 changes: 6 additions & 2 deletions ts/score/peerScore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ const log = debug('libp2p:gossipsub:score')
interface Connection {
remoteAddr: Multiaddr
remotePeer: PeerId
stat: {
direction: 'inbound' | 'outbound'
}
registry: Map<string, {protocol: string}>
}

export interface ConnectionManager {
getAll(id: string): Connection[]
getAll(peerId: PeerId): Connection[]
// eslint-disable-next-line @typescript-eslint/ban-types
on(evt: string, fn: Function): void
// eslint-disable-next-line @typescript-eslint/ban-types
Expand Down Expand Up @@ -518,7 +522,7 @@ export class PeerScore {
* @returns {Array<string>}
*/
_getIPs (id: string): string[] {
return this._connectionManager.getAll(id)
return this._connectionManager.getAll(PeerId.createFromB58String(id))
.map(c => c.remoteAddr.toOptions().host)
}

Expand Down

0 comments on commit b25f9d0

Please sign in to comment.