Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

require repair request signature, ping/pong for Testnet, Development clusters #29351

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 79 additions & 1 deletion core/src/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ mod test {
cluster_info::{ClusterInfo, Node},
contact_info::ContactInfo,
},
solana_ledger::{blockstore::make_many_slot_entries, get_tmp_ledger_path},
solana_ledger::{blockstore::make_many_slot_entries, get_tmp_ledger_path, shred::Nonce},
solana_runtime::{accounts_background_service::AbsRequestSender, bank_forks::BankForks},
solana_sdk::{
hash::Hash,
Expand Down Expand Up @@ -1141,6 +1141,26 @@ mod test {
replay_blockstore_components
}

fn send_ancestor_repair_request(
requester_serve_repair: &ServeRepair,
requester_cluster_info: &ClusterInfo,
responder_info: &ContactInfo,
ancestor_hashes_request_socket: &UdpSocket,
dead_slot: Slot,
nonce: Nonce,
) {
let request_bytes = requester_serve_repair.ancestor_repair_request_bytes(
&requester_cluster_info.keypair(),
&responder_info.id,
dead_slot,
nonce,
);
if let Ok(request_bytes) = request_bytes {
let _ =
ancestor_hashes_request_socket.send_to(&request_bytes, responder_info.serve_repair);
}
}

#[test]
fn test_ancestor_hashes_service_initiate_ancestor_hashes_requests_for_duplicate_slot() {
let dead_slot = MAX_ANCESTOR_RESPONSES as Slot;
Expand Down Expand Up @@ -1189,6 +1209,35 @@ mod test {
);
assert!(ancestor_hashes_request_statuses.is_empty());

// Send a request to generate a ping
send_ancestor_repair_request(
&requester_serve_repair,
&requester_cluster_info,
responder_info,
&ancestor_hashes_request_socket,
dead_slot,
/*nonce*/ 123,
);
// Should have received valid response
let mut response_packet = response_receiver
.recv_timeout(Duration::from_millis(10_000))
.unwrap();
let packet = &mut response_packet[0];
packet
.meta_mut()
.set_socket_addr(&responder_info.serve_repair);
let decision = AncestorHashesService::verify_and_process_ancestor_response(
packet,
&ancestor_hashes_request_statuses,
&mut AncestorHashesResponsesStats::default(),
&outstanding_requests,
&requester_blockstore,
&requester_cluster_info.keypair(),
&ancestor_hashes_request_socket,
);
// should have processed a ping packet
assert_eq!(decision, None);

// Add the responder to the eligible list for requests
let responder_id = responder_info.id;
cluster_slots.insert_node_id(dead_slot, responder_id);
Expand Down Expand Up @@ -1541,6 +1590,35 @@ mod test {
cluster_slots.insert_node_id(dead_slot, responder_id);
requester_cluster_info.insert_info(responder_info.clone());

// Send a request to generate a ping
send_ancestor_repair_request(
&requester_serve_repair,
requester_cluster_info,
responder_info,
&ancestor_hashes_request_socket,
dead_slot,
/*nonce*/ 123,
);
// Should have received valid response
let mut response_packet = response_receiver
.recv_timeout(Duration::from_millis(10_000))
.unwrap();
let packet = &mut response_packet[0];
packet
.meta_mut()
.set_socket_addr(&responder_info.serve_repair);
let decision = AncestorHashesService::verify_and_process_ancestor_response(
packet,
&ancestor_hashes_request_statuses,
&mut AncestorHashesResponsesStats::default(),
&outstanding_requests,
&requester_blockstore,
&requester_cluster_info.keypair(),
&ancestor_hashes_request_socket,
);
// Should have processed a ping packet
assert_eq!(decision, None);

// Simulate getting duplicate confirmed dead slot
ancestor_hashes_replay_update_sender
.send(AncestorHashesReplayUpdate::DeadDuplicateConfirmed(
Expand Down
29 changes: 19 additions & 10 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ impl RequestResponse for AncestorHashesRepairType {
#[derive(Default)]
struct ServeRepairStats {
total_requests: usize,
unsigned_requests: usize,
dropped_requests_outbound_bandwidth: usize,
dropped_requests_load_shed: usize,
dropped_requests_low_stake: usize,
Expand Down Expand Up @@ -468,8 +467,9 @@ impl ServeRepair {
let epoch_staked_nodes = root_bank.epoch_staked_nodes(root_bank.epoch());
let identity_keypair = self.cluster_info.keypair().clone();
let my_id = identity_keypair.pubkey();
let cluster_type = root_bank.cluster_type();

let max_buffered_packets = if root_bank.cluster_type() != ClusterType::MainnetBeta {
let max_buffered_packets = if cluster_type != ClusterType::MainnetBeta {
if self.repair_whitelist.read().unwrap().len() > 0 {
4 * MAX_REQUESTS_PER_ITERATION
} else {
Expand Down Expand Up @@ -512,11 +512,16 @@ impl ServeRepair {
continue;
}

if request.supports_signature() {
// collect stats for signature verification
Self::verify_signed_packet(&my_id, packet, &request, stats);
} else {
stats.unsigned_requests += 1;
match cluster_type {
ClusterType::Testnet | ClusterType::Development => {
if !Self::verify_signed_packet(&my_id, packet, &request, stats) {
continue;
}
}
ClusterType::MainnetBeta | ClusterType::Devnet => {
// collect stats for signature verification
let _ = Self::verify_signed_packet(&my_id, packet, &request, stats);
}
}

if request.sender() == &my_id {
Expand Down Expand Up @@ -564,6 +569,7 @@ impl ServeRepair {
response_sender,
stats,
data_budget,
cluster_type,
);

Ok(())
Expand All @@ -582,7 +588,6 @@ impl ServeRepair {
datapoint_info!(
"serve_repair-requests_received",
("total_requests", stats.total_requests, i64),
("unsigned_requests", stats.unsigned_requests, i64),
(
"dropped_requests_outbound_bandwidth",
stats.dropped_requests_outbound_bandwidth,
Expand Down Expand Up @@ -707,6 +712,7 @@ impl ServeRepair {
.unwrap()
}

#[must_use]
fn verify_signed_packet(
my_id: &Pubkey,
packet: &Packet,
Expand All @@ -721,7 +727,6 @@ impl ServeRepair {
| RepairProtocol::LegacyHighestWindowIndexWithNonce(_, _, _, _)
| RepairProtocol::LegacyOrphanWithNonce(_, _, _)
| RepairProtocol::LegacyAncestorHashes(_, _, _) => {
debug_assert!(false); // expecting only signed request types
stats.err_unsigned += 1;
return false;
}
Expand Down Expand Up @@ -817,6 +822,7 @@ impl ServeRepair {
response_sender: &PacketBatchSender,
stats: &mut ServeRepairStats,
data_budget: &DataBudget,
cluster_type: ClusterType,
) {
let identity_keypair = self.cluster_info.keypair().clone();
let mut pending_pings = Vec::default();
Expand All @@ -839,8 +845,11 @@ impl ServeRepair {
pending_pings.push(ping_pkt);
}
if !check {
// collect stats for ping/pong verification
stats.ping_cache_check_failed += 1;
match cluster_type {
ClusterType::Testnet | ClusterType::Development => continue,
ClusterType::MainnetBeta | ClusterType::Devnet => (),
}
}
}
stats.processed += 1;
Expand Down