Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Very rough implementation of forwarding via TpuConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
ryleung-solana committed Mar 22, 2022
1 parent 89ba3ff commit 4f1bc56
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 44 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ solana-faucet = { path = "../faucet", version = "=1.11.0" }
solana-measure = { path = "../measure", version = "=1.11.0" }
solana-net-utils = { path = "../net-utils", version = "=1.11.0" }
solana-sdk = { path = "../sdk", version = "=1.11.0" }
solana-streamer = { path = "../streamer", version = "=1.11.0" }
solana-transaction-status = { path = "../transaction-status", version = "=1.11.0" }
solana-version = { path = "../version", version = "=1.11.0" }
solana-vote-program = { path = "../programs/vote", version = "=1.11.0" }
Expand Down
9 changes: 9 additions & 0 deletions client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use {
quic_client::QuicTpuConnection, tpu_connection::TpuConnection, udp_client::UdpTpuConnection,
},
lazy_static::lazy_static,
solana_sdk::transport::TransportError,
std::{
collections::{hash_map::Entry, BTreeMap, HashMap},
net::{SocketAddr, UdpSocket},
Expand Down Expand Up @@ -101,6 +102,14 @@ pub fn get_connection(addr: &SocketAddr) -> Arc<dyn TpuConnection + 'static + Sy
conn
}

pub fn batch_send_single_target(
packets: &[&[u8]],
addr: &SocketAddr,
) -> Result<(), TransportError> {
let conn = get_connection(addr);
conn.send_wire_transaction_batch(packets)
}

