From 0011a5e720e2e61beb75b3384cd788c72548284a Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 31 Oct 2023 19:01:04 +0100 Subject: [PATCH] Revert "Prevent concurrent IWANT of the same message (#943)" This reverts commit c6aa085e98e7526cb8d4415cb9a7f886e6dcab30. --- libp2p/protocols/pubsub/gossipsub.nim | 6 +- .../protocols/pubsub/gossipsub/behavior.nim | 16 +-- libp2p/protocols/pubsub/gossipsub/types.nim | 7 -- tests/pubsub/testgossipinternal.nim | 98 ------------------- 4 files changed, 2 insertions(+), 125 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 794ea3e7e1..81e10f8372 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -77,8 +77,7 @@ proc init*(_: type[GossipSubParams]): GossipSubParams = behaviourPenaltyDecay: 0.999, disconnectBadPeers: false, enablePX: false, - bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps - iwantTimeout: 3 * GossipSubHeartbeatInterval + bandwidthEstimatebps: 100_000_000 # 100 Mbps or 12.5 MBps ) proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = @@ -410,9 +409,6 @@ method rpcHandler*(g: GossipSub, let msgId = msgIdResult.get msgIdSalted = msgId & g.seenSalt - g.outstandingIWANTs.withValue(msgId, iwantRequest): - if iwantRequest.peer.peerId == peer.peerId: - g.outstandingIWANTs.del(msgId) # addSeen adds salt to msgId to avoid # remote attacking the hash function diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index 008e197448..983262fa0c 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -254,8 +254,7 @@ proc handleIHave*(g: GossipSub, if not g.hasSeen(msgId): if peer.iHaveBudget <= 0: break - elif msgId notin res.messageIds and msgId notin g.outstandingIWANTs: - g.outstandingIWANTs[msgId] = IWANTRequest(messageId: msgId, peer: peer, timestamp: Moment.now()) + elif msgId notin res.messageIds: res.messageIds.add(msgId) dec peer.iHaveBudget trace "requested message via ihave", messageID=msgId @@ -301,17 +300,6 @@ proc handleIWant*(g: GossipSub, messages.add(msg) return messages -proc checkIWANTTimeouts(g: GossipSub, timeoutDuration: Duration) {.raises: [].} = - let currentTime = Moment.now() - var idsToRemove = newSeq[MessageId]() - for msgId, request in g.outstandingIWANTs.pairs(): - if currentTime - request.timestamp > timeoutDuration: - trace "IWANT request timed out", messageID=msgId, peer=request.peer - request.peer.behaviourPenalty += 0.1 - idsToRemove.add(msgId) - for msgId in idsToRemove: - g.outstandingIWANTs.del(msgId) - proc commitMetrics(metrics: var MeshMetrics) {.raises: [].} = libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics) libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics) @@ -717,5 +705,3 @@ proc heartbeat*(g: GossipSub) {.async.} = for trigger in g.heartbeatEvents: trace "firing heartbeat event", instance = cast[int](g) trigger.fire() - - checkIWANTTimeouts(g, g.parameters.iwantTimeout) diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index ca79b290e8..b5b512db09 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -143,7 +143,6 @@ type enablePX*: bool bandwidthEstimatebps*: int # This is currently used only for limting flood publishing. 0 disables flood-limiting completely - iwantTimeout*: Duration BackoffTable* = Table[string, Table[PeerId, Moment]] ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]] @@ -178,7 +177,6 @@ type routingRecordsHandler*: seq[RoutingRecordsHandler] # Callback for peer exchange heartbeatEvents*: seq[AsyncEvent] - outstandingIWANTs*: Table[MessageId, IWANTRequest] MeshMetrics* = object # scratch buffers for metrics @@ -189,8 +187,3 @@ type lowPeersTopics*: int64 # npeers < dlow healthyPeersTopics*: int64 # npeers >= dlow underDoutTopics*: int64 - - IWANTRequest* = object - messageId*: MessageId - peer*: PubSubPeer - timestamp*: Moment diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 909e1d613a..e574ce611c 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -727,101 +727,3 @@ suite "GossipSub internal": await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() - - asyncTest "two IHAVEs should generate only one IWANT": - let gossipSub = TestGossipSub.init(newStandardSwitch()) - - var iwantCount = 0 - - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = - check false - - proc handler2(topic: string, data: seq[byte]) {.async.} = discard - - let topic = "foobar" - var conns = newSeq[Connection]() - gossipSub.subscribe(topic, handler2) - - # Setup two connections and two peers - var ihaveMessageId: string - var firstPeer: PubSubPeer - let seqno = @[0'u8, 1, 2, 3] - for i in 0..<2: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - if isNil(firstPeer): - firstPeer = peer - ihaveMessageId = byteutils.toHex(seqno) & $firstPeer.peerId - peer.handler = handler - - # Simulate that each peer sends an IHAVE message to our node - let msg = ControlIHave( - topicID: topic, - messageIDs: @[ihaveMessageId.toBytes()] - ) - let iwants = gossipSub.handleIHave(peer, @[msg]) - if iwants.messageIds.len > 0: - iwantCount += 1 - - # Verify that our node responds with only one IWANT message - check: iwantCount == 1 - check: gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes()) - - # Simulate that our node receives the RPCMsg in response to the IWANT - let actualMessageData = "Hello, World!".toBytes - let rpcMsg = RPCMsg( - messages: @[Message( - fromPeer: firstPeer.peerId, - seqno: seqno, - data: actualMessageData - )] - ) - await gossipSub.rpcHandler(firstPeer, rpcMsg) - - check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes()) - - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() - - asyncTest "handle unanswered IWANT messages": - let gossipSub = TestGossipSub.init(newStandardSwitch()) - gossipSub.parameters.heartbeatInterval = 50.milliseconds - gossipSub.parameters.iwantTimeout = 10.milliseconds - await gossipSub.start() - - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = discard - proc handler2(topic: string, data: seq[byte]) {.async.} = discard - - let topic = "foobar" - var conns = newSeq[Connection]() - gossipSub.subscribe(topic, handler2) - - # Setup a connection and a peer - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.handler = handler - - # Simulate that the peer sends an IHAVE message to our node - let ihaveMessageId = @[0'u8, 1, 2, 3] - let ihaveMsg = ControlIHave( - topicID: topic, - messageIDs: @[ihaveMessageId] - ) - discard gossipSub.handleIHave(peer, @[ihaveMsg]) - - check: gossipSub.outstandingIWANTs.contains(ihaveMessageId) - check: peer.behaviourPenalty == 0.0 - - await sleepAsync(60.milliseconds) - - check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId) - check: peer.behaviourPenalty == 0.1 - - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop()