Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
v1.17: disables turbine and repair QUIC endpoints on mainnet-beta (ba…
Browse files Browse the repository at this point in the history
…ckport of #34523) (#34526)

* disables turbine and repair QUIC endpoints on mainnet-beta (#34523)

On mainnet-beta, respective QUIC endpoint are unnecessary for now until
testnet has fully migrated to QUIC. The commit disables turbine and
repair QUIC endpoints on mainnet-beta.

(cherry picked from commit 4feadbd)

# Conflicts:
#	core/src/validator.rs

* resolves mergify merge conflicts

---------

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
  • Loading branch information
mergify[bot] and behzadnouri authored Dec 19, 2023
1 parent f7a655f commit 907b904
Showing 1 changed file with 78 additions and 54 deletions.
132 changes: 78 additions & 54 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ use {
clock::Slot,
epoch_schedule::MAX_LEADER_SCHEDULE_EPOCH_OFFSET,
exit::Exit,
genesis_config::GenesisConfig,
genesis_config::{ClusterType, GenesisConfig},
hash::Hash,
pubkey::Pubkey,
shred_version::compute_shred_version,
Expand Down Expand Up @@ -465,12 +465,12 @@ pub struct Validator {
ledger_metric_report_service: LedgerMetricReportService,
accounts_background_service: AccountsBackgroundService,
accounts_hash_verifier: AccountsHashVerifier,
turbine_quic_endpoint: Endpoint,
turbine_quic_endpoint: Option<Endpoint>,
turbine_quic_endpoint_runtime: Option<TokioRuntime>,
turbine_quic_endpoint_join_handle: solana_turbine::quic_endpoint::AsyncTryJoinHandle,
repair_quic_endpoint: Endpoint,
turbine_quic_endpoint_join_handle: Option<solana_turbine::quic_endpoint::AsyncTryJoinHandle>,
repair_quic_endpoint: Option<Endpoint>,
repair_quic_endpoint_runtime: Option<TokioRuntime>,
repair_quic_endpoint_join_handle: repair::quic_endpoint::AsyncTryJoinHandle,
repair_quic_endpoint_join_handle: Option<repair::quic_endpoint::AsyncTryJoinHandle>,
}

impl Validator {
Expand Down Expand Up @@ -1153,58 +1153,74 @@ impl Validator {
// Outside test-validator crate, we always need a tokio runtime (and
// the respective handle) to initialize the turbine QUIC endpoint.
let current_runtime_handle = tokio::runtime::Handle::try_current();
let turbine_quic_endpoint_runtime = current_runtime_handle.is_err().then(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("solTurbineQuic")
.build()
.unwrap()
});
let turbine_quic_endpoint_runtime = (current_runtime_handle.is_err()
&& genesis_config.cluster_type != ClusterType::MainnetBeta)
.then(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("solTurbineQuic")
.build()
.unwrap()
});
let (turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded();
let (
turbine_quic_endpoint,
turbine_quic_endpoint_sender,
turbine_quic_endpoint_join_handle,
) = solana_turbine::quic_endpoint::new_quic_endpoint(
turbine_quic_endpoint_runtime
.as_ref()
.map(TokioRuntime::handle)
.unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
&identity_keypair,
node.sockets.tvu_quic,
node.info
.tvu(Protocol::QUIC)
.expect("Operator must spin up node with valid QUIC TVU address")
.ip(),
turbine_quic_endpoint_sender,
bank_forks.clone(),
)
.unwrap();

// Repair quic endpoint.
let repair_quic_endpoint_runtime = current_runtime_handle.is_err().then(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("solRepairQuic")
.build()
.unwrap()
});
let (repair_quic_endpoint, repair_quic_endpoint_sender, repair_quic_endpoint_join_handle) =
repair::quic_endpoint::new_quic_endpoint(
repair_quic_endpoint_runtime
) = if genesis_config.cluster_type == ClusterType::MainnetBeta {
let (sender, _receiver) = tokio::sync::mpsc::channel(1);
(None, sender, None)
} else {
solana_turbine::quic_endpoint::new_quic_endpoint(
turbine_quic_endpoint_runtime
.as_ref()
.map(TokioRuntime::handle)
.unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
&identity_keypair,
node.sockets.serve_repair_quic,
node.sockets.tvu_quic,
node.info
.serve_repair(Protocol::QUIC)
.expect("Operator must spin up node with valid QUIC serve-repair address")
.tvu(Protocol::QUIC)
.expect("Operator must spin up node with valid QUIC TVU address")
.ip(),
repair_quic_endpoint_sender,
turbine_quic_endpoint_sender,
bank_forks.clone(),
)
.unwrap();
.map(|(endpoint, sender, join_handle)| (Some(endpoint), sender, Some(join_handle)))
.unwrap()
};

// Repair quic endpoint.
let repair_quic_endpoint_runtime = (current_runtime_handle.is_err()
&& genesis_config.cluster_type != ClusterType::MainnetBeta)
.then(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("solRepairQuic")
.build()
.unwrap()
});
let (repair_quic_endpoint, repair_quic_endpoint_sender, repair_quic_endpoint_join_handle) =
if genesis_config.cluster_type == ClusterType::MainnetBeta {
let (sender, _receiver) = tokio::sync::mpsc::channel(1);
(None, sender, None)
} else {
repair::quic_endpoint::new_quic_endpoint(
repair_quic_endpoint_runtime
.as_ref()
.map(TokioRuntime::handle)
.unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
&identity_keypair,
node.sockets.serve_repair_quic,
node.info
.serve_repair(Protocol::QUIC)
.expect("Operator must spin up node with valid QUIC serve-repair address")
.ip(),
repair_quic_endpoint_sender,
bank_forks.clone(),
)
.map(|(endpoint, sender, join_handle)| (Some(endpoint), sender, Some(join_handle)))
.unwrap()
};

let (replay_vote_sender, replay_vote_receiver) = unbounded();
let tvu = Tvu::new(
Expand Down Expand Up @@ -1460,14 +1476,18 @@ impl Validator {
}

self.gossip_service.join().expect("gossip_service");
repair::quic_endpoint::close_quic_endpoint(&self.repair_quic_endpoint);
if let Some(repair_quic_endpoint) = &self.repair_quic_endpoint {
repair::quic_endpoint::close_quic_endpoint(repair_quic_endpoint);
}
self.serve_repair_service
.join()
.expect("serve_repair_service");
self.repair_quic_endpoint_runtime
.map(|runtime| runtime.block_on(self.repair_quic_endpoint_join_handle))
.transpose()
.unwrap();
if let Some(repair_quic_endpoint_join_handle) = self.repair_quic_endpoint_join_handle {
self.repair_quic_endpoint_runtime
.map(|runtime| runtime.block_on(repair_quic_endpoint_join_handle))
.transpose()
.unwrap();
};
self.stats_reporter_service
.join()
.expect("stats_reporter_service");
Expand All @@ -1480,13 +1500,17 @@ impl Validator {
self.accounts_hash_verifier
.join()
.expect("accounts_hash_verifier");
solana_turbine::quic_endpoint::close_quic_endpoint(&self.turbine_quic_endpoint);
if let Some(turbine_quic_endpoint) = &self.turbine_quic_endpoint {
solana_turbine::quic_endpoint::close_quic_endpoint(turbine_quic_endpoint);
}
self.tpu.join().expect("tpu");
self.tvu.join().expect("tvu");
self.turbine_quic_endpoint_runtime
.map(|runtime| runtime.block_on(self.turbine_quic_endpoint_join_handle))
.transpose()
.unwrap();
if let Some(turbine_quic_endpoint_join_handle) = self.turbine_quic_endpoint_join_handle {
self.turbine_quic_endpoint_runtime
.map(|runtime| runtime.block_on(turbine_quic_endpoint_join_handle))
.transpose()
.unwrap();
}
self.completed_data_sets_service
.join()
.expect("completed_data_sets_service");
Expand Down

0 comments on commit 907b904

Please sign in to comment.