diff --git a/Cargo.lock b/Cargo.lock index 3311e5e43d..ec039e103c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1887,6 +1887,7 @@ dependencies = [ "ciborium", "clap", "criterion-plot", + "futures", "is-terminal", "itertools", "num-traits", @@ -11407,6 +11408,7 @@ dependencies = [ "blake3", "bytesize", "clap", + "criterion", "derive_more", "event-listener-primitives", "fdlimit", diff --git a/crates/pallet-subspace/src/mock.rs b/crates/pallet-subspace/src/mock.rs index 79df65025a..f601546d90 100644 --- a/crates/pallet-subspace/src/mock.rs +++ b/crates/pallet-subspace/src/mock.rs @@ -80,7 +80,8 @@ fn erasure_coding_instance() -> &'static ErasureCoding { ERASURE_CODING.get_or_init(|| { ErasureCoding::new( - NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize).unwrap(), + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), ) .unwrap() }) diff --git a/crates/sp-lightclient/src/tests.rs b/crates/sp-lightclient/src/tests.rs index 08dc0d9a4d..dd6dd5ca7a 100644 --- a/crates/sp-lightclient/src/tests.rs +++ b/crates/sp-lightclient/src/tests.rs @@ -45,7 +45,8 @@ fn erasure_coding_instance() -> &'static ErasureCoding { ERASURE_CODING.get_or_init(|| { ErasureCoding::new( - NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize).unwrap(), + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), ) .unwrap() }) diff --git a/crates/subspace-farmer-components/benches/auditing.rs b/crates/subspace-farmer-components/benches/auditing.rs index 03b594f9cc..b6badd9271 100644 --- a/crates/subspace-farmer-components/benches/auditing.rs +++ b/crates/subspace-farmer-components/benches/auditing.rs @@ -52,7 +52,8 @@ pub fn criterion_benchmark(c: &mut Criterion) { let kzg = Kzg::new(kzg::embedded_kzg_settings()); let mut archiver = Archiver::new(kzg.clone()).unwrap(); let erasure_coding = ErasureCoding::new( - NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize).unwrap(), + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), ) .unwrap(); let mut table_generator = PosTable::generator(); diff --git a/crates/subspace-farmer-components/benches/plotting.rs b/crates/subspace-farmer-components/benches/plotting.rs index d11bdb659c..e5d305773b 100644 --- a/crates/subspace-farmer-components/benches/plotting.rs +++ b/crates/subspace-farmer-components/benches/plotting.rs @@ -33,7 +33,8 @@ fn criterion_benchmark(c: &mut Criterion) { let kzg = Kzg::new(kzg::embedded_kzg_settings()); let mut archiver = Archiver::new(kzg.clone()).unwrap(); let erasure_coding = ErasureCoding::new( - NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize).unwrap(), + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), ) .unwrap(); let mut table_generator = PosTable::generator(); diff --git a/crates/subspace-farmer-components/benches/proving.rs b/crates/subspace-farmer-components/benches/proving.rs index bb78b7554d..1d706cfd1e 100644 --- a/crates/subspace-farmer-components/benches/proving.rs +++ b/crates/subspace-farmer-components/benches/proving.rs @@ -55,7 +55,8 @@ pub fn criterion_benchmark(c: &mut Criterion) { let kzg = Kzg::new(kzg::embedded_kzg_settings()); let mut archiver = Archiver::new(kzg.clone()).unwrap(); let erasure_coding = ErasureCoding::new( - NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize).unwrap(), + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), ) .unwrap(); let mut table_generator = PosTable::generator(); diff --git a/crates/subspace-farmer-components/benches/reading.rs b/crates/subspace-farmer-components/benches/reading.rs index 6b1ded117d..00456003c7 100644 --- a/crates/subspace-farmer-components/benches/reading.rs +++ b/crates/subspace-farmer-components/benches/reading.rs @@ -51,7 +51,8 @@ pub fn criterion_benchmark(c: &mut Criterion) { let kzg = Kzg::new(kzg::embedded_kzg_settings()); let mut archiver = Archiver::new(kzg.clone()).unwrap(); let erasure_coding = ErasureCoding::new( - NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize).unwrap(), + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), ) .unwrap(); let mut table_generator = PosTable::generator(); diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index c45f099fb8..f313b99b8e 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -21,6 +21,7 @@ blake2 = "0.10.6" blake3 = { version = "1.4.1", default-features = false } bytesize = "1.3.0" clap = { version = "4.4.3", features = ["color", "derive"] } +criterion = { version = "0.5.1", default-features = false, features = ["rayon", "async"] } derive_more = "0.99.17" event-listener-primitives = "2.0.1" fdlimit = "0.2" diff --git a/crates/subspace-farmer/README.md b/crates/subspace-farmer/README.md index 7896b85435..ff25e36f10 100644 --- a/crates/subspace-farmer/README.md +++ b/crates/subspace-farmer/README.md @@ -62,6 +62,11 @@ This will connect to local node and will try to solve on every slot notification *NOTE: You need to have a `subspace-node` running before starting farmer, otherwise it will not be able to start* +### Benchmark auditing +``` +target/production/subspace-farmer benchmark audit /path/to/farm +``` + ### Show information about the farm ``` target/production/subspace-farmer info /path/to/farm diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands.rs index 2c7bc73194..5f5ad44665 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands.rs @@ -1,8 +1,8 @@ -mod farm; +pub(crate) mod benchmark; +pub(crate) mod farm; mod info; mod scrub; mod shared; -pub(crate) use farm::farm; pub(crate) use info::info; pub(crate) use scrub::scrub; diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs new file mode 100644 index 0000000000..56ab2af2c1 --- /dev/null +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs @@ -0,0 +1,141 @@ +use crate::PosTable; +use anyhow::anyhow; +use clap::Subcommand; +use criterion::async_executor::AsyncExecutor; +use criterion::{black_box, BatchSize, Criterion, Throughput}; +#[cfg(windows)] +use memmap2::Mmap; +use parking_lot::Mutex; +use std::fs::OpenOptions; +use std::future::Future; +use std::num::NonZeroUsize; +use std::path::PathBuf; +use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; +use subspace_core_primitives::{Record, SolutionRange}; +use subspace_erasure_coding::ErasureCoding; +use subspace_farmer::single_disk_farm::farming::{plot_audit, PlotAuditOptions}; +use subspace_farmer::single_disk_farm::{SingleDiskFarm, SingleDiskFarmSummary}; +use subspace_farmer_components::sector::sector_size; +use subspace_proof_of_space::Table; +use subspace_rpc_primitives::SlotInfo; +use tokio::runtime::Handle; + +struct TokioAsyncExecutor(Handle); + +impl AsyncExecutor for TokioAsyncExecutor { + fn block_on(&self, future: impl Future) -> T { + tokio::task::block_in_place(|| self.0.block_on(future)) + } +} + +impl TokioAsyncExecutor { + fn new() -> Self { + Self(Handle::current()) + } +} + +/// Arguments for benchmark +#[derive(Debug, Subcommand)] +pub(crate) enum BenchmarkArgs { + /// Audit benchmark + Audit { + /// Disk farm to audit + /// + /// Example: + /// /path/to/directory + disk_farm: PathBuf, + #[arg(long, default_value_t = 10)] + sample_size: usize, + }, +} + +pub(crate) async fn benchmark(benchmark_args: BenchmarkArgs) -> anyhow::Result<()> { + match benchmark_args { + BenchmarkArgs::Audit { + disk_farm, + sample_size, + } => audit(disk_farm, sample_size).await, + } +} + +async fn audit(disk_farm: PathBuf, sample_size: usize) -> anyhow::Result<()> { + let (single_disk_farm_info, disk_farm) = match SingleDiskFarm::collect_summary(disk_farm) { + SingleDiskFarmSummary::Found { info, directory } => (info, directory), + SingleDiskFarmSummary::NotFound { directory } => { + return Err(anyhow!( + "No single disk farm info found, make sure {} is a valid path to the farm and \ + process have permissions to access it", + directory.display() + )); + } + SingleDiskFarmSummary::Error { directory, error } => { + return Err(anyhow!( + "Failed to open single disk farm info, make sure {} is a valid path to the farm \ + and process have permissions to access it: {error}", + directory.display() + )); + } + }; + + let sector_size = sector_size(single_disk_farm_info.pieces_in_sector()); + let kzg = Kzg::new(embedded_kzg_settings()); + let erasure_coding = ErasureCoding::new( + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), + ) + .map_err(|error| anyhow::anyhow!(error))?; + let table_generator = Mutex::new(PosTable::generator()); + + let sectors_metadata = SingleDiskFarm::read_all_sectors_metadata(&disk_farm) + .map_err(|error| anyhow::anyhow!("Failed to read sectors metadata: {error}"))?; + + let plot_file = OpenOptions::new() + .read(true) + .open(disk_farm.join(SingleDiskFarm::PLOT_FILE)) + .map_err(|error| anyhow::anyhow!("Failed to open single disk farm: {error}"))?; + #[cfg(windows)] + let plot_mmap = unsafe { Mmap::map(&plot_file)? }; + + let mut criterion = Criterion::default().sample_size(sample_size); + criterion + .benchmark_group("audit") + .throughput(Throughput::Bytes( + sector_size as u64 * sectors_metadata.len() as u64, + )) + .bench_function("plot", |b| { + b.to_async(TokioAsyncExecutor::new()).iter_batched( + rand::random, + |global_challenge| { + let options = PlotAuditOptions:: { + public_key: single_disk_farm_info.public_key(), + reward_address: single_disk_farm_info.public_key(), + sector_size, + slot_info: SlotInfo { + slot_number: 0, + global_challenge, + // No solution will be found, pure audit + solution_range: SolutionRange::MIN, + // No solution will be found, pure audit + voting_solution_range: SolutionRange::MIN, + }, + sectors_metadata: §ors_metadata, + kzg: &kzg, + erasure_coding: &erasure_coding, + #[cfg(not(windows))] + plot_file: &plot_file, + #[cfg(windows)] + plot_mmap: &plot_mmap, + maybe_sector_being_modified: None, + table_generator: &table_generator, + }; + + black_box(plot_audit(black_box(options))) + }, + BatchSize::SmallInput, + ) + }); + + criterion.final_summary(); + + Ok(()) +} diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 3ca932959b..3b59dde084 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -3,8 +3,9 @@ mod dsn; use crate::commands::farm::dsn::configure_dsn; use crate::commands::shared::print_disk_farm_info; use crate::utils::shutdown_signal; -use crate::{DiskFarm, FarmingArgs}; -use anyhow::{anyhow, Result}; +use anyhow::anyhow; +use bytesize::ByteSize; +use clap::{Parser, ValueHint}; use futures::channel::oneshot; use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; @@ -12,10 +13,13 @@ use lru::LruCache; use parking_lot::Mutex; use rayon::ThreadPoolBuilder; use std::fs; -use std::num::NonZeroUsize; +use std::net::SocketAddr; +use std::num::{NonZeroU8, NonZeroUsize}; +use std::path::PathBuf; +use std::str::FromStr; use std::sync::Arc; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; -use subspace_core_primitives::{Record, SectorIndex}; +use subspace_core_primitives::{PublicKey, Record, SectorIndex}; use subspace_erasure_coding::ErasureCoding; use subspace_farmer::piece_cache::PieceCache; use subspace_farmer::single_disk_farm::{ @@ -24,11 +28,13 @@ use subspace_farmer::single_disk_farm::{ use subspace_farmer::utils::farmer_piece_getter::FarmerPieceGetter; use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator; use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces; +use subspace_farmer::utils::ss58::parse_ss58_reward_address; use subspace_farmer::utils::{run_future_in_dedicated_thread, tokio_rayon_spawn_handler}; use subspace_farmer::{Identity, NodeClient, NodeRpcClient}; use subspace_farmer_components::plotting::PlottedSector; use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; use subspace_networking::libp2p::identity::{ed25519, Keypair}; +use subspace_networking::libp2p::Multiaddr; use subspace_networking::utils::piece_provider::PieceProvider; use subspace_proof_of_space::Table; use tempfile::TempDir; @@ -38,9 +44,224 @@ use zeroize::Zeroizing; const RECORDS_ROOTS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1_000_000).expect("Not zero; qed"); +fn available_parallelism() -> usize { + match std::thread::available_parallelism() { + Ok(parallelism) => parallelism.get(), + Err(error) => { + warn!( + %error, + "Unable to identify available parallelism, you might want to configure thread pool sizes with CLI \ + options manually" + ); + + 0 + } + } +} + +/// Arguments for farmer +#[derive(Debug, Parser)] +pub(crate) struct FarmingArgs { + /// One or more farm located at specified path, each with its own allocated space. + /// + /// In case of multiple disks, it is recommended to specify them individually rather than using + /// RAID 0, that way farmer will be able to better take advantage of concurrency of individual + /// drives. + /// + /// Format for each farm is coma-separated list of strings like this: + /// + /// path=/path/to/directory,size=5T + /// + /// `size` is max allocated size in human readable format (e.g. 10GB, 2TiB) or just bytes that + /// farmer will make sure not not exceed (and will pre-allocated all the space on startup to + /// ensure it will not run out of space in runtime). + disk_farms: Vec, + /// WebSocket RPC URL of the Subspace node to connect to + #[arg(long, value_hint = ValueHint::Url, default_value = "ws://127.0.0.1:9944")] + node_rpc_url: String, + /// Address for farming rewards + #[arg(long, value_parser = parse_ss58_reward_address)] + reward_address: PublicKey, + /// Percentage of allocated space dedicated for caching purposes, 99% max + #[arg(long, default_value = "1", value_parser = cache_percentage_parser)] + cache_percentage: NonZeroU8, + /// Sets some flags that are convenient during development, currently `--enable-private-ips`. + #[arg(long)] + dev: bool, + /// Run temporary farmer with specified plot size in human readable format (e.g. 10GB, 2TiB) or + /// just bytes (e.g. 4096), this will create a temporary directory for storing farmer data that + /// will be deleted at the end of the process. + #[arg(long, conflicts_with = "disk_farms")] + tmp: Option, + /// Maximum number of pieces in sector (can override protocol value to something lower). + /// + /// This will make plotting of individual sectors faster, decrease load on CPU proving, but also + /// proportionally increase amount of disk reads during audits since every sector needs to be + /// audited and there will be more of them. + /// + /// This is primarily for development and not recommended to use by regular users. + #[arg(long)] + max_pieces_in_sector: Option, + /// DSN parameters + #[clap(flatten)] + dsn: DsnArgs, + /// Do not print info about configured farms on startup + #[arg(long)] + no_info: bool, + /// Defines endpoints for the prometheus metrics server. It doesn't start without at least + /// one specified endpoint. Format: 127.0.0.1:8080 + #[arg(long, alias = "metrics-endpoint")] + metrics_endpoints: Vec, + /// Defines how many sectors farmer will download concurrently, allows to limit memory usage of + /// the plotting process, increasing beyond 2 makes practical sense due to limited networking + /// concurrency and will likely result in slower plotting overall + #[arg(long, default_value = "2")] + sector_downloading_concurrency: NonZeroUsize, + /// Defines how many sectors farmer will encode concurrently, should generally never be set to + /// more than 1 because it will most likely result in slower plotting overall + #[arg(long, default_value = "1")] + sector_encoding_concurrency: NonZeroUsize, + /// Allows to enable farming during initial plotting. Not used by default because plotting is so + /// intense on CPU and memory that farming will likely not work properly, yet it will + /// significantly impact plotting speed, delaying the time when farming can actually work + /// properly. + #[arg(long)] + farm_during_initial_plotting: bool, + /// Size of PER FARM thread pool used for farming (mostly for blocking I/O, but also for some + /// compute-intensive operations during proving), defaults to number of CPU cores available in + /// the system + #[arg(long, default_value_t = available_parallelism())] + farming_thread_pool_size: usize, + /// Size of thread pool used for plotting, defaults to number of CPU cores available in the + /// system. This thread pool is global for all farms and generally doesn't need to be changed. + #[arg(long, default_value_t = available_parallelism())] + plotting_thread_pool_size: usize, + /// Size of thread pool used for replotting, typically smaller pool than for plotting to not + /// affect farming as much, defaults to half of the number of CPU cores available in the system. + /// This thread pool is global for all farms and generally doesn't need to be changed. + #[arg(long, default_value_t = available_parallelism() / 2)] + replotting_thread_pool_size: usize, +} + +fn cache_percentage_parser(s: &str) -> anyhow::Result { + let cache_percentage = NonZeroU8::from_str(s)?; + + if cache_percentage.get() > 99 { + return Err(anyhow::anyhow!("Cache percentage can't exceed 99")); + } + + Ok(cache_percentage) +} + +/// Arguments for DSN +#[derive(Debug, Parser)] +struct DsnArgs { + /// Multiaddrs of bootstrap nodes to connect to on startup, multiple are supported + #[arg(long)] + bootstrap_nodes: Vec, + /// Multiaddr to listen on for subspace networking, for instance `/ip4/0.0.0.0/tcp/0`, + /// multiple are supported. + #[arg(long, default_values_t = [ + "/ip4/0.0.0.0/udp/30533/quic-v1".parse::().expect("Manual setting"), + "/ip4/0.0.0.0/tcp/30533".parse::().expect("Manual setting"), + ])] + listen_on: Vec, + /// Determines whether we allow keeping non-global (private, shared, loopback..) addresses in + /// Kademlia DHT. + #[arg(long, default_value_t = false)] + enable_private_ips: bool, + /// Multiaddrs of reserved nodes to maintain a connection to, multiple are supported + #[arg(long)] + reserved_peers: Vec, + /// Defines max established incoming connection limit. + #[arg(long, default_value_t = 50)] + in_connections: u32, + /// Defines max established outgoing swarm connection limit. + #[arg(long, default_value_t = 100)] + out_connections: u32, + /// Defines max pending incoming connection limit. + #[arg(long, default_value_t = 50)] + pending_in_connections: u32, + /// Defines max pending outgoing swarm connection limit. + #[arg(long, default_value_t = 100)] + pending_out_connections: u32, + /// Defines target total (in and out) connection number that should be maintained. + #[arg(long, default_value_t = 50)] + target_connections: u32, + /// Known external addresses + #[arg(long, alias = "external-address")] + external_addresses: Vec, +} + +#[derive(Debug, Clone)] +pub(crate) struct DiskFarm { + /// Path to directory where data is stored. + directory: PathBuf, + /// How much space in bytes can farm use for plots (metadata space is not included) + allocated_plotting_space: u64, +} + +impl FromStr for DiskFarm { + type Err = String; + + fn from_str(s: &str) -> anyhow::Result { + let parts = s.split(',').collect::>(); + if parts.len() != 2 { + return Err("Must contain 2 coma-separated components".to_string()); + } + + let mut plot_directory = None; + let mut allocated_plotting_space = None; + + for part in parts { + let part = part.splitn(2, '=').collect::>(); + if part.len() != 2 { + return Err("Each component must contain = separating key from value".to_string()); + } + + let key = *part.first().expect("Length checked above; qed"); + let value = *part.get(1).expect("Length checked above; qed"); + + match key { + "path" => { + plot_directory.replace( + PathBuf::try_from(value).map_err(|error| { + format!("Failed to parse `path` \"{value}\": {error}") + })?, + ); + } + "size" => { + allocated_plotting_space.replace( + value + .parse::() + .map_err(|error| { + format!("Failed to parse `size` \"{value}\": {error}") + })? + .as_u64(), + ); + } + key => { + return Err(format!( + "Key \"{key}\" is not supported, only `path` or `size`" + )); + } + } + } + + Ok(DiskFarm { + directory: plot_directory.ok_or({ + "`path` key is required with path to directory where plots will be stored" + })?, + allocated_plotting_space: allocated_plotting_space.ok_or({ + "`size` key is required with path to directory where plots will be stored" + })?, + }) + } +} + /// Start farming by using multiple replica plot in specified path and connecting to WebSocket /// server at specified address. -pub(crate) async fn farm(farming_args: FarmingArgs) -> Result<(), anyhow::Error> +pub(crate) async fn farm(farming_args: FarmingArgs) -> anyhow::Result<()> where PosTable: Table, { @@ -148,7 +369,8 @@ where let kzg = Kzg::new(embedded_kzg_settings()); let erasure_coding = ErasureCoding::new( - NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize).unwrap(), + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), ) .map_err(|error| anyhow::anyhow!(error))?; // TODO: Consider introducing and using global in-memory segment header cache (this comment is diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs index 7a93886041..f0fe5b7c25 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs @@ -1,4 +1,4 @@ -use crate::DsnArgs; +use crate::commands::farm::DsnArgs; use parking_lot::Mutex; use prometheus_client::registry::Registry; use std::collections::HashSet; diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs index b15f5f48d8..37c37ee362 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs @@ -1,22 +1,14 @@ #![feature(const_option, type_changing_struct_update)] mod commands; -mod ss58; mod utils; -use bytesize::ByteSize; -use clap::{Parser, ValueHint}; -use ss58::parse_ss58_reward_address; +use clap::Parser; use std::fs; -use std::net::SocketAddr; -use std::num::{NonZeroU8, NonZeroUsize}; use std::path::PathBuf; -use std::str::FromStr; -use subspace_core_primitives::PublicKey; use subspace_farmer::single_disk_farm::SingleDiskFarm; -use subspace_networking::libp2p::Multiaddr; use subspace_proof_of_space::chia::ChiaTable; -use tracing::{info, warn}; +use tracing::info; use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::prelude::*; use tracing_subscriber::{fmt, EnvFilter}; @@ -26,227 +18,15 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; type PosTable = ChiaTable; -fn available_parallelism() -> usize { - match std::thread::available_parallelism() { - Ok(parallelism) => parallelism.get(), - Err(error) => { - warn!( - %error, - "Unable to identify available parallelism, you might want to configure thread pool sizes with CLI \ - options manually" - ); - - 0 - } - } -} - -/// Arguments for farmer -#[derive(Debug, Parser)] -struct FarmingArgs { - /// One or more farm located at specified path, each with its own allocated space. - /// - /// In case of multiple disks, it is recommended to specify them individually rather than using - /// RAID 0, that way farmer will be able to better take advantage of concurrency of individual - /// drives. - /// - /// Format for each farm is coma-separated list of strings like this: - /// - /// path=/path/to/directory,size=5T - /// - /// `size` is max allocated size in human readable format (e.g. 10GB, 2TiB) or just bytes that - /// farmer will make sure not not exceed (and will pre-allocated all the space on startup to - /// ensure it will not run out of space in runtime). - disk_farms: Vec, - /// WebSocket RPC URL of the Subspace node to connect to - #[arg(long, value_hint = ValueHint::Url, default_value = "ws://127.0.0.1:9944")] - node_rpc_url: String, - /// Address for farming rewards - #[arg(long, value_parser = parse_ss58_reward_address)] - reward_address: PublicKey, - /// Percentage of allocated space dedicated for caching purposes, 99% max - #[arg(long, default_value = "1", value_parser = cache_percentage_parser)] - cache_percentage: NonZeroU8, - /// Sets some flags that are convenient during development, currently `--enable-private-ips`. - #[arg(long)] - dev: bool, - /// Run temporary farmer with specified plot size in human readable format (e.g. 10GB, 2TiB) or - /// just bytes (e.g. 4096), this will create a temporary directory for storing farmer data that - /// will be deleted at the end of the process. - #[arg(long, conflicts_with = "disk_farms")] - tmp: Option, - /// Maximum number of pieces in sector (can override protocol value to something lower). - /// - /// This will make plotting of individual sectors faster, decrease load on CPU proving, but also - /// proportionally increase amount of disk reads during audits since every sector needs to be - /// audited and there will be more of them. - /// - /// This is primarily for development and not recommended to use by regular users. - #[arg(long)] - max_pieces_in_sector: Option, - /// DSN parameters - #[clap(flatten)] - dsn: DsnArgs, - /// Do not print info about configured farms on startup - #[arg(long)] - no_info: bool, - /// Defines endpoints for the prometheus metrics server. It doesn't start without at least - /// one specified endpoint. Format: 127.0.0.1:8080 - #[arg(long, alias = "metrics-endpoint")] - metrics_endpoints: Vec, - /// Defines how many sectors farmer will download concurrently, allows to limit memory usage of - /// the plotting process, increasing beyond 2 makes practical sense due to limited networking - /// concurrency and will likely result in slower plotting overall - #[arg(long, default_value = "2")] - sector_downloading_concurrency: NonZeroUsize, - /// Defines how many sectors farmer will encode concurrently, should generally never be set to - /// more than 1 because it will most likely result in slower plotting overall - #[arg(long, default_value = "1")] - sector_encoding_concurrency: NonZeroUsize, - /// Allows to enable farming during initial plotting. Not used by default because plotting is so - /// intense on CPU and memory that farming will likely not work properly, yet it will - /// significantly impact plotting speed, delaying the time when farming can actually work - /// properly. - #[arg(long)] - farm_during_initial_plotting: bool, - /// Size of PER FARM thread pool used for farming (mostly for blocking I/O, but also for some - /// compute-intensive operations during proving), defaults to number of CPU cores available in - /// the system - #[arg(long, default_value_t = available_parallelism())] - farming_thread_pool_size: usize, - /// Size of thread pool used for plotting, defaults to number of CPU cores available in the - /// system. This thread pool is global for all farms and generally doesn't need to be changed. - #[arg(long, default_value_t = available_parallelism())] - plotting_thread_pool_size: usize, - /// Size of thread pool used for replotting, typically smaller pool than for plotting to not - /// affect farming as much, defaults to half of the number of CPU cores available in the system. - /// This thread pool is global for all farms and generally doesn't need to be changed. - #[arg(long, default_value_t = available_parallelism() / 2)] - replotting_thread_pool_size: usize, -} - -fn cache_percentage_parser(s: &str) -> anyhow::Result { - let cache_percentage = NonZeroU8::from_str(s)?; - - if cache_percentage.get() > 99 { - return Err(anyhow::anyhow!("Cache percentage can't exceed 99")); - } - - Ok(cache_percentage) -} - -/// Arguments for DSN -#[derive(Debug, Parser)] -struct DsnArgs { - /// Multiaddrs of bootstrap nodes to connect to on startup, multiple are supported - #[arg(long)] - bootstrap_nodes: Vec, - /// Multiaddr to listen on for subspace networking, for instance `/ip4/0.0.0.0/tcp/0`, - /// multiple are supported. - #[arg(long, default_values_t = [ - "/ip4/0.0.0.0/udp/30533/quic-v1".parse::().expect("Manual setting"), - "/ip4/0.0.0.0/tcp/30533".parse::().expect("Manual setting"), - ])] - listen_on: Vec, - /// Determines whether we allow keeping non-global (private, shared, loopback..) addresses in - /// Kademlia DHT. - #[arg(long, default_value_t = false)] - enable_private_ips: bool, - /// Multiaddrs of reserved nodes to maintain a connection to, multiple are supported - #[arg(long)] - reserved_peers: Vec, - /// Defines max established incoming connection limit. - #[arg(long, default_value_t = 50)] - in_connections: u32, - /// Defines max established outgoing swarm connection limit. - #[arg(long, default_value_t = 100)] - out_connections: u32, - /// Defines max pending incoming connection limit. - #[arg(long, default_value_t = 50)] - pending_in_connections: u32, - /// Defines max pending outgoing swarm connection limit. - #[arg(long, default_value_t = 100)] - pending_out_connections: u32, - /// Defines target total (in and out) connection number that should be maintained. - #[arg(long, default_value_t = 50)] - target_connections: u32, - /// Known external addresses - #[arg(long, alias = "external-address")] - external_addresses: Vec, -} - -#[derive(Debug, Clone)] -struct DiskFarm { - /// Path to directory where data is stored. - directory: PathBuf, - /// How much space in bytes can farm use for plots (metadata space is not included) - allocated_plotting_space: u64, -} - -impl FromStr for DiskFarm { - type Err = String; - - fn from_str(s: &str) -> anyhow::Result { - let parts = s.split(',').collect::>(); - if parts.len() != 2 { - return Err("Must contain 2 coma-separated components".to_string()); - } - - let mut plot_directory = None; - let mut allocated_plotting_space = None; - - for part in parts { - let part = part.splitn(2, '=').collect::>(); - if part.len() != 2 { - return Err("Each component must contain = separating key from value".to_string()); - } - - let key = *part.first().expect("Length checked above; qed"); - let value = *part.get(1).expect("Length checked above; qed"); - - match key { - "path" => { - plot_directory.replace( - PathBuf::try_from(value).map_err(|error| { - format!("Failed to parse `path` \"{value}\": {error}") - })?, - ); - } - "size" => { - allocated_plotting_space.replace( - value - .parse::() - .map_err(|error| { - format!("Failed to parse `size` \"{value}\": {error}") - })? - .as_u64(), - ); - } - key => { - return Err(format!( - "Key \"{key}\" is not supported, only `path` or `size`" - )); - } - } - } - - Ok(DiskFarm { - directory: plot_directory.ok_or({ - "`path` key is required with path to directory where plots will be stored" - })?, - allocated_plotting_space: allocated_plotting_space.ok_or({ - "`size` key is required with path to directory where plots will be stored" - })?, - }) - } -} - #[allow(clippy::large_enum_variant)] #[derive(Debug, Parser)] #[clap(about, version)] enum Command { /// Start a farmer, does plotting and farming - Farm(FarmingArgs), + Farm(commands::farm::FarmingArgs), + /// Run various benchmarks + #[clap(subcommand)] + Benchmark(commands::benchmark::BenchmarkArgs), /// Print information about farm and its content Info { /// One or more farm located at specified path. @@ -297,6 +77,26 @@ async fn main() -> anyhow::Result<()> { let command = Command::parse(); match command { + Command::Farm(farming_args) => { + commands::farm::farm::(farming_args).await?; + } + Command::Benchmark(benchmark_args) => { + commands::benchmark::benchmark(benchmark_args).await?; + } + Command::Info { disk_farms } => { + if disk_farms.is_empty() { + info!("No farm was specified, so there is nothing to do"); + } else { + commands::info(disk_farms); + } + } + Command::Scrub { disk_farms } => { + if disk_farms.is_empty() { + info!("No farm was specified, so there is nothing to do"); + } else { + commands::scrub(&disk_farms); + } + } Command::Wipe { disk_farms } => { for disk_farm in &disk_farms { if !disk_farm.exists() { @@ -321,23 +121,6 @@ async fn main() -> anyhow::Result<()> { info!("Done"); } } - Command::Farm(farming_args) => { - commands::farm::(farming_args).await?; - } - Command::Info { disk_farms } => { - if disk_farms.is_empty() { - info!("No farm was specified, so there is nothing to do"); - } else { - commands::info(disk_farms); - } - } - Command::Scrub { disk_farms } => { - if disk_farms.is_empty() { - info!("No farm was specified, so there is nothing to do"); - } else { - commands::scrub(&disk_farms); - } - } } Ok(()) } diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 71a7a50f54..c26882eecb 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -1,4 +1,4 @@ -mod farming; +pub mod farming; pub mod piece_cache; pub mod piece_reader; mod plotting; @@ -560,8 +560,8 @@ impl Drop for SingleDiskFarm { } impl SingleDiskFarm { - const PLOT_FILE: &'static str = "plot.bin"; - const METADATA_FILE: &'static str = "metadata.bin"; + pub const PLOT_FILE: &'static str = "plot.bin"; + pub const METADATA_FILE: &'static str = "metadata.bin"; const SUPPORTED_PLOT_VERSION: u8 = 0; /// Create new single disk farm instance @@ -1137,6 +1137,60 @@ impl SingleDiskFarm { } } + /// Read all sectors metadata + pub fn read_all_sectors_metadata( + directory: &Path, + ) -> io::Result> { + let mut metadata_file = OpenOptions::new() + .read(true) + .open(directory.join(Self::METADATA_FILE))?; + + let metadata_size = metadata_file.seek(SeekFrom::End(0))?; + let sector_metadata_size = SectorMetadataChecksummed::encoded_size(); + + let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()]; + metadata_file.read_exact_at(&mut metadata_header_bytes, 0)?; + + let metadata_header = PlotMetadataHeader::decode(&mut metadata_header_bytes.as_ref()) + .map_err(|error| { + io::Error::new( + io::ErrorKind::Other, + format!("Failed to decode metadata header: {}", error), + ) + })?; + + if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION { + return Err(io::Error::new( + io::ErrorKind::Other, + format!("Unsupported metadata version {}", metadata_header.version), + )); + } + + let mut sectors_metadata = Vec::::with_capacity( + ((metadata_size - RESERVED_PLOT_METADATA) / sector_metadata_size as u64) as usize, + ); + + let mut sector_metadata_bytes = vec![0; sector_metadata_size]; + for sector_index in 0..metadata_header.plotted_sector_count { + metadata_file.read_exact_at( + &mut sector_metadata_bytes, + RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(sector_index), + )?; + sectors_metadata.push( + SectorMetadataChecksummed::decode(&mut sector_metadata_bytes.as_ref()).map_err( + |error| { + io::Error::new( + io::ErrorKind::Other, + format!("Failed to decode sector metadata: {}", error), + ) + }, + )?, + ); + } + + Ok(sectors_metadata) + } + /// ID of this farm pub fn id(&self) -> &SingleDiskFarmId { self.single_disk_farm_info.id() diff --git a/crates/subspace-farmer/src/single_disk_farm/farming.rs b/crates/subspace-farmer/src/single_disk_farm/farming.rs index b8e67e3176..4987b365bf 100644 --- a/crates/subspace-farmer/src/single_disk_farm/farming.rs +++ b/crates/subspace-farmer/src/single_disk_farm/farming.rs @@ -14,7 +14,7 @@ use std::io; use std::sync::Arc; use std::time::Instant; use subspace_core_primitives::crypto::kzg::Kzg; -use subspace_core_primitives::{PosSeed, PublicKey, SectorIndex, SolutionRange}; +use subspace_core_primitives::{PosSeed, PublicKey, SectorIndex, Solution, SolutionRange}; use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::auditing::audit_sector; use subspace_farmer_components::proving; @@ -137,84 +137,30 @@ where let start = Instant::now(); let slot = slot_info.slot_number; let sectors_metadata = sectors_metadata.read().await; - let sector_count = sectors_metadata.len(); - debug!(%slot, %sector_count, "Reading sectors"); - - #[cfg(not(windows))] - let sectors = (0..sector_count) - .into_par_iter() - .map(|sector_index| plot_file.offset(sector_index * sector_size)); - // On Windows random read is horrible in terms of performance, memory-mapped I/O helps - // TODO: Remove this once https://internals.rust-lang.org/t/introduce-write-all-at-read-exact-at-on-windows/19649 - // or similar exists in standard library - #[cfg(windows)] - let sectors = plot_mmap.par_chunks_exact(sector_size); + debug!(%slot, sector_count = %sectors_metadata.len(), "Reading sectors"); let sectors_solutions = { let modifying_sector_guard = modifying_sector_index.read().await; let maybe_sector_being_modified = modifying_sector_guard.as_ref().copied(); - let mut sectors_solutions = sectors_metadata - .par_iter() - .zip(sectors) - .enumerate() - .filter_map(|(sector_index, (sector_metadata, sector))| { - let sector_index = sector_index as u16; - if maybe_sector_being_modified == Some(sector_index) { - // Skip sector that is being modified right now - return None; - } - trace!(%slot, %sector_index, "Auditing sector"); - - let audit_results = audit_sector( - &public_key, - sector_index, - &slot_info.global_challenge, - slot_info.voting_solution_range, - sector, - sector_metadata, - )?; - - Some((sector_index, audit_results.solution_candidates)) - }) - .filter_map(|(sector_index, solution_candidates)| { - let sector_solutions = match solution_candidates.into_solutions( - &reward_address, - &kzg, - &erasure_coding, - |seed: &PosSeed| table_generator.lock().generate_parallel(seed), - ) { - Ok(solutions) => solutions, - Err(error) => { - warn!( - %error, - %sector_index, - "Failed to turn solution candidates into solutions", - ); - - return None; - } - }; - - if sector_solutions.len() == 0 { - return None; - } - - Some((sector_index, sector_solutions)) - }) - .collect::>(); - - sectors_solutions.sort_by(|a, b| { - let a_solution_distance = - a.1.best_solution_distance().unwrap_or(SolutionRange::MAX); - let b_solution_distance = - b.1.best_solution_distance().unwrap_or(SolutionRange::MAX); - - a_solution_distance.cmp(&b_solution_distance) + let sectors_solutions_fut = plot_audit(PlotAuditOptions:: { + public_key: &public_key, + reward_address: &reward_address, + sector_size, + slot_info, + sectors_metadata: §ors_metadata, + kzg: &kzg, + erasure_coding: &erasure_coding, + #[cfg(not(windows))] + plot_file, + #[cfg(windows)] + plot_mmap: &plot_mmap, + maybe_sector_being_modified, + table_generator: &table_generator, }); - sectors_solutions + sectors_solutions_fut.await }; 'solutions_processing: for (sector_index, sector_solutions) in sectors_solutions { @@ -242,7 +188,7 @@ where } let response = SolutionResponse { - slot_number: slot_info.slot_number, + slot_number: slot, solution, }; @@ -263,3 +209,132 @@ where Ok(()) } + +/// Plot audit options +pub struct PlotAuditOptions<'a, PosTable> +where + PosTable: Table, +{ + /// Public key of the farm + pub public_key: &'a PublicKey, + /// Reward address to use for solutions + pub reward_address: &'a PublicKey, + /// Sector size in bytes + pub sector_size: usize, + /// Slot info for the audit + pub slot_info: SlotInfo, + /// Metadata of all sectors plotted so far + pub sectors_metadata: &'a [SectorMetadataChecksummed], + /// Kzg instance + pub kzg: &'a Kzg, + /// Erasure coding instance + pub erasure_coding: &'a ErasureCoding, + /// File corresponding to the plot, must have at least `sectors_metadata.len()` sectors of + /// `sector_size` each + #[cfg(not(windows))] + pub plot_file: &'a File, + /// Memory-mapped file corresponding to the plot, must have at least `sectors_metadata.len()` + /// sectors of `sector_size` each + #[cfg(windows)] + pub plot_mmap: &'a Mmap, + /// Optional sector that is currently being modified (for example replotted) and should not be + /// audited + pub maybe_sector_being_modified: Option, + /// Proof of space table generator + pub table_generator: &'a Mutex, +} + +pub async fn plot_audit( + options: PlotAuditOptions<'_, PosTable>, +) -> Vec<( + SectorIndex, + impl ProvableSolutions, proving::ProvingError>> + '_, +)> +where + PosTable: Table, +{ + let PlotAuditOptions { + public_key, + reward_address, + sector_size, + slot_info, + sectors_metadata, + kzg, + erasure_coding, + #[cfg(not(windows))] + plot_file, + #[cfg(windows)] + plot_mmap, + maybe_sector_being_modified, + table_generator, + } = options; + + #[cfg(not(windows))] + let sectors = (0..sectors_metadata.len()) + .into_par_iter() + .map(|sector_index| plot_file.offset(sector_index * sector_size)); + // On Windows random read is horrible in terms of performance, memory-mapped I/O helps + // TODO: Remove this once https://internals.rust-lang.org/t/introduce-write-all-at-read-exact-at-on-windows/19649 + // or similar exists in standard library + #[cfg(windows)] + let sectors = plot_mmap.par_chunks_exact(sector_size); + + let mut sectors_solutions = sectors_metadata + .par_iter() + .zip(sectors) + .enumerate() + .filter_map(|(sector_index, (sector_metadata, sector))| { + let sector_index = sector_index as u16; + if maybe_sector_being_modified == Some(sector_index) { + // Skip sector that is being modified right now + return None; + } + trace!(slot = %slot_info.slot_number, %sector_index, "Auditing sector"); + + let audit_results = audit_sector( + public_key, + sector_index, + &slot_info.global_challenge, + slot_info.voting_solution_range, + sector, + sector_metadata, + )?; + + Some((sector_index, audit_results.solution_candidates)) + }) + .filter_map(|(sector_index, solution_candidates)| { + let sector_solutions = match solution_candidates.into_solutions( + reward_address, + kzg, + erasure_coding, + |seed: &PosSeed| table_generator.lock().generate_parallel(seed), + ) { + Ok(solutions) => solutions, + Err(error) => { + warn!( + %error, + %sector_index, + "Failed to turn solution candidates into solutions", + ); + + return None; + } + }; + + if sector_solutions.len() == 0 { + return None; + } + + Some((sector_index, sector_solutions)) + }) + .collect::>(); + + sectors_solutions.sort_by(|a, b| { + let a_solution_distance = a.1.best_solution_distance().unwrap_or(SolutionRange::MAX); + let b_solution_distance = b.1.best_solution_distance().unwrap_or(SolutionRange::MAX); + + a_solution_distance.cmp(&b_solution_distance) + }); + + sectors_solutions +} diff --git a/crates/subspace-farmer/src/utils.rs b/crates/subspace-farmer/src/utils.rs index 8fa9e7d875..6fd4f0a8ed 100644 --- a/crates/subspace-farmer/src/utils.rs +++ b/crates/subspace-farmer/src/utils.rs @@ -1,6 +1,7 @@ pub mod farmer_piece_getter; pub mod piece_validator; pub mod readers_and_pieces; +pub mod ss58; #[cfg(test)] mod tests; diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/ss58.rs b/crates/subspace-farmer/src/utils/ss58.rs similarity index 96% rename from crates/subspace-farmer/src/bin/subspace-farmer/ss58.rs rename to crates/subspace-farmer/src/utils/ss58.rs index 4dfad9cbc6..0d9b8a15b1 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/ss58.rs +++ b/crates/subspace-farmer/src/utils/ss58.rs @@ -30,7 +30,7 @@ const CHECKSUM_LEN: usize = 2; /// An error type for SS58 decoding. #[derive(Debug, Error)] -pub(crate) enum Ss58ParsingError { +pub enum Ss58ParsingError { /// Base 58 requirement is violated #[error("Base 58 requirement is violated")] BadBase58, @@ -49,7 +49,7 @@ pub(crate) enum Ss58ParsingError { } /// Some if the string is a properly encoded SS58Check address. -pub(crate) fn parse_ss58_reward_address(s: &str) -> Result { +pub fn parse_ss58_reward_address(s: &str) -> Result { let data = s.from_base58().map_err(|_| Ss58ParsingError::BadBase58)?; if data.len() < 2 { return Err(Ss58ParsingError::BadLength); diff --git a/docs/farming.md b/docs/farming.md index fe2440afc4..73af7327f0 100644 --- a/docs/farming.md +++ b/docs/farming.md @@ -319,11 +319,12 @@ There are extra commands and parameters you can use on farmer or node, use the ` Below are some helpful samples: -- `./FARMER_FILE_NAME info PATH_TO_FARM` : show information about the farm at `PATH_TO_FARM` -- `./FARMER_FILE_NAME scrub PATH_TO_FARM` : Scrub the farm to find and fix farm at `PATH_TO_FARM` corruption -- `./FARMER_FILE_NAME wipe PATH_TO_FARM` : erases everything related to farmer if data were stored in `PATH_TO_FARM` -- `./NODE_FILE_NAME --base-path NODE_DATA_PATH --chain gemini-3f ...` : start node and store data in `NODE_DATA_PATH` instead of default location -- `./NODE_FILE_NAME purge-chain --base-path NODE_DATA_PATH --chain gemini-3f` : erases data related to the node if data were stored in `NODE_DATA_PATH` +- `./FARMER_FILE_NAME benchmark audit PATH_TO_FARM`: benchmark auditing performance of the farm at `PATH_TO_FARM` +- `./FARMER_FILE_NAME info PATH_TO_FARM`: show information about the farm at `PATH_TO_FARM` +- `./FARMER_FILE_NAME scrub PATH_TO_FARM`: Scrub the farm to find and fix farm at `PATH_TO_FARM` corruption +- `./FARMER_FILE_NAME wipe PATH_TO_FARM`: erases everything related to farmer if data were stored in `PATH_TO_FARM` +- `./NODE_FILE_NAME --base-path NODE_DATA_PATH --chain gemini-3f ...`: start node and store data in `NODE_DATA_PATH` instead of default location +- `./NODE_FILE_NAME purge-chain --base-path NODE_DATA_PATH --chain gemini-3f`: erases data related to the node if data were stored in `NODE_DATA_PATH` Examples: ```bash diff --git a/test/subspace-test-client/src/lib.rs b/test/subspace-test-client/src/lib.rs index dd1e749768..853d6b9949 100644 --- a/test/subspace-test-client/src/lib.rs +++ b/test/subspace-test-client/src/lib.rs @@ -148,7 +148,8 @@ async fn start_farming( let kzg = Kzg::new(embedded_kzg_settings()); let erasure_coding = ErasureCoding::new( - NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize).unwrap(), + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), ) .unwrap();