Skip to content

Commit

Permalink
Merge pull request #2235 from subspace/networking-tweaks
Browse files Browse the repository at this point in the history
Networking tweaks
  • Loading branch information
nazar-pc authored Nov 16, 2023
2 parents 165c319 + 8ec4cec commit 76e2cd9
Show file tree
Hide file tree
Showing 4 changed files with 286 additions and 65 deletions.
3 changes: 2 additions & 1 deletion crates/subspace-networking/src/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ use crate::protocols::request_response::request_response_factory::{
use crate::protocols::reserved_peers::{
Behaviour as ReservedPeersBehaviour, Config as ReservedPeersConfig, Event as ReservedPeersEvent,
};
use crate::protocols::subspace_connection_limits::Behaviour as ConnectionLimitsBehaviour;
use crate::PeerInfoProvider;
use derive_more::From;
use libp2p::allow_block_list::{Behaviour as AllowBlockListBehaviour, BlockedPeers};
use libp2p::autonat::{Behaviour as Autonat, Config as AutonatConfig, Event as AutonatEvent};
use libp2p::connection_limits::{Behaviour as ConnectionLimitsBehaviour, ConnectionLimits};
use libp2p::connection_limits::ConnectionLimits;
use libp2p::gossipsub::{
Behaviour as Gossipsub, Config as GossipsubConfig, Event as GossipsubEvent, MessageAuthenticity,
};
Expand Down
156 changes: 92 additions & 64 deletions crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use event_listener_primitives::HandlerId;
use futures::channel::mpsc;
use futures::future::Fuse;
use futures::{FutureExt, StreamExt};
use libp2p::autonat::{Event as AutonatEvent, NatStatus};
use libp2p::autonat::{Event as AutonatEvent, NatStatus, OutboundProbeEvent};
use libp2p::core::ConnectedPoint;
use libp2p::gossipsub::{Event as GossipsubEvent, TopicHash};
use libp2p::identify::Event as IdentifyEvent;
Expand All @@ -32,6 +32,7 @@ use libp2p::kad::{
InboundRequest, PeerRecord, ProgressStep, PutRecordOk, QueryId, QueryResult, Quorum, Record,
};
use libp2p::metrics::{Metrics, Recorder};
use libp2p::multiaddr::Protocol;
use libp2p::swarm::{DialError, SwarmEvent};
use libp2p::{futures, Multiaddr, PeerId, Swarm, TransportError};
use nohash_hasher::IntMap;
Expand All @@ -41,13 +42,14 @@ use rand::{Rng, SeedableRng};
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::net::IpAddr;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::time::Sleep;
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, trace, warn};

// Defines a batch size for peer addresses from Kademlia buckets.
const KADEMLIA_PEERS_ADDRESSES_BATCH_SIZE: usize = 20;
Expand Down Expand Up @@ -123,8 +125,8 @@ where
temporary_bans: Arc<Mutex<TemporaryBans>>,
/// Prometheus metrics.
metrics: Option<Metrics>,
/// Mapping from specific peer to number of established connections
established_connections: HashMap<(PeerId, ConnectedPoint), usize>,
/// Mapping from specific peer to ip addresses
peer_ip_addresses: HashMap<PeerId, HashSet<IpAddr>>,
/// Defines protocol version for the network peers. Affects network partition.
protocol_version: String,
/// Defines whether we maintain a persistent connection for common peers.
Expand Down Expand Up @@ -219,7 +221,7 @@ where
reserved_peers,
temporary_bans,
metrics,
established_connections: HashMap::new(),
peer_ip_addresses: HashMap::new(),
protocol_version,
general_connection_decision_handler,
special_connection_decision_handler,
Expand Down Expand Up @@ -454,13 +456,24 @@ where
"Connection established [{num_established} from peer]"
);

// TODO: Workaround for https://github.com/libp2p/rust-libp2p/discussions/3418
self.established_connections
.entry((peer_id, endpoint))
.and_modify(|entry| {
*entry += 1;
})
.or_insert(1);
let maybe_remote_ip =
endpoint
.get_remote_address()
.iter()
.find_map(|protocol| match protocol {
Protocol::Ip4(ip) => Some(IpAddr::V4(ip)),
Protocol::Ip6(ip) => Some(IpAddr::V6(ip)),
_ => None,
});
if let Some(ip) = maybe_remote_ip {
self.peer_ip_addresses
.entry(peer_id)
.and_modify(|ips| {
ips.insert(ip);
})
.or_insert(HashSet::from([ip]));
}

let num_established_peer_connections = shared
.num_established_peer_connections
.fetch_add(1, Ordering::SeqCst)
Expand All @@ -486,7 +499,6 @@ where
}
SwarmEvent::ConnectionClosed {
peer_id,
endpoint,
num_established,
cause,
..
Expand All @@ -502,28 +514,8 @@ where
"Connection closed with peer {peer_id} [{num_established} from peer]"
);

