Skip to content

Commit

Permalink
feat: add direct peer connections
Browse files Browse the repository at this point in the history
  • Loading branch information
wemeetagain committed Jul 2, 2020
1 parent b040dc8 commit 7103b83
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 16 deletions.
21 changes: 13 additions & 8 deletions ts/heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ export class Heartbeat {
// clean up expired backoffs
this.gossipsub._clearBackoff()

// ensure direct peers are connected
this.gossipsub._directConnect()

// maintain the mesh for topics we have joined
this.gossipsub.mesh.forEach((peers, topic) => {
// prune/graft helper functions (defined per topic)
Expand Down Expand Up @@ -151,8 +154,8 @@ export class Heartbeat {
const ineed = constants.GossipsubD - peers.size
const peersSet = getGossipPeers(this.gossipsub, topic, ineed, p => {
const id = p.id.toB58String()
// filter out mesh peers, peers we are backing off, peers with negative score
return !peers.has(p) && (!backoff || !backoff.has(id)) && getScore(id) >= 0
// filter out mesh peers, direct peers, peers we are backing off, peers with negative score
return !peers.has(p) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) >= 0
})

peersSet.forEach(graftPeer)
Expand Down Expand Up @@ -229,8 +232,8 @@ export class Heartbeat {
const backoff = this.gossipsub.backoff.get(topic)
getGossipPeers(this.gossipsub, topic, ineed, (p: Peer): boolean => {
const id = p.id.toB58String()
// filter our current mesh peers, peers we are backing off, peers with negative score
return !peers.has(p) && (!backoff || !backoff.has(id)) && getScore(id) >= 0
// filter our current mesh peers, direct peers, peers we are backing off, peers with negative score
return !peers.has(p) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) >= 0
}).forEach(graftPeer)
}
}
Expand All @@ -255,8 +258,8 @@ export class Heartbeat {
const backoff = this.gossipsub.backoff.get(topic)
const peersToGraft = getGossipPeers(this.gossipsub, topic, constants.GossipsubOpportunisticGraftPeers, (p: Peer): boolean => {
const id = p.id.toB58String()
// filter out current mesh peers, peres we are backing off, peers below or at threshold
return peers.has(p) && (!backoff || !backoff.has(id)) && getScore(id) > medianScore
// filter out current mesh peers, direct peers, peers we are backing off, peers below or at threshold
return peers.has(p) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) > medianScore
})
peersToGraft.forEach(p => {
this.gossipsub.log(
Expand Down Expand Up @@ -299,9 +302,11 @@ export class Heartbeat {
if (fanoutPeers.size < constants.GossipsubD) {
const ineed = constants.GossipsubD - fanoutPeers.size
const peersSet = getGossipPeers(this.gossipsub, topic, ineed, (p: Peer): boolean => {
// filter out existing fanout peers and peers with score above the publish threshold
const id = p.id.toB58String()
// filter out existing fanout peers, direct peers, and peers with score above the publish threshold
return !fanoutPeers.has(p) &&
getScore(p.id.toB58String()) >= this.gossipsub._options.scoreThresholds.publishThreshold
!this.gossipsub.direct.has(id) &&
getScore(id) >= this.gossipsub._options.scoreThresholds.publishThreshold
})
peersSet.forEach(p => {
fanoutPeers.add(p)
Expand Down
119 changes: 111 additions & 8 deletions ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { getGossipPeers } from './getGossipPeers'
import { createGossipRpc, shuffle, hasGossipProtocol } from './utils'
import { Peer } from './peer'
import { PeerScore, PeerScoreParams, PeerScoreThresholds, createPeerScoreParams, createPeerScoreThresholds } from './score'
import { Libp2p } from './interfaces'
import { AddrInfo, Libp2p } from './interfaces'
// @ts-ignore
import TimeCache = require('time-cache')
import PeerId = require('peer-id')
Expand All @@ -29,6 +29,7 @@ interface GossipInputOptions {
messageCache: MessageCache
scoreParams: Partial<PeerScoreParams>
scoreThresholds: Partial<PeerScoreThresholds>
directPeers: AddrInfo[]
}

interface GossipOptions extends GossipInputOptions {
Expand All @@ -38,6 +39,7 @@ interface GossipOptions extends GossipInputOptions {

class Gossipsub extends BasicPubsub {
peers: Map<string, Peer>
direct: Set<string>
topics: Map<string, Set<Peer>>
mesh: Map<string, Set<Peer>>
fanout: Map<string, Set<Peer>>
Expand All @@ -64,6 +66,7 @@ class Gossipsub extends BasicPubsub {
* @param {Object} [options.messageCache] override the default MessageCache
* @param {Object} [options.scoreParams] peer score parameters
* @param {Object} [options.scoreThresholds] peer score thresholds
* @param {AddrInfo[]} [options.directPeers] peers with which we will maintain direct connections
* @constructor
*/
constructor (
Expand All @@ -75,6 +78,7 @@ class Gossipsub extends BasicPubsub {
gossipIncoming: true,
fallbackToFloodsub: true,
floodPublish: true,
directPeers: [],
...options,
scoreParams: createPeerScoreParams(options.scoreParams),
scoreThresholds: createPeerScoreThresholds(options.scoreThresholds)
Expand All @@ -92,6 +96,17 @@ class Gossipsub extends BasicPubsub {
options: _options
})

/**
* Direct peers
* @type {Set<string>}
*/
this.direct = new Set(_options.directPeers.map(p => p.id.toB58String()))

// set direct peer addresses in the address book
_options.directPeers.forEach(p => {
p.addrs.forEach(ma => libp2p.peerStore.addressBook.add(p.id, ma))
})

/**
* Cache of seen messages
*
Expand Down Expand Up @@ -349,6 +364,16 @@ class Gossipsub extends BasicPubsub {
super._publishFrom(peer, msg)
}

/**
* Whether to accept a message from a peer
* @override
* @param {string} id
* @returns {boolean}
*/
_acceptFrom (id: string): boolean {
return this.direct.has(id) || this.score.score(id) >= this._options.scoreThresholds.graylistThreshold
}

/**
* Coerse topic validator result to valid/invalid boolean
* Provide extended validator support
Expand Down Expand Up @@ -467,6 +492,14 @@ class Gossipsub extends BasicPubsub {
return
}

// we don't GRAFT to/from direct peers; complain loudly if this happens
if (this.direct.has(id)) {
this.log('GRAFT: ignoring request from direct peer %s', id)
// this is possibly a bug from a non-reciprical configuration; send a PRUNE
prune.push(topicID)
return
}

// make sure we are not backing off that peer
const expire = this.backoff.get(topicID)?.get(id)
if (typeof expire === 'number' && now < expire) {
Expand Down Expand Up @@ -600,6 +633,31 @@ class Gossipsub extends BasicPubsub {
})
}

/**
* Maybe reconnect to direct peers
* @returns {void}
*/
_directConnect (): void {
// we only do this every few ticks to allow pending connections to complete and account for
// restarts/downtime
if (this.heartbeatTicks % constants.GossipsubDirectConnectTicks !== 0) {
return
}

const toconnect: string[] = []
this.direct.forEach(id => {
const peer = this.peers.get(id)
if (!peer || !peer.isConnected) {
toconnect.push(id)
}
})
if (toconnect.length) {
toconnect.forEach(id => {
this._connect(id)
})
}
}

/**
* Mounts the gossipsub protocol onto the libp2p node and sends our
* our subscriptions to every peer connected
Expand All @@ -610,6 +668,12 @@ class Gossipsub extends BasicPubsub {
await super.start()
this.heartbeat.start()
this.score.start()
// connect to direct peers
this._directPeerInitial = setTimeout(() => {
this.direct.forEach(id => {
this._connect(id)
})
}, constants.GossipsubDirectConnectInitialDelay)
}

/**
Expand All @@ -629,6 +693,16 @@ class Gossipsub extends BasicPubsub {
this.control = new Map()
this.backoff = new Map()
this.outbound = new Map()
clearTimeout(this._directPeerInitial)
}

/**
* Connect to a peer using the gossipsub protocol
* @param {string} id
* @returns {void}
*/
_connect (id: string): void {
this._libp2p.dialProtocol(id, this.multicodecs)
}

/**
Expand Down Expand Up @@ -669,14 +743,32 @@ class Gossipsub extends BasicPubsub {
this.log('JOIN %s', topics)

;(topics as string[]).forEach((topic) => {
// Send GRAFT to mesh peers
const fanoutPeers = this.fanout.get(topic)
if (fanoutPeers) {
// these peers have a score above the publish threshold, which may be negative
// so drop the ones with a negative score
fanoutPeers.forEach(p => {
if (this.score.score(p.id.toB58String()) < 0) {
fanoutPeers.delete(p)
}
})
if (fanoutPeers.size < constants.GossipsubD) {
// we need more peers; eager, as this would get fixed in the next heartbeat
getGossipPeers(this, topic, constants.GossipsubD - fanoutPeers.size, (p: Peer): boolean => {
const id = p.id.toB58String()
// filter our current peers, direct peers, and peers with negative scores
return !fanoutPeers.has(p) && !this.direct.has(id) && this.score.score(id) >= 0
}).forEach(p => fanoutPeers.add(p))
}
this.mesh.set(topic, fanoutPeers)
this.fanout.delete(topic)
this.lastpub.delete(topic)
} else {
const peers = getGossipPeers(this, topic, constants.GossipsubD)
const peers = getGossipPeers(this, topic, constants.GossipsubD, (p: Peer): boolean => {
const id = p.id.toB58String()
// filter direct peers and peers with negative score
return !this.direct.has(id) && this.score.score(id) >= 0
})
this.mesh.set(topic, peers)
}
this.mesh.get(topic)!.forEach((peer) => {
Expand Down Expand Up @@ -744,18 +836,26 @@ class Gossipsub extends BasicPubsub {

if (this._options.floodPublish) {
// flood-publish behavior
// send to _all_ peers meeting the publishThreshold
// send to direct peers and _all_ peers meeting the publishThreshold
peersInTopic.forEach(peer => {
const score = this.score.score(peer.id.toB58String())
if (score >= this._options.scoreThresholds.publishThreshold) {
const id = peer.id.toB58String()
if (this.direct.has(id) || this.score.score(id) >= this._options.scoreThresholds.publishThreshold) {
tosend.add(peer)
}
})
} else {
// non-flood-publish behavior
// send to subscribed floodsub peers
// send to direct peers, subscribed floodsub peers
// and some mesh peers above publishThreshold

// direct peers
this.direct.forEach(id => {
const peer = this.peers.get(id)
if (peer) {
tosend.add(peer)
}
})

// floodsub peers
peersInTopic.forEach((peer) => {
if (peer.protocols.includes(constants.FloodsubID)) {
Expand Down Expand Up @@ -929,17 +1029,20 @@ class Gossipsub extends BasicPubsub {
// Send gossip to GossipFactor peers above threshold with a minimum of D_lazy
// First we collect the peers above gossipThreshold that are not in the exclude set
// and then randomly select from that set
// We also exclude direct peers, as there is no reason to emit gossip to them
const peersToGossip: Peer[] = []
const topicPeers = this.topics.get(topic)
if (!topicPeers) {
// no topic peers, no gossip
return
}
topicPeers.forEach(p => {
const id = p.id.toB58String()
if (
!exclude.has(p) &&
!this.direct.has(id) &&
hasGossipProtocol(p.protocols) &&
this.score.score(p.id.toB58String()) >= this._options.scoreThresholds.gossipThreshold
this.score.score(id) >= this._options.scoreThresholds.gossipThreshold
) {
peersToGossip.push(p)
}
Expand Down

0 comments on commit 7103b83

Please sign in to comment.