Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GossipSub: Limit flood publishing #911

Merged
merged 12 commits into from
Jul 31, 2023
22 changes: 15 additions & 7 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -488,20 +488,28 @@ method publish*(g: GossipSub,

var peers: HashSet[PubSubPeer]

# add always direct peers
peers.incl(g.explicit.getOrDefault(topic))

if topic in g.topics: # if we're subscribed use the mesh
peers.incl(g.mesh.getOrDefault(topic))

if g.parameters.floodPublish:
let
msgSize = data.len
bandwidth = 25_000_000 #TODO replace with bandwidth estimate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect we need to make this a (debug) parameter until we have something better - ie in nimbus-eth2, this would be a hidden command-line parameter that we pull out in case of weird behavior - hidden, because obviously it's something that should go away at some point

msToTransmit = max(msgSize div (bandwidth div 1000), 1)
maxFloodPublish =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for a min?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the messages are that big (2mb), it will effectively disable flood publish
The idea of this PR is that flood publish shouldn't last longer than one heartbeat, since after that we will be busy responding to IWANT requests

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider the case where we're not subscribed to the topic - sending to 0 peers seems a bit harsh then...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, that won't happen before >50mb, but still good to cover it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought a bit more about this, and I think we want to put the minimum at dmin at least ..

it feels very risky to send only to one peer - if that peer is slow, it'll delay the message for all peers by a heartbeat and it relies a bit too heavily on the IHAVE/IWANT mechanism to recover, ie there's no redundancy...

I know there's a cost here for sending big messages from slow peers, but I think the risk for normal/highbandwidth peers is more real and costly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The minimum is currently dLow (it will be caught by the if peers.len < g.parameters.dLow: below)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aren't fanout peers populated only when we're not subscribed? also as such, there might not be enough of them in the fanout table either?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also use them when we can't publish to enough peers (ie bad mesh), and they are replenished when < dLow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also use them when we can't publish to enough peers (ie bad mesh), and they are replenished when < dLow

so sending to g.gossipsub[topic] as a last resort wouldn't be appropriate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be the same as using the fanout, the fanout is only a small cache of gossipsub to have more stable diffusion routes, basically

(g.parameters.heartbeatInterval.milliseconds div msToTransmit)
# With flood publishing enabled, the mesh is used when propagating messages from other peers,
# but a peer's own messages will always be published to all known peers in the topic.
# but a peer's own messages will always be published to all known peers in the topic, limited
# to the amount of peers we can send it to in one heartbeat
for peer in g.gossipsub.getOrDefault(topic):
if peers.len >= maxFloodPublish: break
if peer.score >= g.parameters.publishThreshold:
trace "publish: including flood/high score peer", peer
peers.incl(peer)

# add always direct peers
peers.incl(g.explicit.getOrDefault(topic))

if topic in g.topics: # if we're subscribed use the mesh
peers.incl(g.mesh.getOrDefault(topic))

if peers.len < g.parameters.dLow and g.parameters.floodPublish == false:
# not subscribed or bad mesh, send to fanout peers
# disable for floodPublish, since we already sent to every good peer
Expand Down
42 changes: 42 additions & 0 deletions tests/pubsub/testgossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,48 @@ suite "GossipSub":

await allFuturesThrowing(nodesFut.concat())

asyncTest "e2e - GossipSub floodPublish limit":
var passed: Future[bool] = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"

let
nodes = generateNodes(
20,
gossip = true)

await allFuturesThrowing(
nodes.mapIt(it.switch.start())
)

var gossip1: GossipSub = GossipSub(nodes[0])
gossip1.parameters.floodPublish = true
gossip1.parameters.heartbeatInterval = milliseconds(700)

for node in nodes[1..^1]:
node.subscribe("foobar", handler)
await node.switch.connect(nodes[0].peerInfo.peerId, nodes[0].peerInfo.addrs)

block setup:
for i in 0..<50:
if (await nodes[0].publish("foobar", ("Hello!" & $i).toBytes())) == 19:
break setup
await sleepAsync(10.milliseconds)
check false

check (await nodes[0].publish("foobar", newSeq[byte](1_000_000))) == 17

# Now try with a mesh
gossip1.subscribe("foobar", handler)
checkExpiring: gossip1.mesh.peers("foobar") > 5

# use a different length so that the message is not equal to the last
check (await nodes[0].publish("foobar", newSeq[byte](1_000_001))) == 17

await allFuturesThrowing(
nodes.mapIt(it.switch.stop())
)

asyncTest "e2e - GossipSub with multiple peers":
var runs = 10

Expand Down