Skip to content

Commit

Permalink
feat: add peer exchange
Browse files Browse the repository at this point in the history
  • Loading branch information
wemeetagain committed Jul 21, 2020
1 parent 89bf606 commit a0a691b
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 12 deletions.
7 changes: 5 additions & 2 deletions ts/heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ export class Heartbeat {
const tograft = new Map<string, string[]>()
// peer id => topic[]
const toprune = new Map<string, string[]>()
// peer id => don't px
const noPX = new Map<string, boolean>()

// clean up expired backoffs
this.gossipsub._clearBackoff()
Expand Down Expand Up @@ -141,7 +143,7 @@ export class Heartbeat {
}
}

// drop all peers with negative score
// drop all peers with negative score, without PX
peers.forEach(id => {
const score = getScore(id)
if (score < 0) {
Expand All @@ -150,6 +152,7 @@ export class Heartbeat {
id, score, topic
)
prunePeer(id)
noPX.set(id, true)
}
})

Expand Down Expand Up @@ -320,7 +323,7 @@ export class Heartbeat {
})

// send coalesced GRAFT/PRUNE messages (will piggyback gossip)
this.gossipsub._sendGraftPrune(tograft, toprune)
this.gossipsub._sendGraftPrune(tograft, toprune, noPX)

// flush pending gossip that wasn't piggybacked above
this.gossipsub._flush()
Expand Down
121 changes: 111 additions & 10 deletions ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { MessageCache } from './messageCache'
import {
RPCCodec,
RPC, Message, InMessage,
ControlMessage, ControlIHave, ControlGraft, ControlIWant, ControlPrune
ControlMessage, ControlIHave, ControlGraft, ControlIWant, ControlPrune, PeerInfo
} from './message'
import * as constants from './constants'
import { ExtendedValidatorResult } from './constants'
Expand All @@ -15,17 +15,20 @@ import { createGossipRpc, shuffle, hasGossipProtocol } from './utils'
import { PeerStreams } from './peerStreams'
import { PeerScore, PeerScoreParams, PeerScoreThresholds, createPeerScoreParams, createPeerScoreThresholds } from './score'
import { IWantTracer } from './tracer'
import { AddrInfo, Libp2p } from './interfaces'
import { AddrInfo, Libp2p, EnvelopeClass } from './interfaces'
// @ts-ignore
import TimeCache = require('time-cache')
import PeerId = require('peer-id')
import BasicPubsub = require('./pubsub')
// @ts-ignore
import Envelope = require('libp2p/src/record/envelope')

