Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix connected peers bombarding peers with connection attempts #2208

Merged
merged 2 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ where
peer_id,
endpoint,
num_established,
cause,
..
} => {
let shared = match self.shared_weak.upgrade() {
Expand All @@ -522,7 +523,10 @@ where
return;
}
};
debug!("Connection closed with peer {peer_id} [{num_established} from peer]");
debug!(
?cause,
"Connection closed with peer {peer_id} [{num_established} from peer]"
);

// TODO: Workaround for https://github.com/libp2p/rust-libp2p/discussions/3418
{
Expand Down
49 changes: 38 additions & 11 deletions crates/subspace-networking/src/protocols/connected_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ use tracing::{debug, trace};
/// Represents different states of a peer permanent connection.
#[derive(Debug, Clone, PartialEq, Eq)]
enum ConnectionState {
/// Indicates that a connection is expected to be established to this peer.
Preparing { peer_address: Multiaddr },

/// Indicates that a connection attempt to a peer is in progress.
Connecting { peer_address: Multiaddr },

Expand All @@ -76,7 +79,7 @@ impl ConnectionState {
/// Returns active connection ID if any.
fn connection_id(&self) -> Option<ConnectionId> {
match self {
ConnectionState::Connecting { .. } => None,
ConnectionState::Preparing { .. } | ConnectionState::Connecting { .. } => None,
ConnectionState::Deciding { connection_id } => Some(*connection_id),
ConnectionState::Permanent { connection_id } => Some(*connection_id),
ConnectionState::NotInterested => None,
Expand All @@ -86,6 +89,7 @@ impl ConnectionState {
/// Converts [`ConnectionState`] to a string with information loss.
fn stringify(&self) -> String {
match self {
ConnectionState::Preparing { .. } => "ToConnect".to_string(),
ConnectionState::Connecting { .. } => "Connecting".to_string(),
ConnectionState::Deciding { .. } => "Deciding".to_string(),
ConnectionState::Permanent { .. } => "Permanent".to_string(),
Expand Down Expand Up @@ -199,7 +203,7 @@ impl<Instance> Behaviour<Instance> {
let default_keep_alive_until = KeepAlive::Until(default_until);
let keep_alive = if let Some(connection_state) = self.known_peers.get_mut(peer_id) {
match connection_state {
ConnectionState::Connecting { .. } => {
ConnectionState::Preparing { .. } | ConnectionState::Connecting { .. } => {
// Connection attempt was successful.
*connection_state = ConnectionState::Deciding { connection_id };

Expand Down Expand Up @@ -393,7 +397,7 @@ impl<Instance: 'static + Send> NetworkBehaviour for Behaviour<Instance> {
.unwrap_or(false);

if known_connection_closed {
trace!(%peer_id, ?connection_id, "Known connection closed.");
trace!(%peer_id, ?connection_id, "Known connection closed");
self.known_peers.remove(&peer_id);
self.wake();
}
Expand All @@ -403,12 +407,17 @@ impl<Instance: 'static + Send> NetworkBehaviour for Behaviour<Instance> {
let old_peer_decision = self.known_peers.remove(&peer_id);

if old_peer_decision.is_some() {
trace!(%peer_id, ?old_peer_decision, ?connection_id, "Known peer disconnected.");
trace!(
%peer_id,
?old_peer_decision,
?connection_id,
"Known peer disconnected"
);
self.wake();
}
};
}
FromSwarm::DialFailure(DialFailure { peer_id, .. }) => {
FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
if let Some(peer_id) = peer_id {
let other_connections = self
.known_peers
Expand All @@ -419,7 +428,12 @@ impl<Instance: 'static + Send> NetworkBehaviour for Behaviour<Instance> {
let old_peer_decision = self.known_peers.remove(&peer_id);

if old_peer_decision.is_some() {
debug!(%peer_id, ?old_peer_decision, "Dialing error to known peer.");
debug!(
%peer_id,
?old_peer_decision,
?error,
"Dialing error to known peer"
);
}
}

Expand Down Expand Up @@ -464,17 +478,24 @@ impl<Instance: 'static + Send> NetworkBehaviour for Behaviour<Instance> {
// Check decision statuses.
for (peer_id, connection_state) in self.known_peers.iter_mut() {
match connection_state {
ConnectionState::Connecting {
ConnectionState::Preparing {
peer_address: address,
} => {
debug!(%peer_id, "Dialing a new peer.");
debug!(%peer_id, "Dialing a new peer");

let dial_opts = DialOpts::peer_id(*peer_id).addresses(vec![address.clone()]);

*connection_state = ConnectionState::Connecting {
peer_address: address.clone(),
};

return Poll::Ready(ToSwarm::Dial {
opts: dial_opts.build(),
});
}
ConnectionState::Connecting { .. } => {
// Waiting for connection to be established.
}
ConnectionState::Deciding { .. } => {
// The decision time is limited by the connection timeout.
}
Expand All @@ -492,7 +513,7 @@ impl<Instance: 'static + Send> NetworkBehaviour for Behaviour<Instance> {

// Request new peer addresses.
if self.peer_cache.is_empty() {
trace!("Requesting new peers for connected-peers protocol....");
trace!("Requesting new peers for connected-peers protocol...");

return Poll::Ready(ToSwarm::GenerateEvent(
Event::NewDialingCandidatesRequested(PhantomData),
Expand All @@ -508,7 +529,9 @@ impl<Instance: 'static + Send> NetworkBehaviour for Behaviour<Instance> {

for (peer_id, address) in peer_addresses {
self.known_peers.entry(peer_id).or_insert_with(|| {
ConnectionState::Connecting {
cx.waker().wake_by_ref();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder, will it work while we process the current poll()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be VERY surprised if it didn't. This is how futures work and how this should work as well.


ConnectionState::Preparing {
peer_address: address,
}
});
Expand All @@ -525,7 +548,11 @@ impl<Instance: 'static + Send> NetworkBehaviour for Behaviour<Instance> {

let stats = self.gather_stats();

debug!(instance=%self.config.log_target, ?stats, "Connected peers protocol statistics.");
debug!(
instance = %self.config.log_target,
?stats,
"Connected peers protocol statistics",
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl ConnectionHandler for Handler {
match error {
ConnectionHandlerUpgrErr::NegotiationFailed
| ConnectionHandlerUpgrErr::Apply(..) => {
debug!("Peer-info protocol dial upgrade failed.");
debug!(?error, "Peer-info protocol dial upgrade failed");
}
e => {
self.error = Some(PeerInfoError::Other { error: Box::new(e) });
Expand Down
Loading