Skip to content

Commit

Permalink
require repair request signature, ping/pong for Testnet, Development …
Browse files Browse the repository at this point in the history
…clusters (solana-labs#29351)
  • Loading branch information
jbiseda authored and gnapoli23 committed Jan 10, 2023
1 parent bc310cf commit 216b216
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 11 deletions.
80 changes: 79 additions & 1 deletion core/src/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ mod test {
cluster_info::{ClusterInfo, Node},
contact_info::ContactInfo,
},
solana_ledger::{blockstore::make_many_slot_entries, get_tmp_ledger_path},
solana_ledger::{blockstore::make_many_slot_entries, get_tmp_ledger_path, shred::Nonce},
solana_runtime::{accounts_background_service::AbsRequestSender, bank_forks::BankForks},
solana_sdk::{
hash::Hash,
Expand Down Expand Up @@ -1141,6 +1141,26 @@ mod test {
replay_blockstore_components
}

fn send_ancestor_repair_request(
requester_serve_repair: &ServeRepair,
requester_cluster_info: &ClusterInfo,
responder_info: &ContactInfo,
ancestor_hashes_request_socket: &UdpSocket,
dead_slot: Slot,
nonce: Nonce,
) {
let request_bytes = requester_serve_repair.ancestor_repair_request_bytes(
&requester_cluster_info.keypair(),
&responder_info.id,
dead_slot,
nonce,
);
if let Ok(request_bytes) = request_bytes {
let _ =
ancestor_hashes_request_socket.send_to(&request_bytes, responder_info.serve_repair);
}
}

#[test]
fn test_ancestor_hashes_service_initiate_ancestor_hashes_requests_for_duplicate_slot() {
let dead_slot = MAX_ANCESTOR_RESPONSES as Slot;
Expand Down Expand Up @@ -1189,6 +1209,35 @@ mod test {
);
assert!(ancestor_hashes_request_statuses.is_empty());

// Send a request to generate a ping
send_ancestor_repair_request(
&requester_serve_repair,
&requester_cluster_info,
responder_info,
&ancestor_hashes_request_socket,
dead_slot,
/*nonce*/ 123,
);
// Should have received valid response
let mut response_packet = response_receiver
.recv_timeout(Duration::from_millis(10_000))
.unwrap();
let packet = &mut response_packet[0];
packet
.meta_mut()
.set_socket_addr(&responder_info.serve_repair);
let decision = AncestorHashesService::verify_and_process_ancestor_response(
packet,
&ancestor_hashes_request_statuses,
&mut AncestorHashesResponsesStats::default(),
&outstanding_requests,
&requester_blockstore,
&requester_cluster_info.keypair(),
&ancestor_hashes_request_socket,
);
// should have processed a ping packet
assert_eq!(decision, None);

// Add the responder to the eligible list for requests
let responder_id = responder_info.id;
cluster_slots.insert_node_id(dead_slot, responder_id);
Expand Down Expand Up @@ -1541,6 +1590,35 @@ mod test {
cluster_slots.insert_node_id(dead_slot, responder_id);
requester_cluster_info.insert_info(responder_info.clone());

// Send a request to generate a ping
send_ancestor_repair_request(
&requester_serve_repair,
requester_cluster_info,
responder_info,
&ancestor_hashes_request_socket,
dead_slot,
/*nonce*/ 123,
);
// Should have received valid response
let mut response_packet = response_receiver
.recv_timeout(Duration::from_millis(10_000))
.unwrap();
let packet = &mut response_packet[0];
packet
.meta_mut()
.set_socket_addr(&responder_info.serve_repair);
let decision = AncestorHashesService::verify_and_process_ancestor_response(
packet,
&ancestor_hashes_request_statuses,
&mut AncestorHashesResponsesStats::default(),
&outstanding_requests,
&requester_blockstore,
&requester_cluster_info.keypair(),
&ancestor_hashes_request_socket,
);
// Should have processed a ping packet
assert_eq!(decision, None);

// Simulate getting duplicate confirmed dead slot
ancestor_hashes_replay_update_sender
.send(AncestorHashesReplayUpdate::DeadDuplicateConfirmed(
Expand Down
29 changes: 19 additions & 10 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ impl RequestResponse for AncestorHashesRepairType {
#[derive(Default)]
struct ServeRepairStats {
total_requests: usize,
unsigned_requests: usize,
dropped_requests_outbound_bandwidth: usize,
dropped_requests_load_shed: usize,
dropped_requests_low_stake: usize,
Expand Down Expand Up @@ -468,8 +467,9 @@ impl ServeRepair {
let epoch_staked_nodes = root_bank.epoch_staked_nodes(root_bank.epoch());
let identity_keypair = self.cluster_info.keypair().clone();
let my_id = identity_keypair.pubkey();
let cluster_type = root_bank.cluster_type();

let max_buffered_packets = if root_bank.cluster_type() != ClusterType::MainnetBeta {
let max_buffered_packets = if cluster_type != ClusterType::MainnetBeta {
if self.repair_whitelist.read().unwrap().len() > 0 {
4 * MAX_REQUESTS_PER_ITERATION
} else {
Expand Down Expand Up @@ -512,11 +512,16 @@ impl ServeRepair {
continue;
}

if request.supports_signature() {
// collect stats for signature verification
Self::verify_signed_packet(&my_id, packet, &request, stats);
} else {
stats.unsigned_requests += 1;
match cluster_type {
ClusterType::Testnet | ClusterType::Development => {
if !Self::verify_signed_packet(&my_id, packet, &request, stats) {
continue;
}
}
ClusterType::MainnetBeta | ClusterType::Devnet => {
// collect stats for signature verification
let _ = Self::verify_signed_packet(&my_id, packet, &request, stats);
}
}

if request.sender() == &my_id {
Expand Down Expand Up @@ -564,6 +569,7 @@ impl ServeRepair {
response_sender,
stats,
data_budget,
cluster_type,
);

Ok(())
Expand All @@ -582,7 +588,6 @@ impl ServeRepair {
datapoint_info!(
"serve_repair-requests_received",
("total_requests", stats.total_requests, i64),
("unsigned_requests", stats.unsigned_requests, i64),
(
"dropped_requests_outbound_bandwidth",
stats.dropped_requests_outbound_bandwidth,
Expand Down Expand Up @@ -707,6 +712,7 @@ impl ServeRepair {
.unwrap()
}

#[must_use]
fn verify_signed_packet(
my_id: &Pubkey,
packet: &Packet,
Expand All @@ -721,7 +727,6 @@ impl ServeRepair {
| RepairProtocol::LegacyHighestWindowIndexWithNonce(_, _, _, _)
| RepairProtocol::LegacyOrphanWithNonce(_, _, _)
| RepairProtocol::LegacyAncestorHashes(_, _, _) => {
debug_assert!(false); // expecting only signed request types
stats.err_unsigned += 1;
return false;
}
Expand Down Expand Up @@ -817,6 +822,7 @@ impl ServeRepair {
response_sender: &PacketBatchSender,
stats: &mut ServeRepairStats,
data_budget: &DataBudget,
cluster_type: ClusterType,
) {
let identity_keypair = self.cluster_info.keypair().clone();
let mut pending_pings = Vec::default();
Expand All @@ -839,8 +845,11 @@ impl ServeRepair {
pending_pings.push(ping_pkt);
}
if !check {
// collect stats for ping/pong verification
stats.ping_cache_check_failed += 1;
match cluster_type {
ClusterType::Testnet | ClusterType::Development => continue,
ClusterType::MainnetBeta | ClusterType::Devnet => (),
}
}
}
stats.processed += 1;
Expand Down

0 comments on commit 216b216

Please sign in to comment.