Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Implement forwarding via TpuConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
ryleung-solana committed Mar 24, 2022
1 parent 82945ba commit 084c4ab
Showing 1 changed file with 26 additions and 19 deletions.
45 changes: 26 additions & 19 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use {
histogram::Histogram,
itertools::Itertools,
retain_mut::RetainMut,
solana_client::connection_cache::send_wire_transaction_batch,
solana_entry::entry::hash_transactions,
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
solana_ledger::blockstore_processor::TransactionStatusSender,
Expand Down Expand Up @@ -51,16 +52,16 @@ use {
transaction::{
self, AddressLoader, SanitizedTransaction, TransactionError, VersionedTransaction,
},
transport::TransportError,
},
solana_streamer::sendmmsg::{batch_send, SendPktsError},
solana_transaction_status::token_balances::{
collect_token_balances, TransactionTokenBalancesSet,
},
std::{
cmp,
collections::HashMap,
env,
net::{SocketAddr, UdpSocket},
net::SocketAddr,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc, Mutex, RwLock,
Expand Down Expand Up @@ -482,11 +483,10 @@ impl BankingStage {
/// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns
/// the number of successfully forwarded packets in second part of tuple
fn forward_buffered_packets(
socket: &std::net::UdpSocket,
tpu_forwards: &std::net::SocketAddr,
packets: Vec<&Packet>,
data_budget: &DataBudget,
) -> (std::io::Result<()>, usize) {
) -> (std::result::Result<(), TransportError>, usize) {
const INTERVAL_MS: u64 = 100;
const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200;
const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000;
Expand All @@ -502,18 +502,35 @@ impl BankingStage {
.iter()
.filter_map(|p| {
if !p.meta.forwarded() && data_budget.take(p.meta.size) {
Some((&p.data[..p.meta.size], tpu_forwards))
Some(&p.data[..p.meta.size])
} else {
None
}
})
.collect();

// TODO: see https://github.com/solana-labs/solana/issues/23819
// fix this so returns the correct number of succeeded packets
// when there's an error sending the batch. This was left as-is for now
// in favor of shipping Quic support, which was considered higher-priority
if !packet_vec.is_empty() {
inc_new_counter_info!("banking_stage-forwarded_packets", packet_vec.len());
if let Err(SendPktsError::IoError(ioerr, num_failed)) = batch_send(socket, &packet_vec)
{
return (Err(ioerr), packet_vec.len().saturating_sub(num_failed));

let mut measure = Measure::start("banking_stage-forward-us");

let res = send_wire_transaction_batch(&packet_vec, tpu_forwards);

measure.stop();
inc_new_counter_info!(
"banking_stage-forward-us",
measure.as_us() as usize,
1000,
1000
);

if let Err(err) = res {
inc_new_counter_info!("banking_stage-forward_packets-failed-batches", 1);
return (Err(err), 0);
}
}

Expand Down Expand Up @@ -763,7 +780,6 @@ impl BankingStage {
#[allow(clippy::too_many_arguments)]
fn process_buffered_packets(
my_pubkey: &Pubkey,
socket: &std::net::UdpSocket,
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &ClusterInfo,
buffered_packet_batches: &mut UnprocessedPacketBatches,
Expand Down Expand Up @@ -843,7 +859,6 @@ impl BankingStage {
cluster_info,
buffered_packet_batches,
poh_recorder,
socket,
false,
data_budget,
slot_metrics_tracker,
Expand All @@ -862,7 +877,6 @@ impl BankingStage {
cluster_info,
buffered_packet_batches,
poh_recorder,
socket,
true,
data_budget,
slot_metrics_tracker,
Expand All @@ -884,7 +898,6 @@ impl BankingStage {
cluster_info: &ClusterInfo,
buffered_packet_batches: &mut UnprocessedPacketBatches,
poh_recorder: &Arc<Mutex<PohRecorder>>,
socket: &UdpSocket,
hold: bool,
data_budget: &DataBudget,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
Expand All @@ -910,7 +923,7 @@ impl BankingStage {
Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter());
let forwardable_packets_len = forwardable_packets.len();
let (_forward_result, sucessful_forwarded_packets_count) =
Self::forward_buffered_packets(socket, &addr, forwardable_packets, data_budget);
Self::forward_buffered_packets(&addr, forwardable_packets, data_budget);
let failed_forwarded_packets_count =
forwardable_packets_len.saturating_sub(sucessful_forwarded_packets_count);

Expand Down Expand Up @@ -955,7 +968,6 @@ impl BankingStage {
cost_model: Arc<RwLock<CostModel>>,
) {
let recorder = poh_recorder.lock().unwrap().recorder();
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut buffered_packet_batches = UnprocessedPacketBatches::with_capacity(batch_limit);
let mut banking_stage_stats = BankingStageStats::new(id);
let qos_service = QosService::new(cost_model, id);
Expand All @@ -967,7 +979,6 @@ impl BankingStage {
|_| {
Self::process_buffered_packets(
&my_pubkey,
&socket,
poh_recorder,
cluster_info,
&mut buffered_packet_batches,
Expand Down Expand Up @@ -3830,7 +3841,6 @@ mod tests {

let local_node = Node::new_localhost_with_pubkey(validator_pubkey);
let cluster_info = new_test_cluster_info(local_node.info);
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let recv_socket = &local_node.sockets.tpu_forwards[0];

let test_cases = vec![
Expand All @@ -3852,7 +3862,6 @@ mod tests {
&cluster_info,
&mut unprocessed_packet_batches,
&poh_recorder,
&send_socket,
true,
&data_budget,
&mut LeaderSlotMetricsTracker::new(0),
Expand Down Expand Up @@ -3930,7 +3939,6 @@ mod tests {

let local_node = Node::new_localhost_with_pubkey(validator_pubkey);
let cluster_info = new_test_cluster_info(local_node.info);
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let recv_socket = &local_node.sockets.tpu_forwards[0];

let test_cases = vec![
Expand Down Expand Up @@ -3964,7 +3972,6 @@ mod tests {
&cluster_info,
&mut unprocessed_packet_batches,
&poh_recorder,
&send_socket,
hold,
&DataBudget::default(),
&mut LeaderSlotMetricsTracker::new(0),
Expand Down

0 comments on commit 084c4ab

Please sign in to comment.