From 97d9494cd02fde1d52bd6b5454dc13416a99bb4f Mon Sep 17 00:00:00 2001 From: Jack May Date: Tue, 23 Apr 2019 14:48:36 -0700 Subject: [PATCH 1/2] Cleanup bench-exchange --- bench-exchange/src/bench.rs | 362 ++++++++++++++++++------------------ bench-exchange/src/cli.rs | 29 ++- bench-exchange/src/main.rs | 6 +- 3 files changed, 211 insertions(+), 186 deletions(-) diff --git a/bench-exchange/src/bench.rs b/bench-exchange/src/bench.rs index 6d20f55d7717c0..4936b962773520 100644 --- a/bench-exchange/src/bench.rs +++ b/bench-exchange/src/bench.rs @@ -34,7 +34,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; // TODO Chunk length as specified results in a bunch of failures, divide by 10 helps... // Assume 4MB network buffers, and 512 byte packets -const CHUNK_LEN: usize = 4 * 1024 * 1024 / 512 / 10; +const FUND_CHUNK_LEN: usize = 4 * 1024 * 1024 / 512; // Maximum system transfers per transaction const MAX_TRANSFERS_PER_TX: u64 = 4; @@ -48,9 +48,10 @@ pub struct Config { pub identity: Keypair, pub threads: usize, pub duration: Duration, - pub trade_delay: u64, + pub transfer_delay: u64, pub fund_amount: u64, pub batch_size: usize, + pub chunk_size: usize, pub account_groups: usize, } @@ -60,9 +61,10 @@ impl Default for Config { identity: Keypair::new(), threads: 4, duration: Duration::new(u64::max_value(), 0), - trade_delay: 0, + transfer_delay: 0, fund_amount: 100_000, batch_size: 10, + chunk_size: 10, account_groups: 100, } } @@ -73,9 +75,9 @@ pub struct SampleStats { /// Maximum TPS reported by this node pub tps: f32, /// Total time taken for those txs - pub tx_time: Duration, + pub elapsed: Duration, /// Total transactions reported by this node - pub tx_count: u64, + pub txs: u64, } pub fn do_bench_exchange(clients: Vec, config: Config) @@ -86,9 +88,10 @@ where identity, threads, duration, - trade_delay, + transfer_delay, fund_amount, batch_size, + chunk_size, account_groups, } = config; let accounts_in_groups = batch_size * account_groups; @@ -116,12 +119,12 @@ where .map(|keypair| keypair.pubkey()) .collect(); - info!("Fund trader accounts"); + println!("Fund trader accounts"); fund_keys(client, &identity, &trader_signers, fund_amount); - info!("Fund swapper accounts"); + println!("Fund swapper accounts"); fund_keys(client, &identity, &swapper_signers, fund_amount); - info!("Create {:?} source token accounts", src_pubkeys.len()); + println!("Create {:?} source token accounts", src_pubkeys.len()); create_token_accounts(client, &trader_signers, &src_pubkeys); info!("Create {:?} profit token accounts", profit_pubkeys.len()); create_token_accounts(client, &swapper_signers, &profit_pubkeys); @@ -129,30 +132,21 @@ where // Collect the max transaction rate and total tx count seen (single node only) let sample_stats = Arc::new(RwLock::new(Vec::new())); let sample_period = 1; // in seconds - info!("Sampling clients for tps every {} s", sample_period); - - let sample_threads: Vec<_> = clients - .iter() - .map(|client| { - let exit_signal = exit_signal.clone(); - let sample_stats = sample_stats.clone(); - let client = client.clone(); - Builder::new() - .name("solana-exchange-sample".to_string()) - .spawn(move || sample_tx_count(&exit_signal, &sample_stats, sample_period, &client)) - .unwrap() - }) - .collect(); + println!("Sampling clients for tps every {} s", sample_period); + println!( + "Requesting and swapping trades with {} ms delay per thread...", + transfer_delay + ); let shared_txs: SharedTransactions = Arc::new(RwLock::new(VecDeque::new())); let shared_tx_active_thread_count = Arc::new(AtomicIsize::new(0)); - let total_tx_sent_count = Arc::new(AtomicUsize::new(0)); + let total_txs_sent_count = Arc::new(AtomicUsize::new(0)); let s_threads: Vec<_> = (0..threads) .map(|_| { let exit_signal = exit_signal.clone(); let shared_txs = shared_txs.clone(); let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); - let total_tx_sent_count = total_tx_sent_count.clone(); + let total_txs_sent_count = total_txs_sent_count.clone(); let client = clients[0].clone(); Builder::new() .name("solana-exchange-transfer".to_string()) @@ -161,7 +155,7 @@ where &exit_signal, &shared_txs, &shared_tx_active_thread_count, - &total_tx_sent_count, + &total_txs_sent_count, &client, ) }) @@ -187,6 +181,7 @@ where &swapper_signers, &profit_pubkeys, batch_size, + chunk_size, account_groups, &client, ) @@ -210,8 +205,10 @@ where &shared_tx_active_thread_count, &trader_signers, &src_pubkeys, - trade_delay, + &dst_pubkeys, + transfer_delay, batch_size, + chunk_size, account_groups, &client, ) @@ -219,23 +216,43 @@ where .unwrap() }; - info!("Requesting and swapping trades"); + let sample_threads: Vec<_> = clients + .iter() + .map(|client| { + let exit_signal = exit_signal.clone(); + let sample_stats = sample_stats.clone(); + let client = client.clone(); + Builder::new() + .name("solana-exchange-sample".to_string()) + .spawn(move || sample_txs(&exit_signal, &sample_stats, sample_period, &client)) + .unwrap() + }) + .collect(); + sleep(duration); + println!("Stopping threads"); exit_signal.store(true, Ordering::Relaxed); + println!("wait for trader thread"); let _ = trader_thread.join(); + println!("waiting for swapper thread"); let _ = swapper_thread.join(); + println!("wait for tx threads"); for t in s_threads { let _ = t.join(); } + println!("wait for sample threads"); for t in sample_threads { let _ = t.join(); } - compute_and_report_stats(&sample_stats, total_tx_sent_count.load(Ordering::Relaxed)); + compute_and_report_stats( + &sample_stats, + total_txs_sent_count.load(Ordering::Relaxed) as u64, + ); } -fn sample_tx_count( +fn sample_txs( exit_signal: &Arc, sample_stats: &Arc>>, sample_period: u64, @@ -244,49 +261,48 @@ fn sample_tx_count( T: Client, { let mut max_tps = 0.0; - let mut total_tx_time; - let mut total_tx_count; + let mut total_elapsed; + let mut total_txs; let mut now = Instant::now(); let start_time = now; - let mut initial_tx_count = client.get_transaction_count().expect("transaction count"); - let first_tx_count = initial_tx_count; + let initial_txs = client.get_transaction_count().expect("transaction count"); + let mut last_txs = initial_txs; loop { - let mut tx_count = client.get_transaction_count().expect("transaction count"); - let duration = now.elapsed(); + total_elapsed = start_time.elapsed(); + let elapsed = now.elapsed(); now = Instant::now(); - if tx_count < initial_tx_count { - println!( - "expected tx_count({}) >= initial_tx_count({})", - tx_count, initial_tx_count - ); - tx_count = initial_tx_count; + let mut txs = client.get_transaction_count().expect("transaction count"); + + if txs < last_txs { + error!("expected txs({}) >= last_txs({})", txs, last_txs); + txs = last_txs; } - let sample = tx_count - initial_tx_count; - initial_tx_count = tx_count; + total_txs = txs - initial_txs; + let sample_txs = txs - last_txs; + last_txs = txs; - let tps = sample as f32 / duration_as_s(&duration); + let tps = sample_txs as f32 / duration_as_s(&elapsed); if tps > max_tps { max_tps = tps; } - total_tx_time = start_time.elapsed(); - total_tx_count = tx_count - first_tx_count; - trace!( + + info!( "Sampler {:9.2} TPS, Transactions: {:6}, Total transactions: {} over {} s", tps, - sample, - total_tx_count, - total_tx_time.as_secs(), + sample_txs, + total_txs, + total_elapsed.as_secs(), ); if exit_signal.load(Ordering::Relaxed) { let stats = SampleStats { tps: max_tps, - tx_time: total_tx_time, - tx_count: total_tx_count, + elapsed: total_elapsed, + txs: total_txs, }; sample_stats.write().unwrap().push(stats); - break; + return; } sleep(Duration::from_secs(sample_period)); } @@ -296,7 +312,7 @@ fn do_tx_transfers( exit_signal: &Arc, shared_txs: &SharedTransactions, shared_tx_thread_count: &Arc, - total_tx_sent_count: &Arc, + total_txs_sent_count: &Arc, client: &Arc, ) where T: Client, @@ -308,49 +324,45 @@ fn do_tx_transfers( let mut shared_txs_wl = shared_txs.write().unwrap(); txs = shared_txs_wl.pop_front(); } - match txs { - Some(txs0) => { - let n = txs0.len(); - - shared_tx_thread_count.fetch_add(1, Ordering::Relaxed); - let now = Instant::now(); - for tx in txs0 { - client.async_send_transaction(tx).expect("Transfer"); - } - let duration = now.elapsed(); - shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed); - - total_tx_sent_count.fetch_add(n, Ordering::Relaxed); - stats.total += n as u64; - stats.sent_ns += duration_as_ns(&duration); - let rate = n as f32 / duration_as_s(&duration); - if rate > stats.sent_peak_rate { - stats.sent_peak_rate = rate; - } - trace!(" tx {:?} sent {:.2}/s", n, rate); - - solana_metrics::submit( - influxdb::Point::new("bench-exchange") - .add_tag("op", influxdb::Value::String("do_tx_transfers".to_string())) - .add_field( - "duration", - influxdb::Value::Integer(duration_as_ms(&duration) as i64), - ) - .add_field("count", influxdb::Value::Integer(n as i64)) - .to_owned(), - ); + if let Some(txs0) = txs { + let n = txs0.len(); + + shared_tx_thread_count.fetch_add(1, Ordering::Relaxed); + let now = Instant::now(); + for tx in txs0 { + client.async_send_transaction(tx).expect("Transfer"); } - None => { - if exit_signal.load(Ordering::Relaxed) { - info!( - " Thread Transferred {} Txs, avg {:.2}/s peak {:.2}/s", - stats.total, - (stats.total as f64 / stats.sent_ns as f64) * 1_000_000_000_f64, - stats.sent_peak_rate, - ); - break; - } + let duration = now.elapsed(); + shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed); + + total_txs_sent_count.fetch_add(n, Ordering::Relaxed); + stats.total += n as u64; + stats.sent_ns += duration_as_ns(&duration); + let rate = n as f32 / duration_as_s(&duration); + if rate > stats.sent_peak_rate { + stats.sent_peak_rate = rate; } + trace!(" tx {:?} sent {:.2}/s", n, rate); + + solana_metrics::submit( + influxdb::Point::new("bench-exchange") + .add_tag("op", influxdb::Value::String("do_tx_transfers".to_string())) + .add_field( + "duration", + influxdb::Value::Integer(duration_as_ms(&duration) as i64), + ) + .add_field("count", influxdb::Value::Integer(n as i64)) + .to_owned(), + ); + } + if exit_signal.load(Ordering::Relaxed) { + println!( + " Thread Transferred {} Txs, avg {:.2}/s peak {:.2}/s", + stats.total, + (stats.total as f64 / stats.sent_ns as f64) * 1_000_000_000_f64, + stats.sent_peak_rate, + ); + return; } } } @@ -379,6 +391,7 @@ fn swapper( signers: &[Arc], profit_pubkeys: &[Pubkey], batch_size: usize, + chunk_size: usize, account_groups: usize, client: &Arc, ) where @@ -387,7 +400,6 @@ fn swapper( let mut stats = Stats::default(); let mut order_book = OrderBook::default(); let mut account_group: usize = 0; - let mut one_more_time = true; let mut blockhash = client .get_recent_blockhash() .expect("Failed to get blockhash"); @@ -401,7 +413,10 @@ fn swapper( == 0 { tries += 1; - if tries > 10 { + if tries > 30 { + if exit_signal.load(Ordering::Relaxed) { + break 'outer; + } debug!("Give up waiting, dump batch"); continue 'outer; } @@ -493,7 +508,7 @@ fn swapper( .to_owned(), ); - let chunks: Vec<_> = to_swap_txs.chunks(CHUNK_LEN).collect(); + let chunks: Vec<_> = to_swap_txs.chunks(chunk_size).collect(); { let mut shared_txs_wl = shared_txs.write().unwrap(); for chunk in chunks { @@ -507,29 +522,27 @@ fn swapper( } if exit_signal.load(Ordering::Relaxed) { - if !one_more_time { - info!("{} Swaps with batch size {}", stats.total, batch_size); - info!( - " Keygen avg {:.2}/s peak {:.2}/s", - (stats.total as f64 / stats.keygen_ns as f64) * 1_000_000_000_f64, - stats.keygen_peak_rate - ); - info!( - " Signed avg {:.2}/s peak {:.2}/s", - (stats.total as f64 / stats.sign_ns as f64) * 1_000_000_000_f64, - stats.sign_peak_rate - ); - assert_eq!( - order_book.get_num_outstanding().0 + order_book.get_num_outstanding().1, - 0 - ); - break; - } - // Grab any outstanding trades - sleep(Duration::from_secs(2)); - one_more_time = false; + break 'outer; } } + println!( + "{} Swaps, batch size {}, chunk size {}", + stats.total, batch_size, chunk_size + ); + println!( + " Keygen avg {:.2}/s peak {:.2}/s", + (stats.total as f64 / stats.keygen_ns as f64) * 1_000_000_000_f64, + stats.keygen_peak_rate + ); + println!( + " Signed avg {:.2}/s peak {:.2}/s", + (stats.total as f64 / stats.sign_ns as f64) * 1_000_000_000_f64, + stats.sign_peak_rate + ); + assert_eq!( + order_book.get_num_outstanding().0 + order_book.get_num_outstanding().1, + 0 + ); } #[allow(clippy::too_many_arguments)] @@ -542,6 +555,7 @@ fn trader( srcs: &[Pubkey], delay: u64, batch_size: usize, + chunk_size: usize, account_groups: usize, client: &Arc, ) where @@ -563,8 +577,6 @@ fn trader( let now = Instant::now(); let trade_keys = generate_keypairs(batch_size as u64); - stats.total += batch_size as u64; - let mut trades = vec![]; let mut trade_infos = vec![]; let start = account_group * batch_size as usize; @@ -604,7 +616,7 @@ fn trader( } trace!("sw {:?} keypairs {:.2} /s", batch_size, rate); - trades.chunks(CHUNK_LEN).for_each(|chunk| { + trades.chunks(chunk_size).for_each(|chunk| { let now = Instant::now(); // Don't get a blockhash every time @@ -654,16 +666,13 @@ fn trader( .to_owned(), ); - let chunks: Vec<_> = trades_txs.chunks(CHUNK_LEN).collect(); { let mut shared_txs_wl = shared_txs .write() .expect("Failed to send tx to transfer threads"); - for chunk in chunks { - shared_txs_wl.push_back(chunk.to_vec()); - } + stats.total += chunk_size as u64; + shared_txs_wl.push_back(trades_txs); } - if delay > 0 { sleep(Duration::from_millis(delay)); } @@ -678,13 +687,16 @@ fn trader( } if exit_signal.load(Ordering::Relaxed) { - info!("{} Trades with batch size {}", stats.total, batch_size); - info!( + println!( + "{} Trades with batch size {} chunk size {}", + stats.total, batch_size, chunk_size + ); + println!( " Keygen avg {:.2}/s peak {:.2}/s", (stats.total as f64 / stats.keygen_ns as f64) * 1_000_000_000_f64, stats.keygen_peak_rate ); - info!( + println!( " Signed avg {:.2}/s peak {:.2}/s", (stats.total as f64 / stats.sign_ns as f64) * 1_000_000_000_f64, stats.sign_peak_rate @@ -705,7 +717,7 @@ where return true; } Err(e) => { - info!("error: {:?}", e); + println!("error: {:?}", e); } } } @@ -718,7 +730,7 @@ pub fn fund_keys(client: &Client, source: &Keypair, dests: &[Arc], lamp let mut funded: Vec<(&Keypair, u64)> = vec![(source, total)]; let mut notfunded: Vec<&Arc> = dests.iter().collect(); - info!( + println!( " Funding {} keys with {} lamports each", dests.len(), lamports @@ -754,7 +766,7 @@ pub fn fund_keys(client: &Client, source: &Keypair, dests: &[Arc], lamp } } - to_fund.chunks(CHUNK_LEN).for_each(|chunk| { + to_fund.chunks(FUND_CHUNK_LEN).for_each(|chunk| { #[allow(clippy::clone_double_ref)] // sigh let mut to_fund_txs: Vec<_> = chunk .par_iter() @@ -833,7 +845,7 @@ pub fn create_token_accounts(client: &Client, signers: &[Arc], accounts let mut notfunded: Vec<(&Arc, &Pubkey)> = signers.iter().zip(accounts).collect(); while !notfunded.is_empty() { - notfunded.chunks(CHUNK_LEN).for_each(|chunk| { + notfunded.chunks(FUND_CHUNK_LEN).for_each(|chunk| { let mut to_create_txs: Vec<_> = chunk .par_iter() .map(|(signer, new)| { @@ -912,47 +924,43 @@ pub fn create_token_accounts(client: &Client, signers: &[Arc], accounts } } -fn compute_and_report_stats(maxes: &Arc>>, total_tx_send_count: usize) { - let mut max_tx_count = 0; - let mut max_tx_time = Duration::new(0, 0); - info!("| Max TPS | Total Transactions"); - info!("+---------------+--------------------"); +fn compute_and_report_stats(maxes: &Arc>>, total_txs_sent: u64) { + let mut max_txs = 0; + let mut max_elapsed = Duration::new(0, 0); + println!("| Max TPS | Total Transactions"); + println!("+---------------+--------------------"); for stats in maxes.read().unwrap().iter() { - let maybe_flag = match stats.tx_count { + let maybe_flag = match stats.txs { 0 => "!!!!!", _ => "", }; - info!("| {:13.2} | {} {}", stats.tps, stats.tx_count, maybe_flag); + println!("| {:13.2} | {} {}", stats.tps, stats.txs, maybe_flag); - if stats.tx_time > max_tx_time { - max_tx_time = stats.tx_time; + if stats.elapsed > max_elapsed { + max_elapsed = stats.elapsed; } - if stats.tx_count > max_tx_count { - max_tx_count = stats.tx_count; + if stats.txs > max_txs { + max_txs = stats.txs; } } - info!("+---------------+--------------------"); - - if max_tx_count > total_tx_send_count as u64 { - error!( - "{} more transactions sampled ({}) then were sent ({})", - max_tx_count - total_tx_send_count as u64, - max_tx_count, - total_tx_send_count - ); - } else { - info!( - "{} txs dropped ({:.2}%)", - total_tx_send_count as u64 - max_tx_count, - (total_tx_send_count as u64 - max_tx_count) as f64 / total_tx_send_count as f64 - * 100_f64 + println!("+---------------+--------------------"); + + if max_txs >= total_txs_sent { + println!( + "Warning: Average TPS might be under reported, there were no txs sent for a portion of the duration" ); + max_txs = total_txs_sent; } - info!( - "\tAverage TPS: {}", - max_tx_count as f32 / max_tx_time.as_secs() as f32 + println!( + "{} txs outstanding when test ended (lag) ({:.2}%)", + total_txs_sent - max_txs, + (total_txs_sent - max_txs) as f64 / total_txs_sent as f64 * 100_f64 + ); + println!( + "\tAverage TPS: {:.2}", + max_txs as f32 / max_elapsed.as_secs() as f32 ); } @@ -972,7 +980,7 @@ pub fn airdrop_lamports(client: &Client, drone_addr: &SocketAddr, id: &Keypair, let amount_to_drop = amount - balance; - info!( + println!( "Airdropping {:?} lamports from {} for {}", amount_to_drop, drone_addr, @@ -1055,12 +1063,13 @@ mod tests { let mut config = Config::default(); config.identity = Keypair::new(); - config.threads = 4; - config.duration = Duration::from_secs(5); + config.threads = 1; + config.duration = Duration::from_secs(30); config.fund_amount = 100_000; - config.trade_delay = 0; - config.batch_size = 10; - config.account_groups = 100; + config.transfer_delay = 50; + config.batch_size = 1000; + config.chunk_size = 250; + config.account_groups = 10; let Config { fund_amount, batch_size, @@ -1088,7 +1097,7 @@ mod tests { run_local_drone(drone_keypair, addr_sender, Some(1_000_000_000_000)); let drone_addr = addr_receiver.recv_timeout(Duration::from_secs(2)).unwrap(); - info!("Connecting to the cluster"); + println!("Connecting to the cluster"); let nodes = discover_nodes(&cluster.entry_point_info.gossip, NUM_NODES).unwrap_or_else(|err| { error!("Failed to discover {} nodes: {:?}", NUM_NODES, err); @@ -1125,12 +1134,13 @@ mod tests { let mut config = Config::default(); config.identity = identity; - config.threads = 4; - config.duration = Duration::from_secs(5); + config.threads = 1; + config.duration = Duration::from_secs(30); config.fund_amount = 100_000; - config.trade_delay = 1; - config.batch_size = 10; - config.account_groups = 100; + config.transfer_delay = 0; + config.batch_size = 100; + config.chunk_size = 100; + config.account_groups = 50; do_bench_exchange(clients, config); } diff --git a/bench-exchange/src/cli.rs b/bench-exchange/src/cli.rs index 17a4655f3589a7..7f166f21fc2bf3 100644 --- a/bench-exchange/src/cli.rs +++ b/bench-exchange/src/cli.rs @@ -13,9 +13,10 @@ pub struct Config { pub threads: usize, pub num_nodes: usize, pub duration: Duration, - pub trade_delay: u64, + pub transfer_delay: u64, pub fund_amount: u64, pub batch_size: usize, + pub chunk_size: usize, pub account_groups: usize, } @@ -28,9 +29,10 @@ impl Default for Config { num_nodes: 1, threads: 4, duration: Duration::new(u64::max_value(), 0), - trade_delay: 0, + transfer_delay: 0, fund_amount: 100_000, batch_size: 100, + chunk_size: 100, account_groups: 100, } } @@ -95,13 +97,13 @@ pub fn build_args<'a, 'b>() -> App<'a, 'b> { .help("Seconds to run benchmark, then exit; default is forever"), ) .arg( - Arg::with_name("trade-delay") - .long("trade-delay") + Arg::with_name("transfer-delay") + .long("transfer-delay") .value_name("") .takes_value(true) .required(false) .default_value("0") - .help("Delay between trade requests in milliseconds"), + .help("Delay between each chunk"), ) .arg( Arg::with_name("fund-amount") @@ -119,7 +121,16 @@ pub fn build_args<'a, 'b>() -> App<'a, 'b> { .takes_value(true) .required(false) .default_value("1000") - .help("Number of bulk trades to submit between trade delays"), + .help("Number of transactions before the signer rolls over"), + ) + .arg( + Arg::with_name("chunk-size") + .long("chunk-size") + .value_name("") + .takes_value(true) + .required(false) + .default_value("1000") + .help("Number of transactions to generate and send at a time"), ) .arg( Arg::with_name("account-groups") @@ -161,12 +172,14 @@ pub fn extract_args<'a>(matches: &ArgMatches<'a>) -> Config { value_t!(matches.value_of("num-nodes"), usize).expect("Failed to parse num-nodes"); let duration = value_t!(matches.value_of("duration"), u64).expect("Failed to parse duration"); args.duration = Duration::from_secs(duration); - args.trade_delay = - value_t!(matches.value_of("trade-delay"), u64).expect("Failed to parse trade-delay"); + args.transfer_delay = + value_t!(matches.value_of("transfer-delay"), u64).expect("Failed to parse transfer-delay"); args.fund_amount = value_t!(matches.value_of("fund-amount"), u64).expect("Failed to parse fund-amount"); args.batch_size = value_t!(matches.value_of("batch-size"), usize).expect("Failed to parse batch-size"); + args.chunk_size = + value_t!(matches.value_of("chunk-size"), usize).expect("Failed to parse chunk-size"); args.account_groups = value_t!(matches.value_of("account-groups"), usize) .expect("Failed to parse account-groups"); diff --git a/bench-exchange/src/main.rs b/bench-exchange/src/main.rs index 13b83f0068ff44..ae7d972719bbc9 100644 --- a/bench-exchange/src/main.rs +++ b/bench-exchange/src/main.rs @@ -21,9 +21,10 @@ fn main() { threads, num_nodes, duration, - trade_delay, + transfer_delay, fund_amount, batch_size, + chunk_size, account_groups, .. } = cli_config; @@ -54,9 +55,10 @@ fn main() { identity, threads, duration, - trade_delay, + transfer_delay, fund_amount, batch_size, + chunk_size, account_groups, }; From 8a379cd32bbcd3cf53887b9a88277397c571b36c Mon Sep 17 00:00:00 2001 From: Jack May Date: Tue, 23 Apr 2019 15:41:52 -0700 Subject: [PATCH 2/2] nudge --- bench-exchange/src/bench.rs | 73 +++++++++++++++++++------------------ bench-exchange/src/cli.rs | 6 +-- bench-exchange/src/main.rs | 3 +- 3 files changed, 42 insertions(+), 40 deletions(-) diff --git a/bench-exchange/src/bench.rs b/bench-exchange/src/bench.rs index 4936b962773520..3caee8834dc325 100644 --- a/bench-exchange/src/bench.rs +++ b/bench-exchange/src/bench.rs @@ -99,7 +99,8 @@ where let clients: Vec<_> = clients.into_iter().map(Arc::new).collect(); let client = clients[0].as_ref(); - let total_keys = accounts_in_groups as u64 * 4; + const NUM_KEYPAIR_GROUPS: u64 = 4; + let total_keys = accounts_in_groups as u64 * NUM_KEYPAIR_GROUPS; info!("Generating {:?} keys", total_keys); let mut keypairs = generate_keypairs(total_keys); let trader_signers: Vec<_> = keypairs @@ -119,12 +120,12 @@ where .map(|keypair| keypair.pubkey()) .collect(); - println!("Fund trader accounts"); + info!("Fund trader accounts"); fund_keys(client, &identity, &trader_signers, fund_amount); - println!("Fund swapper accounts"); + info!("Fund swapper accounts"); fund_keys(client, &identity, &swapper_signers, fund_amount); - println!("Create {:?} source token accounts", src_pubkeys.len()); + info!("Create {:?} source token accounts", src_pubkeys.len()); create_token_accounts(client, &trader_signers, &src_pubkeys); info!("Create {:?} profit token accounts", profit_pubkeys.len()); create_token_accounts(client, &swapper_signers, &profit_pubkeys); @@ -132,8 +133,8 @@ where // Collect the max transaction rate and total tx count seen (single node only) let sample_stats = Arc::new(RwLock::new(Vec::new())); let sample_period = 1; // in seconds - println!("Sampling clients for tps every {} s", sample_period); - println!( + info!("Sampling clients for tps every {} s", sample_period); + info!( "Requesting and swapping trades with {} ms delay per thread...", transfer_delay ); @@ -205,7 +206,6 @@ where &shared_tx_active_thread_count, &trader_signers, &src_pubkeys, - &dst_pubkeys, transfer_delay, batch_size, chunk_size, @@ -231,17 +231,17 @@ where sleep(duration); - println!("Stopping threads"); + info!("Stopping threads"); exit_signal.store(true, Ordering::Relaxed); - println!("wait for trader thread"); + info!("Wait for trader thread"); let _ = trader_thread.join(); - println!("waiting for swapper thread"); + info!("Waiting for swapper thread"); let _ = swapper_thread.join(); - println!("wait for tx threads"); + info!("Wait for tx threads"); for t in s_threads { let _ = t.join(); } - println!("wait for sample threads"); + info!("Wait for sample threads"); for t in sample_threads { let _ = t.join(); } @@ -356,7 +356,7 @@ fn do_tx_transfers( ); } if exit_signal.load(Ordering::Relaxed) { - println!( + info!( " Thread Transferred {} Txs, avg {:.2}/s peak {:.2}/s", stats.total, (stats.total as f64 / stats.sent_ns as f64) * 1_000_000_000_f64, @@ -417,7 +417,7 @@ fn swapper( if exit_signal.load(Ordering::Relaxed) { break 'outer; } - debug!("Give up waiting, dump batch"); + error!("Give up waiting, dump batch"); continue 'outer; } debug!("{} waiting for trades batch to clear", tries); @@ -525,16 +525,16 @@ fn swapper( break 'outer; } } - println!( + info!( "{} Swaps, batch size {}, chunk size {}", stats.total, batch_size, chunk_size ); - println!( + info!( " Keygen avg {:.2}/s peak {:.2}/s", (stats.total as f64 / stats.keygen_ns as f64) * 1_000_000_000_f64, stats.keygen_peak_rate ); - println!( + info!( " Signed avg {:.2}/s peak {:.2}/s", (stats.total as f64 / stats.sign_ns as f64) * 1_000_000_000_f64, stats.sign_peak_rate @@ -687,16 +687,16 @@ fn trader( } if exit_signal.load(Ordering::Relaxed) { - println!( + info!( "{} Trades with batch size {} chunk size {}", stats.total, batch_size, chunk_size ); - println!( + info!( " Keygen avg {:.2}/s peak {:.2}/s", (stats.total as f64 / stats.keygen_ns as f64) * 1_000_000_000_f64, stats.keygen_peak_rate ); - println!( + info!( " Signed avg {:.2}/s peak {:.2}/s", (stats.total as f64 / stats.sign_ns as f64) * 1_000_000_000_f64, stats.sign_peak_rate @@ -717,7 +717,7 @@ where return true; } Err(e) => { - println!("error: {:?}", e); + info!("error: {:?}", e); } } } @@ -730,7 +730,7 @@ pub fn fund_keys(client: &Client, source: &Keypair, dests: &[Arc], lamp let mut funded: Vec<(&Keypair, u64)> = vec![(source, total)]; let mut notfunded: Vec<&Arc> = dests.iter().collect(); - println!( + info!( " Funding {} keys with {} lamports each", dests.len(), lamports @@ -927,8 +927,8 @@ pub fn create_token_accounts(client: &Client, signers: &[Arc], accounts fn compute_and_report_stats(maxes: &Arc>>, total_txs_sent: u64) { let mut max_txs = 0; let mut max_elapsed = Duration::new(0, 0); - println!("| Max TPS | Total Transactions"); - println!("+---------------+--------------------"); + info!("| Max TPS | Total Transactions"); + info!("+---------------+--------------------"); for stats in maxes.read().unwrap().iter() { let maybe_flag = match stats.txs { @@ -936,7 +936,7 @@ fn compute_and_report_stats(maxes: &Arc>>, total_txs_s _ => "", }; - println!("| {:13.2} | {} {}", stats.tps, stats.txs, maybe_flag); + info!("| {:13.2} | {} {}", stats.tps, stats.txs, maybe_flag); if stats.elapsed > max_elapsed { max_elapsed = stats.elapsed; @@ -945,20 +945,20 @@ fn compute_and_report_stats(maxes: &Arc>>, total_txs_s max_txs = stats.txs; } } - println!("+---------------+--------------------"); + info!("+---------------+--------------------"); if max_txs >= total_txs_sent { - println!( + info!( "Warning: Average TPS might be under reported, there were no txs sent for a portion of the duration" ); max_txs = total_txs_sent; } - println!( + info!( "{} txs outstanding when test ended (lag) ({:.2}%)", total_txs_sent - max_txs, (total_txs_sent - max_txs) as f64 / total_txs_sent as f64 * 100_f64 ); - println!( + info!( "\tAverage TPS: {:.2}", max_txs as f32 / max_elapsed.as_secs() as f32 ); @@ -980,7 +980,7 @@ pub fn airdrop_lamports(client: &Client, drone_addr: &SocketAddr, id: &Keypair, let amount_to_drop = amount - balance; - println!( + info!( "Airdropping {:?} lamports from {} for {}", amount_to_drop, drone_addr, @@ -1066,7 +1066,7 @@ mod tests { config.threads = 1; config.duration = Duration::from_secs(30); config.fund_amount = 100_000; - config.transfer_delay = 50; + config.transfer_delay = 40; config.batch_size = 1000; config.chunk_size = 250; config.account_groups = 10; @@ -1097,7 +1097,7 @@ mod tests { run_local_drone(drone_keypair, addr_sender, Some(1_000_000_000_000)); let drone_addr = addr_receiver.recv_timeout(Duration::from_secs(2)).unwrap(); - println!("Connecting to the cluster"); + info!("Connecting to the cluster"); let nodes = discover_nodes(&cluster.entry_point_info.gossip, NUM_NODES).unwrap_or_else(|err| { error!("Failed to discover {} nodes: {:?}", NUM_NODES, err); @@ -1114,11 +1114,12 @@ mod tests { exit(1); } + const NUM_SIGNERS: u64 = 2; airdrop_lamports( &clients[0], &drone_addr, &config.identity, - fund_amount * (accounts_in_groups + 1) as u64 * 2, + fund_amount * (accounts_in_groups + 1) as u64 * NUM_SIGNERS, ); do_bench_exchange(clients, config); @@ -1138,9 +1139,9 @@ mod tests { config.duration = Duration::from_secs(30); config.fund_amount = 100_000; config.transfer_delay = 0; - config.batch_size = 100; - config.chunk_size = 100; - config.account_groups = 50; + config.batch_size = 1000; + config.chunk_size = 500; + config.account_groups = 10; do_bench_exchange(clients, config); } diff --git a/bench-exchange/src/cli.rs b/bench-exchange/src/cli.rs index 7f166f21fc2bf3..5d41f030ba07db 100644 --- a/bench-exchange/src/cli.rs +++ b/bench-exchange/src/cli.rs @@ -76,7 +76,7 @@ pub fn build_args<'a, 'b>() -> App<'a, 'b> { .value_name("") .takes_value(true) .required(false) - .default_value("4") + .default_value("1") .help("Number of threads submitting transactions"), ) .arg( @@ -129,7 +129,7 @@ pub fn build_args<'a, 'b>() -> App<'a, 'b> { .value_name("") .takes_value(true) .required(false) - .default_value("1000") + .default_value("500") .help("Number of transactions to generate and send at a time"), ) .arg( @@ -138,7 +138,7 @@ pub fn build_args<'a, 'b>() -> App<'a, 'b> { .value_name("") .takes_value(true) .required(false) - .default_value("100") + .default_value("10") .help("Number of account groups to cycle for each batch"), ) } diff --git a/bench-exchange/src/main.rs b/bench-exchange/src/main.rs index ae7d972719bbc9..3cdfbf584479e4 100644 --- a/bench-exchange/src/main.rs +++ b/bench-exchange/src/main.rs @@ -44,11 +44,12 @@ fn main() { info!("Funding keypair: {}", identity.pubkey()); let accounts_in_groups = batch_size * account_groups; + const NUM_SIGNERS: u64 = 2; airdrop_lamports( &clients[0], &drone_addr, &identity, - fund_amount * (accounts_in_groups + 1) as u64 * 2, + fund_amount * (accounts_in_groups + 1) as u64 * NUM_SIGNERS, ); let config = Config {