From cc85e31abe595df736d8ceacb35af8c22d67d086 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sun, 16 Jun 2024 07:36:47 +0300 Subject: [PATCH] Fix farming cluster forwarders and identification intervals --- .../subspace-farmer/commands/cluster/cache.rs | 8 +- .../commands/cluster/controller.rs | 3 +- .../commands/cluster/farmer.rs | 8 +- .../src/bin/subspace-farmer/main.rs | 1 + crates/subspace-farmer/src/cluster/cache.rs | 14 +++- .../subspace-farmer/src/cluster/controller.rs | 74 ++++++++++++------- crates/subspace-farmer/src/cluster/farmer.rs | 14 +++- 7 files changed, 77 insertions(+), 45 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/cache.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/cache.rs index 6d22d845a57..87e13f20cef 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/cache.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/cache.rs @@ -188,12 +188,8 @@ pub(super) async fn cache( nats_client, &caches, &cache_group, - // Only one of the tasks needs to send periodic broadcast - if index == 0 { - CACHE_IDENTIFICATION_BROADCAST_INTERVAL - } else { - Duration::MAX - }, + CACHE_IDENTIFICATION_BROADCAST_INTERVAL, + index, ) .await }), diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs index 8159d71973b..efb10cb786f 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs @@ -196,7 +196,7 @@ pub(super) async fn controller( )?; let mut controller_services = (0..service_instances.get()) - .map(|_| { + .map(|index| { let nats_client = nats_client.clone(); let node_client = node_client.clone(); let piece_getter = piece_getter.clone(); @@ -211,6 +211,7 @@ pub(super) async fn controller( &piece_getter, &farmer_cache, &instance, + index, ) .await }), diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs index 2455d4bd19b..612792cf857 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs @@ -367,12 +367,8 @@ where tokio::spawn(farmer_service( nats_client.clone(), farms.as_slice(), - // Only one of the tasks needs to send periodic broadcast - if index == 0 { - FARMER_IDENTIFICATION_BROADCAST_INTERVAL - } else { - Duration::MAX - }, + FARMER_IDENTIFICATION_BROADCAST_INTERVAL, + index, )), true, ) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs index 8d6556f4a74..12d5e682846 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs @@ -1,5 +1,6 @@ #![feature( const_option, + duration_constructors, extract_if, hash_extract_if, let_chains, diff --git a/crates/subspace-farmer/src/cluster/cache.rs b/crates/subspace-farmer/src/cluster/cache.rs index 35a11719024..78e445d6239 100644 --- a/crates/subspace-farmer/src/cluster/cache.rs +++ b/crates/subspace-farmer/src/cluster/cache.rs @@ -199,6 +199,7 @@ pub async fn cache_service( caches: &[C], cache_group: &str, identification_broadcast_interval: Duration, + index: usize, ) -> anyhow::Result<()> where C: PieceCache, @@ -219,7 +220,7 @@ where .collect::>(); select! { - result = identify_responder(&nats_client, &caches_details, cache_group, identification_broadcast_interval).fuse() => { + result = identify_responder(&nats_client, &caches_details, cache_group, identification_broadcast_interval, index).fuse() => { result }, result = write_piece_responder(&nats_client, &caches_details).fuse() => { @@ -247,6 +248,7 @@ async fn identify_responder( caches_details: &[CacheDetails<'_, C>], cache_group: &str, identification_broadcast_interval: Duration, + index: usize, ) -> anyhow::Result<()> where C: PieceCache, @@ -262,7 +264,15 @@ where })? .fuse(); // Also send periodic updates in addition to the subscription response - let mut interval = tokio::time::interval(identification_broadcast_interval); + let mut interval = tokio::time::interval( + // Only one of the tasks needs to send periodic broadcast + if index == 0 { + identification_broadcast_interval + } else { + // TODO: Change to `Duration::MAX` after upgrade to tokio 1.38.1+ + Duration::from_days(365) + }, + ); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); loop { diff --git a/crates/subspace-farmer/src/cluster/controller.rs b/crates/subspace-farmer/src/cluster/controller.rs index b614bbd8eb5..237be75b95e 100644 --- a/crates/subspace-farmer/src/cluster/controller.rs +++ b/crates/subspace-farmer/src/cluster/controller.rs @@ -492,39 +492,57 @@ pub async fn controller_service( piece_getter: &PG, farmer_cache: &FarmerCache, instance: &str, + index: usize, ) -> anyhow::Result<()> where NC: NodeClient, PG: PieceGetter + Sync, { - select! { - result = slot_info_broadcaster(nats_client, node_client, instance).fuse() => { - result - }, - result = reward_signing_broadcaster(nats_client, node_client, instance).fuse() => { - result - }, - result = archived_segment_headers_broadcaster(nats_client, node_client, instance).fuse() => { - result - }, - result = solution_response_forwarder(nats_client, node_client, instance).fuse() => { - result - }, - result = reward_signature_forwarder(nats_client, node_client, instance).fuse() => { - result - }, - result = farmer_app_info_responder(nats_client, node_client).fuse() => { - result - }, - result = segment_headers_responder(nats_client, node_client).fuse() => { - result - }, - result = find_piece_responder(nats_client, farmer_cache).fuse() => { - result - }, - result = piece_responder(nats_client, piece_getter).fuse() => { - result - }, + if index == 0 { + select! { + result = slot_info_broadcaster(nats_client, node_client, instance).fuse() => { + result + }, + result = reward_signing_broadcaster(nats_client, node_client, instance).fuse() => { + result + }, + result = archived_segment_headers_broadcaster(nats_client, node_client, instance).fuse() => { + result + }, + result = solution_response_forwarder(nats_client, node_client, instance).fuse() => { + result + }, + result = reward_signature_forwarder(nats_client, node_client, instance).fuse() => { + result + }, + result = farmer_app_info_responder(nats_client, node_client).fuse() => { + result + }, + result = segment_headers_responder(nats_client, node_client).fuse() => { + result + }, + result = find_piece_responder(nats_client, farmer_cache).fuse() => { + result + }, + result = piece_responder(nats_client, piece_getter).fuse() => { + result + }, + } + } else { + select! { + result = farmer_app_info_responder(nats_client, node_client).fuse() => { + result + }, + result = segment_headers_responder(nats_client, node_client).fuse() => { + result + }, + result = find_piece_responder(nats_client, farmer_cache).fuse() => { + result + }, + result = piece_responder(nats_client, piece_getter).fuse() => { + result + }, + } } } diff --git a/crates/subspace-farmer/src/cluster/farmer.rs b/crates/subspace-farmer/src/cluster/farmer.rs index 664b083ea10..171ce27d15b 100644 --- a/crates/subspace-farmer/src/cluster/farmer.rs +++ b/crates/subspace-farmer/src/cluster/farmer.rs @@ -353,6 +353,7 @@ pub fn farmer_service( nats_client: NatsClient, farms: &[F], identification_broadcast_interval: Duration, + index: usize, ) -> impl Future> + Send + 'static where F: Farm, @@ -470,7 +471,7 @@ where async move { select! { - result = identify_responder(&nats_client, &farms_details, identification_broadcast_interval).fuse() => { + result = identify_responder(&nats_client, &farms_details, identification_broadcast_interval, index).fuse() => { result }, result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => { @@ -489,6 +490,7 @@ async fn identify_responder( nats_client: &NatsClient, farms_details: &[FarmDetails], identification_broadcast_interval: Duration, + index: usize, ) -> anyhow::Result<()> { let mut subscription = nats_client .subscribe_to_broadcasts::( @@ -505,7 +507,15 @@ async fn identify_responder( })? .fuse(); // Also send periodic updates in addition to the subscription response - let mut interval = tokio::time::interval(identification_broadcast_interval); + let mut interval = tokio::time::interval( + // Only one of the tasks needs to send periodic broadcast + if index == 0 { + identification_broadcast_interval + } else { + // TODO: Change to `Duration::MAX` after upgrade to tokio 1.38.1+ + Duration::from_days(365) + }, + ); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); let mut last_identification = Instant::now();