Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(gossipsub): pubsubpeer is created with wrong gossipsub version #1116

Merged
merged 4 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -862,8 +862,13 @@ method initPubSub*(g: GossipSub) {.raises: [InitializationError].} =
# init gossip stuff
g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength)

method getOrCreatePeer*(g: GossipSub, peerId: PeerId, protos: seq[string]): PubSubPeer =
let peer = procCall PubSub(g).getOrCreatePeer(peerId, protos)
method getOrCreatePeer*(
g: GossipSub,
peerId: PeerId,
protosToDial: seq[string],
protoNegotiated: string = "",
): PubSubPeer =
let peer = procCall PubSub(g).getOrCreatePeer(peerId, protosToDial, protoNegotiated)
g.parameters.overheadRateLimit.withValue(overheadRateLimit):
peer.overheadRateLimitOpt =
Opt.some(TokenBucket.new(overheadRateLimit.bytes, overheadRateLimit.interval))
Expand Down
11 changes: 7 additions & 4 deletions libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -348,19 +348,22 @@ method onPubSubPeerEvent*(
discard

method getOrCreatePeer*(
p: PubSub, peerId: PeerId, protos: seq[string]
p: PubSub, peerId: PeerId, protosToDial: seq[string], protoNegotiated: string = ""
): PubSubPeer {.base, gcsafe.} =
p.peers.withValue(peerId, peer):
if peer[].codec == "":
peer[].codec = protoNegotiated
return peer[]

proc getConn(): Future[Connection] {.async.} =
return await p.switch.dial(peerId, protos)
return await p.switch.dial(peerId, protosToDial)

proc onEvent(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe.} =
p.onPubSubPeerEvent(peer, event)

# create new pubsub peer
let pubSubPeer = PubSubPeer.new(peerId, getConn, onEvent, protos[0], p.maxMessageSize)
let pubSubPeer =
PubSubPeer.new(peerId, getConn, onEvent, protoNegotiated, p.maxMessageSize)
debug "created new pubsub peer", peerId

p.peers[peerId] = pubSubPeer
Expand Down Expand Up @@ -425,7 +428,7 @@ method handleConn*(p: PubSub, conn: Connection, proto: string) {.base, async.} =
# call pubsub rpc handler
p.rpcHandler(peer, data)

let peer = p.getOrCreatePeer(conn.peerId, @[proto])
let peer = p.getOrCreatePeer(conn.peerId, @[], proto)

try:
peer.handler = handler
Expand Down
36 changes: 36 additions & 0 deletions tests/pubsub/testgossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1038,3 +1038,39 @@ suite "GossipSub":
check currentRateLimitHits() == rateLimitHits + 2

await stopNodes(nodes)

asyncTest "Peer must send right gosspipsub version":
func dumbMsgIdProvider(m: Message): Result[MessageId, ValidationResult] =
ok(newSeq[byte](10))
let node0 = generateNodes(1, gossip = true, msgIdProvider = dumbMsgIdProvider)[0]
let node1 = generateNodes(
1,
gossip = true,
msgIdProvider = dumbMsgIdProvider,
gossipSubVersion = GossipSubCodec_10,
)[0]

let nodesFut = await allFinished(node0.switch.start(), node1.switch.start())

await node0.switch.connect(
node1.switch.peerInfo.peerId, node1.switch.peerInfo.addrs
)

proc handler(topic: string, data: seq[byte]) {.async.} =
discard

node0.subscribe("foobar", handler)
node1.subscribe("foobar", handler)
await waitSubGraph(@[node0, node1], "foobar")

var gossip0: GossipSub = GossipSub(node0)
var gossip1: GossipSub = GossipSub(node1)

checkUntilTimeout:
gossip0.mesh.getOrDefault("foobar").toSeq[0].codec == GossipSubCodec_10
checkUntilTimeout:
gossip1.mesh.getOrDefault("foobar").toSeq[0].codec == GossipSubCodec_10

await allFuturesThrowing(node0.switch.stop(), node1.switch.stop())

await allFuturesThrowing(nodesFut.concat())
3 changes: 3 additions & 0 deletions tests/pubsub/utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ proc generateNodes*(
enablePX: bool = false,
overheadRateLimit: Opt[tuple[bytes: int, interval: Duration]] =
Opt.none(tuple[bytes: int, interval: Duration]),
gossipSubVersion: string = "",
): seq[PubSub] =
for i in 0 ..< num:
let switch = newStandardSwitch(
Expand Down Expand Up @@ -100,6 +101,8 @@ proc generateNodes*(
g.topicParams.mgetOrPut("foobar", TopicParams.init()).topicWeight = 1.0
g.topicParams.mgetOrPut("foo", TopicParams.init()).topicWeight = 1.0
g.topicParams.mgetOrPut("bar", TopicParams.init()).topicWeight = 1.0
if gossipSubVersion != "":
g.codecs = @[gossipSubVersion]
g.PubSub
else:
FloodSub.init(
Expand Down
Loading