Skip to content
This repository was archived by the owner on Feb 21, 2024. It is now read-only.

Commit dabf765

Browse files
authored
Merge pull request paritytech#91 from subspace/farmer-lib-phase3b
Farmer lib phase3b
2 parents 6f6b859 + eabf873 commit dabf765

File tree

7 files changed

+253
-177
lines changed

7 files changed

+253
-177
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/subspace-farmer/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ include = [
1616
[dependencies]
1717
anyhow = "1.0.44"
1818
async-lock = "2.4.0"
19+
async-oneshot = "0.5.0"
1920
async-std = "1.9.0"
2021
clap = { version = "3.0.0-beta.5", features = ["color", "derive"] }
2122
dirs = "4.0.0"

crates/subspace-farmer/src/commands/farm.rs

+27-177
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,18 @@
11
use crate::commitments::Commitments;
2-
use crate::common::Salt;
2+
use crate::farming::Farming;
33
use crate::identity::Identity;
44
use crate::object_mappings::ObjectMappings;
55
use crate::plot::Plot;
6-
use crate::rpc::{
7-
EncodedBlockWithObjectMapping, FarmerMetadata, ProposedProofOfReplicationResponse, RpcClient,
8-
SlotInfo, Solution,
9-
};
6+
use crate::rpc::{EncodedBlockWithObjectMapping, FarmerMetadata, RpcClient};
107
use anyhow::{anyhow, Result};
11-
use futures::future;
12-
use futures::future::Either;
13-
use log::{debug, error, info, trace};
8+
use log::{debug, error, info};
149
use std::path::PathBuf;
1510
use std::sync::atomic::{AtomicU32, Ordering};
1611
use std::sync::Arc;
17-
use std::time::Instant;
1812
use subspace_archiving::archiver::{ArchivedSegment, BlockArchiver, ObjectArchiver};
1913
use subspace_archiving::pre_genesis_data;
2014
use subspace_core_primitives::objects::{GlobalObject, PieceObject, PieceObjectMapping};
21-
use subspace_core_primitives::{crypto, Sha256Hash};
15+
use subspace_core_primitives::Sha256Hash;
2216
use subspace_solving::SubspaceCodec;
2317

2418
/// Start farming by using plot in specified path and connecting to WebSocket server at specified
@@ -45,32 +39,29 @@ pub async fn farm(base_directory: PathBuf, ws_server: &str) -> Result<()> {
4539

4640
let identity = Identity::open_or_create(&base_directory)?;
4741

48-
match future::select(
49-
{
50-
let client = client.clone();
51-
let plot = plot.clone();
52-
let commitments = commitments.clone();
53-
let public_key = identity.public_key();
54-
55-
Box::pin(async move {
56-
background_plotting(client, plot, commitments, object_mappings, &public_key).await
57-
})
58-
},
59-
Box::pin(
60-
async move { subscribe_to_slot_info(&client, &plot, &commitments, &identity).await },
61-
),
62-
)
63-
.await
64-
{
65-
Either::Left((result, _)) => match result {
66-
Ok(()) => {
67-
info!("Background plotting finished successfully");
68-
69-
Ok(())
70-
}
71-
Err(error) => Err(anyhow!("Background plotting error: {}", error)),
72-
},
73-
Either::Right((result, _)) => result.map_err(Into::into),
42+
// start the farming task
43+
// right now the instance is unused, however, if we want to call stop the process
44+
// we can just drop the instance, and it will be stopped magically :)
45+
let _farming_instance = Farming::start(
46+
plot.clone(),
47+
commitments.clone(),
48+
client.clone(),
49+
identity.clone(),
50+
);
51+
52+
// start the background plotting
53+
// NOTE: THIS WILL CHANGE IN THE UPCOMING PR
54+
let public_key = identity.public_key();
55+
let plotting_result =
56+
background_plotting(client, plot, commitments, object_mappings, &public_key).await;
57+
58+
match plotting_result {
59+
Ok(()) => {
60+
info!("Background plotting shutdown gracefully");
61+
62+
Ok(())
63+
}
64+
Err(error) => Err(anyhow!("Background plotting error: {}", error)),
7465
}
7566
}
7667

@@ -383,144 +374,3 @@ fn create_global_object_mapping(
383374
})
384375
.collect()
385376
}
386-
387-
async fn subscribe_to_slot_info(
388-
client: &RpcClient,
389-
plot: &Plot,
390-
commitments: &Commitments,
391-
identity: &Identity,
392-
) -> Result<()> {
393-
let farmer_public_key_hash = crypto::sha256_hash(&identity.public_key());
394-
395-
info!("Subscribing to slot info");
396-
let mut new_slots = client.subscribe_slot_info().await?;
397-
398-
let mut salts = Salts::default();
399-
400-
while let Some(slot_info) = new_slots.next().await? {
401-
debug!("New slot: {:?}", slot_info);
402-
403-
update_commitments(plot, commitments, &mut salts, &slot_info);
404-
405-
let local_challenge =
406-
subspace_solving::derive_local_challenge(slot_info.challenge, &farmer_public_key_hash);
407-
408-
let solution = match commitments
409-
.find_by_range(local_challenge, slot_info.solution_range, slot_info.salt)
410-
.await
411-
{
412-
Some((tag, piece_index)) => {
413-
let encoding = plot.read(piece_index).await?;
414-
let solution = Solution::new(
415-
identity.public_key().to_bytes(),
416-
piece_index,
417-
encoding.to_vec(),
418-
identity.sign(&tag).to_bytes().to_vec(),
419-
tag,
420-
);
421-
debug!("Solution found");
422-
trace!("Solution found: {:?}", solution);
423-
424-
Some(solution)
425-
}
426-
None => {
427-
debug!("Solution not found");
428-
None
429-
}
430-
};
431-
432-
client
433-
.propose_proof_of_replication(ProposedProofOfReplicationResponse {
434-
slot_number: slot_info.slot_number,
435-
solution,
436-
secret_key: identity.secret_key().to_bytes().into(),
437-
})
438-
.await?;
439-
}
440-
441-
Ok(())
442-
}
443-
444-
#[derive(Default)]
445-
struct Salts {
446-
current: Option<Salt>,
447-
next: Option<Salt>,
448-
}
449-
450-
/// Compare salts in `slot_info` to those known from `salts` and start update plot commitments
451-
/// accordingly if necessary (in background)
452-
fn update_commitments(
453-
plot: &Plot,
454-
commitments: &Commitments,
455-
salts: &mut Salts,
456-
slot_info: &SlotInfo,
457-
) {
458-
// Check if current salt has changed
459-
if salts.current != Some(slot_info.salt) {
460-
salts.current.replace(slot_info.salt);
461-
462-
if salts.next != Some(slot_info.salt) {
463-
// If previous `salts.next` is the same as current (expected behavior), need to re-commit
464-
465-
tokio::spawn({
466-
let salt = slot_info.salt;
467-
let plot = plot.clone();
468-
let commitments = commitments.clone();
469-
470-
async move {
471-
let started = Instant::now();
472-
info!(
473-
"Salt updated to {}, recommitting in background",
474-
hex::encode(salt)
475-
);
476-
477-
if let Err(error) = commitments.create(salt, plot).await {
478-
error!(
479-
"Failed to create commitment for {}: {}",
480-
hex::encode(salt),
481-
error
482-
);
483-
} else {
484-
info!(
485-
"Finished recommitment for {} in {} seconds",
486-
hex::encode(salt),
487-
started.elapsed().as_secs_f32()
488-
);
489-
}
490-
}
491-
});
492-
}
493-
}
494-
495-
if let Some(new_next_salt) = slot_info.next_salt {
496-
if salts.next != Some(new_next_salt) {
497-
salts.next.replace(new_next_salt);
498-
499-
tokio::spawn({
500-
let plot = plot.clone();
501-
let commitments = commitments.clone();
502-
503-
async move {
504-
let started = Instant::now();
505-
info!(
506-
"Salt will update to {} soon, recommitting in background",
507-
hex::encode(new_next_salt)
508-
);
509-
if let Err(error) = commitments.create(new_next_salt, plot).await {
510-
error!(
511-
"Recommitting salt in background failed for {}: {}",
512-
hex::encode(new_next_salt),
513-
error
514-
);
515-
return;
516-
}
517-
info!(
518-
"Finished recommitment in background for {} in {} seconds",
519-
hex::encode(new_next_salt),
520-
started.elapsed().as_secs_f32()
521-
);
522-
}
523-
});
524-
}
525-
}
526-
}

0 commit comments

Comments
 (0)