Skip to content

Commit

Permalink
Implement an abstraction that opens file for auditing multiple times,…
Browse files Browse the repository at this point in the history
… once for every auditing rayon thread
  • Loading branch information
nazar-pc committed Oct 23, 2023
1 parent d31fe47 commit 3f5e7dc
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 36 deletions.
10 changes: 0 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions crates/subspace-farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,5 @@ tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
ulid = { version = "1.0.0", features = ["serde"] }
zeroize = "1.6.0"

[target.'cfg(windows)'.dependencies]
memmap2 = "0.9.0"

[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dependencies]
monoio = { version = "0.1.10-beta.1", features = ["sync"] }
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ use anyhow::anyhow;
use clap::Subcommand;
use criterion::{black_box, BatchSize, Criterion, Throughput};
use futures::FutureExt;
#[cfg(windows)]
use memmap2::Mmap;
use parking_lot::Mutex;
use std::fs::OpenOptions;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg};
use subspace_core_primitives::{Record, SolutionRange};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer::single_disk_farm::farming::rayon_files::RayonFiles;
use subspace_farmer::single_disk_farm::farming::sync_fallback::SyncPlotAudit;
use subspace_farmer::single_disk_farm::farming::{PlotAudit, PlotAuditOptions};
use subspace_farmer::single_disk_farm::{SingleDiskFarm, SingleDiskFarmSummary};
Expand Down Expand Up @@ -81,20 +80,52 @@ fn audit(disk_farm: PathBuf, sample_size: usize) -> anyhow::Result<()> {
sector_size as u64 * sectors_metadata.len() as u64,
));
{
let plot_file = OpenOptions::new()
let plot = OpenOptions::new()
.read(true)
.open(disk_farm.join(SingleDiskFarm::PLOT_FILE))
.map_err(|error| anyhow::anyhow!("Failed to open plot: {error}"))?;
#[cfg(windows)]
let plot_mmap = unsafe { Mmap::map(&plot_file)? };

group.bench_function("plot/sync", |b| {
#[cfg(not(windows))]
let plot = &plot_file;
#[cfg(windows)]
let plot = &*plot_mmap;
group.bench_function("plot/sync/single", |b| {
let sync_plot_audit = SyncPlotAudit::new(&plot);

let sync_plot_audit = SyncPlotAudit::new(plot);
b.iter_batched(
rand::random,
|global_challenge| {
let options = PlotAuditOptions::<PosTable> {
public_key: single_disk_farm_info.public_key(),
reward_address: single_disk_farm_info.public_key(),
slot_info: SlotInfo {
slot_number: 0,
global_challenge,
// No solution will be found, pure audit
solution_range: SolutionRange::MIN,
// No solution will be found, pure audit
voting_solution_range: SolutionRange::MIN,
},
sectors_metadata: &sectors_metadata,
kzg: &kzg,
erasure_coding: &erasure_coding,
maybe_sector_being_modified: None,
table_generator: &table_generator,
};

black_box(
sync_plot_audit
.audit(black_box(options))
.now_or_never()
.unwrap(),
)
},
BatchSize::SmallInput,
)
});
}
{
let plot = RayonFiles::open(&disk_farm.join(SingleDiskFarm::PLOT_FILE))
.map_err(|error| anyhow::anyhow!("Failed to open plot: {error}"))?;

group.bench_function("plot/sync/rayon", |b| {
let sync_plot_audit = SyncPlotAudit::new(&plot);

b.iter_batched(
rand::random,
Expand Down
15 changes: 3 additions & 12 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod plotting;
use crate::identity::{Identity, IdentityError};
use crate::node_client::NodeClient;
use crate::reward_signing::reward_signing;
use crate::single_disk_farm::farming::rayon_files::RayonFiles;
use crate::single_disk_farm::farming::sync_fallback::SyncPlotAudit;
pub use crate::single_disk_farm::farming::FarmingError;
use crate::single_disk_farm::farming::{farming, slot_notification_forwarder, FarmingOptions};
Expand Down Expand Up @@ -971,7 +972,6 @@ impl SingleDiskFarm {
let farming_join_handle = thread::Builder::new()
.name(format!("farming-{disk_farm_index}"))
.spawn({
let plot_file = Arc::clone(&plot_file);
let handle = handle.clone();
let erasure_coding = erasure_coding.clone();
let handlers = Arc::clone(&handlers);
Expand Down Expand Up @@ -1024,17 +1024,8 @@ impl SingleDiskFarm {
}
}

#[cfg(not(windows))]
let plot_audit = &SyncPlotAudit::new(&*plot_file);
#[cfg(windows)]
let plot_mmap = unsafe {
memmap2::Mmap::map(&*plot_file).map_err(FarmingError::from)?
};
// On Windows random read is horrible in terms of performance, memory-mapped I/O helps
// TODO: Remove this once https://internals.rust-lang.org/t/introduce-write-all-at-read-exact-at-on-windows/19649
// or similar exists in standard library
#[cfg(windows)]
let plot_audit = &SyncPlotAudit::new(&*plot_mmap);
let plot = RayonFiles::open(&directory.join(Self::PLOT_FILE))?;
let plot_audit = &SyncPlotAudit::new(&plot);

let farming_options = FarmingOptions {
public_key,
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-farmer/src/single_disk_farm/farming.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#[cfg(any(target_os = "linux", target_os = "macos"))]
pub mod monoio;
pub mod rayon_files;
pub mod sync_fallback;

use crate::node_client;
Expand Down
53 changes: 53 additions & 0 deletions crates/subspace-farmer/src/single_disk_farm/farming/rayon_files.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use std::fs::{File, OpenOptions};
use std::io;
use std::path::Path;
use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt};
use subspace_farmer_components::ReadAtSync;

/// Wrapper data structure for multiple files to be used with [`rayon`] thread pool, where the same
/// file is opened multiple times, once for each thread.
pub struct RayonFiles {
files: Vec<File>,
}

impl ReadAtSync for RayonFiles {
fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> {
let thread_index = rayon::current_thread_index().ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"Reads must be called from rayon worker thread",
)
})?;
let file = self.files.get(thread_index).ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "No files entry for this rayon thread")
})?;

file.read_at(buf, offset)
}
}

impl ReadAtSync for &RayonFiles {
fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> {
(*self).read_at(buf, offset)
}
}

impl RayonFiles {
/// Open file at specified as many times as there is number of threads in current [`rayon`]
/// thread pool.
pub fn open(path: &Path) -> io::Result<Self> {
let files = (0..rayon::current_num_threads())
.map(|_| {
let file = OpenOptions::new()
.read(true)
.advise_random_access()
.open(path)?;
file.advise_random_access()?;

Ok::<_, io::Error>(file)
})
.collect::<Result<Vec<_>, _>>()?;

Ok(Self { files })
}
}

0 comments on commit 3f5e7dc

Please sign in to comment.