diff --git a/src/common.rs b/src/common.rs index 8657693c..97bf8902 100644 --- a/src/common.rs +++ b/src/common.rs @@ -3,9 +3,10 @@ use std::time::Duration; pub(crate) mod builder_utils; pub(crate) mod concurrent; pub(crate) mod deque; -pub(crate) mod entry; +pub mod entry; pub(crate) mod error; pub(crate) mod frequency_sketch; +pub(crate) mod policy_impl; pub(crate) mod time; pub(crate) mod timer_wheel; diff --git a/src/common/concurrent.rs b/src/common/concurrent.rs index 0532d5a8..3093260a 100644 --- a/src/common/concurrent.rs +++ b/src/common/concurrent.rs @@ -80,11 +80,19 @@ impl KeyHashDate { self.entry_info.last_accessed() } + pub(crate) fn expiration_time(&self) -> Option { + self.entry_info.expiration_time() + } + pub(crate) fn is_dirty(&self) -> bool { self.entry_info.is_dirty() } } +pub(crate) type KeyHashDateNode = DeqNode>; + +pub(crate) type KeyHashDateNodePtr = NonNull>; + pub(crate) struct KvEntry { pub(crate) key: Arc, pub(crate) entry: MiniArc>, diff --git a/src/common/concurrent/housekeeper.rs b/src/common/concurrent/housekeeper.rs index c9daa441..d01b62f4 100644 --- a/src/common/concurrent/housekeeper.rs +++ b/src/common/concurrent/housekeeper.rs @@ -3,14 +3,16 @@ use super::constants::LOG_SYNC_INTERVAL_MILLIS; use super::constants::{READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT}; use crate::common::time::{AtomicInstant, Instant}; use crate::common::HousekeeperConfig; +use crate::policy::{EntrySnapshot, EntrySnapshotConfig}; use parking_lot::{Mutex, MutexGuard}; +use std::marker::PhantomData; use std::{ sync::atomic::{AtomicBool, Ordering}, time::Duration, }; -pub(crate) trait InnerSync { +pub(crate) trait InnerSync { /// Runs the pending tasks. Returns `true` if there are more entries to evict in /// next run. fn run_pending_tasks( @@ -18,12 +20,13 @@ pub(crate) trait InnerSync { timeout: Option, max_log_sync_repeats: u32, eviction_batch_size: u32, - ) -> bool; + snapshot_request: Option, + ) -> (bool, Option>); fn now(&self) -> Instant; } -pub(crate) struct Housekeeper { +pub(crate) struct Housekeeper { run_lock: Mutex<()>, run_after: AtomicInstant, /// A flag to indicate if the last call on `run_pending_tasks` method left some @@ -46,9 +49,10 @@ pub(crate) struct Housekeeper { /// Default: `EVICTION_BATCH_SIZE`. eviction_batch_size: u32, auto_run_enabled: AtomicBool, + key_ty: PhantomData, } -impl Housekeeper { +impl Housekeeper { pub(crate) fn new( is_eviction_listener_enabled: bool, config: HousekeeperConfig, @@ -71,6 +75,7 @@ impl Housekeeper { max_log_sync_repeats: config.max_log_sync_repeats, eviction_batch_size: config.eviction_batch_size, auto_run_enabled: AtomicBool::new(true), + key_ty: PhantomData, } } @@ -102,28 +107,39 @@ impl Housekeeper { && (ch_len >= ch_flush_point || now >= self.run_after.instant().unwrap()) } - pub(crate) fn run_pending_tasks(&self, cache: &T) { + pub(crate) fn run_pending_tasks>( + &self, + cache: &T, + snapshot_config: Option, + ) -> Option> { let lock = self.run_lock.lock(); - self.do_run_pending_tasks(cache, lock); + self.do_run_pending_tasks(cache, lock, snapshot_config) } - pub(crate) fn try_run_pending_tasks(&self, cache: &T) -> bool { + pub(crate) fn try_run_pending_tasks>(&self, cache: &T) -> bool { if let Some(lock) = self.run_lock.try_lock() { - self.do_run_pending_tasks(cache, lock); + self.do_run_pending_tasks(cache, lock, None); true } else { false } } - fn do_run_pending_tasks(&self, cache: &T, _lock: MutexGuard<'_, ()>) { + fn do_run_pending_tasks>( + &self, + cache: &T, + _lock: MutexGuard<'_, ()>, + snapshot_config: Option, + ) -> Option> { let now = cache.now(); self.run_after.set_instant(Self::sync_after(now)); let timeout = self.maintenance_task_timeout; let repeats = self.max_log_sync_repeats; let batch_size = self.eviction_batch_size; - let more_to_evict = cache.run_pending_tasks(timeout, repeats, batch_size); + let (more_to_evict, snap) = + cache.run_pending_tasks(timeout, repeats, batch_size, snapshot_config); self.set_more_entries_to_evict(more_to_evict); + snap } fn sync_after(now: Instant) -> Instant { @@ -133,7 +149,7 @@ impl Housekeeper { } #[cfg(test)] -impl Housekeeper { +impl Housekeeper { pub(crate) fn disable_auto_run(&self) { self.auto_run_enabled.store(false, Ordering::Relaxed); } diff --git a/src/common/deque.rs b/src/common/deque.rs index 629cf4bb..bd2691ac 100644 --- a/src/common/deque.rs +++ b/src/common/deque.rs @@ -52,6 +52,10 @@ impl DeqNode { } } + pub(crate) fn prev_node_ptr(this: NonNull) -> Option>> { + unsafe { this.as_ref() }.prev + } + pub(crate) fn next_node_ptr(this: NonNull) -> Option>> { unsafe { this.as_ref() }.next } @@ -162,6 +166,10 @@ impl Deque { self.tail.as_ref().map(|node| unsafe { node.as_ref() }) } + pub(crate) fn peek_back_ptr(&self) -> Option>> { + self.tail.as_ref().copied() + } + /// Adds the given node to the back of the list. pub(crate) fn push_back(&mut self, mut node: Box>) -> NonNull> { // This method takes care not to create mutable references to whole nodes, diff --git a/src/common/entry.rs b/src/common/entry.rs index a1fb820d..b87e4241 100644 --- a/src/common/entry.rs +++ b/src/common/entry.rs @@ -1,4 +1,10 @@ -use std::{fmt::Debug, sync::Arc}; +use std::{ + fmt::Debug, + sync::Arc, + time::{Duration, Instant}, +}; + +use super::concurrent::KeyHashDate; /// A snapshot of a single entry in the cache. /// @@ -90,3 +96,105 @@ impl Entry { self.is_old_value_replaced } } + +#[derive(Debug, Clone)] +pub struct EntryMetadata { + region: EntryRegion, + policy_weight: u32, + last_modified: Instant, + last_accessed: Instant, + expiration_time: Option, + snapshot_at: Instant, +} + +impl EntryMetadata { + pub fn new( + region: EntryRegion, + policy_weight: u32, + last_modified: Instant, + last_accessed: Instant, + expiration_time: Option, + snapshot_at: Instant, + ) -> Self { + Self { + region, + policy_weight, + last_modified, + last_accessed, + expiration_time, + snapshot_at, + } + } + + pub(crate) fn from_element( + region: EntryRegion, + element: &KeyHashDate, + clock: &super::time::Clock, + time_to_live: Option, + time_to_idle: Option, + snapshot_at: Instant, + ) -> Self { + // SAFETY: `last_accessed` and `last_modified` should be `Some` since we + // assume the element is not dirty. But we use `unwrap_or_default` to avoid + // panicking just in case they are `None`. + let last_modified = clock.to_std_instant(element.last_modified().unwrap_or_default()); + let last_accessed = clock.to_std_instant(element.last_accessed().unwrap_or_default()); + + // When per-entry expiration is used, the expiration time is set in the + // element, otherwise, we calculate the expiration time based on the + // `time_to_live` and `time_to_idle` settings. + let expiration_time = if element.expiration_time().is_some() { + element.expiration_time().map(|ts| clock.to_std_instant(ts)) + } else { + match (time_to_live, time_to_idle) { + (Some(ttl), Some(tti)) => { + let exp_by_ttl = last_modified + ttl; + let exp_by_tti = last_accessed + tti; + Some(exp_by_ttl.min(exp_by_tti)) + } + (Some(ttl), None) => Some(last_modified + ttl), + (None, Some(tti)) => Some(last_accessed + tti), + (None, None) => None, + } + }; + + Self { + region, + policy_weight: element.entry_info().policy_weight(), + last_modified, + last_accessed, + expiration_time, + snapshot_at, + } + } + + pub fn region(&self) -> EntryRegion { + self.region + } + + pub fn policy_weight(&self) -> u32 { + self.policy_weight + } + + pub fn last_modified(&self) -> Instant { + self.last_modified + } + + pub fn last_accessed(&self) -> Instant { + self.last_accessed + } + + pub fn expiration_time(&self) -> Option { + self.expiration_time + } + + pub fn snapshot_at(&self) -> Instant { + self.snapshot_at + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum EntryRegion { + Window, + Main, +} diff --git a/src/common/policy_impl.rs b/src/common/policy_impl.rs new file mode 100644 index 00000000..a9f022a5 --- /dev/null +++ b/src/common/policy_impl.rs @@ -0,0 +1,86 @@ +use std::sync::Arc; + +use crate::{ + common::deque::DeqNode, + policy::{EntrySnapshot, EntrySnapshotConfig, ExpirationPolicy}, +}; + +use super::{ + concurrent::{deques::Deques, KeyHashDateNodePtr}, + entry::EntryMetadata, + time::Clock, +}; + +impl ExpirationPolicy { + pub(crate) fn capture_entry_snapshot( + &self, + sc: EntrySnapshotConfig, + deqs: &Deques, + clock: &Clock, + ) -> EntrySnapshot { + use crate::entry::EntryRegion; + + let coldest_entries = if sc.coldest == 0 { + Vec::new() + } else { + self.top_entries( + EntryRegion::Main, + sc.coldest, + deqs.probation.peek_front_ptr(), + DeqNode::next_node_ptr, + clock, + ) + }; + + let hottest_entries = if sc.hottest == 0 { + Vec::new() + } else { + self.top_entries( + EntryRegion::Main, + sc.hottest, + deqs.probation.peek_back_ptr(), + DeqNode::prev_node_ptr, + clock, + ) + }; + + EntrySnapshot::new(coldest_entries, hottest_entries) + } + + fn top_entries( + &self, + region: crate::entry::EntryRegion, + count: usize, + head: Option>, + next_fun: fn(KeyHashDateNodePtr) -> Option>, + clock: &Clock, + ) -> Vec<(Arc, EntryMetadata)> { + let mut entries = Vec::with_capacity(count); + + let mut next = head; + while entries.len() < count { + let Some(current) = next.take() else { + break; + }; + next = next_fun(current); + + let elem = &unsafe { current.as_ref() }.element; + if elem.is_dirty() { + continue; + } + + let snapshot_at = clock.to_std_instant(clock.now()); + let md = EntryMetadata::from_element( + region, + elem, + clock, + self.time_to_live(), + self.time_to_idle(), + snapshot_at, + ); + entries.push((Arc::clone(elem.key()), md)); + } + + entries + } +} diff --git a/src/common/time/instant.rs b/src/common/time/instant.rs index d542b21d..68c0c07c 100644 --- a/src/common/time/instant.rs +++ b/src/common/time/instant.rs @@ -4,7 +4,7 @@ pub(crate) const MAX_NANOS: u64 = u64::MAX - 1; /// `Instant` represents a point in time since the `Clock` was created. It has /// nanosecond precision. -#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] pub(crate) struct Instant { elapsed_ns: u64, } diff --git a/src/future/base_cache.rs b/src/future/base_cache.rs index f52ceebf..e635de1e 100644 --- a/src/future/base_cache.rs +++ b/src/future/base_cache.rs @@ -27,7 +27,9 @@ use crate::{ }, future::CancelGuard, notification::{AsyncEvictionListener, RemovalCause}, - policy::{EvictionPolicy, EvictionPolicyConfig, ExpirationPolicy}, + policy::{ + EntrySnapshot, EntrySnapshotConfig, EvictionPolicy, EvictionPolicyConfig, ExpirationPolicy, + }, sync_base::iter::ScanningGet, Entry, Expiry, Policy, PredicateError, }; @@ -52,7 +54,7 @@ use std::{ time::{Duration, Instant as StdInstant}, }; -pub(crate) type HouseKeeperArc = Arc; +pub(crate) type HouseKeeperArc = Arc>; pub(crate) struct BaseCache { pub(crate) inner: Arc>, @@ -60,7 +62,7 @@ pub(crate) struct BaseCache { pub(crate) write_op_ch: Sender>, pub(crate) interrupted_op_ch_snd: Sender>, pub(crate) interrupted_op_ch_rcv: Receiver>, - pub(crate) housekeeper: Option, + pub(crate) housekeeper: Option>, } impl Clone for BaseCache { @@ -384,7 +386,7 @@ where inner: &Arc>, ch: &Sender>, now: Instant, - housekeeper: Option<&HouseKeeperArc>, + housekeeper: Option<&HouseKeeperArc>, ) { let w_len = ch.len(); @@ -622,7 +624,7 @@ where ch_ready_event: &event_listener::Event<()>, op: WriteOp, ts: Instant, - housekeeper: Option<&HouseKeeperArc>, + housekeeper: Option<&HouseKeeperArc>, // Used only for testing. _should_block: bool, ) -> Result<(), TrySendError>> { @@ -737,12 +739,12 @@ where } #[inline] - fn should_apply_reads(hk: &HouseKeeperArc, ch_len: usize, now: Instant) -> bool { + fn should_apply_reads(hk: &HouseKeeperArc, ch_len: usize, now: Instant) -> bool { hk.should_apply_reads(ch_len, now) } #[inline] - fn should_apply_writes(hk: &HouseKeeperArc, ch_len: usize, now: Instant) -> bool { + fn should_apply_writes(hk: &HouseKeeperArc, ch_len: usize, now: Instant) -> bool { hk.should_apply_writes(ch_len, now) } } @@ -1293,9 +1295,14 @@ where timeout: Option, max_log_sync_repeats: u32, eviction_batch_size: u32, - ) -> bool { + snapshot_config: Option, + ) -> (bool, Option>) { if self.max_capacity == Some(0) { - return false; + if snapshot_config.is_some() { + return (false, Some(EntrySnapshot::default())); + } else { + return (false, None); + } } // Acquire some locks. @@ -1436,10 +1443,15 @@ where crossbeam_epoch::pin().flush(); + let snapshot = snapshot_config.map(|req| { + self.expiration_policy + .capture_entry_snapshot(req, &deqs, &self.clock) + }); + // Ensure this lock is held until here. drop(deqs); - eviction_state.more_entries_to_evict + (eviction_state.more_entries_to_evict, snapshot) } } @@ -2837,19 +2849,19 @@ mod tests { ($cache:ident, $key:ident, $hash:ident, $mock:ident, $duration_secs:expr) => { // Increment the time. $mock.increment(Duration::from_millis($duration_secs * 1000 - 1)); - $cache.inner.do_run_pending_tasks(None, 1, 10).await; + $cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert!($cache.contains_key_with_hash(&$key, $hash)); assert_eq!($cache.entry_count(), 1); // Increment the time by 1ms (3). The entry should be expired. $mock.increment(Duration::from_millis(1)); - $cache.inner.do_run_pending_tasks(None, 1, 10).await; + $cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert!(!$cache.contains_key_with_hash(&$key, $hash)); // Increment the time again to ensure the entry has been evicted from the // cache. $mock.increment(Duration::from_secs(1)); - $cache.inner.do_run_pending_tasks(None, 1, 10).await; + $cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert_eq!($cache.entry_count(), 0); }; } @@ -3133,7 +3145,7 @@ mod tests { insert(&cache, key, hash, value).await; // Run a sync to register the entry to the internal data structures including // the timer wheel. - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert_eq!(cache.entry_count(), 1); assert_expiry!(cache, key, hash, mock, 1); @@ -3155,12 +3167,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None); let inserted_at = current_time(&cache); insert(&cache, key, hash, value).await; - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(1)); - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert!(cache.contains_key_with_hash(&key, hash)); // Read the entry (2). @@ -3180,7 +3192,7 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert_expiry!(cache, key, hash, mock, 3); @@ -3202,12 +3214,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None); let inserted_at = current_time(&cache); insert(&cache, key, hash, value).await; - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(1)); - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert!(cache.contains_key_with_hash(&key, hash)); // Read the entry (2). @@ -3227,11 +3239,11 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; // Increment the time. mock.increment(Duration::from_secs(2)); - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3246,7 +3258,7 @@ mod tests { Some(3), ); insert(&cache, key, hash, value).await; - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert_eq!(cache.entry_count(), 1); assert_expiry!(cache, key, hash, mock, 3); @@ -3269,12 +3281,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None); let inserted_at = current_time(&cache); insert(&cache, key, hash, value).await; - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(1)); - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3295,11 +3307,11 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; // Increment the time. mock.increment(Duration::from_secs(2)); - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3314,7 +3326,7 @@ mod tests { None, ); insert(&cache, key, hash, value).await; - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert_eq!(cache.entry_count(), 1); assert_expiry!(cache, key, hash, mock, 7); @@ -3336,12 +3348,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(8)); let inserted_at = current_time(&cache); insert(&cache, key, hash, value).await; - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(5)); - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3362,7 +3374,7 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert_expiry!(cache, key, hash, mock, 7); @@ -3384,12 +3396,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(8)); let inserted_at = current_time(&cache); insert(&cache, key, hash, value).await; - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(5)); - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3410,11 +3422,11 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; // Increment the time. mock.increment(Duration::from_secs(6)); - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3435,7 +3447,7 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert_expiry!(cache, key, hash, mock, 5); @@ -3456,12 +3468,12 @@ mod tests { *expectation.lock().unwrap() = ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(9)); insert(&cache, key, hash, value).await; - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(6)); - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3477,12 +3489,12 @@ mod tests { ); let updated_at = current_time(&cache); insert(&cache, key, hash, value).await; - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(6)); - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3503,11 +3515,11 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; // Increment the time. mock.increment(Duration::from_secs(6)); - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3528,7 +3540,7 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.do_run_pending_tasks(None, 1, 10).await; + cache.inner.do_run_pending_tasks(None, 1, 10, None).await; assert_expiry!(cache, key, hash, mock, 4); } diff --git a/src/future/cache.rs b/src/future/cache.rs index 313035fb..e7947838 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -10,7 +10,9 @@ use crate::{ common::{concurrent::Weigher, time::Clock, HousekeeperConfig}, notification::AsyncEvictionListener, ops::compute::{self, CompResult}, - policy::{EvictionPolicy, ExpirationPolicy}, + policy::{ + future::PolicyExt, EntrySnapshot, EntrySnapshotConfig, EvictionPolicy, ExpirationPolicy, + }, Entry, Policy, PredicateError, }; @@ -707,6 +709,10 @@ impl Cache { self.base.policy() } + pub fn policy_ext(&self) -> PolicyExt<'_, K, V, S> { + PolicyExt::new(self) + } + /// Returns an approximate number of entries in this cache. /// /// The value returned is _an estimate_; the actual count may differ if there are @@ -1496,7 +1502,21 @@ where pub async fn run_pending_tasks(&self) { if let Some(hk) = &self.base.housekeeper { self.base.retry_interrupted_ops().await; - hk.run_pending_tasks(Arc::clone(&self.base.inner)).await; + hk.run_pending_tasks(Arc::clone(&self.base.inner), None) + .await; + } + } + + pub(crate) async fn capture_entry_snapshot( + &self, + snapshot_config: EntrySnapshotConfig, + ) -> EntrySnapshot { + if let Some(hk) = &self.base.housekeeper { + hk.run_pending_tasks(Arc::clone(&self.base.inner), Some(snapshot_config)) + .await + .unwrap_or_default() + } else { + EntrySnapshot::default() } } } @@ -2286,6 +2306,16 @@ mod tests { assert!(!cache.contains_key(&"c")); assert!(cache.contains_key(&"d")); + dbg!( + cache + .policy_ext() + .entry_snapshot() + .with_coldest(5) + .with_hottest(5) + .capture() + .await + ); + cache.invalidate(&"b").await; expected.push((Arc::new("b"), "bob", RemovalCause::Explicit)); cache.run_pending_tasks().await; diff --git a/src/future/housekeeper.rs b/src/future/housekeeper.rs index 1ef86cd1..4263544a 100644 --- a/src/future/housekeeper.rs +++ b/src/future/housekeeper.rs @@ -1,9 +1,12 @@ -use crate::common::{ - concurrent::constants::{ - LOG_SYNC_INTERVAL_MILLIS, READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT, +use crate::{ + common::{ + concurrent::constants::{ + LOG_SYNC_INTERVAL_MILLIS, READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT, + }, + time::{AtomicInstant, Instant}, + HousekeeperConfig, }, - time::{AtomicInstant, Instant}, - HousekeeperConfig, + policy::{EntrySnapshot, EntrySnapshotConfig}, }; use std::{ @@ -23,9 +26,11 @@ use futures_util::future::{BoxFuture, Shared}; use super::base_cache::Inner; -pub(crate) struct Housekeeper { +type RunTasksResult = (bool, Option>); + +pub(crate) struct Housekeeper { /// A shared `Future` of the maintenance task that is currently being resolved. - current_task: Mutex>>>, + current_task: Mutex>>>>, run_after: AtomicInstant, /// A flag to indicate if the last call on `run_pending_tasks` method left some /// entries to evict. @@ -53,7 +58,7 @@ pub(crate) struct Housekeeper { pub(crate) complete_count: AtomicUsize, } -impl Housekeeper { +impl Housekeeper { pub(crate) fn new( is_eviction_listener_enabled: bool, config: HousekeeperConfig, @@ -111,14 +116,19 @@ impl Housekeeper { && (ch_len >= ch_flush_point || now >= self.run_after.instant().unwrap()) } - pub(crate) async fn run_pending_tasks(&self, cache: Arc>) + pub(crate) async fn run_pending_tasks( + &self, + cache: Arc>, + snapshot_config: Option, + ) -> Option> where K: Hash + Eq + Send + Sync + 'static, V: Clone + Send + Sync + 'static, S: BuildHasher + Clone + Send + Sync + 'static, { let mut current_task = self.current_task.lock().await; - self.do_run_pending_tasks(Arc::clone(&cache), &mut current_task) + let snap = self + .do_run_pending_tasks(Arc::clone(&cache), &mut current_task, snapshot_config) .await; drop(current_task); @@ -126,18 +136,20 @@ impl Housekeeper { // If there are any async tasks waiting in `BaseCache::schedule_write_op` // method for the write op channel, notify them. cache.write_op_ch_ready_event.notify(usize::MAX); + + snap } /// Tries to run the pending tasks if the lock is free. Returns `true` if there /// are more entries to evict in next run. - pub(crate) async fn try_run_pending_tasks(&self, cache: &Arc>) -> bool + pub(crate) async fn try_run_pending_tasks(&self, cache: &Arc>) -> bool where K: Hash + Eq + Send + Sync + 'static, V: Clone + Send + Sync + 'static, S: BuildHasher + Clone + Send + Sync + 'static, { if let Some(mut current_task) = self.current_task.try_lock() { - self.do_run_pending_tasks(Arc::clone(cache), &mut current_task) + self.do_run_pending_tasks(Arc::clone(cache), &mut current_task, None) .await; } else { return false; @@ -151,11 +163,13 @@ impl Housekeeper { true } - async fn do_run_pending_tasks( + async fn do_run_pending_tasks( &self, cache: Arc>, - current_task: &mut Option>>, - ) where + current_task: &mut Option>>>, + snapshot_config: Option, + ) -> Option> + where K: Hash + Eq + Send + Sync + 'static, V: Clone + Send + Sync + 'static, S: BuildHasher + Clone + Send + Sync + 'static, @@ -164,13 +178,14 @@ impl Housekeeper { let now = cache.current_time(); let more_to_evict; + let snap; // Async Cancellation Safety: Our maintenance task is cancellable as we save // it in the lock. If it is canceled, we will resume it in the next run. if let Some(task) = &*current_task { // This task was cancelled in the previous run due to the enclosing // Future was dropped. Resume the task now by awaiting. - more_to_evict = task.clone().await; + (more_to_evict, snap) = task.clone().await; } else { let timeout = self.maintenance_task_timeout; let repeats = self.max_log_sync_repeats; @@ -178,7 +193,7 @@ impl Housekeeper { // Create a new maintenance task and await it. let task = async move { cache - .do_run_pending_tasks(timeout, repeats, batch_size) + .do_run_pending_tasks(timeout, repeats, batch_size, snapshot_config) .await } .boxed() @@ -188,7 +203,7 @@ impl Housekeeper { #[cfg(test)] self.start_count.fetch_add(1, Ordering::AcqRel); - more_to_evict = task.await; + (more_to_evict, snap) = task.await; } // If we are here, it means that the maintenance task has been completed. @@ -199,6 +214,8 @@ impl Housekeeper { #[cfg(test)] self.complete_count.fetch_add(1, Ordering::AcqRel); + + snap } fn sync_after(now: Instant) -> Instant { @@ -208,7 +225,7 @@ impl Housekeeper { } #[cfg(test)] -impl Housekeeper { +impl Housekeeper { pub(crate) fn disable_auto_run(&self) { self.auto_run_enabled.store(false, Ordering::Relaxed); } diff --git a/src/lib.rs b/src/lib.rs index 51f74c5a..1348f8c1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -273,7 +273,7 @@ pub use common::error::PredicateError; #[cfg(any(feature = "sync", feature = "future"))] #[cfg_attr(docsrs, doc(cfg(any(feature = "sync", feature = "future"))))] -pub use common::entry::Entry; +pub use common::entry::{self, Entry}; #[cfg(any(feature = "sync", feature = "future"))] #[cfg_attr(docsrs, doc(cfg(any(feature = "sync", feature = "future"))))] diff --git a/src/policy.rs b/src/policy.rs index b98a3464..6a2b31b6 100644 --- a/src/policy.rs +++ b/src/policy.rs @@ -4,6 +4,8 @@ use std::{ time::{Duration, Instant}, }; +use crate::entry::EntryMetadata; + #[derive(Clone, Debug)] /// The policy of a cache. pub struct Policy { @@ -403,3 +405,166 @@ pub(crate) mod test_utils { } } } + +#[derive(Debug)] +pub struct EntrySnapshot { + coldest: Vec<(Arc, EntryMetadata)>, + hottest: Vec<(Arc, EntryMetadata)>, +} + +impl Clone for EntrySnapshot { + fn clone(&self) -> Self { + Self { + coldest: self.coldest.clone(), + hottest: self.hottest.clone(), + } + } +} + +impl Default for EntrySnapshot { + fn default() -> Self { + Self { + coldest: Vec::default(), + hottest: Vec::default(), + } + } +} + +impl EntrySnapshot { + pub(crate) fn new( + coldest: Vec<(Arc, EntryMetadata)>, + hottest: Vec<(Arc, EntryMetadata)>, + ) -> Self { + Self { coldest, hottest } + } + + pub fn coldest(&self) -> &[(Arc, EntryMetadata)] { + &self.coldest + } + + pub fn hottest(&self) -> &[(Arc, EntryMetadata)] { + &self.hottest + } +} + +#[derive(Debug, Clone, Default)] +pub(crate) struct EntrySnapshotConfig { + pub(crate) coldest: usize, + pub(crate) hottest: usize, +} + +#[cfg(feature = "future")] +pub mod future { + use std::hash::{BuildHasher, Hash}; + + use crate::future::Cache; + + use super::{EntrySnapshot, EntrySnapshotConfig}; + + pub struct PolicyExt<'a, K, V, S> { + cache: &'a Cache, + } + + impl<'a, K, V, S> PolicyExt<'a, K, V, S> { + pub(crate) fn new(cache: &'a Cache) -> Self { + Self { cache } + } + + pub fn entry_snapshot(&self) -> EntrySnapshotRequest<'_, K, V, S> { + EntrySnapshotRequest::new(self.cache) + } + } + + pub struct EntrySnapshotRequest<'a, K, V, S> { + cache: &'a Cache, + config: EntrySnapshotConfig, + } + + impl<'a, K, V, S> EntrySnapshotRequest<'a, K, V, S> { + pub(crate) fn new(cache: &'a Cache) -> Self { + Self { + cache, + config: EntrySnapshotConfig::default(), + } + } + + pub fn with_coldest(self, count: usize) -> Self { + let mut req = self; + req.config.coldest = count; + req + } + + pub fn with_hottest(self, count: usize) -> Self { + let mut req = self; + req.config.hottest = count; + req + } + + pub async fn capture(self) -> EntrySnapshot + where + K: Hash + Send + Sync + Eq + 'static, + V: Clone + Send + Sync + 'static, + S: BuildHasher + Clone + Send + Sync + 'static, + { + self.cache.capture_entry_snapshot(self.config).await + } + } +} + +#[cfg(feature = "sync")] +pub mod sync { + use std::hash::{BuildHasher, Hash}; + + use crate::sync::Cache; + + use super::{EntrySnapshot, EntrySnapshotConfig}; + + pub struct PolicyExt<'a, K, V, S> { + cache: &'a Cache, + } + + impl<'a, K, V, S> PolicyExt<'a, K, V, S> { + pub(crate) fn new(cache: &'a Cache) -> Self { + Self { cache } + } + + pub fn entry_snapshot(&self) -> EntrySnapshotRequest<'_, K, V, S> { + EntrySnapshotRequest::new(self.cache) + } + } + + pub struct EntrySnapshotRequest<'a, K, V, S> { + cache: &'a Cache, + config: EntrySnapshotConfig, + } + + impl<'a, K, V, S> EntrySnapshotRequest<'a, K, V, S> { + pub(crate) fn new(cache: &'a Cache) -> Self { + Self { + cache, + config: EntrySnapshotConfig::default(), + } + } + + pub fn with_coldest(self, count: usize) -> Self { + let mut req = self; + req.config.coldest = count; + req + } + + pub fn with_hottest(self, count: usize) -> Self { + let mut req = self; + req.config.hottest = count; + req + } + + pub fn capture(self) -> EntrySnapshot + where + K: Hash + Send + Sync + Eq + 'static, + V: Clone + Send + Sync + 'static, + S: BuildHasher + Clone + Send + Sync + 'static, + { + self.cache.capture_entry_snapshot(self.config) + } + } +} diff --git a/src/sync/cache.rs b/src/sync/cache.rs index 2cbf19ea..efecc477 100644 --- a/src/sync/cache.rs +++ b/src/sync/cache.rs @@ -12,7 +12,9 @@ use crate::{ }, notification::EvictionListener, ops::compute::{self, CompResult}, - policy::{EvictionPolicy, ExpirationPolicy}, + policy::{ + sync::PolicyExt, EntrySnapshot, EntrySnapshotConfig, EvictionPolicy, ExpirationPolicy, + }, sync::{Iter, PredicateId}, sync_base::{ base_cache::{BaseCache, HouseKeeperArc}, @@ -639,6 +641,10 @@ impl Cache { self.base.policy() } + pub fn policy_ext(&self) -> PolicyExt<'_, K, V, S> { + PolicyExt::new(self) + } + /// Returns an approximate number of entries in this cache. /// /// The value returned is _an estimate_; the actual count may differ if there are @@ -1767,9 +1773,20 @@ where /// Performs any pending maintenance operations needed by the cache. pub fn run_pending_tasks(&self) { if let Some(hk) = &self.base.housekeeper { - hk.run_pending_tasks(&*self.base.inner); + hk.run_pending_tasks(&*self.base.inner, None); } } + + pub(crate) fn capture_entry_snapshot( + &self, + snapshot_config: EntrySnapshotConfig, + ) -> EntrySnapshot { + self.base + .housekeeper + .as_ref() + .and_then(|hk| hk.run_pending_tasks(&*self.base.inner, Some(snapshot_config))) + .unwrap_or_default() + } } impl<'a, K, V, S> IntoIterator for &'a Cache @@ -1821,11 +1838,11 @@ where // TODO: Like future::Cache, move this method to BaseCache. #[inline] fn schedule_write_op( - inner: &impl InnerSync, + inner: &impl InnerSync, ch: &Sender>, op: WriteOp, now: Instant, - housekeeper: Option<&HouseKeeperArc>, + housekeeper: Option<&HouseKeeperArc>, ) -> Result<(), TrySendError>> { let mut op = op; @@ -1988,6 +2005,13 @@ mod tests { assert!(!cache.contains_key(&"c")); assert!(cache.contains_key(&"d")); + dbg!(cache + .policy_ext() + .entry_snapshot() + .with_coldest(5) + .with_hottest(5) + .capture()); + cache.invalidate(&"b"); expected.push((Arc::new("b"), "bob", RemovalCause::Explicit)); cache.run_pending_tasks(); diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index a42c20ce..508b8dca 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -26,7 +26,9 @@ use crate::{ CacheRegion, HousekeeperConfig, }, notification::{notifier::RemovalNotifier, EvictionListener, RemovalCause}, - policy::{EvictionPolicy, EvictionPolicyConfig, ExpirationPolicy}, + policy::{ + EntrySnapshot, EntrySnapshotConfig, EvictionPolicy, EvictionPolicyConfig, ExpirationPolicy, + }, Entry, Expiry, Policy, PredicateError, }; @@ -47,13 +49,13 @@ use std::{ time::{Duration, Instant as StdInstant}, }; -pub(crate) type HouseKeeperArc = Arc; +pub(crate) type HouseKeeperArc = Arc>; pub(crate) struct BaseCache { pub(crate) inner: Arc>, read_op_ch: Sender>, pub(crate) write_op_ch: Sender>, - pub(crate) housekeeper: Option, + pub(crate) housekeeper: Option>, } impl Clone for BaseCache { @@ -387,10 +389,10 @@ where #[inline] pub(crate) fn apply_reads_writes_if_needed( - inner: &impl InnerSync, + inner: &impl InnerSync, ch: &Sender>, now: Instant, - housekeeper: Option<&HouseKeeperArc>, + housekeeper: Option<&HouseKeeperArc>, ) { let w_len = ch.len(); @@ -607,12 +609,12 @@ where } #[inline] - fn should_apply_reads(hk: &HouseKeeperArc, ch_len: usize, now: Instant) -> bool { + fn should_apply_reads(hk: &HouseKeeperArc, ch_len: usize, now: Instant) -> bool { hk.should_apply_reads(ch_len, now) } #[inline] - fn should_apply_writes(hk: &HouseKeeperArc, ch_len: usize, now: Instant) -> bool { + fn should_apply_writes(hk: &HouseKeeperArc, ch_len: usize, now: Instant) -> bool { hk.should_apply_writes(ch_len, now) } } @@ -1137,7 +1139,7 @@ where } } -impl InnerSync for Inner +impl InnerSync for Inner where K: Hash + Eq + Send + Sync + 'static, V: Clone + Send + Sync + 'static, @@ -1148,8 +1150,14 @@ where timeout: Option, max_log_sync_repeats: u32, eviction_batch_size: u32, - ) -> bool { - self.do_run_pending_tasks(timeout, max_log_sync_repeats, eviction_batch_size) + snapshot_config: Option, + ) -> (bool, Option>) { + self.do_run_pending_tasks( + timeout, + max_log_sync_repeats, + eviction_batch_size, + snapshot_config, + ) } fn now(&self) -> Instant { @@ -1168,9 +1176,14 @@ where timeout: Option, max_log_sync_repeats: u32, eviction_batch_size: u32, - ) -> bool { + snapshot_config: Option, + ) -> (bool, Option>) { if self.max_capacity == Some(0) { - return false; + if snapshot_config.is_some() { + return (false, Some(EntrySnapshot::default())); + } else { + return (false, None); + } } // Acquire some locks. @@ -1296,10 +1309,15 @@ where crossbeam_epoch::pin().flush(); + let snapshot = snapshot_config.map(|req| { + self.expiration_policy + .capture_entry_snapshot(req, &deqs, &self.clock) + }); + // Ensure the deqs lock is held until here. drop(deqs); - eviction_state.more_entries_to_evict + (eviction_state.more_entries_to_evict, snapshot) } } @@ -2603,19 +2621,19 @@ mod tests { ($cache:ident, $key:ident, $hash:ident, $mock:ident, $duration_secs:expr) => { // Increment the time. $mock.increment(Duration::from_millis($duration_secs * 1000 - 1)); - $cache.inner.run_pending_tasks(None, 1, 10); + $cache.inner.run_pending_tasks(None, 1, 10, None); assert!($cache.contains_key_with_hash(&$key, $hash)); assert_eq!($cache.entry_count(), 1); // Increment the time by 1ms (3). The entry should be expired. $mock.increment(Duration::from_millis(1)); - $cache.inner.run_pending_tasks(None, 1, 10); + $cache.inner.run_pending_tasks(None, 1, 10, None); assert!(!$cache.contains_key_with_hash(&$key, $hash)); // Increment the time again to ensure the entry has been evicted from the // cache. $mock.increment(Duration::from_secs(1)); - $cache.inner.run_pending_tasks(None, 1, 10); + $cache.inner.run_pending_tasks(None, 1, 10, None); assert_eq!($cache.entry_count(), 0); }; } @@ -2900,7 +2918,7 @@ mod tests { insert(&cache, key, hash, value); // Run a sync to register the entry to the internal data structures including // the timer wheel. - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert_eq!(cache.entry_count(), 1); assert_expiry!(cache, key, hash, mock, 1); @@ -2922,12 +2940,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None); let inserted_at = current_time(&cache); insert(&cache, key, hash, value); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(1)); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert!(cache.contains_key_with_hash(&key, hash)); // Read the entry (2). @@ -2946,7 +2964,7 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert_expiry!(cache, key, hash, mock, 3); @@ -2968,12 +2986,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None); let inserted_at = current_time(&cache); insert(&cache, key, hash, value); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(1)); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert!(cache.contains_key_with_hash(&key, hash)); // Read the entry (2). @@ -2992,11 +3010,11 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); // Increment the time. mock.increment(Duration::from_secs(2)); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3011,7 +3029,7 @@ mod tests { Some(3), ); insert(&cache, key, hash, value); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert_eq!(cache.entry_count(), 1); assert_expiry!(cache, key, hash, mock, 3); @@ -3034,12 +3052,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None); let inserted_at = current_time(&cache); insert(&cache, key, hash, value); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(1)); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3059,11 +3077,11 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); // Increment the time. mock.increment(Duration::from_secs(2)); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3078,7 +3096,7 @@ mod tests { None, ); insert(&cache, key, hash, value); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert_eq!(cache.entry_count(), 1); assert_expiry!(cache, key, hash, mock, 7); @@ -3100,12 +3118,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(8)); let inserted_at = current_time(&cache); insert(&cache, key, hash, value); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(5)); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3125,7 +3143,7 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert_expiry!(cache, key, hash, mock, 7); @@ -3147,12 +3165,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(8)); let inserted_at = current_time(&cache); insert(&cache, key, hash, value); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(5)); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3172,11 +3190,11 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); // Increment the time. mock.increment(Duration::from_secs(6)); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3196,7 +3214,7 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert_expiry!(cache, key, hash, mock, 5); @@ -3217,12 +3235,12 @@ mod tests { *expectation.lock().unwrap() = ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(9)); insert(&cache, key, hash, value); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(6)); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3238,12 +3256,12 @@ mod tests { ); let updated_at = current_time(&cache); insert(&cache, key, hash, value); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(6)); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3263,11 +3281,11 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); // Increment the time. mock.increment(Duration::from_secs(6)); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3287,7 +3305,7 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(None, 1, 10); + cache.inner.run_pending_tasks(None, 1, 10, None); assert_expiry!(cache, key, hash, mock, 4); }