Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
require repair request signature, ping/pong for Testnet, Development …
Browse files Browse the repository at this point in the history
…clusters (#29351)

(cherry picked from commit 8323024)

# Conflicts:
#	core/src/serve_repair.rs
  • Loading branch information
jbiseda authored and mergify[bot] committed Jan 4, 2023
1 parent 69c15ab commit 16cd1cf
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 5 deletions.
80 changes: 79 additions & 1 deletion core/src/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ mod test {
cluster_info::{ClusterInfo, Node},
contact_info::ContactInfo,
},
solana_ledger::{blockstore::make_many_slot_entries, get_tmp_ledger_path},
solana_ledger::{blockstore::make_many_slot_entries, get_tmp_ledger_path, shred::Nonce},
solana_runtime::{accounts_background_service::AbsRequestSender, bank_forks::BankForks},
solana_sdk::{
hash::Hash,
Expand Down Expand Up @@ -1132,6 +1132,26 @@ mod test {
replay_blockstore_components
}

fn send_ancestor_repair_request(
requester_serve_repair: &ServeRepair,
requester_cluster_info: &ClusterInfo,
responder_info: &ContactInfo,
ancestor_hashes_request_socket: &UdpSocket,
dead_slot: Slot,
nonce: Nonce,
) {
let request_bytes = requester_serve_repair.ancestor_repair_request_bytes(
&requester_cluster_info.keypair(),
&responder_info.id,
dead_slot,
nonce,
);
if let Ok(request_bytes) = request_bytes {
let _ =
ancestor_hashes_request_socket.send_to(&request_bytes, responder_info.serve_repair);
}
}

#[test]
fn test_ancestor_hashes_service_initiate_ancestor_hashes_requests_for_duplicate_slot() {
let dead_slot = MAX_ANCESTOR_RESPONSES as Slot;
Expand Down Expand Up @@ -1180,6 +1200,35 @@ mod test {
);
assert!(ancestor_hashes_request_statuses.is_empty());

// Send a request to generate a ping
send_ancestor_repair_request(
&requester_serve_repair,
&requester_cluster_info,
responder_info,
&ancestor_hashes_request_socket,
dead_slot,
/*nonce*/ 123,
);
// Should have received valid response
let mut response_packet = response_receiver
.recv_timeout(Duration::from_millis(10_000))
.unwrap();
let packet = &mut response_packet[0];
packet
.meta_mut()
.set_socket_addr(&responder_info.serve_repair);
let decision = AncestorHashesService::verify_and_process_ancestor_response(
packet,
&ancestor_hashes_request_statuses,
&mut AncestorHashesResponsesStats::default(),
&outstanding_requests,
&requester_blockstore,
&requester_cluster_info.keypair(),
&ancestor_hashes_request_socket,
);
// should have processed a ping packet
assert_eq!(decision, None);

// Add the responder to the eligible list for requests
let responder_id = responder_info.id;
cluster_slots.insert_node_id(dead_slot, responder_id);
Expand Down Expand Up @@ -1529,6 +1578,35 @@ mod test {
cluster_slots.insert_node_id(dead_slot, responder_id);
requester_cluster_info.insert_info(responder_info.clone());

// Send a request to generate a ping
send_ancestor_repair_request(
&requester_serve_repair,
requester_cluster_info,
responder_info,
&ancestor_hashes_request_socket,
dead_slot,
/*nonce*/ 123,
);
// Should have received valid response
let mut response_packet = response_receiver
.recv_timeout(Duration::from_millis(10_000))
.unwrap();
let packet = &mut response_packet[0];
packet
.meta_mut()
.set_socket_addr(&responder_info.serve_repair);
let decision = AncestorHashesService::verify_and_process_ancestor_response(
packet,
&ancestor_hashes_request_statuses,
&mut AncestorHashesResponsesStats::default(),
&outstanding_requests,
&requester_blockstore,
&requester_cluster_info.keypair(),
&ancestor_hashes_request_socket,
);
// Should have processed a ping packet
assert_eq!(decision, None);

// Simulate getting duplicate confirmed dead slot
ancestor_hashes_replay_update_sender
.send(AncestorHashesReplayUpdate::DeadDuplicateConfirmed(
Expand Down
40 changes: 36 additions & 4 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ impl RequestResponse for AncestorHashesRepairType {
#[derive(Default)]
struct ServeRepairStats {
total_requests: usize,
unsigned_requests: usize,
dropped_requests_outbound_bandwidth: usize,
dropped_requests_load_shed: usize,
dropped_requests_low_stake: usize,
Expand Down Expand Up @@ -451,9 +450,19 @@ impl ServeRepair {
let epoch_staked_nodes = root_bank.epoch_staked_nodes(root_bank.epoch());
let identity_keypair = self.cluster_info.keypair().clone();
let my_id = identity_keypair.pubkey();
let cluster_type = root_bank.cluster_type();

<<<<<<< HEAD
let max_buffered_packets = if root_bank.cluster_type() != ClusterType::MainnetBeta {
2 * MAX_REQUESTS_PER_ITERATION
=======
let max_buffered_packets = if cluster_type != ClusterType::MainnetBeta {
if self.repair_whitelist.read().unwrap().len() > 0 {
4 * MAX_REQUESTS_PER_ITERATION
} else {
2 * MAX_REQUESTS_PER_ITERATION
}
>>>>>>> 832302485 (require repair request signature, ping/pong for Testnet, Development clusters (#29351))
} else {
MAX_REQUESTS_PER_ITERATION
};
Expand Down Expand Up @@ -482,11 +491,30 @@ impl ServeRepair {
}
};

<<<<<<< HEAD
let from_addr = packet.meta.socket_addr();
if !ContactInfo::is_valid_address(&from_addr, &socket_addr_space) {
stats.err_malformed += 1;
continue;
}
=======
match cluster_type {
ClusterType::Testnet | ClusterType::Development => {
if !Self::verify_signed_packet(&my_id, packet, &request, stats) {
continue;
}
}
ClusterType::MainnetBeta | ClusterType::Devnet => {
// collect stats for signature verification
let _ = Self::verify_signed_packet(&my_id, packet, &request, stats);
}
}

if request.sender() == &my_id {
stats.self_repair += 1;
continue;
}
>>>>>>> 832302485 (require repair request signature, ping/pong for Testnet, Development clusters (#29351))

if request.supports_signature() {
// collect stats for signature verification
Expand Down Expand Up @@ -527,6 +555,7 @@ impl ServeRepair {
response_sender,
stats,
data_budget,
cluster_type,
);

Ok(())
Expand All @@ -545,7 +574,6 @@ impl ServeRepair {
datapoint_info!(
"serve_repair-requests_received",
("total_requests", stats.total_requests, i64),
("unsigned_requests", stats.unsigned_requests, i64),
(
"dropped_requests_outbound_bandwidth",
stats.dropped_requests_outbound_bandwidth,
Expand Down Expand Up @@ -669,6 +697,7 @@ impl ServeRepair {
.unwrap()
}

#[must_use]
fn verify_signed_packet(
my_id: &Pubkey,
packet: &Packet,
Expand All @@ -683,7 +712,6 @@ impl ServeRepair {
| RepairProtocol::LegacyHighestWindowIndexWithNonce(_, _, _, _)
| RepairProtocol::LegacyOrphanWithNonce(_, _, _)
| RepairProtocol::LegacyAncestorHashes(_, _, _) => {
debug_assert!(false); // expecting only signed request types
stats.err_unsigned += 1;
return false;
}
Expand Down Expand Up @@ -779,6 +807,7 @@ impl ServeRepair {
response_sender: &PacketBatchSender,
stats: &mut ServeRepairStats,
data_budget: &DataBudget,
cluster_type: ClusterType,
) {
let identity_keypair = self.cluster_info.keypair().clone();
let mut pending_pings = Vec::default();
Expand All @@ -792,8 +821,11 @@ impl ServeRepair {
pending_pings.push(ping_pkt);
}
if !check {
// collect stats for ping/pong verification
stats.ping_cache_check_failed += 1;
match cluster_type {
ClusterType::Testnet | ClusterType::Development => continue,
ClusterType::MainnetBeta | ClusterType::Devnet => (),
}
}
}
stats.processed += 1;
Expand Down

0 comments on commit 16cd1cf

Please sign in to comment.