diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 9e78cc418344d7..aafd8e55ddc659 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -14,6 +14,7 @@ use { histogram::Histogram, itertools::Itertools, retain_mut::RetainMut, + solana_client::connection_cache::send_wire_transaction_batch, solana_entry::entry::hash_transactions, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_ledger::blockstore_processor::TransactionStatusSender, @@ -51,8 +52,8 @@ use { transaction::{ self, AddressLoader, SanitizedTransaction, TransactionError, VersionedTransaction, }, + transport::TransportError, }, - solana_streamer::sendmmsg::{batch_send, SendPktsError}, solana_transaction_status::token_balances::{ collect_token_balances, TransactionTokenBalancesSet, }, @@ -60,7 +61,7 @@ use { cmp, collections::HashMap, env, - net::{SocketAddr, UdpSocket}, + net::SocketAddr, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, Mutex, RwLock, @@ -482,11 +483,10 @@ impl BankingStage { /// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns /// the number of successfully forwarded packets in second part of tuple fn forward_buffered_packets( - socket: &std::net::UdpSocket, tpu_forwards: &std::net::SocketAddr, packets: Vec<&Packet>, data_budget: &DataBudget, - ) -> (std::io::Result<()>, usize) { + ) -> (std::result::Result<(), TransportError>, usize) { const INTERVAL_MS: u64 = 100; const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200; const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000; @@ -502,18 +502,35 @@ impl BankingStage { .iter() .filter_map(|p| { if !p.meta.forwarded() && data_budget.take(p.meta.size) { - Some((&p.data[..p.meta.size], tpu_forwards)) + Some(&p.data[..p.meta.size]) } else { None } }) .collect(); + // TODO: see https://github.com/solana-labs/solana/issues/23819 + // fix this so returns the correct number of succeeded packets + // when there's an error sending the batch. This was left as-is for now + // in favor of shipping Quic support, which was considered higher-priority if !packet_vec.is_empty() { inc_new_counter_info!("banking_stage-forwarded_packets", packet_vec.len()); - if let Err(SendPktsError::IoError(ioerr, num_failed)) = batch_send(socket, &packet_vec) - { - return (Err(ioerr), packet_vec.len().saturating_sub(num_failed)); + + let mut measure = Measure::start("banking_stage-forward-us"); + + let res = send_wire_transaction_batch(&packet_vec, tpu_forwards); + + measure.stop(); + inc_new_counter_info!( + "banking_stage-forward-us", + measure.as_us() as usize, + 1000, + 1000 + ); + + if let Err(err) = res { + inc_new_counter_info!("banking_stage-forward_packets-failed-batches", 1); + return (Err(err), 0); } } @@ -763,7 +780,6 @@ impl BankingStage { #[allow(clippy::too_many_arguments)] fn process_buffered_packets( my_pubkey: &Pubkey, - socket: &std::net::UdpSocket, poh_recorder: &Arc>, cluster_info: &ClusterInfo, buffered_packet_batches: &mut UnprocessedPacketBatches, @@ -843,7 +859,6 @@ impl BankingStage { cluster_info, buffered_packet_batches, poh_recorder, - socket, false, data_budget, slot_metrics_tracker, @@ -862,7 +877,6 @@ impl BankingStage { cluster_info, buffered_packet_batches, poh_recorder, - socket, true, data_budget, slot_metrics_tracker, @@ -884,7 +898,6 @@ impl BankingStage { cluster_info: &ClusterInfo, buffered_packet_batches: &mut UnprocessedPacketBatches, poh_recorder: &Arc>, - socket: &UdpSocket, hold: bool, data_budget: &DataBudget, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, @@ -910,7 +923,7 @@ impl BankingStage { Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter()); let forwardable_packets_len = forwardable_packets.len(); let (_forward_result, sucessful_forwarded_packets_count) = - Self::forward_buffered_packets(socket, &addr, forwardable_packets, data_budget); + Self::forward_buffered_packets(&addr, forwardable_packets, data_budget); let failed_forwarded_packets_count = forwardable_packets_len.saturating_sub(sucessful_forwarded_packets_count); @@ -955,7 +968,6 @@ impl BankingStage { cost_model: Arc>, ) { let recorder = poh_recorder.lock().unwrap().recorder(); - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packet_batches = UnprocessedPacketBatches::with_capacity(batch_limit); let mut banking_stage_stats = BankingStageStats::new(id); let qos_service = QosService::new(cost_model, id); @@ -967,7 +979,6 @@ impl BankingStage { |_| { Self::process_buffered_packets( &my_pubkey, - &socket, poh_recorder, cluster_info, &mut buffered_packet_batches, @@ -3830,7 +3841,6 @@ mod tests { let local_node = Node::new_localhost_with_pubkey(validator_pubkey); let cluster_info = new_test_cluster_info(local_node.info); - let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let recv_socket = &local_node.sockets.tpu_forwards[0]; let test_cases = vec![ @@ -3852,7 +3862,6 @@ mod tests { &cluster_info, &mut unprocessed_packet_batches, &poh_recorder, - &send_socket, true, &data_budget, &mut LeaderSlotMetricsTracker::new(0), @@ -3930,7 +3939,6 @@ mod tests { let local_node = Node::new_localhost_with_pubkey(validator_pubkey); let cluster_info = new_test_cluster_info(local_node.info); - let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let recv_socket = &local_node.sockets.tpu_forwards[0]; let test_cases = vec![ @@ -3964,7 +3972,6 @@ mod tests { &cluster_info, &mut unprocessed_packet_batches, &poh_recorder, - &send_socket, hold, &DataBudget::default(), &mut LeaderSlotMetricsTracker::new(0),