interface GossipInputOptions {
emitSelf: boolean
gossipIncoming: boolean
fallbackToFloodsub: boolean
floodPublish: boolean
doPX: boolean
msgIdFn: (msg: Message) => string
messageCache: MessageCache
scoreParams: Partial<PeerScoreParams>
Expand Down Expand Up @@ -66,6 +69,7 @@ class Gossipsub extends BasicPubsub {
* @param {bool} [options.gossipIncoming] if incoming messages on a subscribed topic should be automatically gossiped, defaults to true
* @param {bool} [options.fallbackToFloodsub] if dial should fallback to floodsub, defaults to true
* @param {bool} [options.floodPublish] if self-published messages should be sent to all peers, defaults to true
* @param {bool} [options.doPX] whether PX is enabled; this should be enabled in bootstrappers and other well connected/trusted nodes. defaults to false
* @param {function} [options.msgIdFn] override the default message id function
* @param {Object} [options.messageCache] override the default MessageCache
* @param {Object} [options.scoreParams] peer score parameters
Expand All @@ -82,6 +86,7 @@ class Gossipsub extends BasicPubsub {
gossipIncoming: true,
fallbackToFloodsub: true,
floodPublish: true,
doPX: false,
directPeers: [],
...options,
scoreParams: createPeerScoreParams(options.scoreParams),
Expand Down Expand Up @@ -551,6 +556,7 @@ class Gossipsub extends BasicPubsub {
const prune: string[] = []
const score = this.score.score(id)
const now = this._now()
let doPX = this._options.doPX

graft.forEach(({ topicID }) => {
if (!topicID) {
Expand All @@ -559,6 +565,8 @@ class Gossipsub extends BasicPubsub {
const peersInMesh = this.mesh.get(topicID)
const peersInTopic = this.topics.get(topicID)
if (!peersInMesh || !peersInTopic) {
// don't do PX when there is an unknown topic to avoid leaking our peers
doPX = false
// spam hardening: ignore GRAFTs for unknown topics
return
}
Expand All @@ -573,6 +581,8 @@ class Gossipsub extends BasicPubsub {
this.log('GRAFT: ignoring request from direct peer %s', id)
// this is possibly a bug from a non-reciprical configuration; send a PRUNE
prune.push(topicID)
// but don't px
doPX = false
return
}

Expand All @@ -582,6 +592,8 @@ class Gossipsub extends BasicPubsub {
this.log('GRAFT: ignoring backed off peer %s', id)
// add behavioral penalty
this.score.addPenalty(id, 1)
// no PX
doPX = false
// check the flood cutoff -- is the GRAFT coming too fast?
const floodCutoff = expire + constants.GossipsubGraftFloodThreshold - constants.GossipsubPruneBackoff
if (now < floodCutoff) {
Expand All @@ -603,6 +615,8 @@ class Gossipsub extends BasicPubsub {
)
// we do send them PRUNE however, because it's a matter of protocol correctness
prune.push(topicID)
// but we won't PX to them
doPX = false
// add/refresh backoff so that we don't reGRAFT too early even if the score decays
this._addBackoff(id, topicID)
return
Expand All @@ -626,7 +640,7 @@ class Gossipsub extends BasicPubsub {
return
}

return prune.map(topic => this._makePrune(id, topic))
return prune.map(topic => this._makePrune(id, topic, doPX))
}

/**
Expand All @@ -636,7 +650,8 @@ class Gossipsub extends BasicPubsub {
* @returns {void}
*/
_handlePrune (id: string, prune: ControlPrune[]): void {
prune.forEach(({ topicID, backoff }) => {
const score = this.score.score(id)
prune.forEach(({ topicID, backoff, peers }) => {
if (!topicID) {
return
}
Expand All @@ -654,6 +669,19 @@ class Gossipsub extends BasicPubsub {
} else {
this._addBackoff(id, topicID)
}

// PX
if (peers && peers.length) {
// we ignore PX from peers with insufficient scores
if (score < this._options.scoreThresholds.acceptPXThreshold) {
this.log(
'PRUNE: ignoring PX from peer %s with insufficient score [score = %d, topic = %s]',
id, score, topicID
)
return
}
this._pxConnect(peers)
}
})
}

Expand Down Expand Up @@ -746,6 +774,60 @@ class Gossipsub extends BasicPubsub {
}
}

/**
* Maybe attempt connection given signed peer records
* @param {PeerInfo[]} peers
* @returns {Promise<void>}
*/
async _pxConnect (peers: PeerInfo[]): Promise<void> {
if (peers.length > constants.GossipsubPrunePeers) {
shuffle(peers)
peers = peers.slice(0, constants.GossipsubPrunePeers)
}
const toconnect: string[] = []

peers.forEach(async pi => {
if (!pi.peerID || !pi.signedPeerRecord) {
return
}

const p = PeerId.createFromBytes(pi.peerID)
const id = p.toB58String()

if (this.peers.has(id)) {
return
}

// the peer sent us a signed record; ensure that it is valid
try {
const envelope = await (Envelope as EnvelopeClass).openAndCertify(pi.signedPeerRecord, 'libp2p-peer-record')
const eid = envelope.peerId.toB58String()
if (id !== eid) {
this.log(
'bogus peer record obtained through px: peer ID %s doesn\'t match expected peer %s',
eid, id
)
return
}
if (!this._libp2p.peerStore.addressBook.consumePeerRecord(envelope)) {
this.log(
'bogus peer record obtained through px: could not add peer record to address book'
)
return
}
toconnect.push(id)
} catch (e) {
this.log('bogus peer record obtained through px: invalid signature or not a peer record')
}
})

if (!toconnect.length) {
return
}

toconnect.forEach(id => this._connect(id))
}

/**
* Mounts the gossipsub protocol onto the libp2p node and sends our
* our subscriptions to every peer connected
Expand Down Expand Up @@ -1014,7 +1096,7 @@ class Gossipsub extends BasicPubsub {
*/
_sendPrune (id: string, topic: string): void {
const prune = [
this._makePrune(id, topic)
this._makePrune(id, topic, this._options.doPX)
]

const out = createGossipRpc([], { prune })
Expand Down Expand Up @@ -1077,22 +1159,23 @@ class Gossipsub extends BasicPubsub {
* @param {Map<string, Array<string>>} tograft peer id => topic[]
* @param {Map<string, Array<string>>} toprune peer id => topic[]
*/
_sendGraftPrune (tograft: Map<string, string[]>, toprune: Map<string, string[]>): void {
_sendGraftPrune (tograft: Map<string, string[]>, toprune: Map<string, string[]>, noPX: Map<string, boolean>): void {
const doPX = this._options.doPX
for (const [id, topics] of tograft) {
const graft = topics.map((topicID) => ({ topicID }))
let prune: ControlPrune[] = []
// If a peer also has prunes, process them now
const pruning = toprune.get(id)
if (pruning) {
prune = pruning.map((topicID) => this._makePrune(id, topicID))
prune = pruning.map((topicID) => this._makePrune(id, topicID, doPX && !noPX.get(id)))
toprune.delete(id)
}

const outRpc = createGossipRpc([], { graft, prune })
this._sendRpc(id, outRpc)
}
for (const [id, topics] of toprune) {
const prune = topics.map((topicID) => this._makePrune(id, topicID))
const prune = topics.map((topicID) => this._makePrune(id, topicID, doPX && !noPX.get(id)))
const outRpc = createGossipRpc([], { prune })
this._sendRpc(id, outRpc)
}
Expand Down Expand Up @@ -1212,9 +1295,10 @@ class Gossipsub extends BasicPubsub {
* Make a PRUNE control message for a peer in a topic
* @param {string} id
* @param {string} topic
* @param {boolean} doPX
* @returns {ControlPrune}
*/
_makePrune (id: string, topic: string): ControlPrune {
_makePrune (id: string, topic: string, doPX: boolean): ControlPrune {
if (this.peers.get(id)!.protocol === constants.GossipsubIDv10) {
// Gossipsub v1.0 -- no backoff, the peer won't be able to parse it anyway
return {
Expand All @@ -1225,9 +1309,26 @@ class Gossipsub extends BasicPubsub {
// backoff is measured in seconds
// GossipsubPruneBackoff is measured in milliseconds
const backoff = constants.GossipsubPruneBackoff / 1000
const px: PeerInfo[] = []
if (doPX) {
// select peers for Peer eXchange
const peers = getGossipPeers(this, topic, constants.GossipsubPrunePeers, (xid: string): boolean => {
return xid !== id && this.score.score(xid) >= 0
})
peers.forEach(p => {
// see if we have a signed record to send back; if we don't, just send
// the peer ID and let the pruned peer find them in the DHT -- we can't trust
// unsigned address records through PX anyways
const peerId = PeerId.createFromB58String(p)
px.push({
peerID: peerId.toBytes(),
signedPeerRecord: this._libp2p.peerStore.addressBook.getRawEnvelope(peerId)
})
})
}
return {
topicID: topic,
peers: [],
peers: px,
backoff: backoff
}
}
Expand Down
1 change: 1 addition & 0 deletions ts/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ interface Book<K, V> {

export interface AddressBook extends Book<PeerId, Multiaddr> {
consumePeerRecord(envelope: Envelope): boolean
getRawEnvelope(peerId: PeerId): Buffer | undefined
}

export interface PeerStore {
Expand Down

0 comments on commit a0a691b

Please sign in to comment.