Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

More cluster stats and add epoch stakes cache in retransmit stage #10345

Merged
merged 2 commits into from
Jun 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ struct GossipStats {
repair_peers: Counter,
new_push_requests: Counter,
new_push_requests2: Counter,
new_push_requests_num: Counter,
process_pull_response: Counter,
process_pull_response_count: Counter,
process_pull_response_len: Counter,
Expand All @@ -236,6 +237,7 @@ struct GossipStats {
skip_push_message_shred_version: Counter,
push_message_count: Counter,
push_message_value_count: Counter,
push_response_count: Counter,
}

pub struct ClusterInfo {
Expand Down Expand Up @@ -1362,7 +1364,7 @@ impl ClusterInfo {
let (_, push_messages) = self
.time_gossip_write_lock("new_push_requests", &self.stats.new_push_requests)
.new_push_messages(timestamp());
push_messages
let messages: Vec<_> = push_messages
.into_iter()
.filter_map(|(peer, messages)| {
let peer_label = CrdsValueLabel::ContactInfo(peer);
Expand All @@ -1377,7 +1379,11 @@ impl ClusterInfo {
.into_iter()
.map(move |payload| (peer, Protocol::PushMessage(self_id, payload)))
})
.collect()
.collect();
self.stats
.new_push_requests_num
.add_relaxed(messages.len() as u64);
messages
}

fn gossip_request(&self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
Expand Down Expand Up @@ -1891,6 +1897,9 @@ impl ClusterInfo {
return None;
}
let mut packets = to_packets_with_destination(recycler.clone(), &rsp);
me.stats
.push_response_count
.add_relaxed(packets.packets.len() as u64);
if !packets.is_empty() {
let pushes: Vec<_> = me.new_push_requests();
inc_new_counter_debug!("cluster_info-push_message-pushes", pushes.len());
Expand Down Expand Up @@ -1982,6 +1991,11 @@ impl ClusterInfo {
),
("all_tvu_peers", self.stats.all_tvu_peers.clear(), i64),
("tvu_peers", self.stats.tvu_peers.clear(), i64),
(
"new_push_requests_num",
self.stats.new_push_requests2.clear(),
i64
),
);
datapoint_info!(
"cluster_info_stats2",
Expand All @@ -2008,6 +2022,26 @@ impl ClusterInfo {
self.stats.process_pull_response_count.clear(),
i64
),
(
"process_pull_resp_success",
self.stats.process_pull_response_success.clear(),
i64
),
(
"process_pull_resp_timeout",
self.stats.process_pull_response_timeout.clear(),
i64
),
(
"process_pull_resp_fail",
self.stats.process_pull_response_fail.clear(),
i64
),
(
"push_response_count",
self.stats.push_response_count.clear(),
i64
),
);
datapoint_info!(
"cluster_info_stats3",
Expand Down
88 changes: 80 additions & 8 deletions core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate::{
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
cluster_slots::ClusterSlots,
contact_info::ContactInfo,
repair_service::DuplicateSlotsResetSender,
repair_service::RepairInfo,
result::{Error, Result},
Expand All @@ -18,8 +19,9 @@ use solana_ledger::{
use solana_measure::measure::Measure;
use solana_metrics::inc_new_counter_error;
use solana_perf::packet::Packets;
use solana_sdk::clock::Slot;
use solana_sdk::clock::{Epoch, Slot};
use solana_sdk::epoch_schedule::EpochSchedule;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::timestamp;
use solana_streamer::streamer::PacketReceiver;
use std::{
Expand All @@ -44,6 +46,8 @@ struct RetransmitStats {
total_packets: AtomicU64,
total_batches: AtomicU64,
total_time: AtomicU64,
epoch_fetch: AtomicU64,
epoch_cache_update: AtomicU64,
repair_total: AtomicU64,
discard_total: AtomicU64,
retransmit_total: AtomicU64,
Expand All @@ -65,6 +69,8 @@ fn update_retransmit_stats(
peers_len: usize,
packets_by_slot: HashMap<Slot, usize>,
packets_by_source: HashMap<String, usize>,
epoch_fetch: u64,
epoch_cach_update: u64,
) {
stats.total_time.fetch_add(total_time, Ordering::Relaxed);
stats
Expand All @@ -83,6 +89,10 @@ fn update_retransmit_stats(
.compute_turbine_peers_total
.fetch_add(compute_turbine_peers_total, Ordering::Relaxed);
stats.total_batches.fetch_add(1, Ordering::Relaxed);
stats.epoch_fetch.fetch_add(epoch_fetch, Ordering::Relaxed);
stats
.epoch_cache_update
.fetch_add(epoch_cach_update, Ordering::Relaxed);
{
let mut stats_packets_by_slot = stats.packets_by_slot.lock().unwrap();
for (slot, count) in packets_by_slot {
Expand All @@ -107,6 +117,16 @@ fn update_retransmit_stats(
stats.total_time.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"epoch_fetch",
stats.epoch_fetch.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"epoch_cache_update",
stats.epoch_cache_update.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"total_batches",
stats.total_batches.swap(0, Ordering::Relaxed) as i64,
Expand Down Expand Up @@ -148,6 +168,14 @@ fn update_retransmit_stats(
}
}

#[derive(Default)]
struct EpochStakesCache {
epoch: Epoch,
stakes: Option<Arc<HashMap<Pubkey, u64>>>,
peers: Vec<ContactInfo>,
stakes_and_index: Vec<(u64, usize)>,
}

fn retransmit(
bank_forks: &Arc<RwLock<BankForks>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
Expand All @@ -156,6 +184,8 @@ fn retransmit(
sock: &UdpSocket,
id: u32,
stats: &Arc<RetransmitStats>,
epoch_stakes_cache: &Arc<RwLock<EpochStakesCache>>,
last_peer_update: &Arc<AtomicU64>,
) -> Result<()> {
let timer = Duration::new(1, 0);
let r_lock = r.lock().unwrap();
Expand All @@ -172,12 +202,42 @@ fn retransmit(
}
drop(r_lock);

let mut epoch_fetch = Measure::start("retransmit_epoch_fetch");
let r_bank = bank_forks.read().unwrap().working_bank();
let bank_epoch = r_bank.get_leader_schedule_epoch(r_bank.slot());
epoch_fetch.stop();

let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update");
let mut r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap();
if r_epoch_stakes_cache.epoch != bank_epoch {
drop(r_epoch_stakes_cache);
let mut w_epoch_stakes_cache = epoch_stakes_cache.write().unwrap();
if w_epoch_stakes_cache.epoch != bank_epoch {
let stakes = staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch);
let stakes = stakes.map(Arc::new);
w_epoch_stakes_cache.stakes = stakes;
w_epoch_stakes_cache.epoch = bank_epoch;
}
drop(w_epoch_stakes_cache);
r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap();
}

let now = timestamp();
let last = last_peer_update.load(Ordering::Relaxed);
if now - last > 1000 && last_peer_update.compare_and_swap(last, now, Ordering::Relaxed) == last
{
drop(r_epoch_stakes_cache);
let mut w_epoch_stakes_cache = epoch_stakes_cache.write().unwrap();
let (peers, stakes_and_index) =
cluster_info.sorted_retransmit_peers_and_stakes(w_epoch_stakes_cache.stakes.clone());
w_epoch_stakes_cache.peers = peers;
w_epoch_stakes_cache.stakes_and_index = stakes_and_index;
drop(w_epoch_stakes_cache);
r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap();
}
let mut peers_len = 0;
let stakes = staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch);
let stakes = stakes.map(Arc::new);
let (peers, stakes_and_index) = cluster_info.sorted_retransmit_peers_and_stakes(stakes);
epoch_cache_update.stop();

let my_id = cluster_info.id();
let mut discard_total = 0;
let mut repair_total = 0;
Expand All @@ -202,8 +262,8 @@ fn retransmit(
let mut compute_turbine_peers = Measure::start("turbine_start");
let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index(
&my_id,
&peers,
&stakes_and_index,
&r_epoch_stakes_cache.peers,
&r_epoch_stakes_cache.stakes_and_index,
packet.meta.seed,
);
peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len());
Expand All @@ -216,8 +276,14 @@ fn retransmit(

let (neighbors, children) =
compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, indexes);
let neighbors: Vec<_> = neighbors.into_iter().map(|index| &peers[index]).collect();
let children: Vec<_> = children.into_iter().map(|index| &peers[index]).collect();
let neighbors: Vec<_> = neighbors
.into_iter()
.map(|index| &r_epoch_stakes_cache.peers[index])
.collect();
let children: Vec<_> = children
.into_iter()
.map(|index| &r_epoch_stakes_cache.peers[index])
.collect();
compute_turbine_peers.stop();
compute_turbine_peers_total += compute_turbine_peers.as_us();

Expand Down Expand Up @@ -258,6 +324,8 @@ fn retransmit(
peers_len,
packets_by_slot,
packets_by_source,
epoch_fetch.as_us(),
epoch_cache_update.as_us(),
);

Ok(())
Expand Down Expand Up @@ -287,6 +355,8 @@ pub fn retransmitter(
let r = r.clone();
let cluster_info = cluster_info.clone();
let stats = stats.clone();
let epoch_stakes_cache = Arc::new(RwLock::new(EpochStakesCache::default()));
let last_peer_update = Arc::new(AtomicU64::new(0));

Builder::new()
.name("solana-retransmitter".to_string())
Expand All @@ -301,6 +371,8 @@ pub fn retransmitter(
&sockets[s],
s as u32,
&stats,
&epoch_stakes_cache,
&last_peer_update,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Expand Down