Skip to content

Commit

Permalink
feat: allow only defined list of topics (#348)
Browse files Browse the repository at this point in the history
* Allow only defined list of topics

* Allow allowedTopics unit test
  • Loading branch information
dapplion committed Oct 3, 2022
1 parent 27bdee7 commit 6b5ff4d
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 21 deletions.
58 changes: 37 additions & 21 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit {
* If full it will throw and reject sending any more data.
*/
maxOutboundBufferSize?: number

/**
* If provided, only allow topics in this list
*/
allowedTopics?: string[] | Set<string>
}

export interface GossipsubMessage {
Expand Down Expand Up @@ -339,6 +344,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
private status: GossipStatus = { code: GossipStatusCode.stopped }
private maxInboundStreams?: number
private maxOutboundStreams?: number
private allowedTopics: Set<TopicStr> | null

private heartbeatTimer: {
_intervalId: ReturnType<typeof setInterval> | undefined
Expand Down Expand Up @@ -462,6 +468,8 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali

this.maxInboundStreams = options.maxInboundStreams
this.maxOutboundStreams = options.maxOutboundStreams

this.allowedTopics = opts.allowedTopics ? new Set(opts.allowedTopics) : null
}

getPeers(): PeerId[] {
Expand Down Expand Up @@ -918,23 +926,29 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
// Handle received subscriptions
if (rpc.subscriptions && rpc.subscriptions.length > 0) {
// update peer subscriptions

const subscriptions: { topic: TopicStr; subscribe: boolean }[] = []

rpc.subscriptions.forEach((subOpt) => {
this.handleReceivedSubscription(from, subOpt)
const topic = subOpt.topic
const subscribe = subOpt.subscribe === true

if (topic != null) {
if (this.allowedTopics && !this.allowedTopics.has(topic)) {
// Not allowed: subscription data-structures are not bounded by topic count
// TODO: Should apply behaviour penalties?
return
}

this.handleReceivedSubscription(from, topic, subscribe)

subscriptions.push({ topic, subscribe })
}
})

this.dispatchEvent(
new CustomEvent<SubscriptionChangeData>('subscription-change', {
detail: {
peerId: from,
subscriptions: rpc.subscriptions
.filter((sub) => sub.topic !== null)
.map((sub) => {
return {
topic: sub.topic ?? '',
subscribe: Boolean(sub.subscribe)
}
})
}
detail: { peerId: from, subscriptions }
})
)
}
Expand All @@ -943,6 +957,12 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
// TODO: (up to limit)
if (rpc.messages) {
for (const message of rpc.messages) {
if (this.allowedTopics && !this.allowedTopics.has(message.topic)) {
// Not allowed: message cache data-structures are not bounded by topic count
// TODO: Should apply behaviour penalties?
continue
}

const handleReceivedMessagePromise = this.handleReceivedMessage(from, message)
// Should never throw, but handle just in case
.catch((err) => this.log(err))
Expand All @@ -962,20 +982,16 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
/**
* Handles a subscription change from a peer
*/
private handleReceivedSubscription(from: PeerId, subOpt: RPC.ISubOpts): void {
if (subOpt.topic == null) {
return
}

this.log('subscription update from %p topic %s', from, subOpt.topic)
private handleReceivedSubscription(from: PeerId, topic: TopicStr, subscribe: boolean): void {
this.log('subscription update from %p topic %s', from, topic)

let topicSet = this.topics.get(subOpt.topic)
let topicSet = this.topics.get(topic)
if (topicSet == null) {
topicSet = new Set()
this.topics.set(subOpt.topic, topicSet)
this.topics.set(topic, topicSet)
}

if (subOpt.subscribe) {
if (subscribe) {
// subscribe peer to new topic
topicSet.add(from.toString())
} else {
Expand Down
57 changes: 57 additions & 0 deletions test/allowedTopics.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { expect } from 'aegir/chai'
import { GossipSub } from '../src/index.js'
import { pEvent } from 'p-event'
import { connectAllPubSubNodes, createComponentsArray } from './utils/create-pubsub.js'
import { Components } from '@libp2p/components'
import { stop } from '@libp2p/interfaces/startable'
import { mockNetwork } from '@libp2p/interface-mocks'

/* eslint-disable dot-notation */
describe('gossip / allowedTopics', () => {
let nodes: Components[]

const allowedTopic = 'topic_allowed'
const notAllowedTopic = 'topic_not_allowed'
const allowedTopics = [allowedTopic]
const allTopics = [allowedTopic, notAllowedTopic]

// Create pubsub nodes
beforeEach(async () => {
mockNetwork.reset()
nodes = await createComponentsArray({
number: 2,
connected: false,
init: {
allowedTopics
}
})
})

afterEach(async () => {
await stop(...nodes)
mockNetwork.reset()
})

it('should send gossip to non-mesh peers in topic', async function () {
this.timeout(10 * 1000)
const [nodeA, nodeB] = nodes

// add subscriptions to each node
for (const topic of allTopics) {
nodeA.getPubSub().subscribe(topic)
}

// every node connected to every other
await Promise.all([
connectAllPubSubNodes(nodes),
// nodeA should send nodeB all its subscriptions on connection
pEvent(nodeB.getPubSub(), 'subscription-change')
])

const nodeASubscriptions = Array.from((nodeA.getPubSub() as GossipSub)['subscriptions'].keys())
expect(nodeASubscriptions).deep.equals(allTopics, 'nodeA.subscriptions should be subcribed to all')

const nodeBTopics = Array.from((nodeB.getPubSub() as GossipSub)['topics'].keys())
expect(nodeBTopics).deep.equals(allowedTopics, 'nodeB.topics should only contain allowedTopics')
})
})

0 comments on commit 6b5ff4d

Please sign in to comment.