Skip to content

Commit

Permalink
Define BankingTracer::create_channels() (#4041)
Browse files Browse the repository at this point in the history
* Define BankingTracer::create_channels()

* Make create_channels() take a bool
  • Loading branch information
ryoqun authored Dec 13, 2024
1 parent 3a6d593 commit d11072e
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 48 deletions.
15 changes: 11 additions & 4 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use {
solana_client::connection_cache::ConnectionCache,
solana_core::{
banking_stage::BankingStage,
banking_trace::{BankingPacketBatch, BankingTracer, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT},
banking_trace::{
BankingPacketBatch, BankingTracer, Channels, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
},
validator::BlockProductionMethod,
},
solana_gossip::cluster_info::{ClusterInfo, Node},
Expand Down Expand Up @@ -440,9 +442,14 @@ fn main() {
BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
)))
.unwrap();
let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote();
let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote();
let (gossip_vote_sender, gossip_vote_receiver) = banking_tracer.create_channel_gossip_vote();
let Channels {
non_vote_sender,
non_vote_receiver,
tpu_vote_sender,
tpu_vote_receiver,
gossip_vote_sender,
gossip_vote_receiver,
} = banking_tracer.create_channels(false);
let cluster_info = {
let keypair = Arc::new(Keypair::new());
let node = Node::new_localhost_with_pubkey(&keypair.pubkey());
Expand Down
13 changes: 9 additions & 4 deletions core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#![feature(test)]

use {
solana_core::validator::BlockProductionMethod,
solana_core::{banking_trace::Channels, validator::BlockProductionMethod},
solana_vote_program::{vote_state::TowerSync, vote_transaction::new_tower_sync_transaction},
};

Expand Down Expand Up @@ -211,9 +211,14 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
genesis_config.ticks_per_slot = 10_000;

let banking_tracer = BankingTracer::new_disabled();
let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote();
let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote();
let (gossip_vote_sender, gossip_vote_receiver) = banking_tracer.create_channel_gossip_vote();
let Channels {
non_vote_sender,
non_vote_receiver,
tpu_vote_sender,
tpu_vote_receiver,
gossip_vote_sender,
gossip_vote_receiver,
} = banking_tracer.create_channels(false);

let mut bank = Bank::new_for_benches(&genesis_config);
// Allow arbitrary transaction processing time for the purposes of this bench
Expand Down
26 changes: 21 additions & 5 deletions core/benches/banking_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use {
for_test::{
drop_and_clean_temp_dir_unless_suppressed, sample_packet_batch, terminate_tracer,
},
receiving_loop_with_minimized_sender_overhead, BankingPacketBatch, BankingTracer,
receiving_loop_with_minimized_sender_overhead, BankingPacketBatch, BankingTracer, Channels,
TraceError, TracerThreadResult, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
},
std::{
Expand Down Expand Up @@ -35,7 +35,11 @@ fn black_box_packet_batch(packet_batch: BankingPacketBatch) -> TracerThreadResul
fn bench_banking_tracer_main_thread_overhead_noop_baseline(bencher: &mut Bencher) {
let exit = Arc::<AtomicBool>::default();
let tracer = BankingTracer::new_disabled();
let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote();
let Channels {
non_vote_sender,
non_vote_receiver,
..
} = tracer.create_channels(false);

let exit_for_dummy_thread = exit.clone();
let dummy_main_thread = thread::spawn(move || {
Expand Down Expand Up @@ -64,7 +68,11 @@ fn bench_banking_tracer_main_thread_overhead_under_peak_write(bencher: &mut Benc
BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
)))
.unwrap();
let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote();
let Channels {
non_vote_sender,
non_vote_receiver,
..
} = tracer.create_channels(false);

let exit_for_dummy_thread = exit.clone();
let dummy_main_thread = thread::spawn(move || {
Expand Down Expand Up @@ -101,7 +109,11 @@ fn bench_banking_tracer_main_thread_overhead_under_sustained_write(bencher: &mut
1024 * 1024, // cause more frequent trace file rotation
)))
.unwrap();
let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote();
let Channels {
non_vote_sender,
non_vote_receiver,
..
} = tracer.create_channels(false);

let exit_for_dummy_thread = exit.clone();
let dummy_main_thread = thread::spawn(move || {
Expand Down Expand Up @@ -142,7 +154,11 @@ fn bench_banking_tracer_background_thread_throughput(bencher: &mut Bencher) {

let (tracer, tracer_thread) =
BankingTracer::new(Some((&path, exit.clone(), 50 * 1024 * 1024))).unwrap();
let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote();
let Channels {
non_vote_sender,
non_vote_receiver,
..
} = tracer.create_channels(false);

let dummy_main_thread = thread::spawn(move || {
receiving_loop_with_minimized_sender_overhead::<_, TraceError, 0>(
Expand Down
16 changes: 11 additions & 5 deletions core/src/banking_simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use {
crate::{
banking_stage::{BankingStage, LikeClusterInfo},
banking_trace::{
BankingPacketBatch, BankingTracer, ChannelLabel, TimedTracedEvent, TracedEvent,
TracedSender, TracerThread, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, BASENAME,
BankingPacketBatch, BankingTracer, ChannelLabel, Channels, TimedTracedEvent,
TracedEvent, TracedSender, TracerThread, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
BASENAME,
},
validator::BlockProductionMethod,
},
Expand Down Expand Up @@ -753,9 +754,14 @@ impl BankingSimulator {
BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
);

let (non_vote_sender, non_vote_receiver) = retracer.create_channel_non_vote();
let (tpu_vote_sender, tpu_vote_receiver) = retracer.create_channel_tpu_vote();
let (gossip_vote_sender, gossip_vote_receiver) = retracer.create_channel_gossip_vote();
let Channels {
non_vote_sender,
non_vote_receiver,
tpu_vote_sender,
tpu_vote_receiver,
gossip_vote_sender,
gossip_vote_receiver,
} = retracer.create_channels(false);

let connection_cache = Arc::new(ConnectionCache::new("connection_cache_sim"));
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
Expand Down
62 changes: 41 additions & 21 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ impl BankingStage {
mod tests {
use {
super::*,
crate::banking_trace::{BankingPacketBatch, BankingTracer},
crate::banking_trace::{BankingPacketBatch, BankingTracer, Channels},
crossbeam_channel::{unbounded, Receiver},
itertools::Itertools,
solana_entry::entry::{self, Entry, EntrySlice},
Expand Down Expand Up @@ -863,10 +863,14 @@ mod tests {
let genesis_config = create_genesis_config(2).genesis_config;
let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
let banking_tracer = BankingTracer::new_disabled();
let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote();
let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote();
let (gossip_vote_sender, gossip_vote_receiver) =
banking_tracer.create_channel_gossip_vote();
let Channels {
non_vote_sender,
non_vote_receiver,
tpu_vote_sender,
tpu_vote_receiver,
gossip_vote_sender,
gossip_vote_receiver,
} = banking_tracer.create_channels(false);
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let blockstore = Arc::new(
Expand Down Expand Up @@ -915,10 +919,14 @@ mod tests {
let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
let start_hash = bank.last_blockhash();
let banking_tracer = BankingTracer::new_disabled();
let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote();
let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote();
let (gossip_vote_sender, gossip_vote_receiver) =
banking_tracer.create_channel_gossip_vote();
let Channels {
non_vote_sender,
non_vote_receiver,
tpu_vote_sender,
tpu_vote_receiver,
gossip_vote_sender,
gossip_vote_receiver,
} = banking_tracer.create_channels(false);
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let blockstore = Arc::new(
Expand Down Expand Up @@ -993,10 +1001,14 @@ mod tests {
let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
let start_hash = bank.last_blockhash();
let banking_tracer = BankingTracer::new_disabled();
let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote();
let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote();
let (gossip_vote_sender, gossip_vote_receiver) =
banking_tracer.create_channel_gossip_vote();
let Channels {
non_vote_sender,
non_vote_receiver,
tpu_vote_sender,
tpu_vote_receiver,
gossip_vote_sender,
gossip_vote_receiver,
} = banking_tracer.create_channels(false);
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let blockstore = Arc::new(
Expand Down Expand Up @@ -1127,7 +1139,14 @@ mod tests {
..
} = create_slow_genesis_config(2);
let banking_tracer = BankingTracer::new_disabled();
let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote();
let Channels {
non_vote_sender,
non_vote_receiver,
tpu_vote_sender,
tpu_vote_receiver,
gossip_vote_sender,
gossip_vote_receiver,
} = banking_tracer.create_channels(false);

// Process a batch that includes a transaction that receives two lamports.
let alice = Keypair::new();
Expand Down Expand Up @@ -1157,9 +1176,6 @@ mod tests {
.send(BankingPacketBatch::new(packet_batches))
.unwrap();

let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote();
let (gossip_vote_sender, gossip_vote_receiver) =
banking_tracer.create_channel_gossip_vote();
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
Expand Down Expand Up @@ -1350,10 +1366,14 @@ mod tests {
let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
let start_hash = bank.last_blockhash();
let banking_tracer = BankingTracer::new_disabled();
let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote();
let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote();
let (gossip_vote_sender, gossip_vote_receiver) =
banking_tracer.create_channel_gossip_vote();
let Channels {
non_vote_sender,
non_vote_receiver,
tpu_vote_sender,
tpu_vote_receiver,
gossip_vote_sender,
gossip_vote_receiver,
} = banking_tracer.create_channels(false);
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let blockstore = Arc::new(
Expand Down
87 changes: 84 additions & 3 deletions core/src/banking_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,15 @@ pub fn receiving_loop_with_minimized_sender_overhead<T, E, const SLEEP_MS: u64>(
Ok(())
}

pub struct Channels {
pub non_vote_sender: BankingPacketSender,
pub non_vote_receiver: BankingPacketReceiver,
pub tpu_vote_sender: BankingPacketSender,
pub tpu_vote_receiver: BankingPacketReceiver,
pub gossip_vote_sender: BankingPacketSender,
pub gossip_vote_receiver: BankingPacketReceiver,
}

impl BankingTracer {
pub fn new(
maybe_config: Option<(&PathBuf, Arc<AtomicBool>, DirByteLimit)>,
Expand Down Expand Up @@ -214,22 +223,85 @@ impl BankingTracer {
self.active_tracer.is_some()
}

pub fn create_channels(&self, unify_channels: bool) -> Channels {
if unify_channels {
// Returning the same channel is needed when unified scheduler supports block
// production because unified scheduler doesn't distinguish them and treats them as
// unified as the single source of incoming transactions. This is to reduce the number
// of recv operation per loop and load balance evenly as much as possible there.
let (non_vote_sender, non_vote_receiver) = self.create_channel_non_vote();
// Tap into some private helper fns so that banking trace labelling works as before.
let (tpu_vote_sender, tpu_vote_receiver) =
self.create_unified_channel_tpu_vote(&non_vote_sender, &non_vote_receiver);
let (gossip_vote_sender, gossip_vote_receiver) =
self.create_unified_channel_gossip_vote(&non_vote_sender, &non_vote_receiver);

Channels {
non_vote_sender,
non_vote_receiver,
tpu_vote_sender,
tpu_vote_receiver,
gossip_vote_sender,
gossip_vote_receiver,
}
} else {
let (non_vote_sender, non_vote_receiver) = self.create_channel_non_vote();
let (tpu_vote_sender, tpu_vote_receiver) = self.create_channel_tpu_vote();
let (gossip_vote_sender, gossip_vote_receiver) = self.create_channel_gossip_vote();

Channels {
non_vote_sender,
non_vote_receiver,
tpu_vote_sender,
tpu_vote_receiver,
gossip_vote_sender,
gossip_vote_receiver,
}
}
}

fn create_channel(&self, label: ChannelLabel) -> (BankingPacketSender, BankingPacketReceiver) {
Self::channel(label, self.active_tracer.as_ref().cloned())
}

pub fn create_channel_non_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) {
fn create_channel_non_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) {
self.create_channel(ChannelLabel::NonVote)
}

pub fn create_channel_tpu_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) {
fn create_channel_tpu_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) {
self.create_channel(ChannelLabel::TpuVote)
}

pub fn create_channel_gossip_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) {
fn create_channel_gossip_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) {
self.create_channel(ChannelLabel::GossipVote)
}

fn create_unified_channel_tpu_vote(
&self,
sender: &TracedSender,
receiver: &BankingPacketReceiver,
) -> (BankingPacketSender, BankingPacketReceiver) {
Self::channel_inner(
ChannelLabel::TpuVote,
self.active_tracer.as_ref().cloned(),
sender.sender.clone(),
receiver.clone(),
)
}

fn create_unified_channel_gossip_vote(
&self,
sender: &TracedSender,
receiver: &BankingPacketReceiver,
) -> (BankingPacketSender, BankingPacketReceiver) {
Self::channel_inner(
ChannelLabel::GossipVote,
self.active_tracer.as_ref().cloned(),
sender.sender.clone(),
receiver.clone(),
)
}

pub fn hash_event(&self, slot: Slot, blockhash: &Hash, bank_hash: &Hash) {
self.trace_event(|| {
TimedTracedEvent(
Expand Down Expand Up @@ -258,6 +330,15 @@ impl BankingTracer {
active_tracer: Option<ActiveTracer>,
) -> (TracedSender, Receiver<BankingPacketBatch>) {
let (sender, receiver) = unbounded();
Self::channel_inner(label, active_tracer, sender, receiver)
}

fn channel_inner(
label: ChannelLabel,
active_tracer: Option<ActiveTracer>,
sender: Sender<BankingPacketBatch>,
receiver: BankingPacketReceiver,
) -> (TracedSender, Receiver<BankingPacketBatch>) {
(TracedSender::new(label, sender, active_tracer), receiver)
}

Expand Down
Loading

0 comments on commit d11072e

Please sign in to comment.