// TODO: Workaround for https://github.com/libp2p/rust-libp2p/discussions/3418
{
match self.established_connections.entry((peer_id, endpoint)) {
Entry::Vacant(_) => {
// Nothing to do here, we are not aware of the connection being closed
warn!(
?peer_id,
"Connection closed, but it is not known as open connection, \
this is likely a bug in libp2p: \
https://github.com/libp2p/rust-libp2p/discussions/3418"
);
return;
}
Entry::Occupied(mut entry) => {
let value = entry.get_mut();
if *value == 1 {
entry.remove_entry();
} else {
*value -= 1;
}
}
};
if num_established == 0 {
self.peer_ip_addresses.remove(&peer_id);
}
let num_established_peer_connections = shared
.num_established_peer_connections
Expand Down Expand Up @@ -551,24 +543,15 @@ where
}
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
if let Some(peer_id) = &peer_id {
// Create or extend temporary ban, but only if we are not offline
if let Some(shared) = self.shared_weak.upgrade() {
// One peer is possibly a node peer is connected to, hence expecting more
// than one for online status
let other_connections_exist = shared
.num_established_peer_connections
.load(Ordering::Relaxed)
> 1;
let should_ban_temporarily =
self.should_temporary_ban_on_dial_error(peer_id, &error);

trace!(%should_ban_temporarily, %other_connections_exist, "Temporary bans conditions.");

if other_connections_exist && should_ban_temporarily {
self.temporary_bans.lock().create_or_extend(peer_id);
debug!(%peer_id, ?error, "Peer was temporarily banned.");
}
};
let should_ban_temporarily =
self.should_temporary_ban_on_dial_error(peer_id, &error);

trace!(%should_ban_temporarily, "Temporary bans conditions.");

if should_ban_temporarily {
self.temporary_bans.lock().create_or_extend(peer_id);
debug!(%peer_id, ?error, "Peer was temporarily banned.");
}
}

debug!(
Expand Down Expand Up @@ -605,13 +588,13 @@ where
trace!(%address, "External address candidate");
}
SwarmEvent::ExternalAddrConfirmed { address } => {
info!(%address, "Confirmed external address");
debug!(%address, "Confirmed external address");

let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
self.swarm.behaviour_mut().identify.push(connected_peers);
}
SwarmEvent::ExternalAddrExpired { address } => {
info!(%address, "External address expired");
debug!(%address, "External address expired");

let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
self.swarm.behaviour_mut().identify.push(connected_peers);
Expand All @@ -623,6 +606,11 @@ where
}

fn should_temporary_ban_on_dial_error(&self, peer_id: &PeerId, error: &DialError) -> bool {
// TODO: Replace with banning of addresses rather peer IDs if this helps
if true {
return false;
}

// Ban temporarily only peers without active connections.
if self.swarm.is_connected(peer_id) {
return false;
Expand Down Expand Up @@ -1187,17 +1175,57 @@ where
);
}

if let AutonatEvent::StatusChanged { old, new } = event {
info!(?old, ?new, "Public address status changed.");
match event {
AutonatEvent::InboundProbe(_inbound_probe_event) => {
// We do not care about this event
}
AutonatEvent::OutboundProbe(outbound_probe_event) => {
match outbound_probe_event {
OutboundProbeEvent::Request { peer, .. } => {
// For outbound probe request add peer to allow list to ensure they can dial us back and not hit
// global incoming connection limit
self.swarm
.behaviour_mut()
.connection_limits
// We expect a single successful dial from this peer
.add_to_incoming_allow_list(
peer,
self.peer_ip_addresses
.get(&peer)
.iter()
.flat_map(|ip_addresses| ip_addresses.iter())
.copied(),
1,
);
}
OutboundProbeEvent::Response { peer, .. } => {
self.swarm
.behaviour_mut()
.connection_limits
.remove_from_incoming_allow_list(&peer, Some(1));
}
OutboundProbeEvent::Error { peer, .. } => {
if let Some(peer) = peer {
self.swarm
.behaviour_mut()
.connection_limits
.remove_from_incoming_allow_list(&peer, Some(1));
}
}
}
}
AutonatEvent::StatusChanged { old, new } => {
debug!(?old, ?new, "Public address status changed.");

// TODO: Remove block once https://github.com/libp2p/rust-libp2p/issues/4863 is resolved
if let (NatStatus::Public(old_address), NatStatus::Private) = (old, new) {
self.swarm.remove_external_address(&old_address);
// Trigger potential mode change manually
self.swarm.behaviour_mut().kademlia.set_mode(None);
// TODO: Remove block once https://github.com/libp2p/rust-libp2p/issues/4863 is resolved
if let (NatStatus::Public(old_address), NatStatus::Private) = (old, new) {
self.swarm.remove_external_address(&old_address);
// Trigger potential mode change manually
self.swarm.behaviour_mut().kademlia.set_mode(None);

let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
self.swarm.behaviour_mut().identify.push(connected_peers);
let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
self.swarm.behaviour_mut().identify.push(connected_peers);
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-networking/src/protocols.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pub(crate) mod connected_peers;
pub mod peer_info;
pub mod request_response;
pub(crate) mod reserved_peers;
pub(crate) mod subspace_connection_limits;
Loading

0 comments on commit 76e2cd9

Please sign in to comment.