Skip to content

Commit

Permalink
Fix farming cluster forwarders and identification intervals
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Jun 16, 2024
1 parent 744d52b commit cc85e31
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,8 @@ pub(super) async fn cache(
nats_client,
&caches,
&cache_group,
// Only one of the tasks needs to send periodic broadcast
if index == 0 {
CACHE_IDENTIFICATION_BROADCAST_INTERVAL
} else {
Duration::MAX
},
CACHE_IDENTIFICATION_BROADCAST_INTERVAL,
index,
)
.await
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ pub(super) async fn controller(
)?;

let mut controller_services = (0..service_instances.get())
.map(|_| {
.map(|index| {
let nats_client = nats_client.clone();
let node_client = node_client.clone();
let piece_getter = piece_getter.clone();
Expand All @@ -211,6 +211,7 @@ pub(super) async fn controller(
&piece_getter,
&farmer_cache,
&instance,
index,
)
.await
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,12 +367,8 @@ where
tokio::spawn(farmer_service(
nats_client.clone(),
farms.as_slice(),
// Only one of the tasks needs to send periodic broadcast
if index == 0 {
FARMER_IDENTIFICATION_BROADCAST_INTERVAL
} else {
Duration::MAX
},
FARMER_IDENTIFICATION_BROADCAST_INTERVAL,
index,
)),
true,
)
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-farmer/src/bin/subspace-farmer/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![feature(
const_option,
duration_constructors,
extract_if,
hash_extract_if,
let_chains,
Expand Down
14 changes: 12 additions & 2 deletions crates/subspace-farmer/src/cluster/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ pub async fn cache_service<C>(
caches: &[C],
cache_group: &str,
identification_broadcast_interval: Duration,
index: usize,
) -> anyhow::Result<()>
where
C: PieceCache,
Expand All @@ -219,7 +220,7 @@ where
.collect::<Vec<_>>();

select! {
result = identify_responder(&nats_client, &caches_details, cache_group, identification_broadcast_interval).fuse() => {
result = identify_responder(&nats_client, &caches_details, cache_group, identification_broadcast_interval, index).fuse() => {
result
},
result = write_piece_responder(&nats_client, &caches_details).fuse() => {
Expand Down Expand Up @@ -247,6 +248,7 @@ async fn identify_responder<C>(
caches_details: &[CacheDetails<'_, C>],
cache_group: &str,
identification_broadcast_interval: Duration,
index: usize,
) -> anyhow::Result<()>
where
C: PieceCache,
Expand All @@ -262,7 +264,15 @@ where
})?
.fuse();
// Also send periodic updates in addition to the subscription response
let mut interval = tokio::time::interval(identification_broadcast_interval);
let mut interval = tokio::time::interval(
// Only one of the tasks needs to send periodic broadcast
if index == 0 {
identification_broadcast_interval
} else {
// TODO: Change to `Duration::MAX` after upgrade to tokio 1.38.1+
Duration::from_days(365)
},
);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

loop {
Expand Down
74 changes: 46 additions & 28 deletions crates/subspace-farmer/src/cluster/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,39 +492,57 @@ pub async fn controller_service<NC, PG>(
piece_getter: &PG,
farmer_cache: &FarmerCache,
instance: &str,
index: usize,
) -> anyhow::Result<()>
where
NC: NodeClient,
PG: PieceGetter + Sync,
{
select! {
result = slot_info_broadcaster(nats_client, node_client, instance).fuse() => {
result
},
result = reward_signing_broadcaster(nats_client, node_client, instance).fuse() => {
result
},
result = archived_segment_headers_broadcaster(nats_client, node_client, instance).fuse() => {
result
},
result = solution_response_forwarder(nats_client, node_client, instance).fuse() => {
result
},
result = reward_signature_forwarder(nats_client, node_client, instance).fuse() => {
result
},
result = farmer_app_info_responder(nats_client, node_client).fuse() => {
result
},
result = segment_headers_responder(nats_client, node_client).fuse() => {
result
},
result = find_piece_responder(nats_client, farmer_cache).fuse() => {
result
},
result = piece_responder(nats_client, piece_getter).fuse() => {
result
},
if index == 0 {
select! {
result = slot_info_broadcaster(nats_client, node_client, instance).fuse() => {
result
},
result = reward_signing_broadcaster(nats_client, node_client, instance).fuse() => {
result
},
result = archived_segment_headers_broadcaster(nats_client, node_client, instance).fuse() => {
result
},
result = solution_response_forwarder(nats_client, node_client, instance).fuse() => {
result
},
result = reward_signature_forwarder(nats_client, node_client, instance).fuse() => {
result
},
result = farmer_app_info_responder(nats_client, node_client).fuse() => {
result
},
result = segment_headers_responder(nats_client, node_client).fuse() => {
result
},
result = find_piece_responder(nats_client, farmer_cache).fuse() => {
result
},
result = piece_responder(nats_client, piece_getter).fuse() => {
result
},
}
} else {
select! {
result = farmer_app_info_responder(nats_client, node_client).fuse() => {
result
},
result = segment_headers_responder(nats_client, node_client).fuse() => {
result
},
result = find_piece_responder(nats_client, farmer_cache).fuse() => {
result
},
result = piece_responder(nats_client, piece_getter).fuse() => {
result
},
}
}
}

Expand Down
14 changes: 12 additions & 2 deletions crates/subspace-farmer/src/cluster/farmer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ pub fn farmer_service<F>(
nats_client: NatsClient,
farms: &[F],
identification_broadcast_interval: Duration,
index: usize,
) -> impl Future<Output = anyhow::Result<()>> + Send + 'static
where
F: Farm,
Expand Down Expand Up @@ -470,7 +471,7 @@ where

async move {
select! {
result = identify_responder(&nats_client, &farms_details, identification_broadcast_interval).fuse() => {
result = identify_responder(&nats_client, &farms_details, identification_broadcast_interval, index).fuse() => {
result
},
result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => {
Expand All @@ -489,6 +490,7 @@ async fn identify_responder(
nats_client: &NatsClient,
farms_details: &[FarmDetails],
identification_broadcast_interval: Duration,
index: usize,
) -> anyhow::Result<()> {
let mut subscription = nats_client
.subscribe_to_broadcasts::<ClusterControllerFarmerIdentifyBroadcast>(
Expand All @@ -505,7 +507,15 @@ async fn identify_responder(
})?
.fuse();
// Also send periodic updates in addition to the subscription response
let mut interval = tokio::time::interval(identification_broadcast_interval);
let mut interval = tokio::time::interval(
// Only one of the tasks needs to send periodic broadcast
if index == 0 {
identification_broadcast_interval
} else {
// TODO: Change to `Duration::MAX` after upgrade to tokio 1.38.1+
Duration::from_days(365)
},
);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

let mut last_identification = Instant::now();
Expand Down

0 comments on commit cc85e31

Please sign in to comment.