diff --git a/src/common.rs b/src/common.rs index 9ad7a1a2..5e63e783 100644 --- a/src/common.rs +++ b/src/common.rs @@ -6,11 +6,14 @@ pub(crate) mod deque; pub mod entry; pub(crate) mod error; pub(crate) mod frequency_sketch; -pub(crate) mod time; -pub(crate) mod timer_wheel; + +#[cfg(feature = "sync")] +pub(crate) mod policy_impl; #[cfg(test)] pub(crate) mod test_utils; +pub(crate) mod time; +pub(crate) mod timer_wheel; use self::concurrent::constants::{ DEFAULT_EVICTION_BATCH_SIZE, DEFAULT_MAINTENANCE_TASK_TIMEOUT_MILLIS, diff --git a/src/common/concurrent.rs b/src/common/concurrent.rs index 3093260a..f1a99cfe 100644 --- a/src/common/concurrent.rs +++ b/src/common/concurrent.rs @@ -80,6 +80,7 @@ impl KeyHashDate { self.entry_info.last_accessed() } + #[cfg(feature = "sync")] pub(crate) fn expiration_time(&self) -> Option { self.entry_info.expiration_time() } @@ -89,8 +90,10 @@ impl KeyHashDate { } } +#[cfg(feature = "sync")] pub(crate) type KeyHashDateNode = DeqNode>; +#[cfg(feature = "sync")] pub(crate) type KeyHashDateNodePtr = NonNull>; pub(crate) struct KvEntry { diff --git a/src/common/concurrent/housekeeper.rs b/src/common/concurrent/housekeeper.rs index 01ee6e00..d01b62f4 100644 --- a/src/common/concurrent/housekeeper.rs +++ b/src/common/concurrent/housekeeper.rs @@ -3,7 +3,7 @@ use super::constants::LOG_SYNC_INTERVAL_MILLIS; use super::constants::{READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT}; use crate::common::time::{AtomicInstant, Instant}; use crate::common::HousekeeperConfig; -use crate::entry::{EntrySnapshot, EntrySnapshotConfig}; +use crate::policy::{EntrySnapshot, EntrySnapshotConfig}; use parking_lot::{Mutex, MutexGuard}; use std::marker::PhantomData; diff --git a/src/common/deque.rs b/src/common/deque.rs index bd2691ac..4f600809 100644 --- a/src/common/deque.rs +++ b/src/common/deque.rs @@ -52,6 +52,7 @@ impl DeqNode { } } + #[cfg(feature = "sync")] pub(crate) fn prev_node_ptr(this: NonNull) -> Option>> { unsafe { this.as_ref() }.prev } @@ -166,6 +167,7 @@ impl Deque { self.tail.as_ref().map(|node| unsafe { node.as_ref() }) } + #[cfg(feature = "sync")] pub(crate) fn peek_back_ptr(&self) -> Option>> { self.tail.as_ref().copied() } diff --git a/src/common/entry.rs b/src/common/entry.rs index 1533a38f..b1cca6eb 100644 --- a/src/common/entry.rs +++ b/src/common/entry.rs @@ -1,12 +1,9 @@ -use std::{ - fmt::Debug, - hash::{BuildHasher, Hash}, - sync::Arc, - time::{Duration, Instant}, -}; +use std::{fmt::Debug, sync::Arc, time::Instant}; -use crate::sync::Cache; +#[cfg(feature = "sync")] +use std::time::Duration; +#[cfg(feature = "sync")] use super::concurrent::KeyHashDate; /// A snapshot of a single entry in the cache. @@ -104,38 +101,39 @@ impl Entry { pub struct EntryMetadata { region: EntryRegion, policy_weight: u32, - estimated_frequency: u8, last_modified: Instant, last_accessed: Instant, expiration_time: Option, + snapshot_at: Instant, } impl EntryMetadata { pub fn new( region: EntryRegion, policy_weight: u32, - estimated_frequency: u8, last_modified: Instant, last_accessed: Instant, expiration_time: Option, + snapshot_at: Instant, ) -> Self { Self { region, policy_weight, - estimated_frequency, last_modified, last_accessed, expiration_time, + snapshot_at, } } + #[cfg(feature = "sync")] pub(crate) fn from_element( region: EntryRegion, - estimated_frequency: u8, element: &KeyHashDate, clock: &super::time::Clock, time_to_live: Option, time_to_idle: Option, + snapshot_at: Instant, ) -> Self { // SAFETY: `last_accessed` and `last_modified` should be `Some` since we // assume the element is not dirty. But we use `unwrap_or_default` to avoid @@ -164,10 +162,10 @@ impl EntryMetadata { Self { region, policy_weight: element.entry_info().policy_weight(), - estimated_frequency, last_modified, last_accessed, expiration_time, + snapshot_at, } } @@ -179,10 +177,6 @@ impl EntryMetadata { self.policy_weight } - pub fn estimated_frequency(&self) -> u8 { - self.estimated_frequency - } - pub fn last_modified(&self) -> Instant { self.last_modified } @@ -194,88 +188,14 @@ impl EntryMetadata { pub fn expiration_time(&self) -> Option { self.expiration_time } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum EntryRegion { - Window, - Main, -} - -#[derive(Debug, Clone)] -pub struct EntrySnapshot { - snapshot_at: Instant, - coldest: Vec<(Arc, EntryMetadata)>, - hottest: Vec<(Arc, EntryMetadata)>, -} - -impl EntrySnapshot { - pub(crate) fn new( - snapshot_at: Instant, - coldest: Vec<(Arc, EntryMetadata)>, - hottest: Vec<(Arc, EntryMetadata)>, - ) -> Self { - Self { - snapshot_at, - coldest, - hottest, - } - } pub fn snapshot_at(&self) -> Instant { self.snapshot_at } - - pub fn coldest(&self) -> &[(Arc, EntryMetadata)] { - &self.coldest - } - - pub fn hottest(&self) -> &[(Arc, EntryMetadata)] { - &self.hottest - } -} - -pub struct EntrySnapshotRequest { - cache: Cache, - config: EntrySnapshotConfig, -} - -impl EntrySnapshotRequest { - pub(crate) fn new_with_cache(cache: Cache) -> Self { - Self { - cache, - config: EntrySnapshotConfig::default(), - } - } - - pub fn with_coldest(self, count: usize) -> Self { - let mut req = self; - req.config.coldest = count; - req - } - - pub fn with_hottest(self, count: usize) -> Self { - let mut req = self; - req.config.hottest = count; - req - } - - // pub fn config(&self) -> &EntrySnapshotConfig { - // &self.config - // } - - pub fn capture(self) -> EntrySnapshot - where - K: Hash + Send + Sync + Eq + 'static, - V: Clone + Send + Sync + 'static, - S: BuildHasher + Clone + Send + Sync + 'static, - { - self.cache.capture_entry_snapshot(self.config) - } } -#[derive(Debug, Clone, Default)] -pub(crate) struct EntrySnapshotConfig { - pub(crate) coldest: usize, - pub(crate) hottest: usize, +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum EntryRegion { + Window, + Main, } diff --git a/src/common/policy_impl.rs b/src/common/policy_impl.rs new file mode 100644 index 00000000..a9f022a5 --- /dev/null +++ b/src/common/policy_impl.rs @@ -0,0 +1,86 @@ +use std::sync::Arc; + +use crate::{ + common::deque::DeqNode, + policy::{EntrySnapshot, EntrySnapshotConfig, ExpirationPolicy}, +}; + +use super::{ + concurrent::{deques::Deques, KeyHashDateNodePtr}, + entry::EntryMetadata, + time::Clock, +}; + +impl ExpirationPolicy { + pub(crate) fn capture_entry_snapshot( + &self, + sc: EntrySnapshotConfig, + deqs: &Deques, + clock: &Clock, + ) -> EntrySnapshot { + use crate::entry::EntryRegion; + + let coldest_entries = if sc.coldest == 0 { + Vec::new() + } else { + self.top_entries( + EntryRegion::Main, + sc.coldest, + deqs.probation.peek_front_ptr(), + DeqNode::next_node_ptr, + clock, + ) + }; + + let hottest_entries = if sc.hottest == 0 { + Vec::new() + } else { + self.top_entries( + EntryRegion::Main, + sc.hottest, + deqs.probation.peek_back_ptr(), + DeqNode::prev_node_ptr, + clock, + ) + }; + + EntrySnapshot::new(coldest_entries, hottest_entries) + } + + fn top_entries( + &self, + region: crate::entry::EntryRegion, + count: usize, + head: Option>, + next_fun: fn(KeyHashDateNodePtr) -> Option>, + clock: &Clock, + ) -> Vec<(Arc, EntryMetadata)> { + let mut entries = Vec::with_capacity(count); + + let mut next = head; + while entries.len() < count { + let Some(current) = next.take() else { + break; + }; + next = next_fun(current); + + let elem = &unsafe { current.as_ref() }.element; + if elem.is_dirty() { + continue; + } + + let snapshot_at = clock.to_std_instant(clock.now()); + let md = EntryMetadata::from_element( + region, + elem, + clock, + self.time_to_live(), + self.time_to_idle(), + snapshot_at, + ); + entries.push((Arc::clone(elem.key()), md)); + } + + entries + } +} diff --git a/src/policy.rs b/src/policy.rs index b98a3464..4e12a3fc 100644 --- a/src/policy.rs +++ b/src/policy.rs @@ -4,6 +4,8 @@ use std::{ time::{Duration, Instant}, }; +use crate::entry::EntryMetadata; + #[derive(Clone, Debug)] /// The policy of a cache. pub struct Policy { @@ -403,3 +405,101 @@ pub(crate) mod test_utils { } } } + +#[derive(Debug, Clone)] +pub struct EntrySnapshot { + coldest: Vec<(Arc, EntryMetadata)>, + hottest: Vec<(Arc, EntryMetadata)>, +} + +impl Default for EntrySnapshot { + fn default() -> Self { + Self { + coldest: Vec::default(), + hottest: Vec::default(), + } + } +} + +impl EntrySnapshot { + #[cfg(feature = "sync")] + pub(crate) fn new( + coldest: Vec<(Arc, EntryMetadata)>, + hottest: Vec<(Arc, EntryMetadata)>, + ) -> Self { + Self { coldest, hottest } + } + + pub fn coldest(&self) -> &[(Arc, EntryMetadata)] { + &self.coldest + } + + pub fn hottest(&self) -> &[(Arc, EntryMetadata)] { + &self.hottest + } +} + +#[cfg(feature = "sync")] +#[derive(Debug, Clone, Default)] +pub(crate) struct EntrySnapshotConfig { + pub(crate) coldest: usize, + pub(crate) hottest: usize, +} + +#[cfg(feature = "sync")] +pub mod sync { + use std::hash::{BuildHasher, Hash}; + + use crate::sync::Cache; + + use super::{EntrySnapshot, EntrySnapshotConfig}; + + pub struct PolicyExt<'a, K, V, S> { + cache: &'a Cache, + } + + impl<'a, K, V, S> PolicyExt<'a, K, V, S> { + pub(crate) fn new(cache: &'a Cache) -> Self { + Self { cache } + } + + pub fn entry_snapshot(&self) -> EntrySnapshotRequest<'_, K, V, S> { + EntrySnapshotRequest::new(self.cache) + } + } + + pub struct EntrySnapshotRequest<'a, K, V, S> { + cache: &'a Cache, + config: EntrySnapshotConfig, + } + + impl<'a, K, V, S> EntrySnapshotRequest<'a, K, V, S> { + pub(crate) fn new(cache: &'a Cache) -> Self { + Self { + cache, + config: EntrySnapshotConfig::default(), + } + } + + pub fn with_coldest(self, count: usize) -> Self { + let mut req = self; + req.config.coldest = count; + req + } + + pub fn with_hottest(self, count: usize) -> Self { + let mut req = self; + req.config.hottest = count; + req + } + + pub fn capture(self) -> EntrySnapshot + where + K: Hash + Send + Sync + Eq + 'static, + V: Clone + Send + Sync + 'static, + S: BuildHasher + Clone + Send + Sync + 'static, + { + self.cache.capture_entry_snapshot(self.config) + } + } +} diff --git a/src/sync/cache.rs b/src/sync/cache.rs index 3979c5ab..efecc477 100644 --- a/src/sync/cache.rs +++ b/src/sync/cache.rs @@ -10,10 +10,11 @@ use crate::{ time::{Clock, Instant}, HousekeeperConfig, }, - entry::{EntrySnapshot, EntrySnapshotConfig, EntrySnapshotRequest}, notification::EvictionListener, ops::compute::{self, CompResult}, - policy::{EvictionPolicy, ExpirationPolicy}, + policy::{ + sync::PolicyExt, EntrySnapshot, EntrySnapshotConfig, EvictionPolicy, ExpirationPolicy, + }, sync::{Iter, PredicateId}, sync_base::{ base_cache::{BaseCache, HouseKeeperArc}, @@ -640,8 +641,8 @@ impl Cache { self.base.policy() } - pub fn policy_snapshot(&self) -> EntrySnapshotRequest { - EntrySnapshotRequest::new_with_cache(self.clone()) + pub fn policy_ext(&self) -> PolicyExt<'_, K, V, S> { + PolicyExt::new(self) } /// Returns an approximate number of entries in this cache. @@ -1780,13 +1781,11 @@ where &self, snapshot_config: EntrySnapshotConfig, ) -> EntrySnapshot { - if let Some(hk) = &self.base.housekeeper { - if let Some(snap) = hk.run_pending_tasks(&*self.base.inner, Some(snapshot_config)) { - return snap; - } - } - let now = self.base.clock().to_std_instant(self.base.current_time()); - EntrySnapshot::new(now, Vec::default(), Vec::default()) + self.base + .housekeeper + .as_ref() + .and_then(|hk| hk.run_pending_tasks(&*self.base.inner, Some(snapshot_config))) + .unwrap_or_default() } } @@ -2007,7 +2006,8 @@ mod tests { assert!(cache.contains_key(&"d")); dbg!(cache - .policy_snapshot() + .policy_ext() + .entry_snapshot() .with_coldest(5) .with_hottest(5) .capture()); diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index 9f7625bf..508b8dca 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -16,8 +16,8 @@ use crate::{ deques::Deques, entry_info::EntryInfo, housekeeper::{Housekeeper, InnerSync}, - AccessTime, KeyHash, KeyHashDate, KeyHashDateNodePtr, KvEntry, OldEntryInfo, ReadOp, - ValueEntry, Weigher, WriteOp, + AccessTime, KeyHash, KeyHashDate, KvEntry, OldEntryInfo, ReadOp, ValueEntry, Weigher, + WriteOp, }, deque::{DeqNode, Deque}, frequency_sketch::FrequencySketch, @@ -25,9 +25,10 @@ use crate::{ timer_wheel::{ReschedulingResult, TimerWheel}, CacheRegion, HousekeeperConfig, }, - entry::{EntryMetadata, EntrySnapshot, EntrySnapshotConfig}, notification::{notifier::RemovalNotifier, EvictionListener, RemovalCause}, - policy::{EvictionPolicy, EvictionPolicyConfig, ExpirationPolicy}, + policy::{ + EntrySnapshot, EntrySnapshotConfig, EvictionPolicy, EvictionPolicyConfig, ExpirationPolicy, + }, Entry, Expiry, Policy, PredicateError, }; @@ -100,10 +101,6 @@ impl BaseCache { self.inner.max_capacity == Some(0) } - pub(crate) fn clock(&self) -> &Clock { - &self.inner.clock - } - #[inline] pub(crate) fn is_removal_notifier_enabled(&self) -> bool { self.inner.is_removal_notifier_enabled() @@ -1183,9 +1180,7 @@ where ) -> (bool, Option>) { if self.max_capacity == Some(0) { if snapshot_config.is_some() { - let now = self.clock().to_std_instant(self.current_time()); - let snap = EntrySnapshot::new(now, Vec::default(), Vec::default()); - return (false, Some(snap)); + return (false, Some(EntrySnapshot::default())); } else { return (false, None); } @@ -1314,7 +1309,10 @@ where crossbeam_epoch::pin().flush(); - let snapshot = snapshot_config.map(|req| self.capture_snapshot(req, &deqs)); + let snapshot = snapshot_config.map(|req| { + self.expiration_policy + .capture_entry_snapshot(req, &deqs, &self.clock) + }); // Ensure the deqs lock is held until here. drop(deqs); @@ -2304,72 +2302,6 @@ where eviction_state.more_entries_to_evict = true; } } - - fn capture_snapshot(&self, sc: EntrySnapshotConfig, deqs: &Deques) -> EntrySnapshot { - use crate::entry::EntryRegion; - - let freq = &self.frequency_sketch.read(); - let now = self.clock().to_std_instant(self.current_time()); - - let coldest_entries = Self::top_entries( - EntryRegion::Main, - sc.coldest, - deqs.probation.peek_front_ptr(), - DeqNode::next_node_ptr, - freq, - self.clock(), - self.time_to_live(), - self.time_to_idle(), - ); - - let hottest_entries = Self::top_entries( - EntryRegion::Main, - sc.hottest, - deqs.probation.peek_back_ptr(), - DeqNode::prev_node_ptr, - freq, - self.clock(), - self.time_to_live(), - self.time_to_idle(), - ); - - EntrySnapshot::new(now, coldest_entries, hottest_entries) - } - - #[allow(clippy::too_many_arguments)] - fn top_entries( - region: crate::entry::EntryRegion, - count: usize, - head: Option>, - next_fun: fn(KeyHashDateNodePtr) -> Option>, - freq: &FrequencySketch, - clock: &Clock, - time_to_live: Option, - time_to_idle: Option, - ) -> Vec<(Arc, EntryMetadata)> { - let mut entries = Vec::with_capacity(count); - - let mut next = head; - while let Some(current) = next.take() { - next = next_fun(current); - let elem = &unsafe { current.as_ref() }.element; - - if elem.is_dirty() { - continue; - } - - let f = freq.frequency(elem.hash()); - let md = - EntryMetadata::from_element(region, f, elem, clock, time_to_live, time_to_idle); - entries.push((Arc::clone(elem.key()), md)); - - if entries.len() >= count { - break; - } - } - - entries - } } impl Inner