Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: graft/prune events and mesh peer tagging #383

Merged
merged 17 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

113 changes: 88 additions & 25 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ import { removeFirstNItemsFromSet, removeItemsFromSet } from './utils/set.js'
import { SimpleTimeCache } from './utils/time-cache.js'
import type { GossipsubOptsSpec } from './config.js'
import type {
Connection, Stream, PeerId, Peer, PeerStore,
Connection, Direction, Stream, PeerId, Peer, PeerStore,
Message,
PublishResult,
PubSub,
Expand Down Expand Up @@ -188,6 +188,11 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit {
* Limits to bound protobuf decoding
*/
decodeRpcLimits?: DecodeRPCLimits

/**
* If true, will utilize the libp2p connection manager tagging system to prune/graft connections to peers, defaults to true
*/
tagMeshPeers: boolean
}

export interface GossipsubMessage {
Expand All @@ -196,9 +201,17 @@ export interface GossipsubMessage {
msg: Message
}

export interface MeshPeer {
peerId: string
topic: string
direction: Direction
}

export interface GossipsubEvents extends PubSubEvents {
'gossipsub:heartbeat': CustomEvent
'gossipsub:message': CustomEvent<GossipsubMessage>
'gossipsub:graft': CustomEvent<MeshPeer>
'gossipsub:prune': CustomEvent<MeshPeer>
}

enum GossipStatusCode {
Expand Down Expand Up @@ -407,6 +420,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
fallbackToFloodsub: true,
floodPublish: true,
batchPublish: false,
tagMeshPeers: true,
doPX: false,
directPeers: [],
D: constants.GossipsubD,
Expand Down Expand Up @@ -634,6 +648,11 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
})
}, constants.GossipsubDirectConnectInitialDelay)

if (this.opts.tagMeshPeers) {
this.addEventListener('gossipsub:graft', this.tagMeshPeer)
this.addEventListener('gossipsub:prune', this.untagMeshPeer)
}

this.log('started')
}

Expand All @@ -651,6 +670,11 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
const { registrarTopologyIds } = this.status
this.status = { code: GossipStatusCode.stopped }

if (this.opts.tagMeshPeers) {
this.removeEventListener('gossipsub:graft', this.tagMeshPeer)
this.removeEventListener('gossipsub:prune', this.untagMeshPeer)
}

// unregister protocol and handlers
const registrar = this.components.registrar
await Promise.all(this.multicodecs.map(async (multicodec) => registrar.unhandle(multicodec)))
Expand Down Expand Up @@ -1493,6 +1517,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
if (topicID == null) {
return
}

const peersInMesh = this.mesh.get(topicID)
if (peersInMesh == null) {
// don't do PX when there is an unknown topic to avoid leaking our peers
Expand All @@ -1506,38 +1531,38 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
return
}

const backoffExpiry = this.backoff.get(topicID)?.get(id)

// This if/else chain contains the various cases of valid (and semi-valid) GRAFTs
// Most of these cases result in a PRUNE immediately being sent in response

// we don't GRAFT to/from direct peers; complain loudly if this happens
if (this.direct.has(id)) {
this.log('GRAFT: ignoring request from direct peer %s', id)
// this is possibly a bug from a non-reciprical configuration; send a PRUNE
prune.push(topicID)
// but don't px
doPX = false
return
}

// make sure we are not backing off that peer
const expire = this.backoff.get(topicID)?.get(id)
if (typeof expire === 'number' && now < expire) {
// make sure we are not backing off that peer
} else if (typeof backoffExpiry === 'number' && now < backoffExpiry) {
this.log('GRAFT: ignoring backed off peer %s', id)
// add behavioral penalty
this.score.addPenalty(id, 1, ScorePenalty.GraftBackoff)
// no PX
doPX = false
// check the flood cutoff -- is the GRAFT coming too fast?
const floodCutoff = expire + this.opts.graftFloodThreshold - this.opts.pruneBackoff
const floodCutoff = backoffExpiry + this.opts.graftFloodThreshold - this.opts.pruneBackoff
if (now < floodCutoff) {
// extra penalty
this.score.addPenalty(id, 1, ScorePenalty.GraftBackoff)
}
// refresh the backoff
this.addBackoff(id, topicID)
prune.push(topicID)
return
}

// check the score
if (score < 0) {
// check the score
} else if (score < 0) {
// we don't GRAFT peers with negative score
this.log('GRAFT: ignoring peer %s with negative score: score=%d, topic=%s', id, score, topicID)
// we do send them PRUNE however, because it's a matter of protocol correctness
Expand All @@ -1546,23 +1571,24 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
doPX = false
// add/refresh backoff so that we don't reGRAFT too early even if the score decays
this.addBackoff(id, topicID)
return
}

// check the number of mesh peers; if it is at (or over) Dhi, we only accept grafts
// from peers with outbound connections; this is a defensive check to restrict potential
// mesh takeover attacks combined with love bombing
if (peersInMesh.size >= this.opts.Dhi && !(this.outbound.get(id) ?? false)) {
// check the number of mesh peers; if it is at (or over) Dhi, we only accept grafts
// from peers with outbound connections; this is a defensive check to restrict potential
// mesh takeover attacks combined with love bombing
} else if (peersInMesh.size >= this.opts.Dhi && !(this.outbound.get(id) ?? false)) {
prune.push(topicID)
this.addBackoff(id, topicID)
return
}

