From b582120988a17d225960eed5249667d13057a24b Mon Sep 17 00:00:00 2001 From: Cayman Date: Wed, 17 Jun 2020 12:23:54 -0500 Subject: [PATCH 1/3] chore: update getGossipPeers --- ts/getGossipPeers.ts | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/ts/getGossipPeers.ts b/ts/getGossipPeers.ts index 8faf67e1..caed0a9f 100644 --- a/ts/getGossipPeers.ts +++ b/ts/getGossipPeers.ts @@ -1,27 +1,38 @@ -import * as constants from './constants' +import { GossipsubIDv10, GossipsubIDv11 } from './constants' import { shuffle } from './utils' import { Peer } from './peer' import Gossipsub = require('./index') /** * Given a topic, returns up to count peers subscribed to that topic + * that pass an optional filter function * * @param {Gossipsub} router * @param {String} topic * @param {Number} count + * @param {Function} [filter] a function to filter acceptable peers * @returns {Set} * */ -export function getGossipPeers (router: Gossipsub, topic: string, count: number): Set { +export function getGossipPeers ( + router: Gossipsub, + topic: string, + count: number, + filter: (peer: Peer) => boolean = () => true +): Set { const peersInTopic = router.topics.get(topic) if (!peersInTopic) { return new Set() } // Adds all peers using our protocol + // that also pass the filter function let peers: Peer[] = [] peersInTopic.forEach((peer) => { - if (peer.protocols.includes(constants.GossipsubIDv10)) { + if ( + peer.protocols.find(proto => proto === GossipsubIDv10 || proto === GossipsubIDv11) && + filter(peer) + ) { peers.push(peer) } }) From 5854d2629a596fb174f1f82b7124e72c77baa256 Mon Sep 17 00:00:00 2001 From: Cayman Date: Wed, 17 Jun 2020 10:46:06 -0500 Subject: [PATCH 2/3] feat: add flood publishing --- ts/index.ts | 74 +++++++++++++++++++++++++++++++++-------------------- 1 file changed, 46 insertions(+), 28 deletions(-) diff --git a/ts/index.ts b/ts/index.ts index 3318c9cc..f8fede45 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -22,6 +22,7 @@ interface GossipInputOptions { emitSelf: boolean gossipIncoming: boolean fallbackToFloodsub: boolean + floodPublish: boolean msgIdFn: (msg: Message) => string messageCache: MessageCache scoreParams: Partial @@ -57,6 +58,7 @@ class Gossipsub extends BasicPubsub { * @param {bool} [options.emitSelf] if publish should emit to self, if subscribed, defaults to false * @param {bool} [options.gossipIncoming] if incoming messages on a subscribed topic should be automatically gossiped, defaults to true * @param {bool} [options.fallbackToFloodsub] if dial should fallback to floodsub, defaults to true + * @param {bool} [options.floodPublish] if self-published messages should be sent to all peers, defaults to true * @param {function} [options.msgIdFn] override the default message id function * @param {Object} [options.messageCache] override the default MessageCache * @param {Object} [options.scoreParams] peer score parameters @@ -73,6 +75,7 @@ class Gossipsub extends BasicPubsub { const _options = { gossipIncoming: true, fallbackToFloodsub: true, + floodPublish: true, ...options, scoreParams: createPeerScoreParams(options.scoreParams), scoreThresholds: createPeerScoreThresholds(options.scoreThresholds) @@ -546,44 +549,59 @@ class Gossipsub extends BasicPubsub { this.seenCache.put(msgID) this.messageCache.put(msgObj) - const tosend = new Set() - msgObj.topicIDs.forEach((topic) => { - const peersInTopic = this.topics.get(topic) - if (!peersInTopic) { - return - } - // floodsub peers - peersInTopic.forEach((peer) => { - if (peer.protocols.includes(constants.FloodsubID)) { + const tosend = new Set() + if (this._options.floodPublish) { + // flood-publish behavior + // send to _all_ peers meeting the publishThreshold + this.peers.forEach((peer, id) => { + const score = this.score.score(id) + if (score >= this._options.scoreThresholds.publishThreshold) { tosend.add(peer) } }) + } else { + // non-flood-publish behavior + // send to subscribed floodsub peers + // and some mesh peers + msgObj.topicIDs.forEach((topic) => { + const peersInTopic = this.topics.get(topic) + if (!peersInTopic) { + return + } - // Gossipsub peers handling - let meshPeers = this.mesh.get(topic) - if (!meshPeers) { - // We are not in the mesh for topic, use fanout peers - meshPeers = this.fanout.get(topic) + // floodsub peers + peersInTopic.forEach((peer) => { + if (peer.protocols.includes(constants.FloodsubID)) { + tosend.add(peer) + } + }) + + // Gossipsub peers handling + let meshPeers = this.mesh.get(topic) if (!meshPeers) { - // If we are not in the fanout, then pick any peers in topic - const peers = getGossipPeers(this, topic, constants.GossipsubD) - - if (peers.size > 0) { - meshPeers = peers - this.fanout.set(topic, peers) - } else { - meshPeers = new Set() + // We are not in the mesh for topic, use fanout peers + meshPeers = this.fanout.get(topic) + if (!meshPeers) { + // If we are not in the fanout, then pick any peers in topic + const peers = getGossipPeers(this, topic, constants.GossipsubD) + + if (peers.size > 0) { + meshPeers = peers + this.fanout.set(topic, peers) + } else { + meshPeers = new Set() + } } + // Store the latest publishing time + this.lastpub.set(topic, this._now()) } - // Store the latest publishing time - this.lastpub.set(topic, this._now()) - } - meshPeers!.forEach((peer) => { - tosend.add(peer) + meshPeers!.forEach((peer) => { + tosend.add(peer) + }) }) - }) + } // Publish messages to peers tosend.forEach((peer) => { if (peer.id.toB58String() === msgObj.from) { From 0a49b20000864bc46122b2f4b7cd0753258752ba Mon Sep 17 00:00:00 2001 From: Cayman Date: Wed, 17 Jun 2020 12:37:12 -0500 Subject: [PATCH 3/3] chore: tweak non-flood-publish behavior --- ts/index.ts | 47 +++++++++++++++++++++++++---------------------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/ts/index.ts b/ts/index.ts index f8fede45..1c8eb221 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -551,24 +551,25 @@ class Gossipsub extends BasicPubsub { this.messageCache.put(msgObj) const tosend = new Set() - if (this._options.floodPublish) { - // flood-publish behavior - // send to _all_ peers meeting the publishThreshold - this.peers.forEach((peer, id) => { - const score = this.score.score(id) - if (score >= this._options.scoreThresholds.publishThreshold) { - tosend.add(peer) - } - }) - } else { - // non-flood-publish behavior - // send to subscribed floodsub peers - // and some mesh peers - msgObj.topicIDs.forEach((topic) => { - const peersInTopic = this.topics.get(topic) - if (!peersInTopic) { - return - } + msgObj.topicIDs.forEach((topic) => { + const peersInTopic = this.topics.get(topic) + if (!peersInTopic) { + return + } + + if (this._options.floodPublish) { + // flood-publish behavior + // send to _all_ peers meeting the publishThreshold + peersInTopic.forEach(peer => { + const score = this.score.score(peer.id.toB58String()) + if (score >= this._options.scoreThresholds.publishThreshold) { + tosend.add(peer) + } + }) + } else { + // non-flood-publish behavior + // send to subscribed floodsub peers + // and some mesh peers above publishThreshold // floodsub peers peersInTopic.forEach((peer) => { @@ -583,8 +584,10 @@ class Gossipsub extends BasicPubsub { // We are not in the mesh for topic, use fanout peers meshPeers = this.fanout.get(topic) if (!meshPeers) { - // If we are not in the fanout, then pick any peers in topic - const peers = getGossipPeers(this, topic, constants.GossipsubD) + // If we are not in the fanout, then pick peers in topic above the publishThreshold + const peers = getGossipPeers(this, topic, constants.GossipsubD, peer => { + return this.score.score(peer.id.toB58String()) >= this._options.scoreThresholds.publishThreshold + }) if (peers.size > 0) { meshPeers = peers @@ -600,8 +603,8 @@ class Gossipsub extends BasicPubsub { meshPeers!.forEach((peer) => { tosend.add(peer) }) - }) - } + } + }) // Publish messages to peers tosend.forEach((peer) => { if (peer.id.toB58String() === msgObj.from) {