Skip to content

Commit

Permalink
Async wrapper for sector write in plotting
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Jun 14, 2024
1 parent faa7a3f commit 46730d6
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 21 deletions.
4 changes: 2 additions & 2 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -884,8 +884,8 @@ impl SingleDiskFarm {
node_client: &node_client,
pieces_in_sector,
sector_size,
plot_file: &plot_file,
metadata_file,
plot_file,
metadata_file: Arc::new(metadata_file),
handlers: &handlers,
global_mutex: &global_mutex,
plotter,
Expand Down
62 changes: 43 additions & 19 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use subspace_farmer_components::plotting::PlottedSector;
use subspace_farmer_components::sector::SectorMetadataChecksummed;
use thiserror::Error;
use tokio::sync::watch;
use tokio::task;
use tracing::{debug, info, info_span, trace, warn, Instrument};

const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500);
Expand Down Expand Up @@ -89,13 +90,13 @@ pub(super) struct SectorPlottingOptions<'a, NC, P> {
pub(super) pieces_in_sector: u16,
pub(super) sector_size: usize,
#[cfg(not(windows))]
pub(super) plot_file: &'a File,
pub(super) plot_file: Arc<File>,
#[cfg(windows)]
pub(super) plot_file: &'a UnbufferedIoFileWindows,
pub(super) plot_file: Arc<UnbufferedIoFileWindows>,
#[cfg(not(windows))]
pub(super) metadata_file: File,
pub(super) metadata_file: Arc<File>,
#[cfg(windows)]
pub(super) metadata_file: UnbufferedIoFileWindows,
pub(super) metadata_file: Arc<UnbufferedIoFileWindows>,
pub(super) handlers: &'a Handlers,
pub(super) global_mutex: &'a AsyncMutex<()>,
pub(super) plotter: P,
Expand Down Expand Up @@ -167,7 +168,7 @@ where
&mut metadata_header,
sectors_metadata,
sectors_being_modified,
&sector_plotting_options.metadata_file
Arc::clone(&sector_plotting_options.metadata_file)
).await?;
}
}
Expand All @@ -179,7 +180,7 @@ where
&mut metadata_header,
sectors_metadata,
sectors_being_modified,
&sector_plotting_options.metadata_file
Arc::clone(&sector_plotting_options.metadata_file)
).await?;
}
}
Expand All @@ -193,8 +194,8 @@ async fn process_plotting_result(
metadata_header: &mut PlotMetadataHeader,
sectors_metadata: &AsyncRwLock<Vec<SectorMetadataChecksummed>>,
sectors_being_modified: &AsyncRwLock<HashSet<SectorIndex>>,
#[cfg(not(windows))] metadata_file: &File,
#[cfg(windows)] metadata_file: &UnbufferedIoFileWindows,
#[cfg(not(windows))] metadata_file: Arc<File>,
#[cfg(windows)] metadata_file: Arc<UnbufferedIoFileWindows>,
) -> Result<(), PlottingError> {
let SectorPlottingResult {
sector_index,
Expand All @@ -218,7 +219,13 @@ async fn process_plotting_result(

if sector_index + 1 > metadata_header.plotted_sector_count {
metadata_header.plotted_sector_count = sector_index + 1;
metadata_file.write_all_at(&metadata_header.encode(), 0)?;

let encoded_metadata_header = metadata_header.encode();
let write_fut =
task::spawn_blocking(move || metadata_file.write_all_at(&encoded_metadata_header, 0));
write_fut.await.map_err(|error| {
PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
})??;
}

if last_queued {
Expand Down Expand Up @@ -454,10 +461,10 @@ where
async fn plot_single_sector_internal(
sector_index: SectorIndex,
sector_size: usize,
#[cfg(not(windows))] plot_file: &File,
#[cfg(windows)] plot_file: &UnbufferedIoFileWindows,
#[cfg(not(windows))] metadata_file: &File,
#[cfg(windows)] metadata_file: &UnbufferedIoFileWindows,
#[cfg(not(windows))] plot_file: &Arc<File>,
#[cfg(windows)] plot_file: &Arc<UnbufferedIoFileWindows>,
#[cfg(not(windows))] metadata_file: &Arc<File>,
#[cfg(windows)] metadata_file: &Arc<UnbufferedIoFileWindows>,
handlers: &Handlers,
sectors_being_modified: &AsyncRwLock<HashSet<SectorIndex>>,
global_mutex: &AsyncMutex<()>,
Expand Down Expand Up @@ -544,8 +551,16 @@ async fn plot_single_sector_internal(
))));
}
};
plot_file.write_all_at(&sector_chunk, sector_write_offset)?;
sector_write_offset += sector_chunk.len() as u64;

let write_fut = task::spawn_blocking({
let plot_file = Arc::clone(plot_file);

move || plot_file.write_all_at(&sector_chunk, sector_write_offset)
});
write_fut.await.map_err(|error| {
PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
})??;
}
drop(sector);

Expand All @@ -558,11 +573,20 @@ async fn plot_single_sector_internal(
}
{
let encoded_sector_metadata = plotted_sector.sector_metadata.encode();
metadata_file.write_all_at(
&encoded_sector_metadata,
RESERVED_PLOT_METADATA
+ (u64::from(sector_index) * encoded_sector_metadata.len() as u64),
)?;
let write_fut = task::spawn_blocking({
let metadata_file = Arc::clone(metadata_file);

move || {
metadata_file.write_all_at(
&encoded_sector_metadata,
RESERVED_PLOT_METADATA
+ (u64::from(sector_index) * encoded_sector_metadata.len() as u64),
)
}
});
write_fut.await.map_err(|error| {
PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
})??;
}

handlers.sector_update.call_simple(&(
Expand Down

0 comments on commit 46730d6

Please sign in to comment.