Skip to content

Commit

Permalink
Switch remaining key farmer file operations to UnbufferedIoFileWindows
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Mar 6, 2024
1 parent 78f4d16 commit c32f2de
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 36 deletions.
26 changes: 20 additions & 6 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,16 +779,21 @@ impl SingleDiskFarm {
};

let metadata_file_path = directory.join(Self::METADATA_FILE);
let mut metadata_file = OpenOptions::new()
#[cfg(not(windows))]
let metadata_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.advise_random_access()
.open(&metadata_file_path)?;

#[cfg(not(windows))]
metadata_file.advise_random_access()?;

let metadata_size = metadata_file.seek(SeekFrom::End(0))?;
#[cfg(windows)]
let metadata_file = UnbufferedIoFileWindows::open(&metadata_file_path)?;

let metadata_size = metadata_file.allocated_size()?;
let expected_metadata_size =
RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(target_sector_count);
let metadata_header = if metadata_size == 0 {
Expand Down Expand Up @@ -874,6 +879,7 @@ impl SingleDiskFarm {
Arc::new(RwLock::new(sectors_metadata))
};

#[cfg(not(windows))]
let plot_file = Arc::new(
OpenOptions::new()
.read(true)
Expand All @@ -883,8 +889,12 @@ impl SingleDiskFarm {
.open(directory.join(Self::PLOT_FILE))?,
);

#[cfg(not(windows))]
plot_file.advise_random_access()?;

#[cfg(windows)]
let plot_file = Arc::new(UnbufferedIoFileWindows::open(&metadata_file_path)?);

if plot_file.allocated_size()? != plot_file_size {
// Allocating the whole file (`set_len` below can create a sparse file, which will cause
// writes to fail later)
Expand Down Expand Up @@ -1248,11 +1258,15 @@ impl SingleDiskFarm {
pub fn read_all_sectors_metadata(
directory: &Path,
) -> io::Result<Vec<SectorMetadataChecksummed>> {
let mut metadata_file = OpenOptions::new()
#[cfg(not(windows))]
let metadata_file = OpenOptions::new()
.read(true)
.open(directory.join(Self::METADATA_FILE))?;

let metadata_size = metadata_file.seek(SeekFrom::End(0))?;
#[cfg(windows)]
let metadata_file = UnbufferedIoFileWindows::open(&directory.join(Self::METADATA_FILE))?;

let metadata_size = metadata_file.allocated_size()?;
let sector_metadata_size = SectorMetadataChecksummed::encoded_size();

let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()];
Expand Down Expand Up @@ -1516,7 +1530,7 @@ impl SingleDiskFarm {
let (metadata_file, mut metadata_header) = {
info!(path = %metadata_file_path.display(), "Checking metadata file");

let mut metadata_file = match OpenOptions::new()
let metadata_file = match OpenOptions::new()
.read(true)
.write(true)
.open(&metadata_file_path)
Expand All @@ -1539,7 +1553,7 @@ impl SingleDiskFarm {
// Error doesn't matter here
let _ = metadata_file.advise_sequential_access();

let metadata_size = match metadata_file.seek(SeekFrom::End(0)) {
let metadata_size = match metadata_file.allocated_size() {
Ok(metadata_size) => metadata_size,
Err(error) => {
return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
Expand Down
11 changes: 11 additions & 0 deletions crates/subspace-farmer/src/single_disk_farm/piece_cache.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#[cfg(test)]
mod tests;

#[cfg(windows)]
use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows;
use derive_more::Display;
#[cfg(not(windows))]
use std::fs::{File, OpenOptions};
use std::path::Path;
use std::sync::Arc;
Expand Down Expand Up @@ -44,7 +47,10 @@ pub struct Offset(u32);

#[derive(Debug)]
struct Inner {
#[cfg(not(windows))]
file: File,
#[cfg(windows)]
file: UnbufferedIoFileWindows,
num_elements: u32,
}

Expand All @@ -63,15 +69,20 @@ impl DiskPieceCache {
return Err(DiskPieceCacheError::ZeroCapacity);
}

#[cfg(not(windows))]
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.advise_random_access()
.open(directory.join(Self::FILE_NAME))?;

#[cfg(not(windows))]
file.advise_random_access()?;

#[cfg(windows)]
let file = UnbufferedIoFileWindows::open(&directory.join(Self::FILE_NAME))?;

let expected_size = u64::from(Self::element_size()) * u64::from(capacity);
if file.allocated_size()? != expected_size {
// Allocating the whole file (`set_len` below can create a sparse file, which will cause
Expand Down
32 changes: 20 additions & 12 deletions crates/subspace-farmer/src/single_disk_farm/piece_reader.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#[cfg(windows)]
use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows;
use async_lock::RwLock;
use futures::channel::{mpsc, oneshot};
use futures::{SinkExt, StreamExt};
#[cfg(not(windows))]
use std::fs::File;
use std::future::Future;
use std::sync::Arc;
Expand Down Expand Up @@ -32,7 +35,8 @@ impl PieceReader {
pub(super) fn new<PosTable>(
public_key: PublicKey,
pieces_in_sector: u16,
plot_file: Arc<File>,
#[cfg(not(windows))] plot_file: Arc<File>,
#[cfg(windows)] plot_file: Arc<UnbufferedIoFileWindows>,
sectors_metadata: Arc<RwLock<Vec<SectorMetadataChecksummed>>>,
erasure_coding: ErasureCoding,
modifying_sector_index: Arc<RwLock<Option<SectorIndex>>>,
Expand All @@ -42,15 +46,18 @@ impl PieceReader {
{
let (read_piece_sender, read_piece_receiver) = mpsc::channel(10);

let reading_fut = read_pieces::<PosTable>(
public_key,
pieces_in_sector,
plot_file,
sectors_metadata,
erasure_coding,
modifying_sector_index,
read_piece_receiver,
);
let reading_fut = async move {
read_pieces::<PosTable, _>(
public_key,
pieces_in_sector,
&*plot_file,
sectors_metadata,
erasure_coding,
modifying_sector_index,
read_piece_receiver,
)
.await
};

(Self { read_piece_sender }, reading_fut)
}
Expand Down Expand Up @@ -80,16 +87,17 @@ impl PieceReader {
}

#[allow(clippy::too_many_arguments)]
async fn read_pieces<PosTable>(
async fn read_pieces<PosTable, S>(
public_key: PublicKey,
pieces_in_sector: u16,
plot_file: Arc<File>,
plot_file: S,
sectors_metadata: Arc<RwLock<Vec<SectorMetadataChecksummed>>>,
erasure_coding: ErasureCoding,
modifying_sector_index: Arc<RwLock<Option<SectorIndex>>>,
mut read_piece_receiver: mpsc::Receiver<ReadPieceRequest>,
) where
PosTable: Table,
S: ReadAtSync,
{
let mut table_generator = PosTable::generator();

Expand Down
12 changes: 10 additions & 2 deletions crates/subspace-farmer/src/single_disk_farm/plot_cache.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#[cfg(test)]
mod tests;

#[cfg(windows)]
use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows;
use async_lock::RwLock as AsyncRwLock;
use parking_lot::RwLock;
use std::collections::HashMap;
#[cfg(not(windows))]
use std::fs::File;
use std::sync::{Arc, Weak};
use std::{io, mem};
Expand Down Expand Up @@ -54,15 +57,19 @@ struct CachedPieces {
/// Additional piece cache that exploit part of the plot that does not contain sectors yet
#[derive(Debug, Clone)]
pub struct DiskPlotCache {
#[cfg(not(windows))]
file: Weak<File>,
#[cfg(windows)]
file: Weak<UnbufferedIoFileWindows>,
sectors_metadata: Weak<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
cached_pieces: Arc<RwLock<CachedPieces>>,
sector_size: u64,
}

impl DiskPlotCache {
pub(crate) fn new(
file: &Arc<File>,
#[cfg(not(windows))] file: &Arc<File>,
#[cfg(windows)] file: &Arc<UnbufferedIoFileWindows>,
sectors_metadata: &Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
target_sector_count: SectorIndex,
sector_size: usize,
Expand Down Expand Up @@ -240,7 +247,8 @@ impl DiskPlotCache {
}

fn read_piece_internal(
file: &File,
#[cfg(not(windows))] file: &File,
#[cfg(windows)] file: &UnbufferedIoFileWindows,
offset: u32,
element: &mut [u8],
) -> Result<Option<PieceIndex>, DiskPlotCacheError> {
Expand Down
24 changes: 21 additions & 3 deletions crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use crate::single_disk_farm::plot_cache::{DiskPlotCache, MaybePieceStoredResult};
#[cfg(windows)]
use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows;
use rand::prelude::*;
use std::assert_matches::assert_matches;
#[cfg(not(windows))]
use std::fs::OpenOptions;
use std::num::NonZeroU64;
use std::sync::Arc;
use subspace_core_primitives::{HistorySize, Piece, PieceIndex, Record, SectorIndex};
use subspace_farmer_components::file_ext::FileExt;
use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt};
use subspace_farmer_components::sector::{SectorMetadata, SectorMetadataChecksummed};
use subspace_networking::libp2p::kad::RecordKey;
use subspace_networking::utils::multihash::ToMultihash;
use tempfile::tempfile;
use tempfile::tempdir;

const FAKE_SECTOR_SIZE: usize = 2 * 1024 * 1024;
const TARGET_SECTOR_COUNT: SectorIndex = 5;
Expand All @@ -22,7 +26,21 @@ fn basic() {
history_size: HistorySize::new(NonZeroU64::MIN),
});

let file = Arc::new(tempfile().unwrap());
let tempdir = tempdir().unwrap();
#[cfg(not(windows))]
let file = Arc::new(
OpenOptions::new()
.read(true)
.write(true)
.create(true)
.advise_random_access()
.open(tempdir.path().join("plot.bin"))
.unwrap(),
);

#[cfg(windows)]
let file = Arc::new(UnbufferedIoFileWindows::open(&tempdir.path().join("plot.bin")).unwrap());

file.preallocate(FAKE_SECTOR_SIZE as u64 * u64::from(TARGET_SECTOR_COUNT))
.unwrap();

Expand Down
5 changes: 5 additions & 0 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(windows)]
use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows;
use crate::single_disk_farm::{
BackgroundTaskError, Handlers, PlotMetadataHeader, SectorUpdate, RESERVED_PLOT_METADATA,
};
Expand Down Expand Up @@ -146,7 +148,10 @@ pub(super) struct PlottingOptions<'a, NC, PG> {
pub(super) sector_metadata_size: usize,
pub(super) metadata_header: PlotMetadataHeader,
pub(super) plot_file: Arc<File>,
#[cfg(not(windows))]
pub(super) metadata_file: File,
#[cfg(windows)]
pub(super) metadata_file: UnbufferedIoFileWindows,
pub(super) sectors_metadata: Arc<RwLock<Vec<SectorMetadataChecksummed>>>,
pub(super) piece_getter: &'a PG,
pub(super) kzg: &'a Kzg,
Expand Down
Loading

0 comments on commit c32f2de

Please sign in to comment.