diff --git a/src/index.ts b/src/index.ts index 8e717db9..576337e2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -183,7 +183,7 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit { /** * If true, will utilize the libp2p connection manager tagging system to prune/graft connections to peers, defaults to false */ - taggingEnabled?: boolean + tagMeshPeers?: boolean } export interface GossipsubMessage { @@ -1479,9 +1479,9 @@ export class GossipSub extends TypedEventEmitter implements Pub const now = Date.now() let doPX = this.opts.doPX - if (this.opts?.taggingEnabled ?? false) { + if (this.opts?.tagMeshPeers ?? false) { for (const { topicID } of graft) { - if (!topicID) { + if (topicID == null) { continue } try { @@ -1594,18 +1594,6 @@ export class GossipSub extends TypedEventEmitter implements Pub continue } - if (this.opts?.taggingEnabled ?? false) { - try { - await this.components.peerStore.merge(peerIdFromString(id), { - tags: { - [topicID]: undefined - } - }) - } catch (err) { - this.log.error('Error untagging peer %s with topic %s', id, topicID, err) - } - } - const peersInMesh = this.mesh.get(topicID) if (peersInMesh == null) { return @@ -1638,6 +1626,18 @@ export class GossipSub extends TypedEventEmitter implements Pub continue } await this.pxConnect(peers) + + if (this.opts?.tagMeshPeers ?? false) { + try { + await this.components.peerStore.merge(peerIdFromString(id), { + tags: { + [topicID]: undefined + } + }) + } catch (err) { + this.log.error('Error untagging peer %s with topic %s', id, topicID, err) + } + } } } } @@ -1887,7 +1887,7 @@ export class GossipSub extends TypedEventEmitter implements Pub // - peer_added_to_mesh() }) - if (this.opts?.taggingEnabled ?? false) { + if (this.opts?.tagMeshPeers ?? false) { Array.from(toAdd).map(async (id) => { try { await this.components.peerStore.merge(peerIdFromString(id), { @@ -1898,7 +1898,7 @@ export class GossipSub extends TypedEventEmitter implements Pub } }) } catch (e) { - this.log('Failed to add topic tag to peer %s', id) + this.log('Failed to tag peer %s with topic %s', id, topic, e) } }) } @@ -2303,7 +2303,9 @@ export class GossipSub extends TypedEventEmitter implements Pub const onUnsubscribe = true const prune = [await this.makePrune(id, topic, this.opts.doPX, onUnsubscribe)] - if (this.opts.taggingEnabled ?? false) { + this.sendRpc(id, { control: { prune } }) + + if (this.opts.tagMeshPeers ?? false) { try { await this.components.peerStore.merge(peerIdFromString(id), { tags: { @@ -2314,8 +2316,6 @@ export class GossipSub extends TypedEventEmitter implements Pub this.log.error('Error untagging peer %s with topic %s', id, topic, err) } } - - this.sendRpc(id, { control: { prune } }) } /** diff --git a/test/gossip.spec.ts b/test/gossip.spec.ts index 85884bff..a292bfdc 100644 --- a/test/gossip.spec.ts +++ b/test/gossip.spec.ts @@ -29,7 +29,7 @@ describe('gossip', () => { }, maxInboundDataLength: 4000000, allowPublishToZeroPeers: false, - taggingEnabled: true + tagMeshPeers: true } }) }) @@ -110,9 +110,9 @@ describe('gossip', () => { const twoNodes = [nodeA, nodeB] - const subscriptionPromises = twoNodes.map(async (n) => await pEvent(n.pubsub, 'subscription-change')) + const subscriptionPromises = twoNodes.map(async (n) => pEvent(n.pubsub, 'subscription-change')) // add subscriptions to each node - twoNodes.forEach((n) => n.pubsub.subscribe(topic)) + twoNodes.forEach((n) => { n.pubsub.subscribe(topic) }) // every node connected to every other await connectAllPubSubNodes(twoNodes) @@ -121,7 +121,7 @@ describe('gossip', () => { await Promise.all(subscriptionPromises) // await mesh rebalancing - await Promise.all(twoNodes.map(async (n) => await pEvent(n.pubsub, 'gossipsub:heartbeat'))) + await Promise.all(twoNodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat'))) let peerInfo try { @@ -132,8 +132,10 @@ describe('gossip', () => { peerInfo = await nodeB.components.peerStore.get(nodeA.components.peerId) } } - - expect(peerInfo!.tags.get(topic)?.value).to.equal(100) + if (peerInfo == null) { + throw new Error('Peer info not found') + } + expect(peerInfo.tags.get(topic)?.value).to.equal(100) }) it('should remove the tags upon pruning', async function () { @@ -144,9 +146,9 @@ describe('gossip', () => { const twoNodes = [nodeA, nodeB] - const subscriptionPromises = nodes.map(async (n) => await pEvent(n.pubsub, 'subscription-change')) + const subscriptionPromises = nodes.map(async (n) => pEvent(n.pubsub, 'subscription-change')) // add subscriptions to each node - twoNodes.forEach((n) => n.pubsub.subscribe(topic)) + twoNodes.forEach((n) => { n.pubsub.subscribe(topic) }) // every node connected to every other await connectAllPubSubNodes(nodes) @@ -155,7 +157,7 @@ describe('gossip', () => { await Promise.all(subscriptionPromises) // await mesh rebalancing - await Promise.all(twoNodes.map(async (n) => await pEvent(n.pubsub, 'gossipsub:heartbeat'))) + await Promise.all(twoNodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat'))) nodeA.pubsub.unsubscribe(topic)