From 3f5e7dccb511325cb12361b0441fb4a33ac7f3e2 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 23 Oct 2023 20:09:53 +0300 Subject: [PATCH] Implement an abstraction that opens file for auditing multiple times, once for every auditing rayon thread --- Cargo.lock | 10 ---- crates/subspace-farmer/Cargo.toml | 3 -- .../bin/subspace-farmer/commands/benchmark.rs | 53 +++++++++++++++---- .../subspace-farmer/src/single_disk_farm.rs | 15 ++---- .../src/single_disk_farm/farming.rs | 1 + .../single_disk_farm/farming/rayon_files.rs | 53 +++++++++++++++++++ 6 files changed, 99 insertions(+), 36 deletions(-) create mode 100644 crates/subspace-farmer/src/single_disk_farm/farming/rayon_files.rs diff --git a/Cargo.lock b/Cargo.lock index c3f2ecb62f..da07bda1c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6407,15 +6407,6 @@ dependencies = [ "libc", ] -[[package]] -name = "memmap2" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "deaba38d7abf1d4cca21cc89e932e542ba2b9258664d2a9ef0e61512039c9375" -dependencies = [ - "libc", -] - [[package]] name = "memoffset" version = "0.6.5" @@ -11519,7 +11510,6 @@ dependencies = [ "hex", "jsonrpsee", "lru 0.11.1", - "memmap2 0.9.0", "mimalloc", "monoio", "parity-scale-codec", diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index 59296ee0e4..3027a0d272 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -58,8 +58,5 @@ tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } ulid = { version = "1.0.0", features = ["serde"] } zeroize = "1.6.0" -[target.'cfg(windows)'.dependencies] -memmap2 = "0.9.0" - [target.'cfg(any(target_os = "linux", target_os = "macos"))'.dependencies] monoio = { version = "0.1.10-beta.1", features = ["sync"] } diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs index 51975c7450..f1571ba5e8 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs @@ -3,8 +3,6 @@ use anyhow::anyhow; use clap::Subcommand; use criterion::{black_box, BatchSize, Criterion, Throughput}; use futures::FutureExt; -#[cfg(windows)] -use memmap2::Mmap; use parking_lot::Mutex; use std::fs::OpenOptions; use std::num::NonZeroUsize; @@ -12,6 +10,7 @@ use std::path::PathBuf; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; use subspace_core_primitives::{Record, SolutionRange}; use subspace_erasure_coding::ErasureCoding; +use subspace_farmer::single_disk_farm::farming::rayon_files::RayonFiles; use subspace_farmer::single_disk_farm::farming::sync_fallback::SyncPlotAudit; use subspace_farmer::single_disk_farm::farming::{PlotAudit, PlotAuditOptions}; use subspace_farmer::single_disk_farm::{SingleDiskFarm, SingleDiskFarmSummary}; @@ -81,20 +80,52 @@ fn audit(disk_farm: PathBuf, sample_size: usize) -> anyhow::Result<()> { sector_size as u64 * sectors_metadata.len() as u64, )); { - let plot_file = OpenOptions::new() + let plot = OpenOptions::new() .read(true) .open(disk_farm.join(SingleDiskFarm::PLOT_FILE)) .map_err(|error| anyhow::anyhow!("Failed to open plot: {error}"))?; - #[cfg(windows)] - let plot_mmap = unsafe { Mmap::map(&plot_file)? }; - group.bench_function("plot/sync", |b| { - #[cfg(not(windows))] - let plot = &plot_file; - #[cfg(windows)] - let plot = &*plot_mmap; + group.bench_function("plot/sync/single", |b| { + let sync_plot_audit = SyncPlotAudit::new(&plot); - let sync_plot_audit = SyncPlotAudit::new(plot); + b.iter_batched( + rand::random, + |global_challenge| { + let options = PlotAuditOptions:: { + public_key: single_disk_farm_info.public_key(), + reward_address: single_disk_farm_info.public_key(), + slot_info: SlotInfo { + slot_number: 0, + global_challenge, + // No solution will be found, pure audit + solution_range: SolutionRange::MIN, + // No solution will be found, pure audit + voting_solution_range: SolutionRange::MIN, + }, + sectors_metadata: §ors_metadata, + kzg: &kzg, + erasure_coding: &erasure_coding, + maybe_sector_being_modified: None, + table_generator: &table_generator, + }; + + black_box( + sync_plot_audit + .audit(black_box(options)) + .now_or_never() + .unwrap(), + ) + }, + BatchSize::SmallInput, + ) + }); + } + { + let plot = RayonFiles::open(&disk_farm.join(SingleDiskFarm::PLOT_FILE)) + .map_err(|error| anyhow::anyhow!("Failed to open plot: {error}"))?; + + group.bench_function("plot/sync/rayon", |b| { + let sync_plot_audit = SyncPlotAudit::new(&plot); b.iter_batched( rand::random, diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 62d8b377dd..095cca8f8d 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -6,6 +6,7 @@ mod plotting; use crate::identity::{Identity, IdentityError}; use crate::node_client::NodeClient; use crate::reward_signing::reward_signing; +use crate::single_disk_farm::farming::rayon_files::RayonFiles; use crate::single_disk_farm::farming::sync_fallback::SyncPlotAudit; pub use crate::single_disk_farm::farming::FarmingError; use crate::single_disk_farm::farming::{farming, slot_notification_forwarder, FarmingOptions}; @@ -971,7 +972,6 @@ impl SingleDiskFarm { let farming_join_handle = thread::Builder::new() .name(format!("farming-{disk_farm_index}")) .spawn({ - let plot_file = Arc::clone(&plot_file); let handle = handle.clone(); let erasure_coding = erasure_coding.clone(); let handlers = Arc::clone(&handlers); @@ -1024,17 +1024,8 @@ impl SingleDiskFarm { } } - #[cfg(not(windows))] - let plot_audit = &SyncPlotAudit::new(&*plot_file); - #[cfg(windows)] - let plot_mmap = unsafe { - memmap2::Mmap::map(&*plot_file).map_err(FarmingError::from)? - }; - // On Windows random read is horrible in terms of performance, memory-mapped I/O helps - // TODO: Remove this once https://internals.rust-lang.org/t/introduce-write-all-at-read-exact-at-on-windows/19649 - // or similar exists in standard library - #[cfg(windows)] - let plot_audit = &SyncPlotAudit::new(&*plot_mmap); + let plot = RayonFiles::open(&directory.join(Self::PLOT_FILE))?; + let plot_audit = &SyncPlotAudit::new(&plot); let farming_options = FarmingOptions { public_key, diff --git a/crates/subspace-farmer/src/single_disk_farm/farming.rs b/crates/subspace-farmer/src/single_disk_farm/farming.rs index 153276ace7..672f34f90e 100644 --- a/crates/subspace-farmer/src/single_disk_farm/farming.rs +++ b/crates/subspace-farmer/src/single_disk_farm/farming.rs @@ -1,5 +1,6 @@ #[cfg(any(target_os = "linux", target_os = "macos"))] pub mod monoio; +pub mod rayon_files; pub mod sync_fallback; use crate::node_client; diff --git a/crates/subspace-farmer/src/single_disk_farm/farming/rayon_files.rs b/crates/subspace-farmer/src/single_disk_farm/farming/rayon_files.rs new file mode 100644 index 0000000000..b66283771a --- /dev/null +++ b/crates/subspace-farmer/src/single_disk_farm/farming/rayon_files.rs @@ -0,0 +1,53 @@ +use std::fs::{File, OpenOptions}; +use std::io; +use std::path::Path; +use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt}; +use subspace_farmer_components::ReadAtSync; + +/// Wrapper data structure for multiple files to be used with [`rayon`] thread pool, where the same +/// file is opened multiple times, once for each thread. +pub struct RayonFiles { + files: Vec, +} + +impl ReadAtSync for RayonFiles { + fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> { + let thread_index = rayon::current_thread_index().ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + "Reads must be called from rayon worker thread", + ) + })?; + let file = self.files.get(thread_index).ok_or_else(|| { + io::Error::new(io::ErrorKind::Other, "No files entry for this rayon thread") + })?; + + file.read_at(buf, offset) + } +} + +impl ReadAtSync for &RayonFiles { + fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> { + (*self).read_at(buf, offset) + } +} + +impl RayonFiles { + /// Open file at specified as many times as there is number of threads in current [`rayon`] + /// thread pool. + pub fn open(path: &Path) -> io::Result { + let files = (0..rayon::current_num_threads()) + .map(|_| { + let file = OpenOptions::new() + .read(true) + .advise_random_access() + .open(path)?; + file.advise_random_access()?; + + Ok::<_, io::Error>(file) + }) + .collect::, _>>()?; + + Ok(Self { files }) + } +}