Skip to content

Commit

Permalink
Fix connected peers bombarding peers with connection attempts (#2208)
Browse files Browse the repository at this point in the history
* Tweak networking logs a bit

* Fix connected peers bombarding peers with connection attempts
  • Loading branch information
nazar-pc authored Nov 9, 2023
1 parent 53dca16 commit d4eba10
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 13 deletions.
6 changes: 5 additions & 1 deletion crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ where
peer_id,
endpoint,
num_established,
cause,
..
} => {
let shared = match self.shared_weak.upgrade() {
Expand All @@ -522,7 +523,10 @@ where
return;
}
};
debug!("Connection closed with peer {peer_id} [{num_established} from peer]");
debug!(
?cause,
"Connection closed with peer {peer_id} [{num_established} from peer]"
);

// TODO: Workaround for https://github.com/libp2p/rust-libp2p/discussions/3418
{
Expand Down
49 changes: 38 additions & 11 deletions crates/subspace-networking/src/protocols/connected_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ use tracing::{debug, trace};
/// Represents different states of a peer permanent connection.
#[derive(Debug, Clone, PartialEq, Eq)]
enum ConnectionState {
/// Indicates that a connection is expected to be established to this peer.
Preparing { peer_address: Multiaddr },

/// Indicates that a connection attempt to a peer is in progress.
Connecting { peer_address: Multiaddr },

Expand All @@ -76,7 +79,7 @@ impl ConnectionState {
/// Returns active connection ID if any.
fn connection_id(&self) -> Option<ConnectionId> {
match self {
ConnectionState::Connecting { .. } => None,
ConnectionState::Preparing { .. } | ConnectionState::Connecting { .. } => None,
ConnectionState::Deciding { connection_id } => Some(*connection_id),
ConnectionState::Permanent { connection_id } => Some(*connection_id),
ConnectionState::NotInterested => None,
Expand All @@ -86,6 +89,7 @@ impl ConnectionState {
/// Converts [`ConnectionState`] to a string with information loss.
fn stringify(&self) -> String {
match self {
ConnectionState::Preparing { .. } => "ToConnect".to_string(),
ConnectionState::Connecting { .. } => "Connecting".to_string(),
ConnectionState::Deciding { .. } => "Deciding".to_string(),
ConnectionState::Permanent { .. } => "Permanent".to_string(),
Expand Down Expand Up @@ -199,7 +203,7 @@ impl<Instance> Behaviour<Instance> {
let default_keep_alive_until = KeepAlive::Until(default_until);
let keep_alive = if let Some(connection_state) = self.known_peers.get_mut(peer_id) {
match connection_state {
ConnectionState::Connecting { .. } => {
ConnectionState::Preparing { .. } | ConnectionState::Connecting { .. } => {
// Connection attempt was successful.
*connection_state = ConnectionState::Deciding { connection_id };

Expand Down Expand Up @@ -393,7 +397,7 @@ impl<Instance: 'static + Send> NetworkBehaviour for Behaviour<Instance> {
.unwrap_or(false);

if known_connection_closed {
trace!(%peer_id, ?connection_id, "Known connection closed.");
trace!(%peer_id, ?connection_id, "Known connection closed");
self.known_peers.remove(&peer_id);
self.wake();
}
Expand All @@ -403,12 +407,17 @@ impl<Instance: 'static + Send> NetworkBehaviour for Behaviour<Instance> {
let old_peer_decision = self.known_peers.remove(&peer_id);

if old_peer_decision.is_some() {
trace!(%peer_id, ?old_peer_decision, ?connection_id, "Known peer disconnected.");
trace!(
%peer_id,
?old_peer_decision,
?connection_id,
"Known peer disconnected"
);
self.wake();
}
};
}
FromSwarm::DialFailure(DialFailure { peer_id, .. }) => {
FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
if let Some(peer_id) = peer_id {
let other_connections = self
.known_peers
Expand All @@ -419,7 +428,12 @@ impl<Instance: 'static + Send> NetworkBehaviour for Behaviour<Instance> {
let old_peer_decision = self.known_peers.remove(&peer_id);

if old_peer_decision.is_some() {
debug!(%peer_id, ?old_peer_decision, "Dialing error to known peer.");
debug!(
%peer_id,
?old_peer_decision,
?error,
"Dialing error to known peer"
);
}
}

Expand Down Expand Up @@ -464,17 +478,24 @@ impl<Instance: 'static + Send> NetworkBehaviour for Behaviour<Instance> {
// Check decision statuses.
for (peer_id, connection_state) in self.known_peers.iter_mut() {
match connection_state {
ConnectionState::Connecting {
ConnectionState::Preparing {
peer_address: address,
} => {
debug!(%peer_id, "Dialing a new peer.");
debug!(%peer_id, "Dialing a new peer");

let dial_opts = DialOpts::peer_id(*peer_id).addresses(vec![address.clone()]);

*connection_state = ConnectionState::Connecting {
peer_address: address.clone(),
};

return Poll::Ready(ToSwarm::Dial {
opts: dial_opts.build(),
});
}
ConnectionState::Connecting { .. } => {
// Waiting for connection to be established.
}
ConnectionState::Deciding { .. } => {
// The decision time is limited by the connection timeout.
}
Expand All @@ -492,7 +513,7 @@ impl<Instance: 'static + Send> NetworkBehaviour for Behaviour<Instance> {

// Request new peer addresses.
if self.peer_cache.is_empty() {
trace!("Requesting new peers for connected-peers protocol....");
trace!("Requesting new peers for connected-peers protocol...");

return Poll::Ready(ToSwarm::GenerateEvent(
Event::NewDialingCandidatesRequested(PhantomData),
Expand All @@ -508,7 +529,9 @@ impl<Instance: 'static + Send> NetworkBehaviour for Behaviour<Instance> {

for (peer_id, address) in peer_addresses {
self.known_peers.entry(peer_id).or_insert_with(|| {
ConnectionState::Connecting {
cx.waker().wake_by_ref();

ConnectionState::Preparing {
peer_address: address,
}
});
Expand All @@ -525,7 +548,11 @@ impl<Instance: 'static + Send> NetworkBehaviour for Behaviour<Instance> {

let stats = self.gather_stats();

debug!(instance=%self.config.log_target, ?stats, "Connected peers protocol statistics.");
debug!(
instance = %self.config.log_target,
?stats,
"Connected peers protocol statistics",
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl ConnectionHandler for Handler {
match error {
ConnectionHandlerUpgrErr::NegotiationFailed
| ConnectionHandlerUpgrErr::Apply(..) => {
debug!("Peer-info protocol dial upgrade failed.");
debug!(?error, "Peer-info protocol dial upgrade failed");
}
e => {
self.error = Some(PeerInfoError::Other { error: Box::new(e) });
Expand Down

0 comments on commit d4eba10

Please sign in to comment.