From 8cba6cca762bc2fd036302e9f5d005af88a0bbf8 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 21 Oct 2021 02:15:03 +0000 Subject: [PATCH] rpc-send-tx-svc server-side retry knobs (backport #20818) (#20830) * rpc-send-tx-svc: add with_config constructor (cherry picked from commit fe098b5ddcdb0009feded79613319ae0b8a16a3d) # Conflicts: # Cargo.lock # core/Cargo.toml # replica-node/Cargo.toml # rpc/src/rpc_service.rs # rpc/src/send_transaction_service.rs # validator/Cargo.toml * rpc-send-tx-svc: server-side retry knobs (cherry picked from commit 2744a2128c2c0cfca5bd5b587c92497e0a4a4d4f) Co-authored-by: Trent Nelson --- core/src/validator.rs | 10 +-- local-cluster/src/validator_configs.rs | 3 +- rpc/src/rpc_service.rs | 17 ++-- rpc/src/send_transaction_service.rs | 114 ++++++++++++++++++------- validator/src/main.rs | 56 +++++++++--- 5 files changed, 141 insertions(+), 59 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index b5aad60d0c17cc..505f9a22076806 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -45,6 +45,7 @@ use { poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, poh_service::{self, PohService}, }, + solana_rpc::send_transaction_service, solana_rpc::{ max_slots::MaxSlots, optimistically_confirmed_bank_tracker::{ @@ -138,8 +139,7 @@ pub struct ValidatorConfig { pub contact_debug_interval: u64, pub contact_save_interval: u64, pub bpf_jit: bool, - pub send_transaction_retry_ms: u64, - pub send_transaction_leader_forward_count: u64, + pub send_transaction_service_config: send_transaction_service::Config, pub no_poh_speed_test: bool, pub poh_pinned_cpu_core: usize, pub poh_hashes_per_batch: u64, @@ -197,8 +197,7 @@ impl Default for ValidatorConfig { contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS, contact_save_interval: DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS, bpf_jit: false, - send_transaction_retry_ms: 2000, - send_transaction_leader_forward_count: 2, + send_transaction_service_config: send_transaction_service::Config::default(), no_poh_speed_test: true, poh_pinned_cpu_core: poh_service::DEFAULT_PINNED_CPU_CORE, poh_hashes_per_batch: poh_service::DEFAULT_HASHES_PER_BATCH, @@ -592,8 +591,7 @@ impl Validator { config.trusted_validators.clone(), rpc_override_health_check.clone(), optimistically_confirmed_bank.clone(), - config.send_transaction_retry_ms, - config.send_transaction_leader_forward_count, + config.send_transaction_service_config.clone(), max_slots.clone(), leader_schedule_cache.clone(), max_complete_transaction_status_slot, diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 1d3927ed71f5b9..7b8747d7f771ee 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -43,8 +43,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { contact_debug_interval: config.contact_debug_interval, contact_save_interval: config.contact_save_interval, bpf_jit: config.bpf_jit, - send_transaction_retry_ms: config.send_transaction_retry_ms, - send_transaction_leader_forward_count: config.send_transaction_leader_forward_count, + send_transaction_service_config: config.send_transaction_service_config.clone(), no_poh_speed_test: config.no_poh_speed_test, poh_pinned_cpu_core: config.poh_pinned_cpu_core, account_indexes: config.account_indexes.clone(), diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index ca38192a261f09..531ca3733528f6 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -6,7 +6,7 @@ use { optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, rpc::{rpc_deprecated_v1_7::*, rpc_full::*, rpc_minimal::*, rpc_obsolete_v1_7::*, *}, rpc_health::*, - send_transaction_service::{LeaderInfo, SendTransactionService}, + send_transaction_service::{self, LeaderInfo, SendTransactionService}, }, jsonrpc_core::{futures::prelude::*, MetaIoHandler}, jsonrpc_http_server::{ @@ -280,8 +280,7 @@ impl JsonRpcService { trusted_validators: Option>, override_health_check: Arc, optimistically_confirmed_bank: Arc>, - send_transaction_retry_ms: u64, - send_transaction_leader_forward_count: u64, + send_transaction_service_config: send_transaction_service::Config, max_slots: Arc, leader_schedule_cache: Arc, current_transaction_status_slot: Arc, @@ -378,13 +377,12 @@ impl JsonRpcService { let leader_info = poh_recorder.map(|recorder| LeaderInfo::new(cluster_info.clone(), recorder)); - let _send_transaction_service = Arc::new(SendTransactionService::new( + let _send_transaction_service = Arc::new(SendTransactionService::new_with_config( tpu_address, &bank_forks, leader_info, receiver, - send_transaction_retry_ms, - send_transaction_leader_forward_count, + send_transaction_service_config, )); #[cfg(test)] @@ -540,8 +538,11 @@ mod tests { None, Arc::new(AtomicBool::new(false)), optimistically_confirmed_bank, - 1000, - 1, + send_transaction_service::Config { + retry_rate_ms: 1000, + leader_forward_count: 1, + ..send_transaction_service::Config::default() + }, Arc::new(MaxSlots::default()), Arc::new(LeaderScheduleCache::default()), Arc::new(AtomicU64::default()), diff --git a/rpc/src/send_transaction_service.rs b/rpc/src/send_transaction_service.rs index 3162eab3566544..03df7195cfb210 100644 --- a/rpc/src/send_transaction_service.rs +++ b/rpc/src/send_transaction_service.rs @@ -23,6 +23,12 @@ use { /// Maximum size of the transaction queue const MAX_TRANSACTION_QUEUE_SIZE: usize = 10_000; // This seems like a lot but maybe it needs to be bigger one day +/// Default retry interval +const DEFAULT_RETRY_RATE_MS: u64 = 2_000; +/// Default number of leaders to forward transactions to +const DEFAULT_LEADER_FORWARD_COUNT: u64 = 2; +/// Default max number of time the service will retry broadcast +const DEFAULT_SERVICE_MAX_RETRIES: usize = usize::MAX; pub struct SendTransactionService { thread: JoinHandle<()>, @@ -108,6 +114,25 @@ struct ProcessTransactionsResult { retained: u64, } +#[derive(Clone, Debug)] +pub struct Config { + pub retry_rate_ms: u64, + pub leader_forward_count: u64, + pub default_max_retries: Option, + pub service_max_retries: usize, +} + +impl Default for Config { + fn default() -> Self { + Self { + retry_rate_ms: DEFAULT_RETRY_RATE_MS, + leader_forward_count: DEFAULT_LEADER_FORWARD_COUNT, + default_max_retries: None, + service_max_retries: DEFAULT_SERVICE_MAX_RETRIES, + } + } +} + impl SendTransactionService { pub fn new( tpu_address: SocketAddr, @@ -116,14 +141,28 @@ impl SendTransactionService { receiver: Receiver, retry_rate_ms: u64, leader_forward_count: u64, + ) -> Self { + let config = Config { + retry_rate_ms, + leader_forward_count, + ..Config::default() + }; + Self::new_with_config(tpu_address, bank_forks, leader_info, receiver, config) + } + + pub fn new_with_config( + tpu_address: SocketAddr, + bank_forks: &Arc>, + leader_info: Option, + receiver: Receiver, + config: Config, ) -> Self { let thread = Self::retry_thread( tpu_address, receiver, bank_forks.clone(), leader_info, - retry_rate_ms, - leader_forward_count, + config, ); Self { thread } } @@ -133,8 +172,7 @@ impl SendTransactionService { receiver: Receiver, bank_forks: Arc>, mut leader_info: Option, - retry_rate_ms: u64, - leader_forward_count: u64, + config: Config, ) -> JoinHandle<()> { let mut last_status_check = Instant::now(); let mut last_leader_refresh = Instant::now(); @@ -148,13 +186,13 @@ impl SendTransactionService { Builder::new() .name("send-tx-sv2".to_string()) .spawn(move || loop { - match receiver.recv_timeout(Duration::from_millis(1000.min(retry_rate_ms))) { + match receiver.recv_timeout(Duration::from_millis(1000.min(config.retry_rate_ms))) { Err(RecvTimeoutError::Disconnected) => break, Err(RecvTimeoutError::Timeout) => {} Ok(transaction_info) => { - let addresses = leader_info - .as_ref() - .map(|leader_info| leader_info.get_leader_tpus(leader_forward_count)); + let addresses = leader_info.as_ref().map(|leader_info| { + leader_info.get_leader_tpus(config.leader_forward_count) + }); let addresses = addresses .map(|address_list| { if address_list.is_empty() { @@ -179,7 +217,7 @@ impl SendTransactionService { } } - if last_status_check.elapsed().as_millis() as u64 >= retry_rate_ms { + if last_status_check.elapsed().as_millis() as u64 >= config.retry_rate_ms { if !transactions.is_empty() { datapoint_info!( "send_transaction_service-queue-size", @@ -200,7 +238,7 @@ impl SendTransactionService { &tpu_address, &mut transactions, &leader_info, - leader_forward_count, + &config, ); } last_status_check = Instant::now(); @@ -222,7 +260,7 @@ impl SendTransactionService { tpu_address: &SocketAddr, transactions: &mut HashMap, leader_info: &Option, - leader_forward_count: u64, + config: &Config, ) -> ProcessTransactionsResult { let mut result = ProcessTransactionsResult::default(); @@ -253,7 +291,13 @@ impl SendTransactionService { inc_new_counter_info!("send_transaction_service-expired", 1); return false; } - if let Some(max_retries) = transaction_info.max_retries { + + let max_retries = transaction_info + .max_retries + .or(config.default_max_retries) + .map(|max_retries| max_retries.min(config.service_max_retries)); + + if let Some(max_retries) = max_retries { if transaction_info.retries >= max_retries { info!("Dropping transaction due to max retries: {}", signature); result.max_retries_elapsed += 1; @@ -270,9 +314,9 @@ impl SendTransactionService { result.retried += 1; transaction_info.retries += 1; inc_new_counter_info!("send_transaction_service-retry", 1); - let addresses = leader_info - .as_ref() - .map(|leader_info| leader_info.get_leader_tpus(leader_forward_count)); + let addresses = leader_info.as_ref().map(|leader_info| { + leader_info.get_leader_tpus(config.leader_forward_count) + }); let addresses = addresses .map(|address_list| { if address_list.is_empty() { @@ -372,7 +416,10 @@ mod test { let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let tpu_address = "127.0.0.1:0".parse().unwrap(); - let leader_forward_count = 1; + let config = Config { + leader_forward_count: 1, + ..Config::default() + }; let root_bank = Arc::new(Bank::new_from_parent( &bank_forks.read().unwrap().working_bank(), @@ -418,7 +465,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert!(transactions.is_empty()); assert_eq!( @@ -447,7 +494,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert!(transactions.is_empty()); assert_eq!( @@ -476,7 +523,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert!(transactions.is_empty()); assert_eq!( @@ -505,7 +552,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert_eq!(transactions.len(), 1); assert_eq!( @@ -535,7 +582,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert_eq!(transactions.len(), 1); assert_eq!( @@ -575,7 +622,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert_eq!(transactions.len(), 1); assert_eq!( @@ -593,7 +640,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert!(transactions.is_empty()); assert_eq!( @@ -614,7 +661,10 @@ mod test { let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let tpu_address = "127.0.0.1:0".parse().unwrap(); - let leader_forward_count = 1; + let config = Config { + leader_forward_count: 1, + ..Config::default() + }; let root_bank = Arc::new(Bank::new_from_parent( &bank_forks.read().unwrap().working_bank(), @@ -673,7 +723,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert!(transactions.is_empty()); assert_eq!( @@ -701,7 +751,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert!(transactions.is_empty()); assert_eq!( @@ -731,7 +781,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert!(transactions.is_empty()); assert_eq!( @@ -759,7 +809,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert!(transactions.is_empty()); assert_eq!( @@ -788,7 +838,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert!(transactions.is_empty()); assert_eq!( @@ -817,7 +867,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert_eq!(transactions.len(), 1); assert_eq!( @@ -847,7 +897,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert_eq!(transactions.len(), 1); assert_eq!( @@ -875,7 +925,7 @@ mod test { &tpu_address, &mut transactions, &None, - leader_forward_count, + &config, ); assert_eq!(transactions.len(), 0); assert_eq!( diff --git a/validator/src/main.rs b/validator/src/main.rs index 2668f20ea33706..368fa2d6a9e4ac 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -36,7 +36,7 @@ use { solana_ledger::blockstore_db::BlockstoreRecoveryMode, solana_perf::recycler::enable_recycler_warming, solana_poh::poh_service, - solana_rpc::{rpc::JsonRpcConfig, rpc_pubsub_service::PubSubConfig}, + solana_rpc::{rpc::JsonRpcConfig, rpc_pubsub_service::PubSubConfig, send_transaction_service}, solana_runtime::{ accounts_db::{ AccountShrinkThreshold, DEFAULT_ACCOUNTS_SHRINK_OPTIMIZE_TOTAL_SPACE, @@ -1062,11 +1062,15 @@ pub fn main() { PubSubConfig::default().queue_capacity_items.to_string(); let default_rpc_pubsub_queue_capacity_bytes = PubSubConfig::default().queue_capacity_bytes.to_string(); - let default_rpc_send_transaction_retry_ms = ValidatorConfig::default() - .send_transaction_retry_ms + let default_send_transaction_service_config = send_transaction_service::Config::default(); + let default_rpc_send_transaction_retry_ms = default_send_transaction_service_config + .retry_rate_ms .to_string(); - let default_rpc_send_transaction_leader_forward_count = ValidatorConfig::default() - .send_transaction_leader_forward_count + let default_rpc_send_transaction_leader_forward_count = default_send_transaction_service_config + .leader_forward_count + .to_string(); + let default_rpc_send_transaction_service_max_retries = default_send_transaction_service_config + .service_max_retries .to_string(); let default_rpc_threads = num_cpus::get().to_string(); let default_max_snapshot_to_retain = &DEFAULT_MAX_SNAPSHOTS_TO_RETAIN.to_string(); @@ -1721,6 +1725,23 @@ pub fn main() { .default_value(&default_rpc_send_transaction_leader_forward_count) .help("The number of upcoming leaders to which to forward transactions sent via rpc service."), ) + .arg( + Arg::with_name("rpc_send_transaction_default_max_retries") + .long("rpc-send-default-max-retries") + .value_name("NUMBER") + .takes_value(true) + .validator(is_parsable::) + .help("The maximum number of transaction broadcast retries when unspecified by the request, otherwise retried until expiration."), + ) + .arg( + Arg::with_name("rpc_send_transaction_service_max_retries") + .long("rpc-send-service-max-retries") + .value_name("NUMBER") + .takes_value(true) + .validator(is_parsable::) + .default_value(&default_rpc_send_transaction_service_max_retries) + .help("The maximum number of transaction broadcast retries, regardless of requested value."), + ) .arg( Arg::with_name("rpc_scan_and_fix_roots") .long("rpc-scan-and-fix-roots") @@ -2423,12 +2444,25 @@ pub fn main() { debug_keys, contact_debug_interval, bpf_jit: !matches.is_present("no_bpf_jit"), - send_transaction_retry_ms: value_t_or_exit!(matches, "rpc_send_transaction_retry_ms", u64), - send_transaction_leader_forward_count: value_t_or_exit!( - matches, - "rpc_send_transaction_leader_forward_count", - u64 - ), + send_transaction_service_config: send_transaction_service::Config { + retry_rate_ms: value_t_or_exit!(matches, "rpc_send_transaction_retry_ms", u64), + leader_forward_count: value_t_or_exit!( + matches, + "rpc_send_transaction_leader_forward_count", + u64 + ), + default_max_retries: value_t!( + matches, + "rpc_send_transaction_default_max_retries", + usize + ) + .ok(), + service_max_retries: value_t_or_exit!( + matches, + "rpc_send_transaction_service_max_retries", + usize + ), + }, no_poh_speed_test: matches.is_present("no_poh_speed_test"), poh_pinned_cpu_core: value_of(&matches, "poh_pinned_cpu_core") .unwrap_or(poh_service::DEFAULT_PINNED_CPU_CORE),