diff --git a/crates/subspace-networking/src/behavior.rs b/crates/subspace-networking/src/behavior.rs index aeae2315fb..5706b1e175 100644 --- a/crates/subspace-networking/src/behavior.rs +++ b/crates/subspace-networking/src/behavior.rs @@ -15,11 +15,12 @@ use crate::protocols::request_response::request_response_factory::{ use crate::protocols::reserved_peers::{ Behaviour as ReservedPeersBehaviour, Config as ReservedPeersConfig, Event as ReservedPeersEvent, }; +use crate::protocols::subspace_connection_limits::Behaviour as ConnectionLimitsBehaviour; use crate::PeerInfoProvider; use derive_more::From; use libp2p::allow_block_list::{Behaviour as AllowBlockListBehaviour, BlockedPeers}; use libp2p::autonat::{Behaviour as Autonat, Config as AutonatConfig, Event as AutonatEvent}; -use libp2p::connection_limits::{Behaviour as ConnectionLimitsBehaviour, ConnectionLimits}; +use libp2p::connection_limits::ConnectionLimits; use libp2p::gossipsub::{ Behaviour as Gossipsub, Config as GossipsubConfig, Event as GossipsubEvent, MessageAuthenticity, }; diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index 357c4d148f..71c75fdb2c 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -22,7 +22,7 @@ use event_listener_primitives::HandlerId; use futures::channel::mpsc; use futures::future::Fuse; use futures::{FutureExt, StreamExt}; -use libp2p::autonat::{Event as AutonatEvent, NatStatus}; +use libp2p::autonat::{Event as AutonatEvent, NatStatus, OutboundProbeEvent}; use libp2p::core::ConnectedPoint; use libp2p::gossipsub::{Event as GossipsubEvent, TopicHash}; use libp2p::identify::Event as IdentifyEvent; @@ -32,6 +32,7 @@ use libp2p::kad::{ InboundRequest, PeerRecord, ProgressStep, PutRecordOk, QueryId, QueryResult, Quorum, Record, }; use libp2p::metrics::{Metrics, Recorder}; +use libp2p::multiaddr::Protocol; use libp2p::swarm::{DialError, SwarmEvent}; use libp2p::{futures, Multiaddr, PeerId, Swarm, TransportError}; use nohash_hasher::IntMap; @@ -41,13 +42,14 @@ use rand::{Rng, SeedableRng}; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::fmt::Debug; +use std::net::IpAddr; use std::num::NonZeroUsize; use std::pin::Pin; use std::sync::atomic::Ordering; use std::sync::{Arc, Weak}; use std::time::Duration; use tokio::time::Sleep; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, trace, warn}; // Defines a batch size for peer addresses from Kademlia buckets. const KADEMLIA_PEERS_ADDRESSES_BATCH_SIZE: usize = 20; @@ -123,8 +125,8 @@ where temporary_bans: Arc>, /// Prometheus metrics. metrics: Option, - /// Mapping from specific peer to number of established connections - established_connections: HashMap<(PeerId, ConnectedPoint), usize>, + /// Mapping from specific peer to ip addresses + peer_ip_addresses: HashMap>, /// Defines protocol version for the network peers. Affects network partition. protocol_version: String, /// Defines whether we maintain a persistent connection for common peers. @@ -219,7 +221,7 @@ where reserved_peers, temporary_bans, metrics, - established_connections: HashMap::new(), + peer_ip_addresses: HashMap::new(), protocol_version, general_connection_decision_handler, special_connection_decision_handler, @@ -454,13 +456,24 @@ where "Connection established [{num_established} from peer]" ); - // TODO: Workaround for https://github.com/libp2p/rust-libp2p/discussions/3418 - self.established_connections - .entry((peer_id, endpoint)) - .and_modify(|entry| { - *entry += 1; - }) - .or_insert(1); + let maybe_remote_ip = + endpoint + .get_remote_address() + .iter() + .find_map(|protocol| match protocol { + Protocol::Ip4(ip) => Some(IpAddr::V4(ip)), + Protocol::Ip6(ip) => Some(IpAddr::V6(ip)), + _ => None, + }); + if let Some(ip) = maybe_remote_ip { + self.peer_ip_addresses + .entry(peer_id) + .and_modify(|ips| { + ips.insert(ip); + }) + .or_insert(HashSet::from([ip])); + } + let num_established_peer_connections = shared .num_established_peer_connections .fetch_add(1, Ordering::SeqCst) @@ -486,7 +499,6 @@ where } SwarmEvent::ConnectionClosed { peer_id, - endpoint, num_established, cause, .. @@ -502,28 +514,8 @@ where "Connection closed with peer {peer_id} [{num_established} from peer]" ); - // TODO: Workaround for https://github.com/libp2p/rust-libp2p/discussions/3418 - { - match self.established_connections.entry((peer_id, endpoint)) { - Entry::Vacant(_) => { - // Nothing to do here, we are not aware of the connection being closed - warn!( - ?peer_id, - "Connection closed, but it is not known as open connection, \ - this is likely a bug in libp2p: \ - https://github.com/libp2p/rust-libp2p/discussions/3418" - ); - return; - } - Entry::Occupied(mut entry) => { - let value = entry.get_mut(); - if *value == 1 { - entry.remove_entry(); - } else { - *value -= 1; - } - } - }; + if num_established == 0 { + self.peer_ip_addresses.remove(&peer_id); } let num_established_peer_connections = shared .num_established_peer_connections @@ -551,24 +543,15 @@ where } SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { if let Some(peer_id) = &peer_id { - // Create or extend temporary ban, but only if we are not offline - if let Some(shared) = self.shared_weak.upgrade() { - // One peer is possibly a node peer is connected to, hence expecting more - // than one for online status - let other_connections_exist = shared - .num_established_peer_connections - .load(Ordering::Relaxed) - > 1; - let should_ban_temporarily = - self.should_temporary_ban_on_dial_error(peer_id, &error); - - trace!(%should_ban_temporarily, %other_connections_exist, "Temporary bans conditions."); - - if other_connections_exist && should_ban_temporarily { - self.temporary_bans.lock().create_or_extend(peer_id); - debug!(%peer_id, ?error, "Peer was temporarily banned."); - } - }; + let should_ban_temporarily = + self.should_temporary_ban_on_dial_error(peer_id, &error); + + trace!(%should_ban_temporarily, "Temporary bans conditions."); + + if should_ban_temporarily { + self.temporary_bans.lock().create_or_extend(peer_id); + debug!(%peer_id, ?error, "Peer was temporarily banned."); + } } debug!( @@ -605,13 +588,13 @@ where trace!(%address, "External address candidate"); } SwarmEvent::ExternalAddrConfirmed { address } => { - info!(%address, "Confirmed external address"); + debug!(%address, "Confirmed external address"); let connected_peers = self.swarm.connected_peers().copied().collect::>(); self.swarm.behaviour_mut().identify.push(connected_peers); } SwarmEvent::ExternalAddrExpired { address } => { - info!(%address, "External address expired"); + debug!(%address, "External address expired"); let connected_peers = self.swarm.connected_peers().copied().collect::>(); self.swarm.behaviour_mut().identify.push(connected_peers); @@ -623,6 +606,11 @@ where } fn should_temporary_ban_on_dial_error(&self, peer_id: &PeerId, error: &DialError) -> bool { + // TODO: Replace with banning of addresses rather peer IDs if this helps + if true { + return false; + } + // Ban temporarily only peers without active connections. if self.swarm.is_connected(peer_id) { return false; @@ -1187,17 +1175,57 @@ where ); } - if let AutonatEvent::StatusChanged { old, new } = event { - info!(?old, ?new, "Public address status changed."); + match event { + AutonatEvent::InboundProbe(_inbound_probe_event) => { + // We do not care about this event + } + AutonatEvent::OutboundProbe(outbound_probe_event) => { + match outbound_probe_event { + OutboundProbeEvent::Request { peer, .. } => { + // For outbound probe request add peer to allow list to ensure they can dial us back and not hit + // global incoming connection limit + self.swarm + .behaviour_mut() + .connection_limits + // We expect a single successful dial from this peer + .add_to_incoming_allow_list( + peer, + self.peer_ip_addresses + .get(&peer) + .iter() + .flat_map(|ip_addresses| ip_addresses.iter()) + .copied(), + 1, + ); + } + OutboundProbeEvent::Response { peer, .. } => { + self.swarm + .behaviour_mut() + .connection_limits + .remove_from_incoming_allow_list(&peer, Some(1)); + } + OutboundProbeEvent::Error { peer, .. } => { + if let Some(peer) = peer { + self.swarm + .behaviour_mut() + .connection_limits + .remove_from_incoming_allow_list(&peer, Some(1)); + } + } + } + } + AutonatEvent::StatusChanged { old, new } => { + debug!(?old, ?new, "Public address status changed."); - // TODO: Remove block once https://github.com/libp2p/rust-libp2p/issues/4863 is resolved - if let (NatStatus::Public(old_address), NatStatus::Private) = (old, new) { - self.swarm.remove_external_address(&old_address); - // Trigger potential mode change manually - self.swarm.behaviour_mut().kademlia.set_mode(None); + // TODO: Remove block once https://github.com/libp2p/rust-libp2p/issues/4863 is resolved + if let (NatStatus::Public(old_address), NatStatus::Private) = (old, new) { + self.swarm.remove_external_address(&old_address); + // Trigger potential mode change manually + self.swarm.behaviour_mut().kademlia.set_mode(None); - let connected_peers = self.swarm.connected_peers().copied().collect::>(); - self.swarm.behaviour_mut().identify.push(connected_peers); + let connected_peers = self.swarm.connected_peers().copied().collect::>(); + self.swarm.behaviour_mut().identify.push(connected_peers); + } } } } diff --git a/crates/subspace-networking/src/protocols.rs b/crates/subspace-networking/src/protocols.rs index ec72d2dd89..aeec941298 100644 --- a/crates/subspace-networking/src/protocols.rs +++ b/crates/subspace-networking/src/protocols.rs @@ -2,3 +2,4 @@ pub(crate) mod connected_peers; pub mod peer_info; pub mod request_response; pub(crate) mod reserved_peers; +pub(crate) mod subspace_connection_limits; diff --git a/crates/subspace-networking/src/protocols/subspace_connection_limits.rs b/crates/subspace-networking/src/protocols/subspace_connection_limits.rs new file mode 100644 index 0000000000..8684a0e18c --- /dev/null +++ b/crates/subspace-networking/src/protocols/subspace_connection_limits.rs @@ -0,0 +1,191 @@ +use libp2p::connection_limits::{Behaviour as ConnectionLimitsBehaviour, ConnectionLimits}; +use libp2p::core::Endpoint; +use libp2p::multiaddr::Protocol; +use libp2p::swarm::{ + ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, + THandlerOutEvent, ToSwarm, +}; +use libp2p::{Multiaddr, PeerId}; +use std::collections::hash_map::Entry; +use std::collections::{HashMap, HashSet}; +use std::net::IpAddr; +use std::task::{Context, Poll}; + +// TODO: Upstream these capabilities +pub(crate) struct Behaviour { + inner: ConnectionLimitsBehaviour, + /// For every peer ID store both their expected IP addresses as well as number of incoming connection attempts + /// allowed before this allow list entry no longer has an effect + incoming_allow_list: HashMap, usize)>, + /// For every peer ID store number of outgoing connection attempts allowed before this allow list entry no longer + /// has an effect + outgoing_allow_list: HashMap, +} + +impl Behaviour { + pub(crate) fn new(limits: ConnectionLimits) -> Self { + Self { + inner: ConnectionLimitsBehaviour::new(limits), + incoming_allow_list: HashMap::default(), + outgoing_allow_list: HashMap::default(), + } + } + + /// Add to allow list some attempts of incoming connections from specified peer ID that will bypass global limits + pub(crate) fn add_to_incoming_allow_list( + &mut self, + peer: PeerId, + ip_addresses: IpAddresses, + add_attempts: usize, + ) where + IpAddresses: Iterator, + { + match self.incoming_allow_list.entry(peer) { + Entry::Occupied(mut entry) => { + let (existing_ip_addresses, attempts) = entry.get_mut(); + existing_ip_addresses.extend(ip_addresses); + *attempts = attempts.saturating_add(add_attempts); + } + Entry::Vacant(entry) => { + entry.insert((ip_addresses.collect(), add_attempts)); + } + } + } + + /// Remove some (or all) attempts of incoming connections from specified peer ID + pub(crate) fn remove_from_incoming_allow_list( + &mut self, + peer: &PeerId, + remove_attempts: Option, + ) { + if let Some(remove_attempts) = remove_attempts { + if let Some((_ip_addresses, attempts)) = self.incoming_allow_list.get_mut(peer) { + *attempts = attempts.saturating_sub(remove_attempts); + + if *attempts == 0 { + self.incoming_allow_list.remove(peer); + } + } + } else { + self.incoming_allow_list.remove(peer); + } + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = ::ConnectionHandler; + type ToSwarm = ::ToSwarm; + + fn handle_pending_inbound_connection( + &mut self, + connection_id: ConnectionId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result<(), ConnectionDenied> { + // PeerId is not yet known at this point, so we check against IP address instead + if let Some(ip_address) = remote_addr.iter().find_map(|protocol| match protocol { + Protocol::Ip4(ip) => Some(IpAddr::V4(ip)), + Protocol::Ip6(ip) => Some(IpAddr::V6(ip)), + _ => None, + }) { + if self + .incoming_allow_list + .values() + .any(|(ip_addresses, _attempts)| ip_addresses.contains(&ip_address)) + { + return Ok(()); + } + } + + self.inner + .handle_pending_inbound_connection(connection_id, local_addr, remote_addr) + } + + fn handle_established_inbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + if let Some((_ip_addresses, attempts)) = self.incoming_allow_list.get_mut(&peer) { + *attempts -= 1; + + if *attempts == 0 { + self.incoming_allow_list.remove(&peer); + } + + return Ok(Self::ConnectionHandler {}); + } + + self.inner.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + ) + } + + fn handle_pending_outbound_connection( + &mut self, + connection_id: ConnectionId, + maybe_peer: Option, + addresses: &[Multiaddr], + effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + if let Some(peer) = &maybe_peer { + if self.incoming_allow_list.contains_key(peer) { + return Ok(Vec::new()); + } + } + + self.inner.handle_pending_outbound_connection( + connection_id, + maybe_peer, + addresses, + effective_role, + ) + } + + fn handle_established_outbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, + ) -> Result, ConnectionDenied> { + if let Some(attempts) = self.outgoing_allow_list.get_mut(&peer) { + *attempts -= 1; + + if *attempts == 0 { + self.outgoing_allow_list.remove(&peer); + } + + return Ok(Self::ConnectionHandler {}); + } + + self.inner + .handle_established_outbound_connection(connection_id, peer, addr, role_override) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + self.inner.on_swarm_event(event) + } + + fn on_connection_handler_event( + &mut self, + id: PeerId, + connection_id: ConnectionId, + event: THandlerOutEvent, + ) { + self.inner + .on_connection_handler_event(id, connection_id, event) + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + self.inner.poll(cx) + } +}