From fe098b5ddcdb0009feded79613319ae0b8a16a3d Mon Sep 17 00:00:00 2001 From: Trent Nelson Date: Tue, 19 Oct 2021 17:11:46 -0600 Subject: [PATCH] rpc-send-tx-svc: add with_config constructor --- Cargo.lock | 3 + core/Cargo.toml | 1 + core/src/validator.rs | 10 +- local-cluster/src/validator_configs.rs | 3 +- replica-node/Cargo.toml | 1 + replica-node/src/replica_node.rs | 8 +- rpc/src/rpc_service.rs | 17 +-- .../src/send_transaction_service.rs | 100 ++++++++++++------ validator/Cargo.toml | 1 + validator/src/main.rs | 24 +++-- 10 files changed, 109 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f7967c8eff2c67..5f170c472db924 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4664,6 +4664,7 @@ dependencies = [ "solana-rpc", "solana-runtime", "solana-sdk", + "solana-send-transaction-service", "solana-stake-program", "solana-streamer", "solana-transaction-status", @@ -5423,6 +5424,7 @@ dependencies = [ "solana-rpc", "solana-runtime", "solana-sdk", + "solana-send-transaction-service", "solana-streamer", "solana-validator", "solana-version", @@ -5836,6 +5838,7 @@ dependencies = [ "solana-rpc", "solana-runtime", "solana-sdk", + "solana-send-transaction-service", "solana-streamer", "solana-version", "solana-vote-program", diff --git a/core/Cargo.toml b/core/Cargo.toml index 63604b019e19bf..97ff30bf2cb5ed 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -54,6 +54,7 @@ solana-runtime = { path = "../runtime", version = "=1.9.0" } solana-sdk = { path = "../sdk", version = "=1.9.0" } solana-frozen-abi = { path = "../frozen-abi", version = "=1.9.0" } solana-frozen-abi-macro = { path = "../frozen-abi/macro", version = "=1.9.0" } +solana-send-transaction-service = { path = "../send-transaction-service", version = "=1.9.0" } solana-streamer = { path = "../streamer", version = "=1.9.0" } solana-transaction-status = { path = "../transaction-status", version = "=1.9.0" } solana-vote-program = { path = "../programs/vote", version = "=1.9.0" } diff --git a/core/src/validator.rs b/core/src/validator.rs index fd3c1223fface1..b0eda391760263 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -88,6 +88,7 @@ use { signature::{Keypair, Signer}, timing::timestamp, }, + solana_send_transaction_service::send_transaction_service, solana_streamer::socket::SocketAddrSpace, solana_vote_program::vote_state::VoteState, std::{ @@ -148,8 +149,7 @@ pub struct ValidatorConfig { pub contact_debug_interval: u64, pub contact_save_interval: u64, pub bpf_jit: bool, - pub send_transaction_retry_ms: u64, - pub send_transaction_leader_forward_count: u64, + pub send_transaction_service_config: send_transaction_service::Config, pub no_poh_speed_test: bool, pub poh_pinned_cpu_core: usize, pub poh_hashes_per_batch: u64, @@ -209,8 +209,7 @@ impl Default for ValidatorConfig { contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS, contact_save_interval: DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS, bpf_jit: false, - send_transaction_retry_ms: 2000, - send_transaction_leader_forward_count: 2, + send_transaction_service_config: send_transaction_service::Config::default(), no_poh_speed_test: true, poh_pinned_cpu_core: poh_service::DEFAULT_PINNED_CPU_CORE, poh_hashes_per_batch: poh_service::DEFAULT_HASHES_PER_BATCH, @@ -616,8 +615,7 @@ impl Validator { config.trusted_validators.clone(), rpc_override_health_check.clone(), optimistically_confirmed_bank.clone(), - config.send_transaction_retry_ms, - config.send_transaction_leader_forward_count, + config.send_transaction_service_config.clone(), max_slots.clone(), leader_schedule_cache.clone(), max_complete_transaction_status_slot, diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 6d42b4a8e78544..7a5082ceaabc92 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -44,8 +44,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { contact_debug_interval: config.contact_debug_interval, contact_save_interval: config.contact_save_interval, bpf_jit: config.bpf_jit, - send_transaction_retry_ms: config.send_transaction_retry_ms, - send_transaction_leader_forward_count: config.send_transaction_leader_forward_count, + send_transaction_service_config: config.send_transaction_service_config.clone(), no_poh_speed_test: config.no_poh_speed_test, poh_pinned_cpu_core: config.poh_pinned_cpu_core, account_indexes: config.account_indexes.clone(), diff --git a/replica-node/Cargo.toml b/replica-node/Cargo.toml index 7ddce4db720c2b..787fe497aa190e 100644 --- a/replica-node/Cargo.toml +++ b/replica-node/Cargo.toml @@ -25,6 +25,7 @@ solana-rpc = { path = "../rpc", version = "=1.9.0" } solana-replica-lib = { path = "../replica-lib", version = "=1.9.0" } solana-runtime = { path = "../runtime", version = "=1.9.0" } solana-sdk = { path = "../sdk", version = "=1.9.0" } +solana-send-transaction-service = { path = "../send-transaction-service", version = "=1.9.0" } solana-streamer = { path = "../streamer", version = "=1.9.0" } solana-version = { path = "../version", version = "=1.9.0" } solana-validator = { path = "../validator", version = "=1.9.0" } diff --git a/replica-node/src/replica_node.rs b/replica-node/src/replica_node.rs index 720692f0d7ab35..9797a583121a0a 100644 --- a/replica-node/src/replica_node.rs +++ b/replica-node/src/replica_node.rs @@ -26,6 +26,7 @@ use { snapshot_config::SnapshotConfig, snapshot_package::SnapshotType, snapshot_utils, }, solana_sdk::{clock::Slot, exit::Exit, genesis_config::GenesisConfig, hash::Hash}, + solana_send_transaction_service::send_transaction_service, solana_streamer::socket::SocketAddrSpace, std::{ fs, @@ -237,8 +238,11 @@ fn start_client_rpc_services( None, rpc_override_health_check, optimistically_confirmed_bank.clone(), - 0, - 0, + send_transaction_service::Config { + retry_rate_ms: 0, + leader_forward_count: 0, + ..send_transaction_service::Config::default() + }, max_slots, leader_schedule_cache.clone(), max_complete_transaction_status_slot, diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index 256d5cb8452c3b..0a602b8c41f551 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -34,7 +34,7 @@ use { exit::Exit, genesis_config::DEFAULT_GENESIS_DOWNLOAD_PATH, hash::Hash, native_token::lamports_to_sol, pubkey::Pubkey, }, - solana_send_transaction_service::send_transaction_service::SendTransactionService, + solana_send_transaction_service::send_transaction_service::{self, SendTransactionService}, std::{ collections::HashSet, net::SocketAddr, @@ -297,8 +297,7 @@ impl JsonRpcService { trusted_validators: Option>, override_health_check: Arc, optimistically_confirmed_bank: Arc>, - send_transaction_retry_ms: u64, - send_transaction_leader_forward_count: u64, + send_transaction_service_config: send_transaction_service::Config, max_slots: Arc, leader_schedule_cache: Arc, current_transaction_status_slot: Arc, @@ -395,13 +394,12 @@ impl JsonRpcService { let leader_info = poh_recorder.map(|recorder| ClusterTpuInfo::new(cluster_info.clone(), recorder)); - let _send_transaction_service = Arc::new(SendTransactionService::new( + let _send_transaction_service = Arc::new(SendTransactionService::new_with_config( tpu_address, &bank_forks, leader_info, receiver, - send_transaction_retry_ms, - send_transaction_leader_forward_count, + send_transaction_service_config, )); #[cfg(test)] @@ -557,8 +555,11 @@ mod tests { None, Arc::new(AtomicBool::new(false)), optimistically_confirmed_bank, - 1000, - 1, + send_transaction_service::Config { + retry_rate_ms: 1000, + leader_forward_count: 1, + ..send_transaction_service::Config::default() + }, Arc::new(MaxSlots::default()), Arc::new(LeaderScheduleCache::default()), Arc::new(AtomicU64::default()), diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 468ee944867603..c3d8201619da98 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -18,6 +18,10 @@ use { /// Maximum size of the transaction queue const MAX_TRANSACTION_QUEUE_SIZE: usize = 10_000; // This seems like a lot but maybe it needs to be bigger one day +/// Default retry interval +const DEFAULT_RETRY_RATE_MS: u64 = 2_000; +/// Default number of leaders to forward transactions to +const DEFAULT_LEADER_FORWARD_COUNT: u64 = 2; pub struct SendTransactionService { thread: JoinHandle<()>, @@ -61,6 +65,21 @@ struct ProcessTransactionsResult { retained: u64, } +#[derive(Clone, Debug)] +pub struct Config { + pub retry_rate_ms: u64, + pub leader_forward_count: u64, +} + +impl Default for Config { + fn default() -> Self { + Self { + retry_rate_ms: DEFAULT_RETRY_RATE_MS, + leader_forward_count: DEFAULT_LEADER_FORWARD_COUNT, + } + } +} + impl SendTransactionService { pub fn new( tpu_address: SocketAddr, @@ -69,14 +88,28 @@ impl SendTransactionService { receiver: Receiver, retry_rate_ms: u64, leader_forward_count: u64, + ) -> Self { + let config = Config { + retry_rate_ms, + leader_forward_count, + ..Config::default() + }; + Self::new_with_config(tpu_address, bank_forks, leader_info, receiver, config) + } + + pub fn new_with_config( + tpu_address: SocketAddr, + bank_forks: &Arc>, + leader_info: Option, + receiver: Receiver, + config: Config, ) -> Self { let thread = Self::retry_thread( tpu_address, receiver, bank_forks.clone(), leader_info, - retry_rate_ms, - leader_forward_count, + config, ); Self { thread } } @@ -86,8 +119,7 @@ impl SendTransactionService { receiver: Receiver, bank_forks: Arc>, mut leader_info: Option, - retry_rate_ms: u64, - leader_forward_count: u64, + config: Config, ) -> JoinHandle<()> { let mut last_status_check = Instant::now(); let mut last_leader_refresh = Instant::now(); @@ -101,13 +133,13 @@ impl SendTransactionService { Builder::new() .name("send-tx-sv2".to_string()) .spawn(move || loop { - match receiver.recv_timeout(Duration::from_millis(1000.min(retry_rate_ms))) { + match receiver.recv_timeout(Duration::from_millis(1000.min(config.retry_rate_ms))) { Err(RecvTimeoutError::Disconnected) => break, Err(RecvTimeoutError::Timeout) => {} Ok(transaction_info) => { - let addresses = leader_info - .as_ref() - .map(|leader_info| leader_info.get_leader_tpus(leader_forward_count)); + let addresses = leader_info.as_ref().map(|leader_info| { + leader_info.get_leader_tpus(config.leader_forward_count) + }); let addresses = addresses .map(|address_list| { if address_list.is_empty() { @@ -132,7 +164,7 @@ impl SendTransactionService { } } - if last_status_check.elapsed().as_millis() as u64 >= retry_rate_ms { + if last_status_check.elapsed().as_millis() as u64 >= config.retry_rate_ms { if !transactions.is_empty() { datapoint_info!( "send_transaction_service-queue-size", @@ -153,7 +185,7 @@ impl SendTransactionService { &tpu_address, &mut transactions, &leader_info, - leader_forward_count, + &config, ); } last_status_check = Instant::now(); @@ -175,7 +207,7 @@ impl SendTransactionService { tpu_address: &SocketAddr, transactions: &mut HashMap, leader_info: &Option, - leader_forward_count: u64, + config: &Config, ) -> ProcessTransactionsResult { let mut result = ProcessTransactionsResult::default(); @@ -223,9 +255,9 @@ impl SendTransactionService { result.retried += 1; transaction_info.retries += 1; inc_new_counter_info!("send_transaction_service-retry", 1); - let addresses = leader_info - .as_ref() - .map(|leader_info| leader_info.get_leader_tpus(leader_forward_count)); + let addresses = leader_info.as_ref().map(|leader_info| { + leader_info.get_leader_tpus(config.leader_forward_count) + }); let addresses = addresses .map(|address_list| { if address_list.is_empty() { @@ -318,7 +350,10 @@ mod test { let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let tpu_address = "127.0.0.1:0".parse().unwrap(); - let leader_forward_count = 1; + let config = Config { + leader_forward_count: 1, + ..Config::default() + }; let root_bank = Arc::new(Bank::new_from_parent( &bank_forks.read().unwrap().working_bank(), @@ -364,7 +399,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert!(transactions.is_empty()); assert_eq!( @@ -393,7 +428,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert!(transactions.is_empty()); assert_eq!( @@ -422,7 +457,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert!(transactions.is_empty()); assert_eq!( @@ -451,7 +486,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert_eq!(transactions.len(), 1); assert_eq!( @@ -481,7 +516,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert_eq!(transactions.len(), 1); assert_eq!( @@ -521,7 +556,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert_eq!(transactions.len(), 1); assert_eq!( @@ -539,7 +574,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert!(transactions.is_empty()); assert_eq!( @@ -560,7 +595,10 @@ mod test { let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let tpu_address = "127.0.0.1:0".parse().unwrap(); - let leader_forward_count = 1; + let config = Config { + leader_forward_count: 1, + ..Config::default() + }; let root_bank = Arc::new(Bank::new_from_parent( &bank_forks.read().unwrap().working_bank(), @@ -619,7 +657,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert!(transactions.is_empty()); assert_eq!( @@ -647,7 +685,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert!(transactions.is_empty()); assert_eq!( @@ -677,7 +715,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert!(transactions.is_empty()); assert_eq!( @@ -705,7 +743,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert!(transactions.is_empty()); assert_eq!( @@ -734,7 +772,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert!(transactions.is_empty()); assert_eq!( @@ -763,7 +801,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert_eq!(transactions.len(), 1); assert_eq!( @@ -793,7 +831,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert_eq!(transactions.len(), 1); assert_eq!( @@ -821,7 +859,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert_eq!(transactions.len(), 0); assert_eq!( diff --git a/validator/Cargo.toml b/validator/Cargo.toml index b57c64d0628382..cebd1136a791c0 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -44,6 +44,7 @@ solana-replica-lib = { path = "../replica-lib", version = "=1.9.0" } solana-rpc = { path = "../rpc", version = "=1.9.0" } solana-runtime = { path = "../runtime", version = "=1.9.0" } solana-sdk = { path = "../sdk", version = "=1.9.0" } +solana-send-transaction-service = { path = "../send-transaction-service", version = "=1.9.0" } solana-streamer = { path = "../streamer", version = "=1.9.0" } solana-version = { path = "../version", version = "=1.9.0" } solana-vote-program = { path = "../programs/vote", version = "=1.9.0" } diff --git a/validator/src/main.rs b/validator/src/main.rs index 8eff4fe4dd5a8b..887d770f3dcf4a 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -59,6 +59,7 @@ use { pubkey::Pubkey, signature::{Keypair, Signer}, }, + solana_send_transaction_service::send_transaction_service, solana_streamer::socket::SocketAddrSpace, solana_validator::{ admin_rpc_service, bootstrap, dashboard::Dashboard, ledger_lockfile, lock_ledger, @@ -422,11 +423,12 @@ pub fn main() { PubSubConfig::default().queue_capacity_items.to_string(); let default_rpc_pubsub_queue_capacity_bytes = PubSubConfig::default().queue_capacity_bytes.to_string(); - let default_rpc_send_transaction_retry_ms = ValidatorConfig::default() - .send_transaction_retry_ms + let default_send_transaction_service_config = send_transaction_service::Config::default(); + let default_rpc_send_transaction_retry_ms = default_send_transaction_service_config + .retry_rate_ms .to_string(); - let default_rpc_send_transaction_leader_forward_count = ValidatorConfig::default() - .send_transaction_leader_forward_count + let default_rpc_send_transaction_leader_forward_count = default_send_transaction_service_config + .leader_forward_count .to_string(); let default_rpc_threads = num_cpus::get().to_string(); let default_accountsdb_repl_threads = num_cpus::get().to_string(); @@ -2093,12 +2095,14 @@ pub fn main() { debug_keys, contact_debug_interval, bpf_jit: !matches.is_present("no_bpf_jit"), - send_transaction_retry_ms: value_t_or_exit!(matches, "rpc_send_transaction_retry_ms", u64), - send_transaction_leader_forward_count: value_t_or_exit!( - matches, - "rpc_send_transaction_leader_forward_count", - u64 - ), + send_transaction_service_config: send_transaction_service::Config { + retry_rate_ms: value_t_or_exit!(matches, "rpc_send_transaction_retry_ms", u64), + leader_forward_count: value_t_or_exit!( + matches, + "rpc_send_transaction_leader_forward_count", + u64 + ), + }, no_poh_speed_test: matches.is_present("no_poh_speed_test"), poh_pinned_cpu_core: value_of(&matches, "poh_pinned_cpu_core") .unwrap_or(poh_service::DEFAULT_PINNED_CPU_CORE),