From 16cd1cf5434b25ba182cedf151899e203202cc9d Mon Sep 17 00:00:00 2001 From: Jeff Biseda Date: Wed, 4 Jan 2023 14:54:19 -0800 Subject: [PATCH] require repair request signature, ping/pong for Testnet, Development clusters (#29351) (cherry picked from commit 832302485e61cc0f359b91f6c8f0c383caaf6629) # Conflicts: # core/src/serve_repair.rs --- core/src/ancestor_hashes_service.rs | 80 ++++++++++++++++++++++++++++- core/src/serve_repair.rs | 40 +++++++++++++-- 2 files changed, 115 insertions(+), 5 deletions(-) diff --git a/core/src/ancestor_hashes_service.rs b/core/src/ancestor_hashes_service.rs index a040e23cb4f7ee..4844ac73c90486 100644 --- a/core/src/ancestor_hashes_service.rs +++ b/core/src/ancestor_hashes_service.rs @@ -773,7 +773,7 @@ mod test { cluster_info::{ClusterInfo, Node}, contact_info::ContactInfo, }, - solana_ledger::{blockstore::make_many_slot_entries, get_tmp_ledger_path}, + solana_ledger::{blockstore::make_many_slot_entries, get_tmp_ledger_path, shred::Nonce}, solana_runtime::{accounts_background_service::AbsRequestSender, bank_forks::BankForks}, solana_sdk::{ hash::Hash, @@ -1132,6 +1132,26 @@ mod test { replay_blockstore_components } + fn send_ancestor_repair_request( + requester_serve_repair: &ServeRepair, + requester_cluster_info: &ClusterInfo, + responder_info: &ContactInfo, + ancestor_hashes_request_socket: &UdpSocket, + dead_slot: Slot, + nonce: Nonce, + ) { + let request_bytes = requester_serve_repair.ancestor_repair_request_bytes( + &requester_cluster_info.keypair(), + &responder_info.id, + dead_slot, + nonce, + ); + if let Ok(request_bytes) = request_bytes { + let _ = + ancestor_hashes_request_socket.send_to(&request_bytes, responder_info.serve_repair); + } + } + #[test] fn test_ancestor_hashes_service_initiate_ancestor_hashes_requests_for_duplicate_slot() { let dead_slot = MAX_ANCESTOR_RESPONSES as Slot; @@ -1180,6 +1200,35 @@ mod test { ); assert!(ancestor_hashes_request_statuses.is_empty()); + // Send a request to generate a ping + send_ancestor_repair_request( + &requester_serve_repair, + &requester_cluster_info, + responder_info, + &ancestor_hashes_request_socket, + dead_slot, + /*nonce*/ 123, + ); + // Should have received valid response + let mut response_packet = response_receiver + .recv_timeout(Duration::from_millis(10_000)) + .unwrap(); + let packet = &mut response_packet[0]; + packet + .meta_mut() + .set_socket_addr(&responder_info.serve_repair); + let decision = AncestorHashesService::verify_and_process_ancestor_response( + packet, + &ancestor_hashes_request_statuses, + &mut AncestorHashesResponsesStats::default(), + &outstanding_requests, + &requester_blockstore, + &requester_cluster_info.keypair(), + &ancestor_hashes_request_socket, + ); + // should have processed a ping packet + assert_eq!(decision, None); + // Add the responder to the eligible list for requests let responder_id = responder_info.id; cluster_slots.insert_node_id(dead_slot, responder_id); @@ -1529,6 +1578,35 @@ mod test { cluster_slots.insert_node_id(dead_slot, responder_id); requester_cluster_info.insert_info(responder_info.clone()); + // Send a request to generate a ping + send_ancestor_repair_request( + &requester_serve_repair, + requester_cluster_info, + responder_info, + &ancestor_hashes_request_socket, + dead_slot, + /*nonce*/ 123, + ); + // Should have received valid response + let mut response_packet = response_receiver + .recv_timeout(Duration::from_millis(10_000)) + .unwrap(); + let packet = &mut response_packet[0]; + packet + .meta_mut() + .set_socket_addr(&responder_info.serve_repair); + let decision = AncestorHashesService::verify_and_process_ancestor_response( + packet, + &ancestor_hashes_request_statuses, + &mut AncestorHashesResponsesStats::default(), + &outstanding_requests, + &requester_blockstore, + &requester_cluster_info.keypair(), + &ancestor_hashes_request_socket, + ); + // Should have processed a ping packet + assert_eq!(decision, None); + // Simulate getting duplicate confirmed dead slot ancestor_hashes_replay_update_sender .send(AncestorHashesReplayUpdate::DeadDuplicateConfirmed( diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index a11f48dd1a4995..99243856dff3ea 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -152,7 +152,6 @@ impl RequestResponse for AncestorHashesRepairType { #[derive(Default)] struct ServeRepairStats { total_requests: usize, - unsigned_requests: usize, dropped_requests_outbound_bandwidth: usize, dropped_requests_load_shed: usize, dropped_requests_low_stake: usize, @@ -451,9 +450,19 @@ impl ServeRepair { let epoch_staked_nodes = root_bank.epoch_staked_nodes(root_bank.epoch()); let identity_keypair = self.cluster_info.keypair().clone(); let my_id = identity_keypair.pubkey(); + let cluster_type = root_bank.cluster_type(); +<<<<<<< HEAD let max_buffered_packets = if root_bank.cluster_type() != ClusterType::MainnetBeta { 2 * MAX_REQUESTS_PER_ITERATION +======= + let max_buffered_packets = if cluster_type != ClusterType::MainnetBeta { + if self.repair_whitelist.read().unwrap().len() > 0 { + 4 * MAX_REQUESTS_PER_ITERATION + } else { + 2 * MAX_REQUESTS_PER_ITERATION + } +>>>>>>> 832302485 (require repair request signature, ping/pong for Testnet, Development clusters (#29351)) } else { MAX_REQUESTS_PER_ITERATION }; @@ -482,11 +491,30 @@ impl ServeRepair { } }; +<<<<<<< HEAD let from_addr = packet.meta.socket_addr(); if !ContactInfo::is_valid_address(&from_addr, &socket_addr_space) { stats.err_malformed += 1; continue; } +======= + match cluster_type { + ClusterType::Testnet | ClusterType::Development => { + if !Self::verify_signed_packet(&my_id, packet, &request, stats) { + continue; + } + } + ClusterType::MainnetBeta | ClusterType::Devnet => { + // collect stats for signature verification + let _ = Self::verify_signed_packet(&my_id, packet, &request, stats); + } + } + + if request.sender() == &my_id { + stats.self_repair += 1; + continue; + } +>>>>>>> 832302485 (require repair request signature, ping/pong for Testnet, Development clusters (#29351)) if request.supports_signature() { // collect stats for signature verification @@ -527,6 +555,7 @@ impl ServeRepair { response_sender, stats, data_budget, + cluster_type, ); Ok(()) @@ -545,7 +574,6 @@ impl ServeRepair { datapoint_info!( "serve_repair-requests_received", ("total_requests", stats.total_requests, i64), - ("unsigned_requests", stats.unsigned_requests, i64), ( "dropped_requests_outbound_bandwidth", stats.dropped_requests_outbound_bandwidth, @@ -669,6 +697,7 @@ impl ServeRepair { .unwrap() } + #[must_use] fn verify_signed_packet( my_id: &Pubkey, packet: &Packet, @@ -683,7 +712,6 @@ impl ServeRepair { | RepairProtocol::LegacyHighestWindowIndexWithNonce(_, _, _, _) | RepairProtocol::LegacyOrphanWithNonce(_, _, _) | RepairProtocol::LegacyAncestorHashes(_, _, _) => { - debug_assert!(false); // expecting only signed request types stats.err_unsigned += 1; return false; } @@ -779,6 +807,7 @@ impl ServeRepair { response_sender: &PacketBatchSender, stats: &mut ServeRepairStats, data_budget: &DataBudget, + cluster_type: ClusterType, ) { let identity_keypair = self.cluster_info.keypair().clone(); let mut pending_pings = Vec::default(); @@ -792,8 +821,11 @@ impl ServeRepair { pending_pings.push(ping_pkt); } if !check { - // collect stats for ping/pong verification stats.ping_cache_check_failed += 1; + match cluster_type { + ClusterType::Testnet | ClusterType::Development => continue, + ClusterType::MainnetBeta | ClusterType::Devnet => (), + } } } stats.processed += 1;