this.log('GRAFT: Add mesh link from %s in %s', id, topicID)
this.score.graft(id, topicID)
peersInMesh.add(id)
// valid graft
} else {
this.log('GRAFT: Add mesh link from %s in %s', id, topicID)
this.score.graft(id, topicID)
peersInMesh.add(id)

this.metrics?.onAddToMesh(topicID, InclusionReason.Subscribed, 1)
}

this.metrics?.onAddToMesh(topicID, InclusionReason.Subscribed, 1)
this.safeDispatchEvent<MeshPeer>('gossipsub:graft', { detail: { peerId: id, topic: topicID, direction: 'inbound' } })
})

if (prune.length === 0) {
Expand Down Expand Up @@ -1613,10 +1639,12 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
score,
topicID
)
continue
} else {
await this.pxConnect(peers)
}
await this.pxConnect(peers)
}

this.safeDispatchEvent<MeshPeer>('gossipsub:prune', { detail: { peerId: id, topic: topicID, direction: 'inbound' } })
}
}

Expand Down Expand Up @@ -2311,6 +2339,21 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub

this.metrics?.onRpcSent(rpc, rpcBytes.length)

if (rpc.control?.graft != null) {
for (const topic of rpc.control?.graft) {
if (topic.topicID != null) {
this.safeDispatchEvent<MeshPeer>('gossipsub:graft', { detail: { peerId: id, topic: topic.topicID, direction: 'outbound' } })
}
}
}
if (rpc.control?.prune != null) {
for (const topic of rpc.control?.prune) {
if (topic.topicID != null) {
this.safeDispatchEvent<MeshPeer>('gossipsub:prune', { detail: { peerId: id, topic: topic.topicID, direction: 'outbound' } })
}
}
}

return true
}

Expand Down Expand Up @@ -3015,6 +3058,26 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub

metrics.registerScoreWeights(sw)
}

private readonly tagMeshPeer = (evt: CustomEvent<MeshPeer>): void => {
const { peerId, topic } = evt.detail
this.components.peerStore.merge(peerIdFromString(peerId), {
tags: {
[topic]: {
value: 100
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value could be parameterised?

}
}
}).catch((err) => { this.log.error('Error tagging peer %s with topic %s', peerId, topic, err) })
}

private readonly untagMeshPeer = (evt: CustomEvent<MeshPeer>): void => {
const { peerId, topic } = evt.detail
this.components.peerStore.merge(peerIdFromString(peerId), {
tags: {
[topic]: undefined
}
}).catch((err) => { this.log.error('Error untagging peer %s with topic %s', peerId, topic, err) })
}
}

export function gossipsub (
Expand Down
61 changes: 61 additions & 0 deletions test/gossip.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,67 @@ describe('gossip', () => {
expect(publishResult.recipients).to.deep.equal([])
})

it('should tag peers', async function () {
this.timeout(10e4)
const nodeA = nodes[0]
const nodeB = nodes[1]
const topic = 'Z'

const twoNodes = [nodeA, nodeB]

const graftPromises = twoNodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:graft'))

// add subscriptions to each node
twoNodes.forEach((n) => { n.pubsub.subscribe(topic) })

// every node connected to every other
await connectAllPubSubNodes(twoNodes)

// await grafts
await Promise.all(graftPromises)

// await mesh rebalancing
await Promise.all(twoNodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat')))

const peerInfoA = await nodeA.components.peerStore.get(nodeB.components.peerId).catch((e) => undefined)
const peerInfoB = await nodeB.components.peerStore.get(nodeA.components.peerId).catch((e) => undefined)
expect(peerInfoA?.tags.get(topic)?.value).to.equal(100)
expect(peerInfoB?.tags.get(topic)?.value).to.equal(100)
})

it('should remove the tags upon pruning', async function () {
this.timeout(10e4)
const nodeA = nodes[0]
const nodeB = nodes[1]
const topic = 'Z'

const twoNodes = [nodeA, nodeB]

const subscriptionPromises = nodes.map(async (n) => pEvent(n.pubsub, 'subscription-change'))
// add subscriptions to each node
twoNodes.forEach((n) => { n.pubsub.subscribe(topic) })

// every node connected to every other
await connectAllPubSubNodes(nodes)

// await for subscriptions to be transmitted
await Promise.all(subscriptionPromises)

// await mesh rebalancing
await Promise.all(twoNodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat')))

twoNodes.forEach((n) => { n.pubsub.unsubscribe(topic) })

// await for unsubscriptions to be transmitted
// await mesh rebalancing
await Promise.all(twoNodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat')))

const peerInfoA = await nodeA.components.peerStore.get(nodeB.components.peerId).catch((e) => undefined)
const peerInfoB = await nodeB.components.peerStore.get(nodeA.components.peerId).catch((e) => undefined)
expect(peerInfoA?.tags.get(topic)).to.be.undefined()
expect(peerInfoB?.tags.get(topic)).to.be.undefined()
})

it('should reject incoming messages bigger than maxInboundDataLength limit', async function () {
this.timeout(10e4)
const nodeA = nodes[0]
Expand Down
Loading