Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gs1.1 flood publishing #89

Merged
merged 3 commits into from
Jun 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions ts/getGossipPeers.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,38 @@
import * as constants from './constants'
import { GossipsubIDv10, GossipsubIDv11 } from './constants'
import { shuffle } from './utils'
import { Peer } from './peer'
import Gossipsub = require('./index')

/**
* Given a topic, returns up to count peers subscribed to that topic
* that pass an optional filter function
*
* @param {Gossipsub} router
* @param {String} topic
* @param {Number} count
* @param {Function} [filter] a function to filter acceptable peers
* @returns {Set<Peer>}
*
*/
export function getGossipPeers (router: Gossipsub, topic: string, count: number): Set<Peer> {
export function getGossipPeers (
router: Gossipsub,
topic: string,
count: number,
filter: (peer: Peer) => boolean = () => true
): Set<Peer> {
const peersInTopic = router.topics.get(topic)
if (!peersInTopic) {
return new Set()
}

// Adds all peers using our protocol
// that also pass the filter function
let peers: Peer[] = []
peersInTopic.forEach((peer) => {
if (peer.protocols.includes(constants.GossipsubIDv10)) {
if (
peer.protocols.find(proto => proto === GossipsubIDv10 || proto === GossipsubIDv11) &&
filter(peer)
) {
peers.push(peer)
}
})
Expand Down
71 changes: 46 additions & 25 deletions ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ interface GossipInputOptions {
emitSelf: boolean
gossipIncoming: boolean
fallbackToFloodsub: boolean
floodPublish: boolean
msgIdFn: (msg: Message) => string
messageCache: MessageCache
scoreParams: Partial<PeerScoreParams>
Expand Down Expand Up @@ -57,6 +58,7 @@ class Gossipsub extends BasicPubsub {
* @param {bool} [options.emitSelf] if publish should emit to self, if subscribed, defaults to false
* @param {bool} [options.gossipIncoming] if incoming messages on a subscribed topic should be automatically gossiped, defaults to true
* @param {bool} [options.fallbackToFloodsub] if dial should fallback to floodsub, defaults to true
* @param {bool} [options.floodPublish] if self-published messages should be sent to all peers, defaults to true
* @param {function} [options.msgIdFn] override the default message id function
* @param {Object} [options.messageCache] override the default MessageCache
* @param {Object} [options.scoreParams] peer score parameters
Expand All @@ -73,6 +75,7 @@ class Gossipsub extends BasicPubsub {
const _options = {
gossipIncoming: true,
fallbackToFloodsub: true,
floodPublish: true,
...options,
scoreParams: createPeerScoreParams(options.scoreParams),
scoreThresholds: createPeerScoreThresholds(options.scoreThresholds)
Expand Down Expand Up @@ -546,43 +549,61 @@ class Gossipsub extends BasicPubsub {
this.seenCache.put(msgID)

this.messageCache.put(msgObj)

const tosend = new Set<Peer>()
msgObj.topicIDs.forEach((topic) => {
const peersInTopic = this.topics.get(topic)
if (!peersInTopic) {
return
}

// floodsub peers
peersInTopic.forEach((peer) => {
if (peer.protocols.includes(constants.FloodsubID)) {
tosend.add(peer)
}
})
if (this._options.floodPublish) {
// flood-publish behavior
// send to _all_ peers meeting the publishThreshold
peersInTopic.forEach(peer => {
const score = this.score.score(peer.id.toB58String())
if (score >= this._options.scoreThresholds.publishThreshold) {
tosend.add(peer)
}
})
} else {
// non-flood-publish behavior
// send to subscribed floodsub peers
// and some mesh peers above publishThreshold

// floodsub peers
peersInTopic.forEach((peer) => {
if (peer.protocols.includes(constants.FloodsubID)) {
tosend.add(peer)
}
})

// Gossipsub peers handling
let meshPeers = this.mesh.get(topic)
if (!meshPeers) {
// We are not in the mesh for topic, use fanout peers
meshPeers = this.fanout.get(topic)
// Gossipsub peers handling
let meshPeers = this.mesh.get(topic)
if (!meshPeers) {
// If we are not in the fanout, then pick any peers in topic
const peers = getGossipPeers(this, topic, constants.GossipsubD)

if (peers.size > 0) {
meshPeers = peers
this.fanout.set(topic, peers)
} else {
meshPeers = new Set()
// We are not in the mesh for topic, use fanout peers
meshPeers = this.fanout.get(topic)
if (!meshPeers) {
// If we are not in the fanout, then pick peers in topic above the publishThreshold
const peers = getGossipPeers(this, topic, constants.GossipsubD, peer => {
return this.score.score(peer.id.toB58String()) >= this._options.scoreThresholds.publishThreshold
})

if (peers.size > 0) {
meshPeers = peers
this.fanout.set(topic, peers)
} else {
meshPeers = new Set()
}
}
// Store the latest publishing time
this.lastpub.set(topic, this._now())
}
// Store the latest publishing time
this.lastpub.set(topic, this._now())
}

meshPeers!.forEach((peer) => {
tosend.add(peer)
})
meshPeers!.forEach((peer) => {
tosend.add(peer)
})
}
})
// Publish messages to peers
tosend.forEach((peer) => {
Expand Down