Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Evict inactive peers from SyncingEngine #13829

Merged
merged 7 commits into from
Apr 21, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ use std::{
Arc,
},
task::Poll,
time::{Duration, Instant},
};

/// Interval at which we perform time based maintenance
Expand All @@ -75,12 +76,19 @@ const TICK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1100)
/// Maximum number of known block hashes to keep for a peer.
const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead

/// If the block announces stream to peer has been inactive for two minutes meaning local node
/// has not sent or received block announcements to/from the peer, report the node for inactivity,
/// disconnect it and attempt to establish connection to some other peer.
const INACTIVITY_EVICT_THRESHOLD: Duration = Duration::from_secs(2 * 60);

mod rep {
use sc_peerset::ReputationChange as Rep;
/// Peer has different genesis.
pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch");
/// Peer send us a block announcement that failed at validation.
pub const BAD_BLOCK_ANNOUNCEMENT: Rep = Rep::new(-(1 << 12), "Bad block announcement");
/// Block announce substream with the peer has been inactive too long
pub const INACTIVE_SUBSTREAM: Rep = Rep::new(-(1 << 10), "Inactive block announce substream");
}

struct Metrics {
Expand Down Expand Up @@ -160,6 +168,10 @@ pub struct Peer<B: BlockT> {
pub known_blocks: LruHashSet<B::Hash>,
/// Notification sink.
sink: NotificationsSink,
/// Instant when the last notification was sent to peer.
last_notification_sent: Instant,
/// Instant when the last notification was received from peer.
last_notification_received: Instant,
}

pub struct SyncingEngine<B: BlockT, Client> {
Expand Down Expand Up @@ -200,6 +212,9 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// All connected peers. Contains both full and light node peers.
peers: HashMap<PeerId, Peer<B>>,

/// Evicted peers
evicted: HashSet<PeerId>,

/// List of nodes for which we perform additional logging because they are important for the
/// user.
important_peers: HashSet<PeerId>,
Expand Down Expand Up @@ -344,6 +359,7 @@ where
chain_sync,
network_service,
peers: HashMap::new(),
evicted: HashSet::new(),
block_announce_data_cache: LruCache::new(cache_capacity),
block_announce_protocol_name,
num_connected: num_connected.clone(),
Expand Down Expand Up @@ -507,6 +523,7 @@ where
},
};
peer.known_blocks.insert(hash);
peer.last_notification_received = Instant::now();

if peer.info.roles.is_full() {
let is_best = match announce.state.unwrap_or(BlockState::Best) {
Expand Down Expand Up @@ -557,6 +574,7 @@ where
data: Some(data.clone()),
};

peer.last_notification_sent = Instant::now();
peer.sink.send_sync_notification(message.encode());
}
}
Expand Down Expand Up @@ -590,6 +608,33 @@ where
self.tick_timeout.reset(TICK_TIMEOUT);
}

// go over all connected peers and check if any of them have been idle for a while. Idle in
// this case means that we haven't sent or received block announcements to/from this peer.
// If that is the case, because of #5685, it could be that the block announces substream is
// not actually open and and this peer is just wasting a slot and is should be replaced with
// some other node that is willing to send us block announcements.
for (id, peer) in self.peers.iter() {
altonen marked this conversation as resolved.
Show resolved Hide resolved
// because of a delay between disconnecting a peer in `SyncingEngine` and getting the
// response back from `Protocol`, a peer might be reported and disconnect multiple
// times. To prevent this from happening (until the underlying issue is fixed), keep
// track of evicted peers and report and disconnect them only once.
if self.evicted.contains(id) {
altonen marked this conversation as resolved.
Show resolved Hide resolved
continue
}

let last_received_late =
peer.last_notification_received.elapsed() > INACTIVITY_EVICT_THRESHOLD;
let last_sent_late = peer.last_notification_sent.elapsed() > INACTIVITY_EVICT_THRESHOLD;
altonen marked this conversation as resolved.
Show resolved Hide resolved

if last_received_late && last_sent_late {
log::debug!(target: "sync", "evict peer {id} since it has been idling for too long");
self.network_service.report_peer(*id, rep::INACTIVE_SUBSTREAM);
self.network_service
.disconnect_peer(*id, self.block_announce_protocol_name.clone());
self.evicted.insert(*id);
}
}

while let Poll::Ready(Some(event)) = self.service_rx.poll_next_unpin(cx) {
match event {
ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
Expand Down Expand Up @@ -683,6 +728,7 @@ where
},
},
sc_network::SyncEvent::NotificationStreamClosed { remote } => {
self.evicted.remove(&remote);
if self.on_sync_peer_disconnected(remote).is_err() {
log::trace!(
target: "sync",
Expand Down Expand Up @@ -835,6 +881,8 @@ where
NonZeroUsize::new(MAX_KNOWN_BLOCKS).expect("Constant is nonzero"),
),
sink,
last_notification_sent: Instant::now(),
last_notification_received: Instant::now(),
};

let req = if peer.info.roles.is_full() {
Expand Down