Skip to content

Commit

Permalink
Merge pull request #89 from ChainSafe/cayman/gs1.1-flood-publish
Browse files Browse the repository at this point in the history
gs1.1 flood publishing
  • Loading branch information
wemeetagain committed Jun 18, 2020
2 parents b841c4e + 0a49b20 commit a30801b
Showing 1 changed file with 46 additions and 25 deletions.
71 changes: 46 additions & 25 deletions ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ interface GossipInputOptions {
emitSelf: boolean
gossipIncoming: boolean
fallbackToFloodsub: boolean
floodPublish: boolean
msgIdFn: (msg: Message) => string
messageCache: MessageCache
scoreParams: Partial<PeerScoreParams>
Expand Down Expand Up @@ -57,6 +58,7 @@ class Gossipsub extends BasicPubsub {
* @param {bool} [options.emitSelf] if publish should emit to self, if subscribed, defaults to false
* @param {bool} [options.gossipIncoming] if incoming messages on a subscribed topic should be automatically gossiped, defaults to true
* @param {bool} [options.fallbackToFloodsub] if dial should fallback to floodsub, defaults to true
* @param {bool} [options.floodPublish] if self-published messages should be sent to all peers, defaults to true
* @param {function} [options.msgIdFn] override the default message id function
* @param {Object} [options.messageCache] override the default MessageCache
* @param {Object} [options.scoreParams] peer score parameters
Expand All @@ -73,6 +75,7 @@ class Gossipsub extends BasicPubsub {
const _options = {
gossipIncoming: true,
fallbackToFloodsub: true,
floodPublish: true,
...options,
scoreParams: createPeerScoreParams(options.scoreParams),
scoreThresholds: createPeerScoreThresholds(options.scoreThresholds)
Expand Down Expand Up @@ -546,43 +549,61 @@ class Gossipsub extends BasicPubsub {
this.seenCache.put(msgID)

this.messageCache.put(msgObj)

const tosend = new Set<Peer>()
msgObj.topicIDs.forEach((topic) => {
const peersInTopic = this.topics.get(topic)
if (!peersInTopic) {
return
}

// floodsub peers
peersInTopic.forEach((peer) => {
if (peer.protocols.includes(constants.FloodsubID)) {
tosend.add(peer)
}
})
if (this._options.floodPublish) {
// flood-publish behavior
// send to _all_ peers meeting the publishThreshold
peersInTopic.forEach(peer => {
const score = this.score.score(peer.id.toB58String())
if (score >= this._options.scoreThresholds.publishThreshold) {
tosend.add(peer)
}
})
} else {
// non-flood-publish behavior
// send to subscribed floodsub peers
// and some mesh peers above publishThreshold

// floodsub peers
peersInTopic.forEach((peer) => {
if (peer.protocols.includes(constants.FloodsubID)) {
tosend.add(peer)
}
})

// Gossipsub peers handling
let meshPeers = this.mesh.get(topic)
if (!meshPeers) {
// We are not in the mesh for topic, use fanout peers
meshPeers = this.fanout.get(topic)
// Gossipsub peers handling
let meshPeers = this.mesh.get(topic)
if (!meshPeers) {
// If we are not in the fanout, then pick any peers in topic
const peers = getGossipPeers(this, topic, constants.GossipsubD)

if (peers.size > 0) {
meshPeers = peers
this.fanout.set(topic, peers)
} else {
meshPeers = new Set()
// We are not in the mesh for topic, use fanout peers
meshPeers = this.fanout.get(topic)
if (!meshPeers) {
// If we are not in the fanout, then pick peers in topic above the publishThreshold
const peers = getGossipPeers(this, topic, constants.GossipsubD, peer => {
return this.score.score(peer.id.toB58String()) >= this._options.scoreThresholds.publishThreshold
})

if (peers.size > 0) {
meshPeers = peers
this.fanout.set(topic, peers)
} else {
meshPeers = new Set()
}
}
// Store the latest publishing time
this.lastpub.set(topic, this._now())
}
// Store the latest publishing time
this.lastpub.set(topic, this._now())
}

meshPeers!.forEach((peer) => {
tosend.add(peer)
})
meshPeers!.forEach((peer) => {
tosend.add(peer)
})
}
})
// Publish messages to peers
tosend.forEach((peer) => {
Expand Down

0 comments on commit a30801b

Please sign in to comment.