Skip to content

Commit

Permalink
fix: PR feedback adjustments
Browse files Browse the repository at this point in the history
  • Loading branch information
maschad committed Jan 29, 2024
1 parent fb50dba commit ca7d3d3
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 29 deletions.
40 changes: 20 additions & 20 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit {
/**
* If true, will utilize the libp2p connection manager tagging system to prune/graft connections to peers, defaults to false
*/
taggingEnabled?: boolean
tagMeshPeers?: boolean
}

export interface GossipsubMessage {
Expand Down Expand Up @@ -1479,9 +1479,9 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
const now = Date.now()
let doPX = this.opts.doPX

if (this.opts?.taggingEnabled ?? false) {
if (this.opts?.tagMeshPeers ?? false) {
for (const { topicID } of graft) {
if (!topicID) {
if (topicID == null) {
continue
}
try {
Expand Down Expand Up @@ -1594,18 +1594,6 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
continue
}

if (this.opts?.taggingEnabled ?? false) {
try {
await this.components.peerStore.merge(peerIdFromString(id), {
tags: {
[topicID]: undefined
}
})
} catch (err) {
this.log.error('Error untagging peer %s with topic %s', id, topicID, err)
}
}

const peersInMesh = this.mesh.get(topicID)
if (peersInMesh == null) {
return
Expand Down Expand Up @@ -1638,6 +1626,18 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
continue
}
await this.pxConnect(peers)

if (this.opts?.tagMeshPeers ?? false) {
try {
await this.components.peerStore.merge(peerIdFromString(id), {
tags: {
[topicID]: undefined
}
})
} catch (err) {
this.log.error('Error untagging peer %s with topic %s', id, topicID, err)
}
}
}
}
}
Expand Down Expand Up @@ -1887,7 +1887,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
// - peer_added_to_mesh()
})

if (this.opts?.taggingEnabled ?? false) {
if (this.opts?.tagMeshPeers ?? false) {
Array.from(toAdd).map(async (id) => {
try {
await this.components.peerStore.merge(peerIdFromString(id), {
Expand All @@ -1898,7 +1898,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
}
})
} catch (e) {
this.log('Failed to add topic tag to peer %s', id)
this.log('Failed to tag peer %s with topic %s', id, topic, e)
}
})
}
Expand Down Expand Up @@ -2303,7 +2303,9 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
const onUnsubscribe = true
const prune = [await this.makePrune(id, topic, this.opts.doPX, onUnsubscribe)]

if (this.opts.taggingEnabled ?? false) {
this.sendRpc(id, { control: { prune } })

if (this.opts.tagMeshPeers ?? false) {
try {
await this.components.peerStore.merge(peerIdFromString(id), {
tags: {
Expand All @@ -2314,8 +2316,6 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
this.log.error('Error untagging peer %s with topic %s', id, topic, err)
}
}

this.sendRpc(id, { control: { prune } })
}

/**
Expand Down
20 changes: 11 additions & 9 deletions test/gossip.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ describe('gossip', () => {
},
maxInboundDataLength: 4000000,
allowPublishToZeroPeers: false,
taggingEnabled: true
tagMeshPeers: true
}
})
})
Expand Down Expand Up @@ -110,9 +110,9 @@ describe('gossip', () => {

const twoNodes = [nodeA, nodeB]

const subscriptionPromises = twoNodes.map(async (n) => await pEvent(n.pubsub, 'subscription-change'))
const subscriptionPromises = twoNodes.map(async (n) => pEvent(n.pubsub, 'subscription-change'))
// add subscriptions to each node
twoNodes.forEach((n) => n.pubsub.subscribe(topic))
twoNodes.forEach((n) => { n.pubsub.subscribe(topic) })

// every node connected to every other
await connectAllPubSubNodes(twoNodes)
Expand All @@ -121,7 +121,7 @@ describe('gossip', () => {
await Promise.all(subscriptionPromises)

// await mesh rebalancing
await Promise.all(twoNodes.map(async (n) => await pEvent(n.pubsub, 'gossipsub:heartbeat')))
await Promise.all(twoNodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat')))

let peerInfo
try {
Expand All @@ -132,8 +132,10 @@ describe('gossip', () => {
peerInfo = await nodeB.components.peerStore.get(nodeA.components.peerId)
}
}

expect(peerInfo!.tags.get(topic)?.value).to.equal(100)
if (peerInfo == null) {
throw new Error('Peer info not found')
}
expect(peerInfo.tags.get(topic)?.value).to.equal(100)
})

it('should remove the tags upon pruning', async function () {
Expand All @@ -144,9 +146,9 @@ describe('gossip', () => {

const twoNodes = [nodeA, nodeB]

const subscriptionPromises = nodes.map(async (n) => await pEvent(n.pubsub, 'subscription-change'))
const subscriptionPromises = nodes.map(async (n) => pEvent(n.pubsub, 'subscription-change'))
// add subscriptions to each node
twoNodes.forEach((n) => n.pubsub.subscribe(topic))
twoNodes.forEach((n) => { n.pubsub.subscribe(topic) })

// every node connected to every other
await connectAllPubSubNodes(nodes)
Expand All @@ -155,7 +157,7 @@ describe('gossip', () => {
await Promise.all(subscriptionPromises)

// await mesh rebalancing
await Promise.all(twoNodes.map(async (n) => await pEvent(n.pubsub, 'gossipsub:heartbeat')))
await Promise.all(twoNodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat')))

nodeA.pubsub.unsubscribe(topic)

Expand Down

0 comments on commit ca7d3d3

Please sign in to comment.