Skip to content

Commit

Permalink
Merge pull request #94 from ChainSafe/cayman/gs1.1-adaptive-gossip
Browse files Browse the repository at this point in the history
gs1.1 adaptive gossip
  • Loading branch information
wemeetagain committed Jun 22, 2020
2 parents b25f9d0 + 0c56763 commit 12b4fa3
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 34 deletions.
5 changes: 2 additions & 3 deletions ts/getGossipPeers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { GossipsubIDv10, GossipsubIDv11 } from './constants'
import { shuffle } from './utils'
import { shuffle, hasGossipProtocol } from './utils'
import { Peer } from './peer'
import Gossipsub = require('./index')

Expand Down Expand Up @@ -30,7 +29,7 @@ export function getGossipPeers (
let peers: Peer[] = []
peersInTopic.forEach((peer) => {
if (
peer.protocols.find(proto => proto === GossipsubIDv10 || proto === GossipsubIDv11) &&
hasGossipProtocol(peer.protocols) &&
filter(peer)
) {
peers.push(peer)
Expand Down
47 changes: 27 additions & 20 deletions ts/heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ export class Heartbeat {
* @returns {void}
*/
_heartbeat (): void {
// flush pending control message from retries and gossip
// that hasn't been piggybacked since the last heartbeat
this.gossipsub._flush()

// cache scores throught the heartbeat
const scores = new Map<string, number>()
const getScore = (id: string): number => {
Expand Down Expand Up @@ -228,6 +224,8 @@ export class Heartbeat {
}
}

// 2nd arg are mesh peers excluded from gossip. We have already pushed
// messages to them, so its redundant to gossip IHAVEs.
this.gossipsub._emitGossip(topic, peers)
})

Expand All @@ -241,33 +239,42 @@ export class Heartbeat {
})

// maintain our fanout for topics we are publishing but we have not joined
this.gossipsub.fanout.forEach((peers, topic) => {
// checks whether our peers are still in the topic
const topicGossip = this.gossipsub.topics.get(topic)
peers.forEach((peer) => {
if (topicGossip!.has(peer)) {
peers.delete(peer)
this.gossipsub.fanout.forEach((fanoutPeers, topic) => {
// checks whether our peers are still in the topic and have a score above the publish threshold
const topicPeers = this.gossipsub.topics.get(topic)
fanoutPeers.forEach(p => {
if (
!topicPeers!.has(p) ||
getScore(p.id.toB58String()) < this.gossipsub._options.scoreThresholds.publishThreshold
) {
fanoutPeers.delete(p)
}
})

// do we need more peers?
if (peers.size < constants.GossipsubD) {
const ineed = constants.GossipsubD - peers.size
const peersSet = getGossipPeers(this.gossipsub, topic, ineed)
peersSet.forEach((peer) => {
if (!peers.has(peer)) {
return
}

peers.add(peer)
if (fanoutPeers.size < constants.GossipsubD) {
const ineed = constants.GossipsubD - fanoutPeers.size
const peersSet = getGossipPeers(this.gossipsub, topic, ineed, (p: Peer): boolean => {
// filter out existing fanout peers and peers with score above the publish threshold
return !fanoutPeers.has(p) &&
getScore(p.id.toB58String()) >= this.gossipsub._options.scoreThresholds.publishThreshold
})
peersSet.forEach(p => {
fanoutPeers.add(p)
})
}

this.gossipsub._emitGossip(topic, peers)
// 2nd arg are fanout peers excluded from gossip.
// We have already pushed messages to them, so its redundant to gossip IHAVEs
this.gossipsub._emitGossip(topic, fanoutPeers)
})

// send coalesced GRAFT/PRUNE messages (will piggyback gossip)
this.gossipsub._sendGraftPrune(tograft, toprune)

// flush pending gossip that wasn't piggybacked above
this.gossipsub._flush()

// advance the message history window
this.gossipsub.messageCache.shift()

Expand Down
64 changes: 53 additions & 11 deletions ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
import * as constants from './constants'
import { Heartbeat } from './heartbeat'
import { getGossipPeers } from './getGossipPeers'
import { createGossipRpc } from './utils'
import { createGossipRpc, shuffle, hasGossipProtocol } from './utils'
import { Peer, Registrar } from './peer'
import { PeerScore, PeerScoreParams, PeerScoreThresholds, createPeerScoreParams, createPeerScoreThresholds, ConnectionManager } from './score'
// @ts-ignore
Expand Down Expand Up @@ -777,25 +777,67 @@ class Gossipsub extends BasicPubsub {
/**
* Emits gossip to peers in a particular topic
* @param {String} topic
* @param {Set<Peer>} peers - peers to exclude
* @param {Set<Peer>} exclude peers to exclude
* @returns {void}
*/
_emitGossip (topic: string, peers: Set<Peer>): void {
_emitGossip (topic: string, exclude: Set<Peer>): void {
const messageIDs = this.messageCache.getGossipIDs(topic)
if (!messageIDs.length) {
return
}

const gossipSubPeers = getGossipPeers(this, topic, constants.GossipsubD)
gossipSubPeers.forEach((peer) => {
// skip mesh peers
if (!peers.has(peer)) {
this._pushGossip(peer, {
topicID: topic,
messageIDs: messageIDs
})
// shuffle to emit in random order
shuffle(messageIDs)

// if we are emitting more than GossipsubMaxIHaveLength ids, truncate the list
if (messageIDs.length > constants.GossipsubMaxIHaveLength) {
// we do the truncation (with shuffling) per peer below
this.log('too many messages for gossip; will truncate IHAVE list (%d messages)', messageIDs.length)
}

// Send gossip to GossipFactor peers above threshold with a minimum of D_lazy
// First we collect the peers above gossipThreshold that are not in the exclude set
// and then randomly select from that set
const peersToGossip: Peer[] = []
const topicPeers = this.topics.get(topic)
if (!topicPeers) {
// no topic peers, no gossip
return
}
topicPeers.forEach(p => {
if (
!exclude.has(p) &&
hasGossipProtocol(p.protocols) &&
this.score.score(p.id.toB58String()) >= this._options.scoreThresholds.gossipThreshold
) {
peersToGossip.push(p)
}
})

let target = constants.GossipsubDlazy
const factor = constants.GossipsubGossipFactor * peersToGossip.length
if (factor > target) {
target = factor
}
if (target > peersToGossip.length) {
target = peersToGossip.length
} else {
shuffle(peersToGossip)
}
// Emit the IHAVE gossip to the selected peers up to the target
peersToGossip.slice(0, target).forEach(p => {
let peerMessageIDs = messageIDs
if (messageIDs.length > constants.GossipsubMaxIHaveLength) {
// shuffle and slice message IDs per peer so that we emit a different set for each peer
// we have enough reduncancy in the system that this will significantly increase the message
// coverage when we do truncate
peerMessageIDs = shuffle(peerMessageIDs.slice()).slice(0, constants.GossipsubMaxIHaveLength)
}
this._pushGossip(p, {
topicID: topic,
messageIDs: peerMessageIDs
})
})
}

/**
Expand Down
7 changes: 7 additions & 0 deletions ts/utils/hasGossipProtocol.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { GossipsubIDv10, GossipsubIDv11 } from '../constants'

export function hasGossipProtocol (protocols: string[]): boolean {
return Boolean(protocols.find(p =>
p === GossipsubIDv10 || p === GossipsubIDv11
))
}
1 change: 1 addition & 0 deletions ts/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './createGossipRpc'
export * from './shuffle'
export * from './hasGossipProtocol'

0 comments on commit 12b4fa3

Please sign in to comment.