1
1
use crate :: commitments:: Commitments ;
2
- use crate :: common :: Salt ;
2
+ use crate :: farming :: Farming ;
3
3
use crate :: identity:: Identity ;
4
4
use crate :: object_mappings:: ObjectMappings ;
5
5
use crate :: plot:: Plot ;
6
- use crate :: rpc:: {
7
- EncodedBlockWithObjectMapping , FarmerMetadata , ProposedProofOfReplicationResponse , RpcClient ,
8
- SlotInfo , Solution ,
9
- } ;
6
+ use crate :: rpc:: { EncodedBlockWithObjectMapping , FarmerMetadata , RpcClient } ;
10
7
use anyhow:: { anyhow, Result } ;
11
- use futures:: future;
12
- use futures:: future:: Either ;
13
- use log:: { debug, error, info, trace} ;
8
+ use log:: { debug, error, info} ;
14
9
use std:: path:: PathBuf ;
15
10
use std:: sync:: atomic:: { AtomicU32 , Ordering } ;
16
11
use std:: sync:: Arc ;
17
- use std:: time:: Instant ;
18
12
use subspace_archiving:: archiver:: { ArchivedSegment , BlockArchiver , ObjectArchiver } ;
19
13
use subspace_archiving:: pre_genesis_data;
20
14
use subspace_core_primitives:: objects:: { GlobalObject , PieceObject , PieceObjectMapping } ;
21
- use subspace_core_primitives:: { crypto , Sha256Hash } ;
15
+ use subspace_core_primitives:: Sha256Hash ;
22
16
use subspace_solving:: SubspaceCodec ;
23
17
24
18
/// Start farming by using plot in specified path and connecting to WebSocket server at specified
@@ -45,32 +39,29 @@ pub async fn farm(base_directory: PathBuf, ws_server: &str) -> Result<()> {
45
39
46
40
let identity = Identity :: open_or_create ( & base_directory) ?;
47
41
48
- match future:: select (
49
- {
50
- let client = client. clone ( ) ;
51
- let plot = plot. clone ( ) ;
52
- let commitments = commitments. clone ( ) ;
53
- let public_key = identity. public_key ( ) ;
54
-
55
- Box :: pin ( async move {
56
- background_plotting ( client, plot, commitments, object_mappings, & public_key) . await
57
- } )
58
- } ,
59
- Box :: pin (
60
- async move { subscribe_to_slot_info ( & client, & plot, & commitments, & identity) . await } ,
61
- ) ,
62
- )
63
- . await
64
- {
65
- Either :: Left ( ( result, _) ) => match result {
66
- Ok ( ( ) ) => {
67
- info ! ( "Background plotting finished successfully" ) ;
68
-
69
- Ok ( ( ) )
70
- }
71
- Err ( error) => Err ( anyhow ! ( "Background plotting error: {}" , error) ) ,
72
- } ,
73
- Either :: Right ( ( result, _) ) => result. map_err ( Into :: into) ,
42
+ // start the farming task
43
+ // right now the instance is unused, however, if we want to call stop the process
44
+ // we can just drop the instance, and it will be stopped magically :)
45
+ let _farming_instance = Farming :: start (
46
+ plot. clone ( ) ,
47
+ commitments. clone ( ) ,
48
+ client. clone ( ) ,
49
+ identity. clone ( ) ,
50
+ ) ;
51
+
52
+ // start the background plotting
53
+ // NOTE: THIS WILL CHANGE IN THE UPCOMING PR
54
+ let public_key = identity. public_key ( ) ;
55
+ let plotting_result =
56
+ background_plotting ( client, plot, commitments, object_mappings, & public_key) . await ;
57
+
58
+ match plotting_result {
59
+ Ok ( ( ) ) => {
60
+ info ! ( "Background plotting shutdown gracefully" ) ;
61
+
62
+ Ok ( ( ) )
63
+ }
64
+ Err ( error) => Err ( anyhow ! ( "Background plotting error: {}" , error) ) ,
74
65
}
75
66
}
76
67
@@ -383,144 +374,3 @@ fn create_global_object_mapping(
383
374
} )
384
375
. collect ( )
385
376
}
386
-
387
- async fn subscribe_to_slot_info (
388
- client : & RpcClient ,
389
- plot : & Plot ,
390
- commitments : & Commitments ,
391
- identity : & Identity ,
392
- ) -> Result < ( ) > {
393
- let farmer_public_key_hash = crypto:: sha256_hash ( & identity. public_key ( ) ) ;
394
-
395
- info ! ( "Subscribing to slot info" ) ;
396
- let mut new_slots = client. subscribe_slot_info ( ) . await ?;
397
-
398
- let mut salts = Salts :: default ( ) ;
399
-
400
- while let Some ( slot_info) = new_slots. next ( ) . await ? {
401
- debug ! ( "New slot: {:?}" , slot_info) ;
402
-
403
- update_commitments ( plot, commitments, & mut salts, & slot_info) ;
404
-
405
- let local_challenge =
406
- subspace_solving:: derive_local_challenge ( slot_info. challenge , & farmer_public_key_hash) ;
407
-
408
- let solution = match commitments
409
- . find_by_range ( local_challenge, slot_info. solution_range , slot_info. salt )
410
- . await
411
- {
412
- Some ( ( tag, piece_index) ) => {
413
- let encoding = plot. read ( piece_index) . await ?;
414
- let solution = Solution :: new (
415
- identity. public_key ( ) . to_bytes ( ) ,
416
- piece_index,
417
- encoding. to_vec ( ) ,
418
- identity. sign ( & tag) . to_bytes ( ) . to_vec ( ) ,
419
- tag,
420
- ) ;
421
- debug ! ( "Solution found" ) ;
422
- trace ! ( "Solution found: {:?}" , solution) ;
423
-
424
- Some ( solution)
425
- }
426
- None => {
427
- debug ! ( "Solution not found" ) ;
428
- None
429
- }
430
- } ;
431
-
432
- client
433
- . propose_proof_of_replication ( ProposedProofOfReplicationResponse {
434
- slot_number : slot_info. slot_number ,
435
- solution,
436
- secret_key : identity. secret_key ( ) . to_bytes ( ) . into ( ) ,
437
- } )
438
- . await ?;
439
- }
440
-
441
- Ok ( ( ) )
442
- }
443
-
444
- #[ derive( Default ) ]
445
- struct Salts {
446
- current : Option < Salt > ,
447
- next : Option < Salt > ,
448
- }
449
-
450
- /// Compare salts in `slot_info` to those known from `salts` and start update plot commitments
451
- /// accordingly if necessary (in background)
452
- fn update_commitments (
453
- plot : & Plot ,
454
- commitments : & Commitments ,
455
- salts : & mut Salts ,
456
- slot_info : & SlotInfo ,
457
- ) {
458
- // Check if current salt has changed
459
- if salts. current != Some ( slot_info. salt ) {
460
- salts. current . replace ( slot_info. salt ) ;
461
-
462
- if salts. next != Some ( slot_info. salt ) {
463
- // If previous `salts.next` is the same as current (expected behavior), need to re-commit
464
-
465
- tokio:: spawn ( {
466
- let salt = slot_info. salt ;
467
- let plot = plot. clone ( ) ;
468
- let commitments = commitments. clone ( ) ;
469
-
470
- async move {
471
- let started = Instant :: now ( ) ;
472
- info ! (
473
- "Salt updated to {}, recommitting in background" ,
474
- hex:: encode( salt)
475
- ) ;
476
-
477
- if let Err ( error) = commitments. create ( salt, plot) . await {
478
- error ! (
479
- "Failed to create commitment for {}: {}" ,
480
- hex:: encode( salt) ,
481
- error
482
- ) ;
483
- } else {
484
- info ! (
485
- "Finished recommitment for {} in {} seconds" ,
486
- hex:: encode( salt) ,
487
- started. elapsed( ) . as_secs_f32( )
488
- ) ;
489
- }
490
- }
491
- } ) ;
492
- }
493
- }
494
-
495
- if let Some ( new_next_salt) = slot_info. next_salt {
496
- if salts. next != Some ( new_next_salt) {
497
- salts. next . replace ( new_next_salt) ;
498
-
499
- tokio:: spawn ( {
500
- let plot = plot. clone ( ) ;
501
- let commitments = commitments. clone ( ) ;
502
-
503
- async move {
504
- let started = Instant :: now ( ) ;
505
- info ! (
506
- "Salt will update to {} soon, recommitting in background" ,
507
- hex:: encode( new_next_salt)
508
- ) ;
509
- if let Err ( error) = commitments. create ( new_next_salt, plot) . await {
510
- error ! (
511
- "Recommitting salt in background failed for {}: {}" ,
512
- hex:: encode( new_next_salt) ,
513
- error
514
- ) ;
515
- return ;
516
- }
517
- info ! (
518
- "Finished recommitment in background for {} in {} seconds" ,
519
- hex:: encode( new_next_salt) ,
520
- started. elapsed( ) . as_secs_f32( )
521
- ) ;
522
- }
523
- } ) ;
524
- }
525
- }
526
- }
0 commit comments