Skip to content

Commit

Permalink
Merge pull request #111 from ChainSafe/cayman/gs1.1-refactor-maps
Browse files Browse the repository at this point in the history
gs1.1: use string peer id consistently
  • Loading branch information
wemeetagain committed Jul 21, 2020
2 parents 6a792de + 61698eb commit 89bf606
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 194 deletions.
4 changes: 2 additions & 2 deletions test/2-nodes.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ describe('2 nodes', () => {
new Promise((resolve) => nodes[1].once('gossipsub:heartbeat', resolve))
])

expect(first(nodes[0].mesh.get(topic)).id.toB58String()).to.equal(first(nodes[0].peers).id.toB58String())
expect(first(nodes[1].mesh.get(topic)).id.toB58String()).to.equal(first(nodes[1].peers).id.toB58String())
expect(first(nodes[0].mesh.get(topic))).to.equal(first(nodes[0].peers).id.toB58String())
expect(first(nodes[1].mesh.get(topic))).to.equal(first(nodes[1].peers).id.toB58String())
})
})

Expand Down
9 changes: 4 additions & 5 deletions test/gossip.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,9 @@ describe('gossip', () => {

nodeA._pushGossip.getCalls()
.map((call) => call.args[0])
.forEach((peer) => {
const peerId = peer.id.toB58String()
nodeA.mesh.get(topic).forEach((meshPeer) => {
expect(meshPeer.id.toB58String()).to.not.equal(peerId)
.forEach((peerId) => {
nodeA.mesh.get(topic).forEach((meshPeerId) => {
expect(meshPeerId).to.not.equal(peerId)
})
})

Expand All @@ -74,7 +73,7 @@ describe('gossip', () => {
await delay(500)

const peerB = first(nodeA.mesh.get(topic))
const nodeB = nodes.find((n) => n.peerId.toB58String() === peerB.id.toB58String())
const nodeB = nodes.find((n) => n.peerId.toB58String() === peerB)

// set spy
sinon.spy(nodeA, '_piggybackControl')
Expand Down
13 changes: 6 additions & 7 deletions ts/getGossipPeers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { shuffle, hasGossipProtocol } from './utils'
import { PeerStreams } from './peerStreams'
import Gossipsub = require('./index')

/**
Expand All @@ -10,33 +9,33 @@ import Gossipsub = require('./index')
* @param {String} topic
* @param {Number} count
* @param {Function} [filter] a function to filter acceptable peers
* @returns {Set<Peer>}
* @returns {Set<string>}
*
*/
export function getGossipPeers (
router: Gossipsub,
topic: string,
count: number,
filter: (peerStreams: PeerStreams) => boolean = () => true
): Set<PeerStreams> {
filter: (id: string) => boolean = () => true
): Set<string> {
const peersInTopic = router.topics.get(topic)
if (!peersInTopic) {
return new Set()
}

// Adds all peers using our protocol
// that also pass the filter function
let peers: PeerStreams[] = []
let peers: string[] = []
peersInTopic.forEach((id) => {
const peerStreams = router.peers.get(id)
if (!peerStreams) {
return
}
if (
hasGossipProtocol(peerStreams.protocol) &&
filter(peerStreams)
filter(id)
) {
peers.push(peerStreams)
peers.push(id)
}
})

Expand Down
71 changes: 32 additions & 39 deletions ts/heartbeat.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import * as constants from './constants'
import { getGossipPeers } from './getGossipPeers'
import { shuffle } from './utils'
import { PeerStreams } from './peerStreams'
import Gossipsub = require('./index')
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
Expand Down Expand Up @@ -84,8 +83,10 @@ export class Heartbeat {
return s
}

const tograft = new Map<PeerStreams, string[]>()
const toprune = new Map<PeerStreams, string[]>()
// peer id => topic[]
const tograft = new Map<string, string[]>()
// peer id => topic[]
const toprune = new Map<string, string[]>()

// clean up expired backoffs
this.gossipsub._clearBackoff()
Expand All @@ -103,8 +104,7 @@ export class Heartbeat {
// maintain the mesh for topics we have joined
this.gossipsub.mesh.forEach((peers, topic) => {
// prune/graft helper functions (defined per topic)
const prunePeer = (p: PeerStreams): void => {
const id = p.id.toB58String()
const prunePeer = (id: string): void => {
this.gossipsub.log(
'HEARTBEAT: Remove mesh link to %s in %s',
id, topic
Expand All @@ -114,55 +114,52 @@ export class Heartbeat {
// add prune backoff record
this.gossipsub._addBackoff(id, topic)
// remove peer from mesh
peers.delete(p)
peers.delete(id)
// add to toprune
const topics = toprune.get(p)
const topics = toprune.get(id)
if (!topics) {
toprune.set(p, [topic])
toprune.set(id, [topic])
} else {
topics.push(topic)
}
}
const graftPeer = (p: PeerStreams): void => {
const id = p.id.toB58String()
const graftPeer = (id: string): void => {
this.gossipsub.log(
'HEARTBEAT: Add mesh link to %s in %s',
id, topic
)
// update peer score
this.gossipsub.score.graft(id, topic)
// add peer to mesh
peers.add(p)
peers.add(id)
// add to tograft
const topics = tograft.get(p)
const topics = tograft.get(id)
if (!topics) {
tograft.set(p, [topic])
tograft.set(id, [topic])
} else {
topics.push(topic)
}
}

// drop all peers with negative score
peers.forEach(p => {
const id = p.id.toB58String()
peers.forEach(id => {
const score = getScore(id)
if (score < 0) {
this.gossipsub.log(
'HEARTBEAT: Prune peer %s with negative score: score=%d, topic=%s',
id, score, topic
)
prunePeer(p)
prunePeer(id)
}
})

// do we have enough peers?
if (peers.size < constants.GossipsubDlo) {
const backoff = this.gossipsub.backoff.get(topic)
const ineed = constants.GossipsubD - peers.size
const peersSet = getGossipPeers(this.gossipsub, topic, ineed, p => {
const id = p.id.toB58String()
const peersSet = getGossipPeers(this.gossipsub, topic, ineed, id => {
// filter out mesh peers, direct peers, peers we are backing off, peers with negative score
return !peers.has(p) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) >= 0
return !peers.has(id) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) >= 0
})

peersSet.forEach(graftPeer)
Expand All @@ -172,7 +169,7 @@ export class Heartbeat {
if (peers.size > constants.GossipsubDhi) {
let peersArray = Array.from(peers)
// sort by score
peersArray.sort((a, b) => getScore(b.id.toB58String()) - getScore(a.id.toB58String()))
peersArray.sort((a, b) => getScore(b) - getScore(a))
// We keep the first D_score peers by score and the remaining up to D randomly
// under the constraint that we keep D_out peers in the mesh (if we have that many)
peersArray = peersArray.slice(0, constants.GossipsubDscore).concat(
Expand Down Expand Up @@ -237,10 +234,9 @@ export class Heartbeat {
if (outbound < constants.GossipsubDout) {
const ineed = constants.GossipsubDout - outbound
const backoff = this.gossipsub.backoff.get(topic)
getGossipPeers(this.gossipsub, topic, ineed, (p: PeerStreams): boolean => {
const id = p.id.toB58String()
getGossipPeers(this.gossipsub, topic, ineed, (id: string): boolean => {
// filter our current mesh peers, direct peers, peers we are backing off, peers with negative score
return !peers.has(p) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) >= 0
return !peers.has(id) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) >= 0
}).forEach(graftPeer)
}
}
Expand All @@ -256,24 +252,23 @@ export class Heartbeat {

// now compute the median peer score in the mesh
const peersList = Array.from(peers)
.sort((a, b) => getScore(a.id.toB58String()) - getScore(b.id.toB58String()))
.sort((a, b) => getScore(a) - getScore(b))
const medianIndex = peers.size / 2
const medianScore = getScore(peersList[medianIndex].id.toB58String())
const medianScore = getScore(peersList[medianIndex])

// if the median score is below the threshold, select a better peer (if any) and GRAFT
if (medianScore < this.gossipsub._options.scoreThresholds.opportunisticGraftThreshold) {
const backoff = this.gossipsub.backoff.get(topic)
const peersToGraft = getGossipPeers(this.gossipsub, topic, constants.GossipsubOpportunisticGraftPeers, (p: PeerStreams): boolean => {
const id = p.id.toB58String()
const peersToGraft = getGossipPeers(this.gossipsub, topic, constants.GossipsubOpportunisticGraftPeers, (id: string): boolean => {
// filter out current mesh peers, direct peers, peers we are backing off, peers below or at threshold
return peers.has(p) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) > medianScore
return peers.has(id) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) > medianScore
})
peersToGraft.forEach(p => {
peersToGraft.forEach(id => {
this.gossipsub.log(
'HEARTBEAT: Opportunistically graft peer %s on topic %s',
p.id.toB58String(), topic
id, topic
)
graftPeer(p)
graftPeer(id)
})
}
}
Expand All @@ -296,28 +291,26 @@ export class Heartbeat {
this.gossipsub.fanout.forEach((fanoutPeers, topic) => {
// checks whether our peers are still in the topic and have a score above the publish threshold
const topicPeers = this.gossipsub.topics.get(topic)
fanoutPeers.forEach(p => {
const id = p.id.toB58String()
fanoutPeers.forEach(id => {
if (
!topicPeers!.has(id) ||
getScore(id) < this.gossipsub._options.scoreThresholds.publishThreshold
) {
fanoutPeers.delete(p)
fanoutPeers.delete(id)
}
})

// do we need more peers?
if (fanoutPeers.size < constants.GossipsubD) {
const ineed = constants.GossipsubD - fanoutPeers.size
const peersSet = getGossipPeers(this.gossipsub, topic, ineed, (p: PeerStreams): boolean => {
const id = p.id.toB58String()
const peersSet = getGossipPeers(this.gossipsub, topic, ineed, (id: string): boolean => {
// filter out existing fanout peers, direct peers, and peers with score above the publish threshold
return !fanoutPeers.has(p) &&
return !fanoutPeers.has(id) &&
!this.gossipsub.direct.has(id) &&
getScore(id) >= this.gossipsub._options.scoreThresholds.publishThreshold
})
peersSet.forEach(p => {
fanoutPeers.add(p)
peersSet.forEach(id => {
fanoutPeers.add(id)
})
}

Expand Down
Loading

0 comments on commit 89bf606

Please sign in to comment.