diff --git a/core/src/validator.rs b/core/src/validator.rs index 5f4a3121232573..13c454631625a0 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -108,7 +108,7 @@ use { clock::Slot, epoch_schedule::MAX_LEADER_SCHEDULE_EPOCH_OFFSET, exit::Exit, - genesis_config::GenesisConfig, + genesis_config::{ClusterType, GenesisConfig}, hash::Hash, pubkey::Pubkey, shred_version::compute_shred_version, @@ -471,12 +471,12 @@ pub struct Validator { blockstore_metric_report_service: BlockstoreMetricReportService, accounts_background_service: AccountsBackgroundService, accounts_hash_verifier: AccountsHashVerifier, - turbine_quic_endpoint: Endpoint, + turbine_quic_endpoint: Option, turbine_quic_endpoint_runtime: Option, - turbine_quic_endpoint_join_handle: solana_turbine::quic_endpoint::AsyncTryJoinHandle, - repair_quic_endpoint: Endpoint, + turbine_quic_endpoint_join_handle: Option, + repair_quic_endpoint: Option, repair_quic_endpoint_runtime: Option, - repair_quic_endpoint_join_handle: repair::quic_endpoint::AsyncTryJoinHandle, + repair_quic_endpoint_join_handle: Option, } impl Validator { @@ -1170,58 +1170,74 @@ impl Validator { // Outside test-validator crate, we always need a tokio runtime (and // the respective handle) to initialize the turbine QUIC endpoint. let current_runtime_handle = tokio::runtime::Handle::try_current(); - let turbine_quic_endpoint_runtime = current_runtime_handle.is_err().then(|| { - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .thread_name("solTurbineQuic") - .build() - .unwrap() - }); + let turbine_quic_endpoint_runtime = (current_runtime_handle.is_err() + && genesis_config.cluster_type != ClusterType::MainnetBeta) + .then(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name("solTurbineQuic") + .build() + .unwrap() + }); let (turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded(); let ( turbine_quic_endpoint, turbine_quic_endpoint_sender, turbine_quic_endpoint_join_handle, - ) = solana_turbine::quic_endpoint::new_quic_endpoint( - turbine_quic_endpoint_runtime - .as_ref() - .map(TokioRuntime::handle) - .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()), - &identity_keypair, - node.sockets.tvu_quic, - node.info - .tvu(Protocol::QUIC) - .map_err(|err| format!("Invalid QUIC TVU address: {err:?}"))? - .ip(), - turbine_quic_endpoint_sender, - bank_forks.clone(), - ) - .unwrap(); - - // Repair quic endpoint. - let repair_quic_endpoint_runtime = current_runtime_handle.is_err().then(|| { - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .thread_name("solRepairQuic") - .build() - .unwrap() - }); - let (repair_quic_endpoint, repair_quic_endpoint_sender, repair_quic_endpoint_join_handle) = - repair::quic_endpoint::new_quic_endpoint( - repair_quic_endpoint_runtime + ) = if genesis_config.cluster_type == ClusterType::MainnetBeta { + let (sender, _receiver) = tokio::sync::mpsc::channel(1); + (None, sender, None) + } else { + solana_turbine::quic_endpoint::new_quic_endpoint( + turbine_quic_endpoint_runtime .as_ref() .map(TokioRuntime::handle) .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()), &identity_keypair, - node.sockets.serve_repair_quic, + node.sockets.tvu_quic, node.info - .serve_repair(Protocol::QUIC) - .map_err(|err| format!("Invalid QUIC serve-repair address: {err:?}"))? + .tvu(Protocol::QUIC) + .map_err(|err| format!("Invalid QUIC TVU address: {err:?}"))? .ip(), - repair_quic_endpoint_sender, + turbine_quic_endpoint_sender, bank_forks.clone(), ) - .unwrap(); + .map(|(endpoint, sender, join_handle)| (Some(endpoint), sender, Some(join_handle))) + .unwrap() + }; + + // Repair quic endpoint. + let repair_quic_endpoint_runtime = (current_runtime_handle.is_err() + && genesis_config.cluster_type != ClusterType::MainnetBeta) + .then(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name("solRepairQuic") + .build() + .unwrap() + }); + let (repair_quic_endpoint, repair_quic_endpoint_sender, repair_quic_endpoint_join_handle) = + if genesis_config.cluster_type == ClusterType::MainnetBeta { + let (sender, _receiver) = tokio::sync::mpsc::channel(1); + (None, sender, None) + } else { + repair::quic_endpoint::new_quic_endpoint( + repair_quic_endpoint_runtime + .as_ref() + .map(TokioRuntime::handle) + .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()), + &identity_keypair, + node.sockets.serve_repair_quic, + node.info + .serve_repair(Protocol::QUIC) + .map_err(|err| format!("Invalid QUIC serve-repair address: {err:?}"))? + .ip(), + repair_quic_endpoint_sender, + bank_forks.clone(), + ) + .map(|(endpoint, sender, join_handle)| (Some(endpoint), sender, Some(join_handle))) + .unwrap() + }; let in_wen_restart = config.wen_restart_proto_path.is_some() && !waited_for_supermajority; let tower = match process_blockstore.process_to_create_tower() { @@ -1514,14 +1530,18 @@ impl Validator { } self.gossip_service.join().expect("gossip_service"); - repair::quic_endpoint::close_quic_endpoint(&self.repair_quic_endpoint); + if let Some(repair_quic_endpoint) = &self.repair_quic_endpoint { + repair::quic_endpoint::close_quic_endpoint(repair_quic_endpoint); + } self.serve_repair_service .join() .expect("serve_repair_service"); - self.repair_quic_endpoint_runtime - .map(|runtime| runtime.block_on(self.repair_quic_endpoint_join_handle)) - .transpose() - .unwrap(); + if let Some(repair_quic_endpoint_join_handle) = self.repair_quic_endpoint_join_handle { + self.repair_quic_endpoint_runtime + .map(|runtime| runtime.block_on(repair_quic_endpoint_join_handle)) + .transpose() + .unwrap(); + }; self.stats_reporter_service .join() .expect("stats_reporter_service"); @@ -1534,13 +1554,17 @@ impl Validator { self.accounts_hash_verifier .join() .expect("accounts_hash_verifier"); - solana_turbine::quic_endpoint::close_quic_endpoint(&self.turbine_quic_endpoint); + if let Some(turbine_quic_endpoint) = &self.turbine_quic_endpoint { + solana_turbine::quic_endpoint::close_quic_endpoint(turbine_quic_endpoint); + } self.tpu.join().expect("tpu"); self.tvu.join().expect("tvu"); - self.turbine_quic_endpoint_runtime - .map(|runtime| runtime.block_on(self.turbine_quic_endpoint_join_handle)) - .transpose() - .unwrap(); + if let Some(turbine_quic_endpoint_join_handle) = self.turbine_quic_endpoint_join_handle { + self.turbine_quic_endpoint_runtime + .map(|runtime| runtime.block_on(turbine_quic_endpoint_join_handle)) + .transpose() + .unwrap(); + } self.completed_data_sets_service .join() .expect("completed_data_sets_service");