Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
More cluster stats and add epoch stakes cache in retransmit stage (#1…
Browse files Browse the repository at this point in the history
…0345)

* More cluster info metrics for push request/response counts

* Cache staked peers for the epoch
  • Loading branch information
sakridge authored Jun 1, 2020
1 parent 08ad7d1 commit ef37b82
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 10 deletions.
38 changes: 36 additions & 2 deletions core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ struct GossipStats {
repair_peers: Counter,
new_push_requests: Counter,
new_push_requests2: Counter,
new_push_requests_num: Counter,
process_pull_response: Counter,
process_pull_response_count: Counter,
process_pull_response_len: Counter,
Expand All @@ -236,6 +237,7 @@ struct GossipStats {
skip_push_message_shred_version: Counter,
push_message_count: Counter,
push_message_value_count: Counter,
push_response_count: Counter,
}

pub struct ClusterInfo {
Expand Down Expand Up @@ -1362,7 +1364,7 @@ impl ClusterInfo {
let (_, push_messages) = self
.time_gossip_write_lock("new_push_requests", &self.stats.new_push_requests)
.new_push_messages(timestamp());
push_messages
let messages: Vec<_> = push_messages
.into_iter()
.filter_map(|(peer, messages)| {
let peer_label = CrdsValueLabel::ContactInfo(peer);
Expand All @@ -1377,7 +1379,11 @@ impl ClusterInfo {
.into_iter()
.map(move |payload| (peer, Protocol::PushMessage(self_id, payload)))
})
.collect()
.collect();
self.stats
.new_push_requests_num
.add_relaxed(messages.len() as u64);
messages
}

fn gossip_request(&self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
Expand Down Expand Up @@ -1891,6 +1897,9 @@ impl ClusterInfo {
return None;
}
let mut packets = to_packets_with_destination(recycler.clone(), &rsp);
me.stats
.push_response_count
.add_relaxed(packets.packets.len() as u64);
if !packets.is_empty() {
let pushes: Vec<_> = me.new_push_requests();
inc_new_counter_debug!("cluster_info-push_message-pushes", pushes.len());
Expand Down Expand Up @@ -1982,6 +1991,11 @@ impl ClusterInfo {
),
("all_tvu_peers", self.stats.all_tvu_peers.clear(), i64),
("tvu_peers", self.stats.tvu_peers.clear(), i64),
(
"new_push_requests_num",
self.stats.new_push_requests2.clear(),
i64
),
);
datapoint_info!(
"cluster_info_stats2",
Expand All @@ -2008,6 +2022,26 @@ impl ClusterInfo {
self.stats.process_pull_response_count.clear(),
i64
),
(
"process_pull_resp_success",
self.stats.process_pull_response_success.clear(),
i64
),
(
"process_pull_resp_timeout",
self.stats.process_pull_response_timeout.clear(),
i64
),
(
"process_pull_resp_fail",
self.stats.process_pull_response_fail.clear(),
i64
),
(
"push_response_count",
self.stats.push_response_count.clear(),
i64
),
);
datapoint_info!(
"cluster_info_stats3",
Expand Down
88 changes: 80 additions & 8 deletions core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate::{
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
cluster_slots::ClusterSlots,
contact_info::ContactInfo,
repair_service::DuplicateSlotsResetSender,
repair_service::RepairInfo,
result::{Error, Result},
Expand All @@ -18,8 +19,9 @@ use solana_ledger::{
use solana_measure::measure::Measure;
use solana_metrics::inc_new_counter_error;
use solana_perf::packet::Packets;
use solana_sdk::clock::Slot;
use solana_sdk::clock::{Epoch, Slot};
use solana_sdk::epoch_schedule::EpochSchedule;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::timestamp;
use solana_streamer::streamer::PacketReceiver;
use std::{
Expand All @@ -44,6 +46,8 @@ struct RetransmitStats {
total_packets: AtomicU64,
total_batches: AtomicU64,
total_time: AtomicU64,
epoch_fetch: AtomicU64,
epoch_cache_update: AtomicU64,
repair_total: AtomicU64,
discard_total: AtomicU64,
retransmit_total: AtomicU64,
Expand All @@ -65,6 +69,8 @@ fn update_retransmit_stats(
peers_len: usize,
packets_by_slot: HashMap<Slot, usize>,
packets_by_source: HashMap<String, usize>,
epoch_fetch: u64,
epoch_cach_update: u64,
) {
stats.total_time.fetch_add(total_time, Ordering::Relaxed);
stats
Expand All @@ -83,6 +89,10 @@ fn update_retransmit_stats(
.compute_turbine_peers_total
.fetch_add(compute_turbine_peers_total, Ordering::Relaxed);
stats.total_batches.fetch_add(1, Ordering::Relaxed);
stats.epoch_fetch.fetch_add(epoch_fetch, Ordering::Relaxed);
stats
.epoch_cache_update
.fetch_add(epoch_cach_update, Ordering::Relaxed);
{
let mut stats_packets_by_slot = stats.packets_by_slot.lock().unwrap();
for (slot, count) in packets_by_slot {
Expand All @@ -107,6 +117,16 @@ fn update_retransmit_stats(
stats.total_time.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"epoch_fetch",
stats.epoch_fetch.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"epoch_cache_update",
stats.epoch_cache_update.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"total_batches",
stats.total_batches.swap(0, Ordering::Relaxed) as i64,
Expand Down Expand Up @@ -148,6 +168,14 @@ fn update_retransmit_stats(
}
}

#[derive(Default)]
struct EpochStakesCache {
epoch: Epoch,
stakes: Option<Arc<HashMap<Pubkey, u64>>>,
peers: Vec<ContactInfo>,
stakes_and_index: Vec<(u64, usize)>,
}

fn retransmit(
bank_forks: &Arc<RwLock<BankForks>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
Expand All @@ -156,6 +184,8 @@ fn retransmit(
sock: &UdpSocket,
id: u32,
stats: &Arc<RetransmitStats>,
epoch_stakes_cache: &Arc<RwLock<EpochStakesCache>>,
last_peer_update: &Arc<AtomicU64>,
) -> Result<()> {
let timer = Duration::new(1, 0);
let r_lock = r.lock().unwrap();
Expand All @@ -172,12 +202,42 @@ fn retransmit(
}
drop(r_lock);

let mut epoch_fetch = Measure::start("retransmit_epoch_fetch");
let r_bank = bank_forks.read().unwrap().working_bank();
let bank_epoch = r_bank.get_leader_schedule_epoch(r_bank.slot());
epoch_fetch.stop();

let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update");
let mut r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap();
if r_epoch_stakes_cache.epoch != bank_epoch {
drop(r_epoch_stakes_cache);
let mut w_epoch_stakes_cache = epoch_stakes_cache.write().unwrap();
if w_epoch_stakes_cache.epoch != bank_epoch {
let stakes = staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch);
let stakes = stakes.map(Arc::new);
w_epoch_stakes_cache.stakes = stakes;
w_epoch_stakes_cache.epoch = bank_epoch;
}
drop(w_epoch_stakes_cache);
r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap();
}

let now = timestamp();
let last = last_peer_update.load(Ordering::Relaxed);
if now - last > 1000 && last_peer_update.compare_and_swap(last, now, Ordering::Relaxed) == last
{
drop(r_epoch_stakes_cache);
let mut w_epoch_stakes_cache = epoch_stakes_cache.write().unwrap();
let (peers, stakes_and_index) =
cluster_info.sorted_retransmit_peers_and_stakes(w_epoch_stakes_cache.stakes.clone());
w_epoch_stakes_cache.peers = peers;
w_epoch_stakes_cache.stakes_and_index = stakes_and_index;
drop(w_epoch_stakes_cache);
r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap();
}
let mut peers_len = 0;
let stakes = staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch);
let stakes = stakes.map(Arc::new);
let (peers, stakes_and_index) = cluster_info.sorted_retransmit_peers_and_stakes(stakes);
epoch_cache_update.stop();

let my_id = cluster_info.id();
let mut discard_total = 0;
let mut repair_total = 0;
Expand All @@ -202,8 +262,8 @@ fn retransmit(
let mut compute_turbine_peers = Measure::start("turbine_start");
let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index(
&my_id,
&peers,
&stakes_and_index,
&r_epoch_stakes_cache.peers,
&r_epoch_stakes_cache.stakes_and_index,
packet.meta.seed,
);
peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len());
Expand All @@ -216,8 +276,14 @@ fn retransmit(

let (neighbors, children) =
compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, indexes);
let neighbors: Vec<_> = neighbors.into_iter().map(|index| &peers[index]).collect();
let children: Vec<_> = children.into_iter().map(|index| &peers[index]).collect();
let neighbors: Vec<_> = neighbors
.into_iter()
.map(|index| &r_epoch_stakes_cache.peers[index])
.collect();
let children: Vec<_> = children
.into_iter()
.map(|index| &r_epoch_stakes_cache.peers[index])
.collect();
compute_turbine_peers.stop();
compute_turbine_peers_total += compute_turbine_peers.as_us();

Expand Down Expand Up @@ -258,6 +324,8 @@ fn retransmit(
peers_len,
packets_by_slot,
packets_by_source,
epoch_fetch.as_us(),
epoch_cache_update.as_us(),
);

Ok(())
Expand Down Expand Up @@ -287,6 +355,8 @@ pub fn retransmitter(
let r = r.clone();
let cluster_info = cluster_info.clone();
let stats = stats.clone();
let epoch_stakes_cache = Arc::new(RwLock::new(EpochStakesCache::default()));
let last_peer_update = Arc::new(AtomicU64::new(0));

Builder::new()
.name("solana-retransmitter".to_string())
Expand All @@ -301,6 +371,8 @@ pub fn retransmitter(
&sockets[s],
s as u32,
&stats,
&epoch_stakes_cache,
&last_peer_update,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Expand Down

0 comments on commit ef37b82

Please sign in to comment.