Skip to content

Commit

Permalink
Tiny refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Feb 23, 2024
1 parent d66ec76 commit b3526bc
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 26 deletions.
40 changes: 20 additions & 20 deletions crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ struct DiskPieceCacheState {
#[derive(Debug)]
enum WorkerCommand {
ReplaceBackingCaches {
new_caches: Vec<DiskPieceCache>,
new_piece_caches: Vec<DiskPieceCache>,
acknowledgement: oneshot::Sender<()>,
},
ForgetKey {
Expand Down Expand Up @@ -102,11 +102,11 @@ where
.expect("Always set during worker instantiation");

if let Some(WorkerCommand::ReplaceBackingCaches {
new_caches,
new_piece_caches,
acknowledgement,
}) = worker_receiver.recv().await
{
self.initialize(&piece_getter, &mut worker_state, new_caches)
self.initialize(&piece_getter, &mut worker_state, new_piece_caches)
.await;
// Doesn't matter if receiver is still waiting for acknowledgement
let _ = acknowledgement.send(());
Expand Down Expand Up @@ -163,10 +163,10 @@ where
{
match command {
WorkerCommand::ReplaceBackingCaches {
new_caches,
new_piece_caches,
acknowledgement,
} => {
self.initialize(piece_getter, worker_state, new_caches)
self.initialize(piece_getter, worker_state, new_piece_caches)
.await;
// Doesn't matter if receiver is still waiting for acknowledgement
let _ = acknowledgement.send(());
Expand Down Expand Up @@ -215,31 +215,31 @@ where
&self,
piece_getter: &PG,
worker_state: &mut CacheWorkerState,
new_caches: Vec<DiskPieceCache>,
new_piece_caches: Vec<DiskPieceCache>,
) where
PG: PieceGetter,
{
info!("Initializing piece cache");
// Pull old cache state since it will be replaced with a new one and reuse its allocations
let cache_state = mem::take(&mut *self.caches.write());
let mut stored_pieces = Vec::with_capacity(new_caches.len());
let mut free_offsets = Vec::with_capacity(new_caches.len());
let mut stored_pieces = Vec::with_capacity(new_piece_caches.len());
let mut free_offsets = Vec::with_capacity(new_piece_caches.len());
for mut state in cache_state {
state.stored_pieces.clear();
stored_pieces.push(state.stored_pieces);
state.free_offsets.clear();
free_offsets.push(state.free_offsets);
}
stored_pieces.resize(new_caches.len(), HashMap::default());
free_offsets.resize(new_caches.len(), VecDeque::default());
stored_pieces.resize(new_piece_caches.len(), HashMap::default());
free_offsets.resize(new_piece_caches.len(), VecDeque::default());

debug!("Collecting pieces that were in the cache before");

// Build cache state of all backends
let maybe_caches_futures = stored_pieces
.into_iter()
.zip(free_offsets)
.zip(new_caches)
.zip(new_piece_caches)
.enumerate()
.map(
|(index, ((mut stored_pieces, mut free_offsets), new_cache))| {
Expand Down Expand Up @@ -760,8 +760,8 @@ where
#[derive(Debug, Clone)]
pub struct FarmerCache {
peer_id: PeerId,
/// Individual disk caches where pieces are stored
caches: Arc<RwLock<Vec<DiskPieceCacheState>>>,
/// Individual dedicated piece caches
piece_caches: Arc<RwLock<Vec<DiskPieceCacheState>>>,
handlers: Arc<Handlers>,
// We do not want to increase capacity unnecessarily on clone
worker_sender: Arc<mpsc::Sender<WorkerCommand>>,
Expand All @@ -782,7 +782,7 @@ impl FarmerCache {

let instance = Self {
peer_id,
caches: Arc::clone(&caches),
piece_caches: Arc::clone(&caches),
handlers: Arc::clone(&handlers),
worker_sender: Arc::new(worker_sender),
};
Expand All @@ -801,11 +801,11 @@ impl FarmerCache {
pub async fn get_piece(&self, key: RecordKey) -> Option<Piece> {
let maybe_piece_fut = tokio::task::spawn_blocking({
let key = key.clone();
let caches = Arc::clone(&self.caches);
let piece_caches = Arc::clone(&self.piece_caches);
let worker_sender = Arc::clone(&self.worker_sender);

move || {
for (disk_farm_index, cache) in caches.read().iter().enumerate() {
for (disk_farm_index, cache) in piece_caches.read().iter().enumerate() {
let Some(&offset) = cache.stored_pieces.get(&key) else {
continue;
};
Expand Down Expand Up @@ -850,13 +850,13 @@ impl FarmerCache {
/// to identify when cache initialization has finished
pub async fn replace_backing_caches(
&self,
new_caches: Vec<DiskPieceCache>,
new_piece_caches: Vec<DiskPieceCache>,
) -> oneshot::Receiver<()> {
let (sender, receiver) = oneshot::channel();
if let Err(error) = self
.worker_sender
.send(WorkerCommand::ReplaceBackingCaches {
new_caches,
new_piece_caches,
acknowledgement: sender,
})
.await
Expand All @@ -876,8 +876,8 @@ impl FarmerCache {
impl LocalRecordProvider for FarmerCache {
fn record(&self, key: &RecordKey) -> Option<ProviderRecord> {
// It is okay to take read lock here, writes locks are very infrequent and very short
for cache in self.caches.read().iter() {
if cache.stored_pieces.contains_key(key) {
for piece_cache in self.piece_caches.read().iter() {
if piece_cache.stored_pieces.contains_key(key) {
// Note: We store our own provider records locally without local addresses
// to avoid redundant storage and outdated addresses. Instead these are
// acquired on demand when returning a `ProviderRecord` for the local node.
Expand Down
9 changes: 3 additions & 6 deletions crates/subspace-farmer/src/single_disk_farm/piece_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,9 @@ pub struct DiskPieceCache {
}

impl DiskPieceCache {
pub(super) const FILE_NAME: &'static str = "piece_cache.bin";
pub(crate) const FILE_NAME: &'static str = "piece_cache.bin";

pub(in super::super) fn open(
directory: &Path,
capacity: u32,
) -> Result<Self, DiskPieceCacheError> {
pub(crate) fn open(directory: &Path, capacity: u32) -> Result<Self, DiskPieceCacheError> {
if capacity == 0 {
return Err(DiskPieceCacheError::ZeroCapacity);
}
Expand Down Expand Up @@ -91,7 +88,7 @@ impl DiskPieceCache {
})
}

pub(super) const fn element_size() -> u32 {
pub(crate) const fn element_size() -> u32 {
(PieceIndex::SIZE + Piece::SIZE + mem::size_of::<Blake3Hash>()) as u32
}

Expand Down

0 comments on commit b3526bc

Please sign in to comment.