diff --git a/beacon_node/lighthouse_network/src/behaviour/mod.rs b/beacon_node/lighthouse_network/src/behaviour/mod.rs index 9c9e094db62..ac7ca421dbc 100644 --- a/beacon_node/lighthouse_network/src/behaviour/mod.rs +++ b/beacon_node/lighthouse_network/src/behaviour/mod.rs @@ -31,7 +31,6 @@ use libp2p::{ }, identify::{Identify, IdentifyConfig, IdentifyEvent}, swarm::{ - dial_opts::{DialOpts, PeerCondition}, AddressScore, NetworkBehaviour, NetworkBehaviourAction as NBAction, NetworkBehaviourEventProcess, PollParameters, }, @@ -130,8 +129,6 @@ pub enum BehaviourEvent { /// Internal type to pass messages from sub-behaviours to the poll of the global behaviour to be /// specified as an NBAction. enum InternalBehaviourMessage { - /// Dial a Peer. - DialPeer(PeerId), /// The socket has been updated. SocketUpdated(Multiaddr), } @@ -836,9 +833,7 @@ impl Behaviour { self.discovery.remove_cached_enr(&peer_id); // For any dial event, inform the peer manager let enr = self.discovery_mut().enr_of_peer(&peer_id); - self.peer_manager.inject_dialing(&peer_id, enr); - self.internal_events - .push_back(InternalBehaviourMessage::DialPeer(peer_id)); + self.peer_manager.dial_peer(&peer_id, enr); } } @@ -1154,9 +1149,7 @@ where debug!(self.log, "Dialing discovered peer"; "peer_id" => %peer_id); // For any dial event, inform the peer manager let enr = self.discovery_mut().enr_of_peer(&peer_id); - self.peer_manager.inject_dialing(&peer_id, enr); - self.internal_events - .push_back(InternalBehaviourMessage::DialPeer(peer_id)); + self.peer_manager.dial_peer(&peer_id, enr); } } } @@ -1214,16 +1207,6 @@ where // Handle internal events first if let Some(event) = self.internal_events.pop_front() { match event { - InternalBehaviourMessage::DialPeer(peer_id) => { - // Submit the event - let handler = self.new_handler(); - return Poll::Ready(NBAction::Dial { - opts: DialOpts::peer_id(peer_id) - .condition(PeerCondition::Disconnected) - .build(), - handler, - }); - } InternalBehaviourMessage::SocketUpdated(address) => { return Poll::Ready(NBAction::ReportObservedAddr { address, diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 55b3884454d..b83f03c097a 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -12,6 +12,7 @@ use peerdb::{client::ClientKind, BanOperation, BanResult, ScoreUpdateResult}; use rand::seq::SliceRandom; use slog::{debug, error, trace, warn}; use smallvec::SmallVec; +use std::collections::VecDeque; use std::{ sync::Arc, time::{Duration, Instant}, @@ -71,6 +72,8 @@ pub struct PeerManager { status_peers: HashSetDelay, /// The target number of peers we would like to connect to. target_peers: usize, + /// Peers queued to be dialed. + peers_to_dial: VecDeque<(PeerId, Option)>, /// A collection of sync committee subnets that we need to stay subscribed to. /// Sync committee subnets are longer term (256 epochs). Hence, we need to re-run /// discovery queries for subnet peers if we disconnect from existing sync @@ -135,6 +138,7 @@ impl PeerManager { Ok(PeerManager { network_globals, events: SmallVec::new(), + peers_to_dial: Default::default(), inbound_ping_peers: HashSetDelay::new(Duration::from_secs(ping_interval_inbound)), outbound_ping_peers: HashSetDelay::new(Duration::from_secs(ping_interval_outbound)), status_peers: HashSetDelay::new(Duration::from_secs(status_interval)), @@ -360,8 +364,8 @@ impl PeerManager { /* Notifications from the Swarm */ // A peer is being dialed. - pub fn inject_dialing(&mut self, peer_id: &PeerId, enr: Option) { - self.inject_peer_connection(peer_id, ConnectingType::Dialing, enr); + pub fn dial_peer(&mut self, peer_id: &PeerId, enr: Option) { + self.peers_to_dial.push_back((*peer_id, enr)); } /// Reports if a peer is banned or not. diff --git a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs index 3bda64f0b13..a19c6db657e 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs @@ -3,6 +3,7 @@ use std::task::{Context, Poll}; use futures::StreamExt; use libp2p::core::connection::ConnectionId; use libp2p::core::ConnectedPoint; +use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; use libp2p::swarm::handler::DummyConnectionHandler; use libp2p::swarm::{ ConnectionHandler, DialError, NetworkBehaviour, NetworkBehaviourAction, PollParameters, @@ -16,7 +17,7 @@ use crate::rpc::GoodbyeReason; use crate::types::SyncState; use super::peerdb::BanResult; -use super::{PeerManager, PeerManagerEvent, ReportSource}; +use super::{ConnectingType, PeerManager, PeerManagerEvent, ReportSource}; impl NetworkBehaviour for PeerManager { type ConnectionHandler = DummyConnectionHandler; @@ -99,6 +100,17 @@ impl NetworkBehaviour for PeerManager { self.events.shrink_to_fit(); } + if let Some((peer_id, maybe_enr)) = self.peers_to_dial.pop_front() { + self.inject_peer_connection(&peer_id, ConnectingType::Dialing, maybe_enr); + let handler = self.new_handler(); + return Poll::Ready(NetworkBehaviourAction::Dial { + opts: DialOpts::peer_id(peer_id) + .condition(PeerCondition::Disconnected) + .build(), + handler, + }); + } + Poll::Pending }