From ce4178426411e55447a96bbd748d6bddc545c929 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Fri, 23 Feb 2024 21:27:33 +0200 Subject: [PATCH] Implement plot cache, intercept downloaded pieces and store in plot cache --- .../src/bin/subspace-farmer/commands/farm.rs | 4 + crates/subspace-farmer/src/farmer_cache.rs | 150 ++++++++-- .../subspace-farmer/src/farmer_cache/tests.rs | 22 +- .../subspace-farmer/src/single_disk_farm.rs | 15 + .../src/single_disk_farm/piece_cache.rs | 1 + .../src/single_disk_farm/piece_cache/tests.rs | 3 +- .../src/single_disk_farm/plot_cache.rs | 265 ++++++++++++++++++ .../src/single_disk_farm/plot_cache/tests.rs | 160 +++++++++++ .../src/utils/farmer_piece_getter.rs | 25 +- 9 files changed, 608 insertions(+), 37 deletions(-) create mode 100644 crates/subspace-farmer/src/single_disk_farm/plot_cache.rs create mode 100644 crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 220858e99f..8ee4364768 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -632,6 +632,10 @@ where .iter() .map(|single_disk_farm| single_disk_farm.piece_cache()) .collect(), + single_disk_farms + .iter() + .map(|single_disk_farm| single_disk_farm.plot_cache()) + .collect(), ) .await; drop(farmer_cache); diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index f700ac181f..42cee398d6 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -3,6 +3,7 @@ mod tests; use crate::node_client::NodeClient; use crate::single_disk_farm::piece_cache::{DiskPieceCache, Offset}; +use crate::single_disk_farm::plot_cache::{DiskPlotCache, MaybePieceStoredResult}; use crate::utils::{run_future_in_dedicated_thread, AsyncJoinOnDrop}; use event_listener_primitives::{Bag, HandlerId}; use futures::channel::oneshot; @@ -11,6 +12,7 @@ use futures::{select, FutureExt, StreamExt}; use parking_lot::RwLock; use std::collections::{HashMap, VecDeque}; use std::num::NonZeroU16; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; use std::{fmt, mem}; @@ -762,6 +764,10 @@ pub struct FarmerCache { peer_id: PeerId, /// Individual dedicated piece caches piece_caches: Arc>>, + /// Additional piece caches + plot_caches: Arc>>, + /// Next plot cache to use for storing pieces + next_plot_cache: Arc, handlers: Arc, // We do not want to increase capacity unnecessarily on clone worker_sender: Arc>, @@ -783,6 +789,8 @@ impl FarmerCache { let instance = Self { peer_id, piece_caches: Arc::clone(&caches), + plot_caches: Arc::default(), + next_plot_cache: Arc::new(AtomicUsize::new(0)), handlers: Arc::clone(&handlers), worker_sender: Arc::new(worker_sender), }; @@ -802,33 +810,46 @@ impl FarmerCache { let maybe_piece_fut = tokio::task::spawn_blocking({ let key = key.clone(); let piece_caches = Arc::clone(&self.piece_caches); + let plot_caches = Arc::clone(&self.plot_caches); let worker_sender = Arc::clone(&self.worker_sender); move || { - for (disk_farm_index, cache) in piece_caches.read().iter().enumerate() { - let Some(&offset) = cache.stored_pieces.get(&key) else { - continue; - }; - match cache.backend.read_piece(offset) { - Ok(maybe_piece) => { - return maybe_piece; - } - Err(error) => { - error!( - %error, - %disk_farm_index, - ?key, - %offset, - "Error while reading piece from cache, might be a disk corruption" - ); + { + let piece_caches = piece_caches.read(); + for (disk_farm_index, cache) in piece_caches.iter().enumerate() { + let Some(&offset) = cache.stored_pieces.get(&key) else { + continue; + }; + match cache.backend.read_piece(offset) { + Ok(maybe_piece) => { + return maybe_piece; + } + Err(error) => { + error!( + %error, + %disk_farm_index, + ?key, + %offset, + "Error while reading piece from cache, might be a disk corruption" + ); + + if let Err(error) = + worker_sender.blocking_send(WorkerCommand::ForgetKey { key }) + { + trace!(%error, "Failed to send ForgetKey command to worker"); + } - if let Err(error) = - worker_sender.blocking_send(WorkerCommand::ForgetKey { key }) - { - trace!(%error, "Failed to send ForgetKey command to worker"); + return None; } + } + } + } - return None; + { + let plot_caches = plot_caches.read(); + for cache in plot_caches.iter() { + if let Some(piece) = cache.read_piece(&key) { + return Some(piece); } } } @@ -846,11 +867,77 @@ impl FarmerCache { } } + /// Try to store a piece in additional downloaded pieces, if there is space for them + pub async fn maybe_store_additional_piece(&self, piece_index: PieceIndex, piece: &Piece) { + let key = RecordKey::from(piece_index.to_multihash()); + + let mut should_store = false; + for cache in self.plot_caches.read().iter() { + match cache.is_piece_maybe_stored(&key) { + MaybePieceStoredResult::No => { + // Try another one if there is any + } + MaybePieceStoredResult::Vacant => { + should_store = true; + break; + } + MaybePieceStoredResult::Yes => { + // Already stored, nothing else left to do + return; + } + } + } + + if !should_store { + return; + } + + let should_store_fut = tokio::task::spawn_blocking({ + let plot_caches = Arc::clone(&self.plot_caches); + let next_plot_cache = Arc::clone(&self.next_plot_cache); + let piece = piece.clone(); + + move || { + let plot_caches = plot_caches.read(); + let plot_caches_len = plot_caches.len(); + + // Store pieces in plots using round-robin distribution + for _ in 0..plot_caches_len { + let plot_cache_index = + next_plot_cache.fetch_add(1, Ordering::Relaxed) % plot_caches_len; + + match plot_caches[plot_cache_index].try_store_piece(piece_index, &piece) { + Ok(true) => { + return; + } + Ok(false) => { + continue; + } + Err(error) => { + error!( + %error, + %piece_index, + %plot_cache_index, + "Failed to store additional piece in cache" + ); + continue; + } + } + } + } + }); + + if let Err(error) = AsyncJoinOnDrop::new(should_store_fut, true).await { + error!(%error, %piece_index, "Failed to store additional piece in cache"); + } + } + /// Initialize replacement of backing caches, returns acknowledgement receiver that can be used /// to identify when cache initialization has finished pub async fn replace_backing_caches( &self, new_piece_caches: Vec, + new_plot_caches: Vec, ) -> oneshot::Receiver<()> { let (sender, receiver) = oneshot::channel(); if let Err(error) = self @@ -864,6 +951,8 @@ impl FarmerCache { warn!(%error, "Failed to replace backing caches, worker exited"); } + *self.plot_caches.write() = new_plot_caches; + receiver } @@ -879,7 +968,24 @@ impl LocalRecordProvider for FarmerCache { for piece_cache in self.piece_caches.read().iter() { if piece_cache.stored_pieces.contains_key(key) { // Note: We store our own provider records locally without local addresses - // to avoid redundant storage and outdated addresses. Instead these are + // to avoid redundant storage and outdated addresses. Instead, these are + // acquired on demand when returning a `ProviderRecord` for the local node. + return Some(ProviderRecord { + key: key.clone(), + provider: self.peer_id, + expires: None, + addresses: Vec::new(), + }); + }; + } + // It is okay to take read lock here, writes locks almost never happen + for plot_cache in self.plot_caches.read().iter() { + if matches!( + plot_cache.is_piece_maybe_stored(key), + MaybePieceStoredResult::Yes + ) { + // Note: We store our own provider records locally without local addresses + // to avoid redundant storage and outdated addresses. Instead, these are // acquired on demand when returning a `ProviderRecord` for the local node. return Some(ProviderRecord { key: key.clone(), diff --git a/crates/subspace-farmer/src/farmer_cache/tests.rs b/crates/subspace-farmer/src/farmer_cache/tests.rs index 9c44f91dbf..94ea2db861 100644 --- a/crates/subspace-farmer/src/farmer_cache/tests.rs +++ b/crates/subspace-farmer/src/farmer_cache/tests.rs @@ -192,10 +192,13 @@ async fn basic() { tokio::spawn(farmer_cache_worker.run(piece_getter.clone())); let initialized_fut = farmer_cache - .replace_backing_caches(vec![ - DiskPieceCache::open(path1.as_ref(), 1).unwrap(), - DiskPieceCache::open(path2.as_ref(), 1).unwrap(), - ]) + .replace_backing_caches( + vec![ + DiskPieceCache::open(path1.as_ref(), 1).unwrap(), + DiskPieceCache::open(path2.as_ref(), 1).unwrap(), + ], + vec![], + ) .await; // Wait for piece cache to be initialized @@ -375,10 +378,13 @@ async fn basic() { // Reopen with the same backing caches let initialized_fut = farmer_cache - .replace_backing_caches(vec![ - DiskPieceCache::open(path1.as_ref(), 1).unwrap(), - DiskPieceCache::open(path2.as_ref(), 1).unwrap(), - ]) + .replace_backing_caches( + vec![ + DiskPieceCache::open(path1.as_ref(), 1).unwrap(), + DiskPieceCache::open(path2.as_ref(), 1).unwrap(), + ], + vec![], + ) .await; drop(farmer_cache); diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 9740d0a0a4..9dac067c45 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -1,6 +1,7 @@ pub mod farming; pub mod piece_cache; pub mod piece_reader; +pub mod plot_cache; mod plotting; use crate::identity::{Identity, IdentityError}; @@ -13,6 +14,7 @@ use crate::single_disk_farm::farming::{ }; use crate::single_disk_farm::piece_cache::{DiskPieceCache, DiskPieceCacheError}; use crate::single_disk_farm::piece_reader::PieceReader; +use crate::single_disk_farm::plot_cache::DiskPlotCache; use crate::single_disk_farm::plotting::{ plotting, plotting_scheduler, PlottingOptions, PlottingSchedulerOptions, }; @@ -566,6 +568,7 @@ pub struct SingleDiskFarm { tasks: FuturesUnordered, handlers: Arc, piece_cache: DiskPieceCache, + plot_cache: DiskPlotCache, piece_reader: PieceReader, /// Sender that will be used to signal to background threads that they should start start_sender: Option>, @@ -883,6 +886,12 @@ impl SingleDiskFarm { plot_file.set_len(sector_size as u64 * u64::from(target_sector_count))?; let piece_cache = DiskPieceCache::open(&directory, cache_capacity)?; + let plot_cache = DiskPlotCache::new( + &plot_file, + §ors_metadata, + target_sector_count, + sector_size, + ); let (error_sender, error_receiver) = oneshot::channel(); let error_sender = Arc::new(Mutex::new(Some(error_sender))); @@ -1191,6 +1200,7 @@ impl SingleDiskFarm { tasks, handlers, piece_cache, + plot_cache, piece_reader, start_sender: Some(start_sender), stop_sender: Some(stop_sender), @@ -1337,6 +1347,11 @@ impl SingleDiskFarm { self.piece_cache.clone() } + /// Get plot cache instance + pub fn plot_cache(&self) -> DiskPlotCache { + self.plot_cache.clone() + } + /// Get piece reader to read plotted pieces later pub fn piece_reader(&self) -> PieceReader { self.piece_reader.clone() diff --git a/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs b/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs index a95d706d93..7cb9669455 100644 --- a/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs +++ b/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs @@ -103,6 +103,7 @@ impl DiskPieceCache { let mut element = vec![0; Self::element_size() as usize]; let mut early_exit = false; + // TODO: Parallelize or read in larger batches (0..self.inner.num_elements).map(move |offset| { if early_exit { return (Offset(offset), None); diff --git a/crates/subspace-farmer/src/single_disk_farm/piece_cache/tests.rs b/crates/subspace-farmer/src/single_disk_farm/piece_cache/tests.rs index 2c63e438b9..b5e23de140 100644 --- a/crates/subspace-farmer/src/single_disk_farm/piece_cache/tests.rs +++ b/crates/subspace-farmer/src/single_disk_farm/piece_cache/tests.rs @@ -1,5 +1,4 @@ -use crate::single_disk_farm::piece_cache::{DiskPieceCache, Offset}; -use crate::single_disk_farm::DiskPieceCacheError; +use crate::single_disk_farm::piece_cache::{DiskPieceCache, DiskPieceCacheError, Offset}; use rand::prelude::*; use std::assert_matches::assert_matches; use subspace_core_primitives::{Piece, PieceIndex}; diff --git a/crates/subspace-farmer/src/single_disk_farm/plot_cache.rs b/crates/subspace-farmer/src/single_disk_farm/plot_cache.rs new file mode 100644 index 0000000000..a22221fa33 --- /dev/null +++ b/crates/subspace-farmer/src/single_disk_farm/plot_cache.rs @@ -0,0 +1,265 @@ +#[cfg(test)] +mod tests; + +use async_lock::RwLock as AsyncRwLock; +use parking_lot::RwLock; +use std::collections::HashMap; +use std::fs::File; +use std::sync::{Arc, Weak}; +use std::{io, mem}; +use subspace_core_primitives::crypto::blake3_hash_list; +use subspace_core_primitives::{Blake3Hash, Piece, PieceIndex, SectorIndex}; +use subspace_farmer_components::file_ext::FileExt; +use subspace_farmer_components::sector::SectorMetadataChecksummed; +use subspace_networking::libp2p::kad::RecordKey; +use subspace_networking::utils::multihash::ToMultihash; +use thiserror::Error; +use tracing::{debug, info, warn}; + +/// Disk plot cache open error +#[derive(Debug, Error)] +pub enum DiskPlotCacheError { + /// I/O error occurred + #[error("I/O error: {0}")] + Io(#[from] io::Error), + /// Checksum mismatch + #[error("Checksum mismatch")] + ChecksumMismatch, +} + +#[derive(Debug)] +pub(crate) enum MaybePieceStoredResult { + /// Definitely not stored + No, + /// Maybe has vacant slot to store + Vacant, + /// Maybe still stored + Yes, +} + +#[derive(Debug)] +struct CachedPieces { + /// Map of piece index into offset + map: HashMap, + next_offset: Option, +} + +/// Additional piece cache that exploit part of the plot that does not contain sectors yet +#[derive(Debug, Clone)] +pub struct DiskPlotCache { + file: Weak, + sectors_metadata: Weak>>, + cached_pieces: Arc>, + sector_size: u64, +} + +impl DiskPlotCache { + pub(crate) fn new( + file: &Arc, + sectors_metadata: &Arc>>, + target_sector_count: SectorIndex, + sector_size: usize, + ) -> Self { + info!("Checking plot cache contents"); + let sector_size = sector_size as u64; + let cached_pieces = { + let sectors_metadata = sectors_metadata.read_blocking(); + let mut element = vec![0; Self::element_size() as usize]; + // Clippy complains about `RecordKey`, but it is not changing here, so it is fine + #[allow(clippy::mutable_key_type)] + let mut map = HashMap::new(); + + let file_size = sector_size * u64::from(target_sector_count); + let plotted_size = sector_size * sectors_metadata.len() as u64; + + // Step over all free potential offsets for pieces that could have been cached + let from_offset = (plotted_size / Self::element_size() as u64) as u32; + let to_offset = (file_size / Self::element_size() as u64) as u32; + let mut next_offset = None; + // TODO: Parallelize or read in larger batches + for offset in (from_offset..to_offset).rev() { + match Self::read_piece_internal(file, offset, &mut element) { + Ok(maybe_piece_index) => match maybe_piece_index { + Some(piece_index) => { + map.insert(RecordKey::from(piece_index.to_multihash()), offset); + } + None => { + next_offset.replace(offset); + break; + } + }, + Err(DiskPlotCacheError::ChecksumMismatch) => { + next_offset.replace(offset); + break; + } + Err(error) => { + warn!(%error, %offset, "Failed to read plot cache element"); + break; + } + } + } + + CachedPieces { map, next_offset } + }; + + debug!("Finished checking plot cache contents"); + + Self { + file: Arc::downgrade(file), + sectors_metadata: Arc::downgrade(sectors_metadata), + cached_pieces: Arc::new(RwLock::new(cached_pieces)), + sector_size, + } + } + + pub(crate) const fn element_size() -> u32 { + (PieceIndex::SIZE + Piece::SIZE + mem::size_of::()) as u32 + } + + /// Check if piece is potentially stored in this cache (not guaranteed to be because it might be + /// overridden with sector any time) + pub(crate) fn is_piece_maybe_stored(&self, key: &RecordKey) -> MaybePieceStoredResult { + let offset = { + let cached_pieces = self.cached_pieces.read(); + + let Some(offset) = cached_pieces.map.get(key).copied() else { + return if cached_pieces.next_offset.is_some() { + MaybePieceStoredResult::Vacant + } else { + MaybePieceStoredResult::No + }; + }; + + offset + }; + + let Some(sectors_metadata) = self.sectors_metadata.upgrade() else { + return MaybePieceStoredResult::No; + }; + + let element_offset = u64::from(offset) * u64::from(Self::element_size()); + let plotted_bytes = self.sector_size * sectors_metadata.read_blocking().len() as u64; + + // Make sure offset is after anything that is already plotted + if element_offset < plotted_bytes { + // Remove entry since it was overridden with a sector already + self.cached_pieces.write().map.remove(key); + MaybePieceStoredResult::No + } else { + MaybePieceStoredResult::Yes + } + } + + /// Store piece in cache if there is free space, otherwise `Ok(false)` is returned + pub(crate) fn try_store_piece( + &self, + piece_index: PieceIndex, + piece: &Piece, + ) -> Result { + let offset = { + let mut cached_pieces = self.cached_pieces.write(); + let Some(next_offset) = cached_pieces.next_offset else { + return Ok(false); + }; + + let offset = next_offset; + cached_pieces.next_offset = offset.checked_sub(1); + offset + }; + + let Some(sectors_metadata) = self.sectors_metadata.upgrade() else { + return Ok(false); + }; + + let element_offset = u64::from(offset) * u64::from(Self::element_size()); + let sectors_metadata = sectors_metadata.write_blocking(); + let plotted_bytes = self.sector_size * sectors_metadata.len() as u64; + + // Make sure offset is after anything that is already plotted + if element_offset < plotted_bytes { + // Just to be safe, avoid any overlap of write locks + drop(sectors_metadata); + // No space to store more pieces anymore + self.cached_pieces.write().next_offset.take(); + return Ok(false); + } + + let Some(file) = self.file.upgrade() else { + return Ok(false); + }; + + let piece_index_bytes = piece_index.to_bytes(); + file.write_all_at(&piece_index_bytes, element_offset)?; + file.write_all_at(piece.as_ref(), element_offset + PieceIndex::SIZE as u64)?; + file.write_all_at( + &blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]), + element_offset + PieceIndex::SIZE as u64 + Piece::SIZE as u64, + )?; + // Just to be safe, avoid any overlap of write locks + drop(sectors_metadata); + // Store newly written piece in the map + self.cached_pieces + .write() + .map + .insert(RecordKey::from(piece_index.to_multihash()), offset); + + Ok(true) + } + + /// Read piece from cache. + /// + /// Returns `None` if not cached. + pub(crate) fn read_piece(&self, key: &RecordKey) -> Option { + let offset = self.cached_pieces.read().map.get(key).copied()?; + + let file = self.file.upgrade()?; + + let mut element = vec![0; Self::element_size() as usize]; + match Self::read_piece_internal(&file, offset, &mut element) { + Ok(Some(_piece_index)) => { + let mut piece = Piece::default(); + piece.copy_from_slice(&element[PieceIndex::SIZE..][..Piece::SIZE]); + Some(piece) + } + _ => { + // Remove entry just in case it was overridden with a sector already + self.cached_pieces.write().map.remove(key); + None + } + } + } + + fn read_piece_internal( + file: &File, + offset: u32, + element: &mut [u8], + ) -> Result, DiskPlotCacheError> { + file.read_exact_at(element, u64::from(offset) * u64::from(Self::element_size()))?; + + let (piece_index_bytes, remaining_bytes) = element.split_at(PieceIndex::SIZE); + let (piece_bytes, expected_checksum) = remaining_bytes.split_at(Piece::SIZE); + + // Verify checksum + let actual_checksum = blake3_hash_list(&[piece_index_bytes, piece_bytes]); + if actual_checksum != expected_checksum { + if element.iter().all(|&byte| byte == 0) { + return Ok(None); + } + + debug!( + actual_checksum = %hex::encode(actual_checksum), + expected_checksum = %hex::encode(expected_checksum), + "Hash doesn't match, corrupted or overridden piece in cache" + ); + + return Err(DiskPlotCacheError::ChecksumMismatch); + } + + let piece_index = PieceIndex::from_bytes( + piece_index_bytes + .try_into() + .expect("Statically known to have correct size; qed"), + ); + Ok(Some(piece_index)) + } +} diff --git a/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs b/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs new file mode 100644 index 0000000000..305f44ac57 --- /dev/null +++ b/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs @@ -0,0 +1,160 @@ +use crate::single_disk_farm::plot_cache::{DiskPlotCache, MaybePieceStoredResult}; +use rand::prelude::*; +use std::assert_matches::assert_matches; +use std::num::NonZeroU64; +use std::sync::Arc; +use subspace_core_primitives::{HistorySize, Piece, PieceIndex, Record, SectorIndex}; +use subspace_farmer_components::file_ext::FileExt; +use subspace_farmer_components::sector::{SectorMetadata, SectorMetadataChecksummed}; +use subspace_networking::libp2p::kad::RecordKey; +use subspace_networking::utils::multihash::ToMultihash; +use tempfile::tempfile; + +const FAKE_SECTOR_SIZE: usize = 2 * 1024 * 1024; +const TARGET_SECTOR_COUNT: SectorIndex = 5; + +#[test] +fn basic() { + let dummy_sector_metadata = SectorMetadataChecksummed::from(SectorMetadata { + sector_index: 0, + pieces_in_sector: 0, + s_bucket_sizes: Box::new([0u16; Record::NUM_S_BUCKETS]), + history_size: HistorySize::new(NonZeroU64::MIN), + }); + + let file = Arc::new(tempfile().unwrap()); + file.preallocate(FAKE_SECTOR_SIZE as u64 * u64::from(TARGET_SECTOR_COUNT)) + .unwrap(); + + let piece_index_0 = PieceIndex::from(0); + let piece_index_1 = PieceIndex::from(1); + let piece_index_2 = PieceIndex::from(2); + let piece_0 = { + let mut piece = Piece::default(); + thread_rng().fill(piece.as_mut()); + piece + }; + let piece_1 = { + let mut piece = Piece::default(); + thread_rng().fill(piece.as_mut()); + piece + }; + let piece_2 = { + let mut piece = Piece::default(); + thread_rng().fill(piece.as_mut()); + piece + }; + let record_key_0 = RecordKey::from(piece_index_0.to_multihash()); + let record_key_1 = RecordKey::from(piece_index_1.to_multihash()); + let record_key_2 = RecordKey::from(piece_index_2.to_multihash()); + + let sectors_metadata = Arc::default(); + + let disk_plot_cache = DiskPlotCache::new( + &file, + §ors_metadata, + TARGET_SECTOR_COUNT, + FAKE_SECTOR_SIZE, + ); + + // Initially empty + assert_matches!(disk_plot_cache.read_piece(&record_key_0), None); + assert_matches!( + disk_plot_cache.is_piece_maybe_stored(&record_key_0), + MaybePieceStoredResult::Vacant + ); + + // Can't store pieces when all sectors are plotted + sectors_metadata.write_blocking().resize( + usize::from(TARGET_SECTOR_COUNT), + dummy_sector_metadata.clone(), + ); + assert!(!disk_plot_cache + .try_store_piece(piece_index_0, &piece_0) + .unwrap()); + assert_matches!( + disk_plot_cache.is_piece_maybe_stored(&record_key_0), + MaybePieceStoredResult::No + ); + + // Clear plotted sectors and reopen + sectors_metadata.write_blocking().clear(); + let disk_plot_cache = DiskPlotCache::new( + &file, + §ors_metadata, + TARGET_SECTOR_COUNT, + FAKE_SECTOR_SIZE, + ); + + // Successfully stores piece if not all sectors are plotted + assert!(disk_plot_cache + .try_store_piece(piece_index_0, &piece_0) + .unwrap()); + assert_matches!( + disk_plot_cache.is_piece_maybe_stored(&record_key_0), + MaybePieceStoredResult::Yes + ); + assert!(disk_plot_cache.read_piece(&record_key_0).unwrap() == piece_0); + + // Store two more pieces and make sure they can be read + assert_matches!( + disk_plot_cache.is_piece_maybe_stored(&record_key_1), + MaybePieceStoredResult::Vacant + ); + assert!(disk_plot_cache + .try_store_piece(piece_index_1, &piece_1) + .unwrap()); + assert_matches!( + disk_plot_cache.is_piece_maybe_stored(&record_key_1), + MaybePieceStoredResult::Yes + ); + assert!(disk_plot_cache.read_piece(&record_key_1).unwrap() == piece_1); + + assert_matches!( + disk_plot_cache.is_piece_maybe_stored(&record_key_2), + MaybePieceStoredResult::Vacant + ); + assert!(disk_plot_cache + .try_store_piece(piece_index_2, &piece_2) + .unwrap()); + assert_matches!( + disk_plot_cache.is_piece_maybe_stored(&record_key_2), + MaybePieceStoredResult::Yes + ); + assert!(disk_plot_cache.read_piece(&record_key_2).unwrap() == piece_2); + + // Write almost all sectors even without updating metadata, this will result in internal piece + // read error due to checksum mismatch and eviction of the piece from cache + file.write_all_at( + &vec![0; usize::from(TARGET_SECTOR_COUNT - 1) * FAKE_SECTOR_SIZE], + 0, + ) + .unwrap(); + assert_matches!( + disk_plot_cache.is_piece_maybe_stored(&record_key_2), + MaybePieceStoredResult::Yes + ); + assert_matches!(disk_plot_cache.read_piece(&record_key_2), None); + assert_matches!( + disk_plot_cache.is_piece_maybe_stored(&record_key_2), + MaybePieceStoredResult::Vacant + ); + + // Updating metadata will immediately evict piece + assert_matches!( + disk_plot_cache.is_piece_maybe_stored(&record_key_1), + MaybePieceStoredResult::Yes + ); + sectors_metadata + .write_blocking() + .resize(usize::from(TARGET_SECTOR_COUNT - 1), dummy_sector_metadata); + assert_matches!( + disk_plot_cache.is_piece_maybe_stored(&record_key_1), + MaybePieceStoredResult::No + ); + + // Closing file will render cache unusable + assert!(disk_plot_cache.read_piece(&record_key_0).unwrap() == piece_0); + drop(file); + assert_matches!(disk_plot_cache.read_piece(&record_key_0), None); +} diff --git a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs index 552e306504..be30fb7909 100644 --- a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs @@ -94,9 +94,13 @@ where .get_piece_from_dsn_cache(piece_index, Self::convert_retry_policy(retry_policy)) .await?; - if maybe_piece.is_some() { + if let Some(piece) = maybe_piece { trace!(%piece_index, "Got piece from DSN L2 cache successfully"); - return Ok(maybe_piece); + inner + .farmer_cache + .maybe_store_additional_piece(piece_index, &piece) + .await; + return Ok(Some(piece)); } // Try node's RPC before reaching to L1 (archival storage on DSN) @@ -104,6 +108,10 @@ where match inner.node_client.piece(piece_index).await { Ok(Some(piece)) => { trace!(%piece_index, "Got piece from node successfully"); + inner + .farmer_cache + .maybe_store_additional_piece(piece_index, &piece) + .await; return Ok(Some(piece)); } Ok(None) => { @@ -128,6 +136,10 @@ where if let Some(read_piece_fut) = maybe_read_piece_fut { if let Some(piece) = read_piece_fut.await { trace!(%piece_index, "Got piece from local plot successfully"); + inner + .farmer_cache + .maybe_store_additional_piece(piece_index, &piece) + .await; return Ok(Some(piece)); } } @@ -140,10 +152,13 @@ where .get_piece_from_archival_storage(piece_index, MAX_RANDOM_WALK_ROUNDS) .await; - if archival_storage_search_result.is_some() { + if let Some(piece) = archival_storage_search_result { trace!(%piece_index, "DSN L1 lookup succeeded"); - - return Ok(archival_storage_search_result); + inner + .farmer_cache + .maybe_store_additional_piece(piece_index, &piece) + .await; + return Ok(Some(piece)); } debug!(