Skip to content

Commit

Permalink
Implement plot cache, intercept downloaded pieces and store in plot c…
Browse files Browse the repository at this point in the history
…ache
  • Loading branch information
nazar-pc committed Feb 23, 2024
1 parent b3526bc commit ce41784
Show file tree
Hide file tree
Showing 9 changed files with 608 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,10 @@ where
.iter()
.map(|single_disk_farm| single_disk_farm.piece_cache())
.collect(),
single_disk_farms
.iter()
.map(|single_disk_farm| single_disk_farm.plot_cache())
.collect(),
)
.await;
drop(farmer_cache);
Expand Down
150 changes: 128 additions & 22 deletions crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod tests;

use crate::node_client::NodeClient;
use crate::single_disk_farm::piece_cache::{DiskPieceCache, Offset};
use crate::single_disk_farm::plot_cache::{DiskPlotCache, MaybePieceStoredResult};
use crate::utils::{run_future_in_dedicated_thread, AsyncJoinOnDrop};
use event_listener_primitives::{Bag, HandlerId};
use futures::channel::oneshot;
Expand All @@ -11,6 +12,7 @@ use futures::{select, FutureExt, StreamExt};
use parking_lot::RwLock;
use std::collections::{HashMap, VecDeque};
use std::num::NonZeroU16;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::{fmt, mem};
Expand Down Expand Up @@ -762,6 +764,10 @@ pub struct FarmerCache {
peer_id: PeerId,
/// Individual dedicated piece caches
piece_caches: Arc<RwLock<Vec<DiskPieceCacheState>>>,
/// Additional piece caches
plot_caches: Arc<RwLock<Vec<DiskPlotCache>>>,
/// Next plot cache to use for storing pieces
next_plot_cache: Arc<AtomicUsize>,
handlers: Arc<Handlers>,
// We do not want to increase capacity unnecessarily on clone
worker_sender: Arc<mpsc::Sender<WorkerCommand>>,
Expand All @@ -783,6 +789,8 @@ impl FarmerCache {
let instance = Self {
peer_id,
piece_caches: Arc::clone(&caches),
plot_caches: Arc::default(),
next_plot_cache: Arc::new(AtomicUsize::new(0)),
handlers: Arc::clone(&handlers),
worker_sender: Arc::new(worker_sender),
};
Expand All @@ -802,33 +810,46 @@ impl FarmerCache {
let maybe_piece_fut = tokio::task::spawn_blocking({
let key = key.clone();
let piece_caches = Arc::clone(&self.piece_caches);
let plot_caches = Arc::clone(&self.plot_caches);
let worker_sender = Arc::clone(&self.worker_sender);

move || {
for (disk_farm_index, cache) in piece_caches.read().iter().enumerate() {
let Some(&offset) = cache.stored_pieces.get(&key) else {
continue;
};
match cache.backend.read_piece(offset) {
Ok(maybe_piece) => {
return maybe_piece;
}
Err(error) => {
error!(
%error,
%disk_farm_index,
?key,
%offset,
"Error while reading piece from cache, might be a disk corruption"
);
{
let piece_caches = piece_caches.read();
for (disk_farm_index, cache) in piece_caches.iter().enumerate() {
let Some(&offset) = cache.stored_pieces.get(&key) else {
continue;
};
match cache.backend.read_piece(offset) {
Ok(maybe_piece) => {
return maybe_piece;
}
Err(error) => {
error!(
%error,
%disk_farm_index,
?key,
%offset,
"Error while reading piece from cache, might be a disk corruption"
);

if let Err(error) =
worker_sender.blocking_send(WorkerCommand::ForgetKey { key })
{
trace!(%error, "Failed to send ForgetKey command to worker");
}

if let Err(error) =
worker_sender.blocking_send(WorkerCommand::ForgetKey { key })
{
trace!(%error, "Failed to send ForgetKey command to worker");
return None;
}
}
}
}

return None;
{
let plot_caches = plot_caches.read();
for cache in plot_caches.iter() {
if let Some(piece) = cache.read_piece(&key) {
return Some(piece);
}
}
}
Expand All @@ -846,11 +867,77 @@ impl FarmerCache {
}
}

/// Try to store a piece in additional downloaded pieces, if there is space for them
pub async fn maybe_store_additional_piece(&self, piece_index: PieceIndex, piece: &Piece) {
let key = RecordKey::from(piece_index.to_multihash());

let mut should_store = false;
for cache in self.plot_caches.read().iter() {
match cache.is_piece_maybe_stored(&key) {
MaybePieceStoredResult::No => {
// Try another one if there is any
}
MaybePieceStoredResult::Vacant => {
should_store = true;
break;
}
MaybePieceStoredResult::Yes => {
// Already stored, nothing else left to do
return;
}
}
}

if !should_store {
return;
}

let should_store_fut = tokio::task::spawn_blocking({
let plot_caches = Arc::clone(&self.plot_caches);
let next_plot_cache = Arc::clone(&self.next_plot_cache);
let piece = piece.clone();

move || {
let plot_caches = plot_caches.read();
let plot_caches_len = plot_caches.len();

// Store pieces in plots using round-robin distribution
for _ in 0..plot_caches_len {
let plot_cache_index =
next_plot_cache.fetch_add(1, Ordering::Relaxed) % plot_caches_len;

match plot_caches[plot_cache_index].try_store_piece(piece_index, &piece) {
Ok(true) => {
return;
}
Ok(false) => {
continue;
}
Err(error) => {
error!(
%error,
%piece_index,
%plot_cache_index,
"Failed to store additional piece in cache"
);
continue;
}
}
}
}
});

if let Err(error) = AsyncJoinOnDrop::new(should_store_fut, true).await {
error!(%error, %piece_index, "Failed to store additional piece in cache");
}
}

/// Initialize replacement of backing caches, returns acknowledgement receiver that can be used
/// to identify when cache initialization has finished
pub async fn replace_backing_caches(
&self,
new_piece_caches: Vec<DiskPieceCache>,
new_plot_caches: Vec<DiskPlotCache>,
) -> oneshot::Receiver<()> {
let (sender, receiver) = oneshot::channel();
if let Err(error) = self
Expand All @@ -864,6 +951,8 @@ impl FarmerCache {
warn!(%error, "Failed to replace backing caches, worker exited");
}

*self.plot_caches.write() = new_plot_caches;

receiver
}

Expand All @@ -879,7 +968,24 @@ impl LocalRecordProvider for FarmerCache {
for piece_cache in self.piece_caches.read().iter() {
if piece_cache.stored_pieces.contains_key(key) {
// Note: We store our own provider records locally without local addresses
// to avoid redundant storage and outdated addresses. Instead these are
// to avoid redundant storage and outdated addresses. Instead, these are
// acquired on demand when returning a `ProviderRecord` for the local node.
return Some(ProviderRecord {
key: key.clone(),
provider: self.peer_id,
expires: None,
addresses: Vec::new(),
});
};
}
// It is okay to take read lock here, writes locks almost never happen
for plot_cache in self.plot_caches.read().iter() {
if matches!(
plot_cache.is_piece_maybe_stored(key),
MaybePieceStoredResult::Yes
) {
// Note: We store our own provider records locally without local addresses
// to avoid redundant storage and outdated addresses. Instead, these are
// acquired on demand when returning a `ProviderRecord` for the local node.
return Some(ProviderRecord {
key: key.clone(),
Expand Down
22 changes: 14 additions & 8 deletions crates/subspace-farmer/src/farmer_cache/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,13 @@ async fn basic() {
tokio::spawn(farmer_cache_worker.run(piece_getter.clone()));

let initialized_fut = farmer_cache
.replace_backing_caches(vec![
DiskPieceCache::open(path1.as_ref(), 1).unwrap(),
DiskPieceCache::open(path2.as_ref(), 1).unwrap(),
])
.replace_backing_caches(
vec![
DiskPieceCache::open(path1.as_ref(), 1).unwrap(),
DiskPieceCache::open(path2.as_ref(), 1).unwrap(),
],
vec![],
)
.await;

// Wait for piece cache to be initialized
Expand Down Expand Up @@ -375,10 +378,13 @@ async fn basic() {

// Reopen with the same backing caches
let initialized_fut = farmer_cache
.replace_backing_caches(vec![
DiskPieceCache::open(path1.as_ref(), 1).unwrap(),
DiskPieceCache::open(path2.as_ref(), 1).unwrap(),
])
.replace_backing_caches(
vec![
DiskPieceCache::open(path1.as_ref(), 1).unwrap(),
DiskPieceCache::open(path2.as_ref(), 1).unwrap(),
],
vec![],
)
.await;
drop(farmer_cache);

Expand Down
15 changes: 15 additions & 0 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod farming;
pub mod piece_cache;
pub mod piece_reader;
pub mod plot_cache;
mod plotting;

use crate::identity::{Identity, IdentityError};
Expand All @@ -13,6 +14,7 @@ use crate::single_disk_farm::farming::{
};
use crate::single_disk_farm::piece_cache::{DiskPieceCache, DiskPieceCacheError};
use crate::single_disk_farm::piece_reader::PieceReader;
use crate::single_disk_farm::plot_cache::DiskPlotCache;
use crate::single_disk_farm::plotting::{
plotting, plotting_scheduler, PlottingOptions, PlottingSchedulerOptions,
};
Expand Down Expand Up @@ -566,6 +568,7 @@ pub struct SingleDiskFarm {
tasks: FuturesUnordered<BackgroundTask>,
handlers: Arc<Handlers>,
piece_cache: DiskPieceCache,
plot_cache: DiskPlotCache,
piece_reader: PieceReader,
/// Sender that will be used to signal to background threads that they should start
start_sender: Option<broadcast::Sender<()>>,
Expand Down Expand Up @@ -883,6 +886,12 @@ impl SingleDiskFarm {
plot_file.set_len(sector_size as u64 * u64::from(target_sector_count))?;

let piece_cache = DiskPieceCache::open(&directory, cache_capacity)?;
let plot_cache = DiskPlotCache::new(
&plot_file,
&sectors_metadata,
target_sector_count,
sector_size,
);

let (error_sender, error_receiver) = oneshot::channel();
let error_sender = Arc::new(Mutex::new(Some(error_sender)));
Expand Down Expand Up @@ -1191,6 +1200,7 @@ impl SingleDiskFarm {
tasks,
handlers,
piece_cache,
plot_cache,
piece_reader,
start_sender: Some(start_sender),
stop_sender: Some(stop_sender),
Expand Down Expand Up @@ -1337,6 +1347,11 @@ impl SingleDiskFarm {
self.piece_cache.clone()
}

/// Get plot cache instance
pub fn plot_cache(&self) -> DiskPlotCache {
self.plot_cache.clone()
}

/// Get piece reader to read plotted pieces later
pub fn piece_reader(&self) -> PieceReader {
self.piece_reader.clone()
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-farmer/src/single_disk_farm/piece_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ impl DiskPieceCache {
let mut element = vec![0; Self::element_size() as usize];
let mut early_exit = false;

// TODO: Parallelize or read in larger batches
(0..self.inner.num_elements).map(move |offset| {
if early_exit {
return (Offset(offset), None);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::single_disk_farm::piece_cache::{DiskPieceCache, Offset};
use crate::single_disk_farm::DiskPieceCacheError;
use crate::single_disk_farm::piece_cache::{DiskPieceCache, DiskPieceCacheError, Offset};
use rand::prelude::*;
use std::assert_matches::assert_matches;
use subspace_core_primitives::{Piece, PieceIndex};
Expand Down
Loading

0 comments on commit ce41784

Please sign in to comment.