Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Commit

Permalink
Farmer Library - Integration tests for farming (paritytech#145)
Browse files Browse the repository at this point in the history
* MockRPC is introduced, farming integration tests are introduced
  • Loading branch information
ozgunozerk authored Nov 24, 2021
1 parent 5cac7d8 commit c877423
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 1 deletion.
2 changes: 2 additions & 0 deletions crates/subspace-farmer/src/farming.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! A farming process, that is interruptable (via dropping it)
//! and possible to wait on (custom `wait` method)
#[cfg(test)]
mod tests;

use crate::commitments::Commitments;
use crate::identity::Identity;
Expand Down
135 changes: 135 additions & 0 deletions crates/subspace-farmer/src/farming/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use crate::commitments::Commitments;
use crate::farming::Farming;
use crate::identity::Identity;
use crate::mock_rpc::MockRpc;
use crate::plot::Plot;
use std::sync::Arc;
use subspace_core_primitives::{Piece, Salt, Tag, TAG_SIZE};
use subspace_rpc_primitives::SlotInfo;
use tempfile::TempDir;
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};

fn init() {
let _ = env_logger::builder().is_test(true).try_init();
}

async fn farming_simulator(slots: Vec<SlotInfo>, tags: Vec<Tag>) {
init();

let base_directory = TempDir::new().unwrap();

let piece: Piece = [9u8; 4096].into();
let salt: Salt = slots[0].salt; // the first slots salt should be used for the initial commitments
let index = 0;

let plot = Plot::open_or_create(&base_directory).await.unwrap();

let commitments = Commitments::new(base_directory.path().join("commitments").into())
.await
.unwrap();

plot.write_many(Arc::new(vec![piece]), index).await.unwrap();
commitments.create(salt, plot.clone()).await.unwrap();

let identity =
Identity::open_or_create(&base_directory).expect("Could not open/create identity!");

let (_metadata_sender, metadata_recv) = mpsc::channel(10);
let (_block_sender, block_recv) = mpsc::channel(10);
let (_new_head_sender, new_head_recv) = mpsc::channel(10);
let (slot_sender, slot_recv) = mpsc::channel(10);
let (solution_sender, mut solution_recv) = mpsc::channel(1);

let client = MockRpc::new(
metadata_recv,
block_recv,
new_head_recv,
slot_recv,
solution_sender,
);

// start the farming task
let farming_instance =
Farming::start(plot.clone(), commitments.clone(), client, identity.clone());

// send only the first slot, so that farmer can start
slot_sender.send(slots[0].clone()).await.unwrap();

for index in 0..slots.len() - 1 {
// race between receiving a solution, and waiting for 1 sec
tokio::select! {
Some(solution) = solution_recv.recv() => {
if let Some(solution) = solution.maybe_solution {
if solution.tag != tags[index] {
panic!("Wrong Tag! The expected value was: {:?}", tags[index]);
}
} else {
panic!("Solution was None!")
}
},
_ = sleep(Duration::from_secs(1)) => {},
}
// commitment in the background cannot keep up with the speed, so putting a little delay in here
// commitment usually takes around 0.002-0.003 second on my machine (M1 iMac), putting 100 microseconds here to be safe
sleep(Duration::from_millis(100)).await;
slot_sender.send(slots[index + 1].clone()).await.unwrap();
}

// let the farmer know we are done by closing the channels
drop(slot_sender);

if let Err(e) = farming_instance.wait().await {
panic!("Panicked with error...{:?}", e);
}
}

#[tokio::test(flavor = "multi_thread")]
async fn farming_happy_path() {
let slot_info = SlotInfo {
slot_number: 3,
global_challenge: [1; TAG_SIZE],
salt: [1, 1, 1, 1, 1, 1, 1, 1],
next_salt: None,
solution_range: u64::MAX,
};
let slots = vec![slot_info];

let correct_tag: Tag = [23, 245, 162, 52, 107, 135, 192, 210];
let tags = vec![correct_tag];

farming_simulator(slots, tags).await;
}

