Skip to content

Commit

Permalink
disconnect peer
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomrsantos committed Mar 21, 2024
1 parent 1734160 commit 8c9a6ae
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 12 deletions.
8 changes: 5 additions & 3 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -176,17 +176,19 @@ method onNewPeer*(g: GossipSub, peer: PubSubPeer) =

method onPubSubPeerEvent*(p: GossipSub, peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe.} =
case event.kind
of PubSubPeerEventKind.Connected:
of PubSubPeerEventKind.StreamOpened:
discard
of PubSubPeerEventKind.Disconnected:
# If a send connection is lost, it's better to remove peer from the mesh -
of PubSubPeerEventKind.StreamClosed:
# If a send stream is lost, it's better to remove peer from the mesh -
# if it gets reestablished, the peer will be readded to the mesh, and if it
# doesn't, well.. then we hope the peer is going away!
for topic, peers in p.mesh.mpairs():
p.pruned(peer, topic)
peers.excl(peer)
for _, peers in p.fanout.mpairs():
peers.excl(peer)
of PubSubPeerEventKind.DisconnectionRequested:
asyncSpawn p.disconnectPeer(peer) # this should unsubscribePeer the peer too

procCall FloodSub(p).onPubSubPeerEvent(peer, event)

Expand Down
7 changes: 5 additions & 2 deletions libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,14 @@ method onNewPeer(p: PubSub, peer: PubSubPeer) {.base, gcsafe.} = discard
method onPubSubPeerEvent*(p: PubSub, peer: PubSubPeer, event: PubSubPeerEvent) {.base, gcsafe.} =
# Peer event is raised for the send connection in particular
case event.kind
of PubSubPeerEventKind.Connected:
of PubSubPeerEventKind.StreamOpened:
if p.topics.len > 0:
p.sendSubs(peer, toSeq(p.topics.keys), true)
of PubSubPeerEventKind.Disconnected:
of PubSubPeerEventKind.StreamClosed:
discard
of PubSubPeerEventKind.DisconnectionRequested:
discard


method getOrCreatePeer*(
p: PubSub,
Expand Down
18 changes: 11 additions & 7 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ when defined(pubsubpeer_queue_metrics):
declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"])
declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"])

declareCounter(libp2p_pubsub_disconnects_over_non_priority_queue_limit, "number of peers disconnected due to over non-prio queue capacity")

type
PeerRateLimitError* = object of CatchableError

Expand All @@ -43,8 +45,9 @@ type
onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [].}

PubSubPeerEventKind* {.pure.} = enum
Connected
Disconnected
StreamOpened
StreamClosed
DisconnectionRequested # tells gossipsub that the transport connection to the peer should be closed

PubSubPeerEvent* = object
kind*: PubSubPeerEventKind
Expand Down Expand Up @@ -182,7 +185,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
debug "exiting pubsub read loop",
conn, peer = p, closed = conn.closed

proc disconnect(p: PubSubPeer) {.async.} =
proc closeSendConn(p: PubSubPeer, event: PubSubPeerEventKind) {.async.} =
if p.sendConn != nil:
trace "Removing send connection", p, conn = p.sendConn
await p.sendConn.close()
Expand All @@ -193,7 +196,7 @@ proc disconnect(p: PubSubPeer) {.async.} =

try:
if p.onEvent != nil:
p.onEvent(p, PubSubPeerEvent(kind: PubSubPeerEventKind.Disconnected))
p.onEvent(p, PubSubPeerEvent(kind: event))
except CancelledError as exc:
raise exc
except CatchableError as exc:
Expand Down Expand Up @@ -222,11 +225,11 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
p.address = if p.sendConn.observedAddr.isSome: some(p.sendConn.observedAddr.get) else: none(MultiAddress)

if p.onEvent != nil:
p.onEvent(p, PubSubPeerEvent(kind: PubSubPeerEventKind.Connected))
p.onEvent(p, PubSubPeerEvent(kind: PubSubPeerEventKind.StreamOpened))

await handle(p, newConn)
finally:
await p.disconnect()
await p.closeSendConn(PubSubPeerEventKind.StreamClosed)

proc connectImpl(p: PubSubPeer) {.async.} =
try:
Expand Down Expand Up @@ -342,7 +345,8 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[v
f
else:
if len(p.rpcmessagequeue.nonPriorityQueue) == p.maxNumElementInNonPriorityQueue:
p.disconnect()
libp2p_pubsub_disconnects_over_non_priority_queue_limit.inc()
p.closeSendConn(PubSubPeerEventKind.DisconnectionRequested)
else:
let f = p.rpcmessagequeue.nonPriorityQueue.addLast(msg)
when defined(pubsubpeer_queue_metrics):
Expand Down

0 comments on commit 8c9a6ae

Please sign in to comment.