From cdcd65c7c1b8dbb025787e8887ac36976f2eef14 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Sun, 31 May 2020 09:14:52 -0700 Subject: [PATCH 1/2] More cluster info metrics for push request/response counts --- core/src/cluster_info.rs | 38 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index f100093d661606..ed34c3f6bc013b 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -214,6 +214,7 @@ struct GossipStats { repair_peers: Counter, new_push_requests: Counter, new_push_requests2: Counter, + new_push_requests_num: Counter, process_pull_response: Counter, process_pull_response_count: Counter, process_pull_response_len: Counter, @@ -236,6 +237,7 @@ struct GossipStats { skip_push_message_shred_version: Counter, push_message_count: Counter, push_message_value_count: Counter, + push_response_count: Counter, } pub struct ClusterInfo { @@ -1362,7 +1364,7 @@ impl ClusterInfo { let (_, push_messages) = self .time_gossip_write_lock("new_push_requests", &self.stats.new_push_requests) .new_push_messages(timestamp()); - push_messages + let messages: Vec<_> = push_messages .into_iter() .filter_map(|(peer, messages)| { let peer_label = CrdsValueLabel::ContactInfo(peer); @@ -1377,7 +1379,11 @@ impl ClusterInfo { .into_iter() .map(move |payload| (peer, Protocol::PushMessage(self_id, payload))) }) - .collect() + .collect(); + self.stats + .new_push_requests_num + .add_relaxed(messages.len() as u64); + messages } fn gossip_request(&self, stakes: &HashMap) -> Vec<(SocketAddr, Protocol)> { @@ -1891,6 +1897,9 @@ impl ClusterInfo { return None; } let mut packets = to_packets_with_destination(recycler.clone(), &rsp); + me.stats + .push_response_count + .add_relaxed(packets.packets.len() as u64); if !packets.is_empty() { let pushes: Vec<_> = me.new_push_requests(); inc_new_counter_debug!("cluster_info-push_message-pushes", pushes.len()); @@ -1982,6 +1991,11 @@ impl ClusterInfo { ), ("all_tvu_peers", self.stats.all_tvu_peers.clear(), i64), ("tvu_peers", self.stats.tvu_peers.clear(), i64), + ( + "new_push_requests_num", + self.stats.new_push_requests2.clear(), + i64 + ), ); datapoint_info!( "cluster_info_stats2", @@ -2008,6 +2022,26 @@ impl ClusterInfo { self.stats.process_pull_response_count.clear(), i64 ), + ( + "process_pull_resp_success", + self.stats.process_pull_response_success.clear(), + i64 + ), + ( + "process_pull_resp_timeout", + self.stats.process_pull_response_timeout.clear(), + i64 + ), + ( + "process_pull_resp_fail", + self.stats.process_pull_response_fail.clear(), + i64 + ), + ( + "push_response_count", + self.stats.push_response_count.clear(), + i64 + ), ); datapoint_info!( "cluster_info_stats3", From 68f3142d25bc0a4dcd16a2763e75671780b8c27b Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Sun, 31 May 2020 13:03:30 -0700 Subject: [PATCH 2/2] Cache staked peers for the epoch --- core/src/retransmit_stage.rs | 88 ++++++++++++++++++++++++++++++++---- 1 file changed, 80 insertions(+), 8 deletions(-) diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index f3c0fbac816bda..10234141853bce 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -3,6 +3,7 @@ use crate::{ cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}, cluster_slots::ClusterSlots, + contact_info::ContactInfo, repair_service::DuplicateSlotsResetSender, repair_service::RepairInfo, result::{Error, Result}, @@ -18,8 +19,9 @@ use solana_ledger::{ use solana_measure::measure::Measure; use solana_metrics::inc_new_counter_error; use solana_perf::packet::Packets; -use solana_sdk::clock::Slot; +use solana_sdk::clock::{Epoch, Slot}; use solana_sdk::epoch_schedule::EpochSchedule; +use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; use solana_streamer::streamer::PacketReceiver; use std::{ @@ -44,6 +46,8 @@ struct RetransmitStats { total_packets: AtomicU64, total_batches: AtomicU64, total_time: AtomicU64, + epoch_fetch: AtomicU64, + epoch_cache_update: AtomicU64, repair_total: AtomicU64, discard_total: AtomicU64, retransmit_total: AtomicU64, @@ -65,6 +69,8 @@ fn update_retransmit_stats( peers_len: usize, packets_by_slot: HashMap, packets_by_source: HashMap, + epoch_fetch: u64, + epoch_cach_update: u64, ) { stats.total_time.fetch_add(total_time, Ordering::Relaxed); stats @@ -83,6 +89,10 @@ fn update_retransmit_stats( .compute_turbine_peers_total .fetch_add(compute_turbine_peers_total, Ordering::Relaxed); stats.total_batches.fetch_add(1, Ordering::Relaxed); + stats.epoch_fetch.fetch_add(epoch_fetch, Ordering::Relaxed); + stats + .epoch_cache_update + .fetch_add(epoch_cach_update, Ordering::Relaxed); { let mut stats_packets_by_slot = stats.packets_by_slot.lock().unwrap(); for (slot, count) in packets_by_slot { @@ -107,6 +117,16 @@ fn update_retransmit_stats( stats.total_time.swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "epoch_fetch", + stats.epoch_fetch.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "epoch_cache_update", + stats.epoch_cache_update.swap(0, Ordering::Relaxed) as i64, + i64 + ), ( "total_batches", stats.total_batches.swap(0, Ordering::Relaxed) as i64, @@ -148,6 +168,14 @@ fn update_retransmit_stats( } } +#[derive(Default)] +struct EpochStakesCache { + epoch: Epoch, + stakes: Option>>, + peers: Vec, + stakes_and_index: Vec<(u64, usize)>, +} + fn retransmit( bank_forks: &Arc>, leader_schedule_cache: &Arc, @@ -156,6 +184,8 @@ fn retransmit( sock: &UdpSocket, id: u32, stats: &Arc, + epoch_stakes_cache: &Arc>, + last_peer_update: &Arc, ) -> Result<()> { let timer = Duration::new(1, 0); let r_lock = r.lock().unwrap(); @@ -172,12 +202,42 @@ fn retransmit( } drop(r_lock); + let mut epoch_fetch = Measure::start("retransmit_epoch_fetch"); let r_bank = bank_forks.read().unwrap().working_bank(); let bank_epoch = r_bank.get_leader_schedule_epoch(r_bank.slot()); + epoch_fetch.stop(); + + let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update"); + let mut r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap(); + if r_epoch_stakes_cache.epoch != bank_epoch { + drop(r_epoch_stakes_cache); + let mut w_epoch_stakes_cache = epoch_stakes_cache.write().unwrap(); + if w_epoch_stakes_cache.epoch != bank_epoch { + let stakes = staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch); + let stakes = stakes.map(Arc::new); + w_epoch_stakes_cache.stakes = stakes; + w_epoch_stakes_cache.epoch = bank_epoch; + } + drop(w_epoch_stakes_cache); + r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap(); + } + + let now = timestamp(); + let last = last_peer_update.load(Ordering::Relaxed); + if now - last > 1000 && last_peer_update.compare_and_swap(last, now, Ordering::Relaxed) == last + { + drop(r_epoch_stakes_cache); + let mut w_epoch_stakes_cache = epoch_stakes_cache.write().unwrap(); + let (peers, stakes_and_index) = + cluster_info.sorted_retransmit_peers_and_stakes(w_epoch_stakes_cache.stakes.clone()); + w_epoch_stakes_cache.peers = peers; + w_epoch_stakes_cache.stakes_and_index = stakes_and_index; + drop(w_epoch_stakes_cache); + r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap(); + } let mut peers_len = 0; - let stakes = staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch); - let stakes = stakes.map(Arc::new); - let (peers, stakes_and_index) = cluster_info.sorted_retransmit_peers_and_stakes(stakes); + epoch_cache_update.stop(); + let my_id = cluster_info.id(); let mut discard_total = 0; let mut repair_total = 0; @@ -202,8 +262,8 @@ fn retransmit( let mut compute_turbine_peers = Measure::start("turbine_start"); let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index( &my_id, - &peers, - &stakes_and_index, + &r_epoch_stakes_cache.peers, + &r_epoch_stakes_cache.stakes_and_index, packet.meta.seed, ); peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len()); @@ -216,8 +276,14 @@ fn retransmit( let (neighbors, children) = compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, indexes); - let neighbors: Vec<_> = neighbors.into_iter().map(|index| &peers[index]).collect(); - let children: Vec<_> = children.into_iter().map(|index| &peers[index]).collect(); + let neighbors: Vec<_> = neighbors + .into_iter() + .map(|index| &r_epoch_stakes_cache.peers[index]) + .collect(); + let children: Vec<_> = children + .into_iter() + .map(|index| &r_epoch_stakes_cache.peers[index]) + .collect(); compute_turbine_peers.stop(); compute_turbine_peers_total += compute_turbine_peers.as_us(); @@ -258,6 +324,8 @@ fn retransmit( peers_len, packets_by_slot, packets_by_source, + epoch_fetch.as_us(), + epoch_cache_update.as_us(), ); Ok(()) @@ -287,6 +355,8 @@ pub fn retransmitter( let r = r.clone(); let cluster_info = cluster_info.clone(); let stats = stats.clone(); + let epoch_stakes_cache = Arc::new(RwLock::new(EpochStakesCache::default())); + let last_peer_update = Arc::new(AtomicU64::new(0)); Builder::new() .name("solana-retransmitter".to_string()) @@ -301,6 +371,8 @@ pub fn retransmitter( &sockets[s], s as u32, &stats, + &epoch_stakes_cache, + &last_peer_update, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,