From 6b5ff4d3e40139b2a6cf8cbf670564c9d1b91090 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 3 Oct 2022 15:12:58 +0200 Subject: [PATCH] feat: allow only defined list of topics (#348) * Allow only defined list of topics * Allow allowedTopics unit test --- src/index.ts | 58 ++++++++++++++++++++++++-------------- test/allowedTopics.spec.ts | 57 +++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 21 deletions(-) create mode 100644 test/allowedTopics.spec.ts diff --git a/src/index.ts b/src/index.ts index 8a7691b1..7cba33fa 100644 --- a/src/index.ts +++ b/src/index.ts @@ -157,6 +157,11 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit { * If full it will throw and reject sending any more data. */ maxOutboundBufferSize?: number + + /** + * If provided, only allow topics in this list + */ + allowedTopics?: string[] | Set } export interface GossipsubMessage { @@ -339,6 +344,7 @@ export class GossipSub extends EventEmitter implements Initiali private status: GossipStatus = { code: GossipStatusCode.stopped } private maxInboundStreams?: number private maxOutboundStreams?: number + private allowedTopics: Set | null private heartbeatTimer: { _intervalId: ReturnType | undefined @@ -462,6 +468,8 @@ export class GossipSub extends EventEmitter implements Initiali this.maxInboundStreams = options.maxInboundStreams this.maxOutboundStreams = options.maxOutboundStreams + + this.allowedTopics = opts.allowedTopics ? new Set(opts.allowedTopics) : null } getPeers(): PeerId[] { @@ -918,23 +926,29 @@ export class GossipSub extends EventEmitter implements Initiali // Handle received subscriptions if (rpc.subscriptions && rpc.subscriptions.length > 0) { // update peer subscriptions + + const subscriptions: { topic: TopicStr; subscribe: boolean }[] = [] + rpc.subscriptions.forEach((subOpt) => { - this.handleReceivedSubscription(from, subOpt) + const topic = subOpt.topic + const subscribe = subOpt.subscribe === true + + if (topic != null) { + if (this.allowedTopics && !this.allowedTopics.has(topic)) { + // Not allowed: subscription data-structures are not bounded by topic count + // TODO: Should apply behaviour penalties? + return + } + + this.handleReceivedSubscription(from, topic, subscribe) + + subscriptions.push({ topic, subscribe }) + } }) this.dispatchEvent( new CustomEvent('subscription-change', { - detail: { - peerId: from, - subscriptions: rpc.subscriptions - .filter((sub) => sub.topic !== null) - .map((sub) => { - return { - topic: sub.topic ?? '', - subscribe: Boolean(sub.subscribe) - } - }) - } + detail: { peerId: from, subscriptions } }) ) } @@ -943,6 +957,12 @@ export class GossipSub extends EventEmitter implements Initiali // TODO: (up to limit) if (rpc.messages) { for (const message of rpc.messages) { + if (this.allowedTopics && !this.allowedTopics.has(message.topic)) { + // Not allowed: message cache data-structures are not bounded by topic count + // TODO: Should apply behaviour penalties? + continue + } + const handleReceivedMessagePromise = this.handleReceivedMessage(from, message) // Should never throw, but handle just in case .catch((err) => this.log(err)) @@ -962,20 +982,16 @@ export class GossipSub extends EventEmitter implements Initiali /** * Handles a subscription change from a peer */ - private handleReceivedSubscription(from: PeerId, subOpt: RPC.ISubOpts): void { - if (subOpt.topic == null) { - return - } - - this.log('subscription update from %p topic %s', from, subOpt.topic) + private handleReceivedSubscription(from: PeerId, topic: TopicStr, subscribe: boolean): void { + this.log('subscription update from %p topic %s', from, topic) - let topicSet = this.topics.get(subOpt.topic) + let topicSet = this.topics.get(topic) if (topicSet == null) { topicSet = new Set() - this.topics.set(subOpt.topic, topicSet) + this.topics.set(topic, topicSet) } - if (subOpt.subscribe) { + if (subscribe) { // subscribe peer to new topic topicSet.add(from.toString()) } else { diff --git a/test/allowedTopics.spec.ts b/test/allowedTopics.spec.ts new file mode 100644 index 00000000..7c4e18eb --- /dev/null +++ b/test/allowedTopics.spec.ts @@ -0,0 +1,57 @@ +import { expect } from 'aegir/chai' +import { GossipSub } from '../src/index.js' +import { pEvent } from 'p-event' +import { connectAllPubSubNodes, createComponentsArray } from './utils/create-pubsub.js' +import { Components } from '@libp2p/components' +import { stop } from '@libp2p/interfaces/startable' +import { mockNetwork } from '@libp2p/interface-mocks' + +/* eslint-disable dot-notation */ +describe('gossip / allowedTopics', () => { + let nodes: Components[] + + const allowedTopic = 'topic_allowed' + const notAllowedTopic = 'topic_not_allowed' + const allowedTopics = [allowedTopic] + const allTopics = [allowedTopic, notAllowedTopic] + + // Create pubsub nodes + beforeEach(async () => { + mockNetwork.reset() + nodes = await createComponentsArray({ + number: 2, + connected: false, + init: { + allowedTopics + } + }) + }) + + afterEach(async () => { + await stop(...nodes) + mockNetwork.reset() + }) + + it('should send gossip to non-mesh peers in topic', async function () { + this.timeout(10 * 1000) + const [nodeA, nodeB] = nodes + + // add subscriptions to each node + for (const topic of allTopics) { + nodeA.getPubSub().subscribe(topic) + } + + // every node connected to every other + await Promise.all([ + connectAllPubSubNodes(nodes), + // nodeA should send nodeB all its subscriptions on connection + pEvent(nodeB.getPubSub(), 'subscription-change') + ]) + + const nodeASubscriptions = Array.from((nodeA.getPubSub() as GossipSub)['subscriptions'].keys()) + expect(nodeASubscriptions).deep.equals(allTopics, 'nodeA.subscriptions should be subcribed to all') + + const nodeBTopics = Array.from((nodeB.getPubSub() as GossipSub)['topics'].keys()) + expect(nodeBTopics).deep.equals(allowedTopics, 'nodeB.topics should only contain allowedTopics') + }) +})