#[tokio::test(flavor = "multi_thread")]
async fn farming_salt_change() {
let first_slot = SlotInfo {
slot_number: 1,
global_challenge: [1; TAG_SIZE],
salt: [1, 1, 1, 1, 1, 1, 1, 1],
next_salt: Some([1, 1, 1, 1, 1, 1, 1, 2]),
solution_range: u64::MAX,
};
let second_slot = SlotInfo {
slot_number: 2,
global_challenge: [1; TAG_SIZE],
salt: [1, 1, 1, 1, 1, 1, 1, 1],
next_salt: Some([1, 1, 1, 1, 1, 1, 1, 2]),
solution_range: u64::MAX,
};
let third_slot = SlotInfo {
slot_number: 3,
global_challenge: [1; TAG_SIZE],
salt: [1, 1, 1, 1, 1, 1, 1, 2],
next_salt: None,
solution_range: u64::MAX,
};
let slots = vec![first_slot, second_slot, third_slot];

let first_tag: Tag = [23, 245, 162, 52, 107, 135, 192, 210];
let second_tag: Tag = [23, 245, 162, 52, 107, 135, 192, 210];
let third_tag: Tag = [255, 69, 97, 5, 186, 24, 136, 245];
let tags = vec![first_tag, second_tag, third_tag];

farming_simulator(slots, tags).await;
}
3 changes: 3 additions & 0 deletions crates/subspace-farmer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ pub(crate) mod rpc;
pub(crate) mod ws_rpc;
pub mod ws_rpc_server;

#[cfg(test)]
mod mock_rpc;

pub use commitments::{CommitmentError, Commitments};
pub use farming::Farming;
pub use identity::Identity;
Expand Down
3 changes: 3 additions & 0 deletions crates/subspace-farmer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ mod utils;
mod ws_rpc;
mod ws_rpc_server;

#[cfg(test)]
mod mock_rpc;

use anyhow::Result;
use clap::{Parser, ValueHint};
use env_logger::Env;
Expand Down
82 changes: 82 additions & 0 deletions crates/subspace-farmer/src/mock_rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use crate::rpc::{Error as MockError, NewHead, RpcClient};
use async_trait::async_trait;
use std::sync::Arc;
use subspace_rpc_primitives::{
EncodedBlockWithObjectMapping, FarmerMetadata, SlotInfo, SolutionResponse,
};
use tokio::sync::{mpsc, Mutex};

/// `MockRpc` wrapper.
#[derive(Debug)]
pub struct MockRpc {
metadata_recv: Arc<Mutex<mpsc::Receiver<FarmerMetadata>>>,
block_recv: Arc<Mutex<mpsc::Receiver<EncodedBlockWithObjectMapping>>>,
new_head_recv: Arc<Mutex<mpsc::Receiver<NewHead>>>,
slot_recv: Arc<Mutex<mpsc::Receiver<SlotInfo>>>,
solution_sender: mpsc::Sender<SolutionResponse>,
}

impl MockRpc {
/// Create a new instance of [`MockRPC`].
pub(crate) fn new(
metadata_recv: mpsc::Receiver<FarmerMetadata>,
block_recv: mpsc::Receiver<EncodedBlockWithObjectMapping>,
new_head_recv: mpsc::Receiver<NewHead>,
slot_recv: mpsc::Receiver<SlotInfo>,
solution_sender: mpsc::Sender<SolutionResponse>,
) -> Self {
MockRpc {
metadata_recv: Arc::new(Mutex::new(metadata_recv)),
block_recv: Arc::new(Mutex::new(block_recv)),
new_head_recv: Arc::new(Mutex::new(new_head_recv)),
slot_recv: Arc::new(Mutex::new(slot_recv)),
solution_sender,
}
}
}

#[async_trait]
impl RpcClient for MockRpc {
async fn farmer_metadata(&self) -> Result<FarmerMetadata, MockError> {
Ok(self.metadata_recv.lock().await.try_recv()?)
}

async fn block_by_number(
&self,
_block_number: u32,
) -> Result<Option<EncodedBlockWithObjectMapping>, MockError> {
Ok(Some(self.block_recv.lock().await.try_recv()?))
}

async fn subscribe_new_head(&self) -> Result<mpsc::Receiver<NewHead>, MockError> {
let (sender, receiver) = mpsc::channel(10);
let new_head_r = self.new_head_recv.clone();
tokio::spawn(async move {
while let Some(new_head) = new_head_r.lock().await.recv().await {
let _ = sender.send(new_head);
}
});

Ok(receiver)
}

async fn subscribe_slot_info(&self) -> Result<mpsc::Receiver<SlotInfo>, MockError> {
let (sender, receiver) = mpsc::channel(10);
let slot_r = self.slot_recv.clone();
tokio::spawn(async move {
while let Some(slot_info) = slot_r.lock().await.recv().await {
let _ = sender.send(slot_info).await;
}
});

Ok(receiver)
}

async fn submit_solution_response(
&self,
solution_response: SolutionResponse,
) -> Result<(), MockError> {
let _ = self.solution_sender.send(solution_response).await;
Ok(())
}
}
1 change: 0 additions & 1 deletion crates/subspace-farmer/src/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ async fn background_plotting<T: RpcClient + Clone + Send + 'static>(
}

// Listen for new blocks produced on the network
// receiver.fuse();
loop {
tokio::select! {
_ = &mut stop_receiver => {
Expand Down

0 comments on commit c877423

Please sign in to comment.