Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

limit repairs to top staked requests in batch #28673

Merged
merged 2 commits into from
Nov 17, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 93 additions & 59 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use {
solana_runtime::bank_forks::BankForks,
solana_sdk::{
clock::Slot,
genesis_config::ClusterType,
hash::{Hash, HASH_BYTES},
packet::PACKET_DATA_SIZE,
pubkey::{Pubkey, PUBKEY_BYTES},
Expand All @@ -44,7 +45,7 @@ use {
streamer::{PacketBatchReceiver, PacketBatchSender},
},
std::{
collections::{HashMap, HashSet},
collections::HashSet,
net::{SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -153,6 +154,7 @@ struct ServeRepairStats {
unsigned_requests: usize,
dropped_requests_outbound_bandwidth: usize,
dropped_requests_load_shed: usize,
dropped_requests_low_stake: usize,
total_dropped_response_packets: usize,
total_response_packets: usize,
total_response_bytes_staked: usize,
Expand All @@ -168,6 +170,7 @@ struct ServeRepairStats {
ancestor_hashes: usize,
ping_cache_check_failed: usize,
pings_sent: usize,
decode_time_us: u64,
err_time_skew: usize,
err_malformed: usize,
err_sig_verify: usize,
Expand Down Expand Up @@ -442,10 +445,22 @@ impl ServeRepair {
const MAX_REQUESTS_PER_ITERATION: usize = 1024;
let mut total_requests = reqs_v[0].len();

let socket_addr_space = *self.cluster_info.socket_addr_space();
let root_bank = self.bank_forks.read().unwrap().root_bank();
let epoch_staked_nodes = root_bank.epoch_staked_nodes(root_bank.epoch());
let identity_keypair = self.cluster_info.keypair().clone();
let my_id = identity_keypair.pubkey();

let max_buffered_packets = if root_bank.cluster_type() == ClusterType::Testnet {
2 * MAX_REQUESTS_PER_ITERATION
} else {
MAX_REQUESTS_PER_ITERATION
};

let mut dropped_requests = 0;
while let Ok(more) = requests_receiver.try_recv() {
total_requests += more.len();
if total_requests > MAX_REQUESTS_PER_ITERATION {
if total_requests > max_buffered_packets {
dropped_requests += more.len();
} else {
reqs_v.push(more);
Expand All @@ -455,20 +470,72 @@ impl ServeRepair {
stats.dropped_requests_load_shed += dropped_requests;
stats.total_requests += total_requests;

let root_bank = self.bank_forks.read().unwrap().root_bank();
let epoch_staked_nodes = root_bank.epoch_staked_nodes(root_bank.epoch());
for reqs in reqs_v {
self.handle_packets(
ping_cache,
recycler,
blockstore,
reqs,
response_sender,
stats,
data_budget,
&epoch_staked_nodes,
);
let decode_start = Instant::now();
let mut decoded_reqs = Vec::default();
for batch in reqs_v {
for packet in &batch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe instead of nested for loops and double indentation something like:

for packet in reqs_v.iter().flatten() {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

let request: RepairProtocol = match packet.deserialize_slice(..) {
Ok(request) => request,
Err(_) => {
stats.err_malformed += 1;
continue;
}
};

let from_addr = packet.meta.socket_addr();
if !ContactInfo::is_valid_address(&from_addr, &socket_addr_space) {
stats.err_malformed += 1;
continue;
}

if request.supports_signature() {
// collect stats for signature verification
Self::verify_signed_packet(&my_id, packet, &request, stats);
} else {
stats.unsigned_requests += 1;
}

if request.sender() == &my_id {
stats.self_repair += 1;
continue;
}

let stake = match epoch_staked_nodes
.as_ref()
.map(|nodes| nodes.get(request.sender()))
.unwrap_or_default()
{
Some(stake) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if epoch_staked_nodes includes any zeros, probably not.
Nevertheless I guess it wouldn't hurt to be explicit here and don't rely on that presumption.
Something like:

let stake = epoch_staked_nodes
  .as_ref()
  .and_then(|stakes| staked.get(request.sender()))
  .unwrap_or_default();
if stake == 0 {
   // ...
} else {
   //...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

stats.handle_requests_staked += 1;
*stake
}
None => {
stats.handle_requests_unstaked += 1;
0
}
};

decoded_reqs.push((request, from_addr, stake));
}
}
stats.decode_time_us += decode_start.elapsed().as_micros() as u64;

if decoded_reqs.len() > MAX_REQUESTS_PER_ITERATION {
stats.dropped_requests_low_stake += decoded_reqs.len() - MAX_REQUESTS_PER_ITERATION;
decoded_reqs.sort_by(|(_, _, s1), (_, _, s2)| s2.cmp(s1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decode_reqs.sort_by_key(|(_, _, stake)| Reverse(*stake))

https://doc.rust-lang.org/std/cmp/struct.Reverse.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

decoded_reqs.truncate(MAX_REQUESTS_PER_ITERATION);
}

self.handle_packets(
ping_cache,
recycler,
blockstore,
decoded_reqs,
response_sender,
stats,
data_budget,
);

Ok(())
}

Expand Down Expand Up @@ -496,6 +563,11 @@ impl ServeRepair {
stats.dropped_requests_load_shed,
i64
),
(
"dropped_requests_low_stake",
stats.dropped_requests_low_stake,
i64
),
(
"total_dropped_response_packets",
stats.total_dropped_response_packets,
Expand Down Expand Up @@ -539,6 +611,7 @@ impl ServeRepair {
i64
),
("pings_sent", stats.pings_sent, i64),
("decode_time_us", stats.decode_time_us, i64),
("err_time_skew", stats.err_time_skew, i64),
("err_malformed", stats.err_malformed, i64),
("err_sig_verify", stats.err_sig_verify, i64),
Expand Down Expand Up @@ -709,54 +782,16 @@ impl ServeRepair {
ping_cache: &mut PingCache,
recycler: &PacketBatchRecycler,
blockstore: &Blockstore,
packet_batch: PacketBatch,
requests: Vec<(RepairProtocol, SocketAddr, /*stake*/ u64)>,
response_sender: &PacketBatchSender,
stats: &mut ServeRepairStats,
data_budget: &DataBudget,
epoch_staked_nodes: &Option<Arc<HashMap<Pubkey, u64>>>,
) {
let identity_keypair = self.cluster_info.keypair().clone();
let my_id = identity_keypair.pubkey();
let socket_addr_space = *self.cluster_info.socket_addr_space();
let mut pending_pings = Vec::default();

// iter over the packets
for (i, packet) in packet_batch.iter().enumerate() {
let request: RepairProtocol = match packet.deserialize_slice(..) {
Ok(request) => request,
Err(_) => {
stats.err_malformed += 1;
continue;
}
};

let from_addr = packet.meta.socket_addr();
if !ContactInfo::is_valid_address(&from_addr, &socket_addr_space) {
stats.err_malformed += 1;
continue;
}

let staked = epoch_staked_nodes
.as_ref()
.map(|nodes| nodes.contains_key(request.sender()))
.unwrap_or_default();
match staked {
true => stats.handle_requests_staked += 1,
false => stats.handle_requests_unstaked += 1,
}

if request.sender() == &my_id {
stats.self_repair += 1;
continue;
}

if request.supports_signature() {
// collect stats for signature verification
Self::verify_signed_packet(&my_id, packet, &request, stats);
} else {
stats.unsigned_requests += 1;
}

let requests_len = requests.len();
for (i, (request, from_addr, stake)) in requests.into_iter().enumerate() {
if !matches!(&request, RepairProtocol::Pong(_)) {
let (check, ping_pkt) =
Self::check_ping_cache(ping_cache, &request, &from_addr, &identity_keypair);
Expand All @@ -768,7 +803,6 @@ impl ServeRepair {
stats.ping_cache_check_failed += 1;
}
}

stats.processed += 1;
let rsp = match Self::handle_repair(
recycler, &from_addr, blockstore, request, stats, ping_cache,
Expand All @@ -780,12 +814,12 @@ impl ServeRepair {
let num_response_bytes = rsp.iter().map(|p| p.meta.size).sum();
if data_budget.take(num_response_bytes) && response_sender.send(rsp).is_ok() {
stats.total_response_packets += num_response_packets;
match staked {
match stake > 0 {
true => stats.total_response_bytes_staked += num_response_bytes,
false => stats.total_response_bytes_unstaked += num_response_bytes,
}
} else {
stats.dropped_requests_outbound_bandwidth += packet_batch.len() - i;
stats.dropped_requests_outbound_bandwidth += requests_len - i;
stats.total_dropped_response_packets += num_response_packets;
break;
}
Expand Down