#[cfg(test)]
mod tests {
use {
Expand Down
11 changes: 4 additions & 7 deletions client/src/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,9 @@ impl TpuConnection for QuicTpuConnection {
Ok(())
}

fn send_wire_transaction_batch(
&self,
wire_transaction_batch: &[Vec<u8>],
) -> TransportResult<()> {
fn send_wire_transaction_batch(&self, buffers: &[&[u8]]) -> TransportResult<()> {
let _guard = self.client.runtime.enter();
let send_batch = self.client.send_batch(wire_transaction_batch);
let send_batch = self.client.send_batch(buffers);
self.client.runtime.block_on(send_batch)?;
Ok(())
}
Expand Down Expand Up @@ -163,7 +160,7 @@ impl QuicClient {
Ok(())
}

pub async fn send_batch(&self, buffers: &[Vec<u8>]) -> Result<(), ClientErrorKind> {
pub async fn send_batch(&self, buffers: &[&[u8]]) -> Result<(), ClientErrorKind> {
// Start off by "testing" the connection by sending the first transaction
// This will also connect to the server if not already connected
// and reconnect and retry if the first send attempt failed
Expand All @@ -178,7 +175,7 @@ impl QuicClient {
if buffers.is_empty() {
return Ok(());
}
let connection = self._send_buffer(&buffers[0]).await?;
let connection = self._send_buffer(buffers[0]).await?;

// Used to avoid dereferencing the Arc multiple times below
// by just getting a reference to the NewConnection once
Expand Down
2 changes: 1 addition & 1 deletion client/src/thin_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ impl<C: 'static + TpuConnection> AsyncClient for ThinClient<C> {
fn async_send_batch(&self, transactions: Vec<Transaction>) -> TransportResult<()> {
let batch: Vec<VersionedTransaction> = transactions.into_iter().map(Into::into).collect();
self.tpu_connection()
.par_serialize_and_send_transaction_batch(&batch)?;
.par_serialize_and_send_transaction_batch(&batch[..])?;
Ok(())
}

Expand Down
24 changes: 9 additions & 15 deletions client/src/tpu_connection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
rayon::iter::{IntoParallelRefIterator, ParallelIterator},
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult},
std::net::{SocketAddr, UdpSocket},
};
Expand All @@ -24,22 +24,16 @@ pub trait TpuConnection {

fn par_serialize_and_send_transaction_batch(
&self,
transaction_batch: &[VersionedTransaction],
transactions: &[VersionedTransaction],
) -> TransportResult<()> {
let wire_transaction_batch: Vec<_> = transaction_batch
.par_iter()
let buffers = transactions
.into_par_iter()
.map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
.collect();
self.send_wire_transaction_batch(&wire_transaction_batch)
}
.collect::<Vec<_>>();

fn send_wire_transaction_batch(
&self,
wire_transaction_batch: &[Vec<u8>],
) -> TransportResult<()> {
for wire_transaction in wire_transaction_batch {
self.send_wire_transaction(wire_transaction)?;
}
Ok(())
let buffer_refs = buffers.iter().map(|buf| &buf[..]).collect::<Vec<_>>();
self.send_wire_transaction_batch(&buffer_refs)
}

fn send_wire_transaction_batch(&self, buffers: &[&[u8]]) -> TransportResult<()>;
}
12 changes: 10 additions & 2 deletions client/src/udp_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
use {
crate::tpu_connection::TpuConnection,
core::iter::repeat,
solana_sdk::transport::Result as TransportResult,
solana_streamer::sendmmsg::batch_send,
std::net::{SocketAddr, UdpSocket},
};

Expand All @@ -24,8 +26,14 @@ impl TpuConnection for UdpTpuConnection {
&self.addr
}

fn send_wire_transaction(&self, wire_transaction: &[u8]) -> TransportResult<()> {
self.socket.send_to(wire_transaction, self.addr)?;
fn send_wire_transaction(&self, data: &[u8]) -> TransportResult<()> {
self.socket.send_to(data, self.addr)?;
Ok(())
}

fn send_wire_transaction_batch(&self, buffers: &[&[u8]]) -> TransportResult<()> {
let pkts: Vec<_> = buffers.iter().zip(repeat(self.tpu_addr())).collect();
batch_send(&self.socket, &pkts)?;
Ok(())
}
}
29 changes: 12 additions & 17 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use {
histogram::Histogram,
itertools::Itertools,
retain_mut::RetainMut,
solana_client::connection_cache::batch_send_single_target,
solana_entry::entry::hash_transactions,
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
solana_ledger::blockstore_processor::TransactionStatusSender,
Expand Down Expand Up @@ -51,16 +52,16 @@ use {
transaction::{
self, AddressLoader, SanitizedTransaction, TransactionError, VersionedTransaction,
},
transport::TransportError,
},
solana_streamer::sendmmsg::{batch_send, SendPktsError},
solana_transaction_status::token_balances::{
collect_token_balances, TransactionTokenBalancesSet,
},
std::{
cmp,
collections::HashMap,
env,
net::{SocketAddr, UdpSocket},
net::SocketAddr,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc, Mutex, RwLock,
Expand Down Expand Up @@ -482,11 +483,10 @@ impl BankingStage {
/// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns
/// the number of successfully forwarded packets in second part of tuple
fn forward_buffered_packets(
socket: &std::net::UdpSocket,
tpu_forwards: &std::net::SocketAddr,
packets: Vec<&Packet>,
data_budget: &DataBudget,
) -> (std::io::Result<()>, usize) {
) -> (std::result::Result<(), TransportError>, usize) {
const INTERVAL_MS: u64 = 100;
const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200;
const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000;
Expand All @@ -502,18 +502,21 @@ impl BankingStage {
.iter()
.filter_map(|p| {
if !p.meta.forwarded() && data_budget.take(p.meta.size) {
Some((&p.data[..p.meta.size], tpu_forwards))
Some(&p.data[..p.meta.size])
} else {
None
}
})
.collect();

// TODO: see https://github.com/solana-labs/solana/issues/23819
// fix this so returns the correct number of succeeded packets
// when there's an error sending the batch. This was left as-is for now
// in favor of shipping Quic support, which was considered higher-priority
if !packet_vec.is_empty() {
inc_new_counter_info!("banking_stage-forwarded_packets", packet_vec.len());
if let Err(SendPktsError::IoError(ioerr, num_failed)) = batch_send(socket, &packet_vec)
{
return (Err(ioerr), packet_vec.len().saturating_sub(num_failed));
if let Err(err) = batch_send_single_target(&packet_vec, tpu_forwards) {
return (Err(err), 0);
}
}

Expand Down Expand Up @@ -763,7 +766,6 @@ impl BankingStage {
#[allow(clippy::too_many_arguments)]
fn process_buffered_packets(
my_pubkey: &Pubkey,
socket: &std::net::UdpSocket,
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &ClusterInfo,
buffered_packet_batches: &mut UnprocessedPacketBatches,
Expand Down Expand Up @@ -843,7 +845,6 @@ impl BankingStage {
cluster_info,
buffered_packet_batches,
poh_recorder,
socket,
false,
data_budget,
slot_metrics_tracker,
Expand All @@ -862,7 +863,6 @@ impl BankingStage {
cluster_info,
buffered_packet_batches,
poh_recorder,
socket,
true,
data_budget,
slot_metrics_tracker,
Expand All @@ -884,7 +884,6 @@ impl BankingStage {
cluster_info: &ClusterInfo,
buffered_packet_batches: &mut UnprocessedPacketBatches,
poh_recorder: &Arc<Mutex<PohRecorder>>,
socket: &UdpSocket,
hold: bool,
data_budget: &DataBudget,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
Expand All @@ -910,7 +909,7 @@ impl BankingStage {
Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter());
let forwardable_packets_len = forwardable_packets.len();
let (_forward_result, sucessful_forwarded_packets_count) =
Self::forward_buffered_packets(socket, &addr, forwardable_packets, data_budget);
Self::forward_buffered_packets(&addr, forwardable_packets, data_budget);
let failed_forwarded_packets_count =
forwardable_packets_len.saturating_sub(sucessful_forwarded_packets_count);

Expand Down Expand Up @@ -955,7 +954,6 @@ impl BankingStage {
cost_model: Arc<RwLock<CostModel>>,
) {
let recorder = poh_recorder.lock().unwrap().recorder();
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut buffered_packet_batches = UnprocessedPacketBatches::with_capacity(batch_limit);
let mut banking_stage_stats = BankingStageStats::new(id);
let qos_service = QosService::new(cost_model, id);
Expand All @@ -967,7 +965,6 @@ impl BankingStage {
|_| {
Self::process_buffered_packets(
&my_pubkey,
&socket,
poh_recorder,
cluster_info,
&mut buffered_packet_batches,
Expand Down Expand Up @@ -3928,7 +3925,6 @@ mod tests {

let local_node = Node::new_localhost_with_pubkey(validator_pubkey);
let cluster_info = new_test_cluster_info(local_node.info);
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let recv_socket = &local_node.sockets.tpu_forwards[0];

let test_cases = vec![
Expand Down Expand Up @@ -3962,7 +3958,6 @@ mod tests {
&cluster_info,
&mut unprocessed_packet_batches,
&poh_recorder,
&send_socket,
hold,
&DataBudget::default(),
&mut LeaderSlotMetricsTracker::new(0),
Expand Down
Loading

0 comments on commit 4f1bc56

Please sign in to comment.