From f481300a466a8df76973927d7dd88c8a0ef818de Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sun, 5 Jan 2025 09:44:57 +0800 Subject: [PATCH 1/4] Increase the accuracy of time measurements by replacing most uses of `quanta::Instant` with `std::time::Instant` --- src/common/concurrent.rs | 7 - .../concurrent/atomic_time/atomic_time.rs | 61 ----- .../atomic_time/atomic_time_compat.rs | 40 ---- src/common/concurrent/entry_info.rs | 15 +- src/common/concurrent/housekeeper.rs | 24 +- src/common/time.rs | 53 +---- src/common/time/atomic_time.rs | 62 +++++ src/common/time/clock.rs | 162 ++++++++++++++ src/common/time/clock_compat.rs | 50 ----- src/common/time/clock_quanta.rs | 5 - src/common/time/instant.rs | 80 +++++++ src/common/timer_wheel.rs | 35 +-- src/future/base_cache.rs | 211 +++++------------- src/future/builder.rs | 11 +- src/future/cache.rs | 67 +++--- src/future/housekeeper.rs | 26 +-- src/sync/builder.rs | 14 +- src/sync/cache.rs | 58 +++-- src/sync/segment.rs | 39 +--- src/sync_base/base_cache.rs | 201 +++++------------ 20 files changed, 548 insertions(+), 673 deletions(-) delete mode 100644 src/common/concurrent/atomic_time/atomic_time.rs delete mode 100644 src/common/concurrent/atomic_time/atomic_time_compat.rs create mode 100644 src/common/time/atomic_time.rs create mode 100644 src/common/time/clock.rs delete mode 100644 src/common/time/clock_compat.rs delete mode 100644 src/common/time/clock_quanta.rs create mode 100644 src/common/time/instant.rs diff --git a/src/common/concurrent.rs b/src/common/concurrent.rs index 3611af21..0532d5a8 100644 --- a/src/common/concurrent.rs +++ b/src/common/concurrent.rs @@ -12,13 +12,6 @@ pub(crate) mod entry_info; #[cfg(feature = "sync")] pub(crate) mod housekeeper; -#[cfg_attr(feature = "quanta", path = "concurrent/atomic_time/atomic_time.rs")] -#[cfg_attr( - not(feature = "quanta"), - path = "concurrent/atomic_time/atomic_time_compat.rs" -)] -pub(crate) mod atomic_time; - #[cfg(feature = "unstable-debug-counters")] pub(crate) mod debug_counters; diff --git a/src/common/concurrent/atomic_time/atomic_time.rs b/src/common/concurrent/atomic_time/atomic_time.rs deleted file mode 100644 index c03ea249..00000000 --- a/src/common/concurrent/atomic_time/atomic_time.rs +++ /dev/null @@ -1,61 +0,0 @@ -use crate::common::time::{clock::Instant as ClockInstant, Instant}; - -use std::{any::TypeId, sync::atomic::Ordering}; - -use portable_atomic::AtomicU64; - -#[derive(Debug)] -pub(crate) struct AtomicInstant { - instant: AtomicU64, -} - -impl Default for AtomicInstant { - fn default() -> Self { - Self { - instant: AtomicU64::new(u64::MAX), - } - } -} - -// TODO: Need a safe way to convert between `quanta::Instant` and `u64`. -// quanta v0.10.0 no longer provides `quanta::Instant::as_u64` method. - -impl AtomicInstant { - pub(crate) fn new(timestamp: Instant) -> Self { - let ai = Self::default(); - ai.set_instant(timestamp); - ai - } - - pub(crate) fn clear(&self) { - self.instant.store(u64::MAX, Ordering::Release); - } - - pub(crate) fn is_set(&self) -> bool { - self.instant.load(Ordering::Acquire) != u64::MAX - } - - pub(crate) fn instant(&self) -> Option { - let ts = self.instant.load(Ordering::Acquire); - if ts == u64::MAX { - None - } else { - debug_assert_eq!( - TypeId::of::(), - TypeId::of::() - ); - Some(Instant::new(unsafe { - std::mem::transmute::(ts) - })) - } - } - - pub(crate) fn set_instant(&self, instant: Instant) { - debug_assert_eq!( - TypeId::of::(), - TypeId::of::() - ); - let ts = unsafe { std::mem::transmute::(instant.inner_clock()) }; - self.instant.store(ts, Ordering::Release); - } -} diff --git a/src/common/concurrent/atomic_time/atomic_time_compat.rs b/src/common/concurrent/atomic_time/atomic_time_compat.rs deleted file mode 100644 index d9e73ab3..00000000 --- a/src/common/concurrent/atomic_time/atomic_time_compat.rs +++ /dev/null @@ -1,40 +0,0 @@ -use super::Instant; - -use parking_lot::RwLock; - -#[derive(Debug)] -pub(crate) struct AtomicInstant { - instant: RwLock>, -} - -impl Default for AtomicInstant { - fn default() -> Self { - Self { - instant: RwLock::new(None), - } - } -} - -impl AtomicInstant { - pub(crate) fn new(timestamp: Instant) -> Self { - let ai = Self::default(); - ai.set_instant(timestamp); - ai - } - - pub(crate) fn clear(&self) { - *self.instant.write() = None; - } - - pub(crate) fn is_set(&self) -> bool { - self.instant.read().is_some() - } - - pub(crate) fn instant(&self) -> Option { - *self.instant.read() - } - - pub(crate) fn set_instant(&self, instant: Instant) { - *self.instant.write() = Some(instant); - } -} diff --git a/src/common/concurrent/entry_info.rs b/src/common/concurrent/entry_info.rs index 5fd25f3b..8cdbce84 100644 --- a/src/common/concurrent/entry_info.rs +++ b/src/common/concurrent/entry_info.rs @@ -1,7 +1,7 @@ use std::sync::atomic::{self, AtomicBool, AtomicU16, AtomicU32, Ordering}; use super::{AccessTime, KeyHash}; -use crate::common::{concurrent::atomic_time::AtomicInstant, time::Instant}; +use crate::common::time::{AtomicInstant, Instant}; #[derive(Debug)] pub(crate) struct EntryInfo { @@ -191,7 +191,6 @@ mod test { // e.g. "1.64" let ver = option_env!("RUSTC_SEMVER").expect("RUSTC_SEMVER env var was not set at compile time"); - let is_quanta_enabled = cfg!(feature = "quanta"); let arch = if cfg!(target_os = "linux") { if cfg!(target_pointer_width = "64") { Linux64 @@ -214,14 +213,10 @@ mod test { panic!("Unsupported target architecture"); }; - let expected_sizes = match (arch, is_quanta_enabled) { - (Linux64 | Linux32Arm | Linux32Mips, true) => vec![("1.51", 56)], - (Linux32X86, true) => vec![("1.51", 48)], - (MacOS64, true) => vec![("1.62", 56)], - (Linux64, false) => vec![("1.66", 104), ("1.60", 128)], - (Linux32X86, false) => unimplemented!(), - (Linux32Arm | Linux32Mips, false) => vec![("1.66", 104), ("1.62", 128), ("1.60", 80)], - (MacOS64, false) => vec![("1.62", 104)], + let expected_sizes = match arch { + Linux64 | Linux32Arm | Linux32Mips => vec![("1.51", 56)], + Linux32X86 => vec![("1.51", 48)], + MacOS64 => vec![("1.62", 56)], }; let mut expected = None; diff --git a/src/common/concurrent/housekeeper.rs b/src/common/concurrent/housekeeper.rs index 11df87f4..c9daa441 100644 --- a/src/common/concurrent/housekeeper.rs +++ b/src/common/concurrent/housekeeper.rs @@ -1,10 +1,7 @@ use super::constants::LOG_SYNC_INTERVAL_MILLIS; -use super::{ - atomic_time::AtomicInstant, - constants::{READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT}, -}; -use crate::common::time::{CheckedTimeOps, Instant}; +use super::constants::{READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT}; +use crate::common::time::{AtomicInstant, Instant}; use crate::common::HousekeeperConfig; use parking_lot::{Mutex, MutexGuard}; @@ -52,7 +49,11 @@ pub(crate) struct Housekeeper { } impl Housekeeper { - pub(crate) fn new(is_eviction_listener_enabled: bool, config: HousekeeperConfig) -> Self { + pub(crate) fn new( + is_eviction_listener_enabled: bool, + config: HousekeeperConfig, + now: Instant, + ) -> Self { let (more_entries_to_evict, maintenance_task_timeout) = if is_eviction_listener_enabled { ( Some(AtomicBool::new(false)), @@ -64,7 +65,7 @@ impl Housekeeper { Self { run_lock: Mutex::default(), - run_after: AtomicInstant::new(Self::sync_after(Instant::now())), + run_after: AtomicInstant::new(Self::sync_after(now)), more_entries_to_evict, maintenance_task_timeout, max_log_sync_repeats: config.max_log_sync_repeats, @@ -127,10 +128,7 @@ impl Housekeeper { fn sync_after(now: Instant) -> Instant { let dur = Duration::from_millis(LOG_SYNC_INTERVAL_MILLIS); - let ts = now.checked_add(dur); - // Assuming that `now` is current wall clock time, this should never fail at - // least next millions of years. - ts.expect("Timestamp overflow") + now.saturating_add(dur) } } @@ -139,8 +137,4 @@ impl Housekeeper { pub(crate) fn disable_auto_run(&self) { self.auto_run_enabled.store(false, Ordering::Relaxed); } - - pub(crate) fn reset_run_after(&self, now: Instant) { - self.run_after.set_instant(Self::sync_after(now)); - } } diff --git a/src/common/time.rs b/src/common/time.rs index 41a4c14d..55b05e9f 100644 --- a/src/common/time.rs +++ b/src/common/time.rs @@ -1,53 +1,10 @@ -use std::time::Duration; - -#[cfg_attr(feature = "quanta", path = "time/clock_quanta.rs")] -#[cfg_attr(not(feature = "quanta"), path = "time/clock_compat.rs")] -pub(crate) mod clock; +mod atomic_time; +mod clock; +mod instant; +pub(crate) use atomic_time::AtomicInstant; pub(crate) use clock::Clock; +pub(crate) use instant::Instant; #[cfg(test)] pub(crate) use clock::Mock; - -/// a wrapper type over Instant to force checked additions and prevent -/// unintentional overflow. The type preserve the Copy semantics for the wrapped -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)] -pub(crate) struct Instant(clock::Instant); - -pub(crate) trait CheckedTimeOps { - fn checked_add(&self, duration: Duration) -> Option - where - Self: Sized; - - fn checked_duration_since(&self, earlier: Self) -> Option - where - Self: Sized; -} - -impl Instant { - pub(crate) fn new(instant: clock::Instant) -> Instant { - Instant(instant) - } - - pub(crate) fn now() -> Instant { - Instant(clock::Instant::now()) - } - - #[cfg(feature = "quanta")] - pub(crate) fn inner_clock(self) -> clock::Instant { - self.0 - } -} - -impl CheckedTimeOps for Instant { - fn checked_add(&self, duration: Duration) -> Option { - self.0.checked_add(duration).map(Instant) - } - - fn checked_duration_since(&self, earlier: Self) -> Option - where - Self: Sized, - { - self.0.checked_duration_since(earlier.0) - } -} diff --git a/src/common/time/atomic_time.rs b/src/common/time/atomic_time.rs new file mode 100644 index 00000000..707bacf6 --- /dev/null +++ b/src/common/time/atomic_time.rs @@ -0,0 +1,62 @@ +use crate::common::time::Instant; + +use portable_atomic::AtomicU64; +use std::sync::atomic::Ordering; + +/// `AtomicInstant` is a wrapper around `AtomicU64` that provides thread-safe access +/// to an `Instant`. +/// +/// `u64::MAX` is used to represent an unset `Instant`. +#[derive(Debug)] +pub(crate) struct AtomicInstant { + instant: AtomicU64, +} + +impl Default for AtomicInstant { + /// Creates a new `AtomicInstant` with an unset `Instant`. + fn default() -> Self { + Self { + instant: AtomicU64::new(u64::MAX), + } + } +} + +impl AtomicInstant { + /// Creates a new `AtomicInstant` with the given `Instant`. + pub(crate) fn new(instant: Instant) -> Self { + // Ensure the `Instant` is not `u64::MAX`, which means unset. + debug_assert!(instant.as_nanos() != u64::MAX); + + Self { + instant: AtomicU64::new(instant.as_nanos()), + } + } + + /// Clears the `Instant`. + pub(crate) fn clear(&self) { + self.instant.store(u64::MAX, Ordering::Release); + } + + /// Returns `true` if the `Instant` is set. + pub(crate) fn is_set(&self) -> bool { + self.instant.load(Ordering::Acquire) != u64::MAX + } + + /// Returns the `Instant` if it is set, otherwise `None`. + pub(crate) fn instant(&self) -> Option { + let ts = self.instant.load(Ordering::Acquire); + if ts == u64::MAX { + None + } else { + Some(Instant::from_nanos(ts)) + } + } + + /// Sets the `Instant`. + pub(crate) fn set_instant(&self, instant: Instant) { + // Ensure the `Instant` is not `u64::MAX`, which means unset. + debug_assert!(instant.as_nanos() != u64::MAX); + + self.instant.store(instant.as_nanos(), Ordering::Release); + } +} diff --git a/src/common/time/clock.rs b/src/common/time/clock.rs new file mode 100644 index 00000000..19735959 --- /dev/null +++ b/src/common/time/clock.rs @@ -0,0 +1,162 @@ +use std::time::{Duration, Instant as StdInstant}; + +#[cfg(test)] +use std::sync::Arc; + +#[cfg(test)] +use parking_lot::RwLock; + +// This is `moka`'s `Instant` struct. +use super::Instant; + +#[derive(Default, Clone)] +pub(crate) struct Clock { + ty: ClockType, +} + +#[derive(Clone)] +enum ClockType { + /// A clock that uses `std::time::Instant` as the source of time. + Standard { origin: StdInstant }, + #[cfg(feature = "quanta")] + /// A clock that uses both `std::time::Instant` and `quanta::Instant` as the + /// sources of time. + Hybrid { + std_origin: StdInstant, + quanta_origin: quanta::Instant, + }, + #[cfg(test)] + /// A clock that uses a mocked source of time. + Mocked { mock: Arc }, +} + +impl Default for ClockType { + /// Create a new `ClockType` with the current time as the origin. + /// + /// If the `quanta` feature is enabled, `Hybrid` will be used. Otherwise, + /// `Standard` will be used. + fn default() -> Self { + #[cfg(feature = "quanta")] + { + // Try to get the both origins at close to the same time. Assuming that + // `quanta::Instant::now` has lower latency than `StdInstant::now`. + let quanta_origin = quanta::Instant::now(); + let std_origin = StdInstant::now(); + return ClockType::Hybrid { + std_origin, + quanta_origin, + }; + } + + #[allow(unreachable_code)] + ClockType::Standard { + origin: StdInstant::now(), + } + } +} + +impl Clock { + #[cfg(test)] + /// Creates a new `Clock` with a mocked source of time. + pub(crate) fn mock() -> (Clock, Arc) { + let mock = Arc::new(Mock::default()); + let clock = Clock { + ty: ClockType::Mocked { + mock: Arc::clone(&mock), + }, + }; + (clock, mock) + } + + /// Returns the current time using a reliable source of time. + /// + /// When the the type is `Standard` or `Hybrid`, the time is based on + /// `std::time::Instant`. When the type is `Mocked`, the time is based on the + /// mocked source of time. + pub(crate) fn now(&self) -> Instant { + match &self.ty { + ClockType::Standard { origin } => { + Instant::from_duration_since_clock_start(origin.elapsed()) + } + #[cfg(feature = "quanta")] + ClockType::Hybrid { std_origin, .. } => { + Instant::from_duration_since_clock_start(std_origin.elapsed()) + } + #[cfg(test)] + ClockType::Mocked { mock } => Instant::from_duration_since_clock_start(mock.elapsed()), + } + } + + /// Returns the current time _maybe_ using a fast but less reliable source of + /// time. The time may drift from the time returned by `now`, or not be + /// monotonically increasing. + /// + /// This is useful for performance critical code that does not require the same + /// level of precision as `now`. (e.g. measuring the time between two events for + /// metrics) + /// + /// When the type is `Standard` or `Mocked`, `now` is internally called. So there + /// is no performance benefit. + /// + /// When the type is `Hybrid`, the time is based on `quanta::Instant`, which can + /// be faster than `std::time::Instant`, depending on the CPU architecture. + pub(crate) fn fast_now(&self) -> Instant { + match &self.ty { + #[cfg(feature = "quanta")] + ClockType::Hybrid { quanta_origin, .. } => { + Instant::from_duration_since_clock_start(quanta_origin.elapsed()) + } + ClockType::Standard { .. } => self.now(), + #[cfg(test)] + ClockType::Mocked { .. } => self.now(), + } + } + + /// Converts an `Instant` to a `std::time::Instant`. + pub(crate) fn to_std_instant(&self, instant: Instant) -> StdInstant { + match &self.ty { + ClockType::Standard { origin } => { + let duration = Duration::from_nanos(instant.as_nanos()); + *origin + duration + } + #[cfg(feature = "quanta")] + ClockType::Hybrid { std_origin, .. } => { + let duration = Duration::from_nanos(instant.as_nanos()); + *std_origin + duration + } + #[cfg(test)] + ClockType::Mocked { mock } => { + let duration = Duration::from_nanos(instant.as_nanos()); + mock.origin + duration + } + } + } +} + +#[cfg(test)] +pub(crate) struct Mock { + origin: StdInstant, + now: RwLock, +} + +#[cfg(test)] +impl Default for Mock { + fn default() -> Self { + let origin = StdInstant::now(); + Self { + origin, + now: RwLock::new(origin), + } + } +} + +#[cfg(test)] +impl Mock { + pub(crate) fn increment(&self, amount: Duration) { + *self.now.write() += amount; + } + + pub(crate) fn elapsed(&self) -> Duration { + self.now.read().duration_since(self.origin) + } +} diff --git a/src/common/time/clock_compat.rs b/src/common/time/clock_compat.rs deleted file mode 100644 index 99d6ce0f..00000000 --- a/src/common/time/clock_compat.rs +++ /dev/null @@ -1,50 +0,0 @@ -use std::{sync::Arc, time::Instant as StdInstant}; - -#[cfg(test)] -use std::time::Duration; - -use parking_lot::RwLock; - -pub(crate) type Instant = StdInstant; - -pub(crate) struct Clock { - mock: Option>, -} - -impl Clock { - #[cfg(test)] - pub(crate) fn mock() -> (Clock, Arc) { - let mock = Arc::new(Mock::default()); - let clock = Clock { - mock: Some(Arc::clone(&mock)), - }; - (clock, mock) - } - - pub(crate) fn now(&self) -> Instant { - if let Some(mock) = &self.mock { - *mock.now.read() - } else { - StdInstant::now() - } - } -} - -pub(crate) struct Mock { - now: RwLock, -} - -impl Default for Mock { - fn default() -> Self { - Self { - now: RwLock::new(StdInstant::now()), - } - } -} - -#[cfg(test)] -impl Mock { - pub(crate) fn increment(&self, amount: Duration) { - *self.now.write() += amount; - } -} diff --git a/src/common/time/clock_quanta.rs b/src/common/time/clock_quanta.rs deleted file mode 100644 index 5b64d303..00000000 --- a/src/common/time/clock_quanta.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub(crate) type Clock = quanta::Clock; -pub(crate) type Instant = quanta::Instant; - -#[cfg(test)] -pub(crate) type Mock = quanta::Mock; diff --git a/src/common/time/instant.rs b/src/common/time/instant.rs new file mode 100644 index 00000000..d542b21d --- /dev/null +++ b/src/common/time/instant.rs @@ -0,0 +1,80 @@ +use std::time::Duration; + +pub(crate) const MAX_NANOS: u64 = u64::MAX - 1; + +/// `Instant` represents a point in time since the `Clock` was created. It has +/// nanosecond precision. +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub(crate) struct Instant { + elapsed_ns: u64, +} + +impl Instant { + pub(crate) fn from_nanos(nanos: u64) -> Instant { + debug_assert!(nanos <= MAX_NANOS); + Instant { elapsed_ns: nanos } + } + + pub(crate) fn from_duration_since_clock_start(duration: Duration) -> Instant { + Instant::from_nanos(Self::duration_to_saturating_nanoseconds(duration)) + } + + pub(crate) fn as_nanos(&self) -> u64 { + self.elapsed_ns + } + + /// Converts a `std::time::Duration` to nanoseconds, saturating to + /// `MAX_NANOSECONDS` (`u64::MAX - 1`) if the duration is too large. + /// (`Duration::as_nanos` returns `u128`) + /// + /// Note that `u64::MAX - 1` is used here instead of `u64::MAX` because + /// `u64::MAX` is used by `moka`'s `AtomicTime` to indicate the time is unset. + pub(crate) fn duration_to_saturating_nanoseconds(duration: Duration) -> u64 { + u64::try_from(duration.as_nanos()) + .map(|n| n.min(MAX_NANOS)) + .unwrap_or(MAX_NANOS) + } + + pub(crate) fn saturating_add(&self, duration: Duration) -> Instant { + let dur_ms = Self::duration_to_saturating_nanoseconds(duration); + Instant::from_nanos(self.elapsed_ns.saturating_add(dur_ms).min(MAX_NANOS)) + } + + pub(crate) fn saturating_duration_since(&self, earlier: Self) -> Duration + where + Self: Sized, + { + Duration::from_nanos(self.elapsed_ns.saturating_sub(earlier.elapsed_ns)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_saturating_add() { + let instant = Instant::from_nanos(100_000); + let duration = Duration::from_nanos(50_000); + let result = instant.saturating_add(duration); + assert_eq!(result, Instant::from_nanos(150_000)); + + let instant = Instant::from_nanos(u64::MAX - 10_000); + let duration = Duration::from_nanos(12_000); + let result = instant.saturating_add(duration); + assert_eq!(result, Instant::from_nanos(u64::MAX - 1)); + } + + #[test] + fn test_saturating_duration_since() { + let instant = Instant::from_nanos(100_000); + let earlier = Instant::from_nanos(60_000); + let result = instant.saturating_duration_since(earlier); + assert_eq!(result, Duration::from_nanos(40_000)); + + let instant = Instant::from_nanos(60_000); + let earlier = Instant::from_nanos(100_000); + let result = instant.saturating_duration_since(earlier); + assert_eq!(result, Duration::ZERO); + } +} diff --git a/src/common/timer_wheel.rs b/src/common/timer_wheel.rs index 98da268a..958b9991 100644 --- a/src/common/timer_wheel.rs +++ b/src/common/timer_wheel.rs @@ -16,7 +16,7 @@ use std::{ptr::NonNull, time::Duration}; use super::{ concurrent::{arc::MiniArc, entry_info::EntryInfo, DeqNodes}, deque::{DeqNode, Deque}, - time::{CheckedTimeOps, Instant}, + time::Instant, }; use parking_lot::Mutex; @@ -175,12 +175,6 @@ impl TimerWheel { } } - #[cfg(test)] - pub(crate) fn set_origin(&mut self, time: Instant) { - self.origin = time; - self.current = time; - } - pub(crate) fn is_enabled(&self) -> bool { !self.wheels.is_empty() } @@ -354,12 +348,10 @@ impl TimerWheel { // Returns nano-seconds between the given `time` and the time when this timer // wheel was advanced. If the `time` is earlier than other, returns zero. fn duration_nanos_since_last_advanced(&self, time: Instant) -> u64 { - time.checked_duration_since(self.current) - // If `time` is earlier than `self.current`, use zero. This could happen - // when a user provided `Expiry` method returned zero or a very short - // duration. - .unwrap_or_default() // Assuming `Duration::default()` returns `ZERO`. - .as_nanos() as u64 + // If `time` is earlier than `self.current`, use zero. This could happen + // when a user provided `Expiry` method returned zero or a very short + // duration. + time.saturating_duration_since(self.current).as_nanos() as u64 } // Returns nano-seconds between the given `time` and `self.origin`, the time when @@ -371,12 +363,11 @@ impl TimerWheel { // fn time_nanos(&self, time: Instant) -> u64 { let nanos_u128 = time - .checked_duration_since(self.origin) // If `time` is earlier than `self.origin`, use zero. This would never // happen in practice as there should be some delay between the timer // wheel was created and the first timer event is scheduled. But we will // do this just in case. - .unwrap_or_default() // Assuming `Duration::default()` returns `ZERO`. + .saturating_duration_since(self.origin) .as_nanos(); // Convert an `u128` into an `u64`. If the value is too large, use `u64::MAX` @@ -560,18 +551,18 @@ mod tests { use super::{TimerEvent, TimerWheel, SPANS}; use crate::common::{ concurrent::{arc::MiniArc, entry_info::EntryInfo, KeyHash}, - time::{CheckedTimeOps, Clock, Instant, Mock}, + time::{Clock, Instant, Mock}, }; #[test] fn test_bucket_indices() { fn bi(timer: &TimerWheel<()>, now: Instant, dur: Duration) -> (usize, usize) { - let t = now.checked_add(dur).unwrap(); + let t = now.saturating_add(dur); timer.bucket_indices(t) } let (clock, mock) = Clock::mock(); - let now = now(&clock); + let now = clock.now(); let mut timer = TimerWheel::<()>::new(now); timer.enable(); @@ -652,7 +643,7 @@ mod tests { let key_hash = KeyHash::new(Arc::new(key), hash); let policy_weight = 0; let entry_info = MiniArc::new(EntryInfo::new(key_hash, now, policy_weight)); - entry_info.set_expiration_time(Some(now.checked_add(ttl).unwrap())); + entry_info.set_expiration_time(Some(now.saturating_add(ttl))); let deq_nodes = Default::default(); let timer_node = timer.schedule(entry_info, MiniArc::clone(&deq_nodes)); deq_nodes.lock().set_timer_node(timer_node); @@ -794,13 +785,9 @@ mod tests { // Utility functions // - fn now(clock: &Clock) -> Instant { - Instant::new(clock.now()) - } - fn advance_clock(clock: &Clock, mock: &Arc, duration: Duration) -> Instant { mock.increment(duration); - now(clock) + clock.now() } /// Convert nano-seconds to duration. diff --git a/src/future/base_cache.rs b/src/future/base_cache.rs index 35ce2f1d..8154505a 100644 --- a/src/future/base_cache.rs +++ b/src/future/base_cache.rs @@ -11,7 +11,6 @@ use crate::{ self, concurrent::{ arc::MiniArc, - atomic_time::AtomicInstant, constants::{ READ_LOG_CH_SIZE, READ_LOG_FLUSH_POINT, WRITE_LOG_CH_SIZE, WRITE_LOG_FLUSH_POINT, }, @@ -22,7 +21,7 @@ use crate::{ }, deque::{DeqNode, Deque}, frequency_sketch::FrequencySketch, - time::{CheckedTimeOps, Clock, Instant}, + time::{AtomicInstant, Clock, Instant}, timer_wheel::{ReschedulingResult, TimerWheel}, CacheRegion, HousekeeperConfig, }, @@ -40,7 +39,6 @@ use async_lock::{Mutex, MutexGuard, RwLock}; use crossbeam_channel::{Receiver, Sender, TrySendError}; use crossbeam_utils::atomic::AtomicCell; use futures_util::future::BoxFuture; -use parking_lot::RwLock as SyncRwLock; use smallvec::SmallVec; use std::{ borrow::Borrow, @@ -115,8 +113,8 @@ impl BaseCache { } #[inline] - pub(crate) fn current_time_from_expiration_clock(&self) -> Instant { - self.inner.current_time_from_expiration_clock() + pub(crate) fn current_time(&self) -> Instant { + self.inner.current_time() } #[inline] @@ -171,6 +169,7 @@ where expiration_policy: ExpirationPolicy, housekeeper_config: HousekeeperConfig, invalidator_enabled: bool, + clock: Clock, ) -> Self { let (r_size, w_size) = if max_capacity == Some(0) { (0, 0) @@ -178,6 +177,7 @@ where (READ_LOG_CH_SIZE, WRITE_LOG_CH_SIZE) }; let is_eviction_listener_enabled = eviction_listener.is_some(); + let fast_now = clock.fast_now(); let (r_snd, r_rcv) = crossbeam_channel::bounded(r_size); let (w_snd, w_rcv) = crossbeam_channel::bounded(w_size); @@ -195,6 +195,7 @@ where w_rcv, expiration_policy, invalidator_enabled, + clock, )); Self { @@ -206,6 +207,7 @@ where housekeeper: Some(Arc::new(Housekeeper::new( is_eviction_listener_enabled, housekeeper_config, + fast_now, ))), } } @@ -229,7 +231,7 @@ where .get_key_value_and(key, hash, |k, entry| { let i = &self.inner; let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after()); - let now = self.current_time_from_expiration_clock(); + let now = self.current_time(); !is_expired_by_per_entry_ttl(entry.entry_info(), now) && !is_expired_entry_wo(ttl, va, entry, now) @@ -260,7 +262,7 @@ where self.retry_interrupted_ops().await; } - let mut now = self.current_time_from_expiration_clock(); + let mut now = self.current_time(); let maybe_kv_and_op = self .inner @@ -297,7 +299,7 @@ where // Convert `last_modified` from `moka::common::time::Instant` to // `std::time::Instant`. - let lm = self.inner.clocks().to_std_instant(lm); + let lm = self.inner.clock().to_std_instant(lm); // Call the user supplied `expire_after_read` method. // @@ -325,7 +327,7 @@ where self.inner.expiration_policy.time_to_live(), self.inner.expiration_policy.time_to_idle(), now, - self.inner.clocks(), + self.inner.clock(), ); } @@ -398,7 +400,7 @@ where } pub(crate) fn invalidate_all(&self) { - let now = self.current_time_from_expiration_clock(); + let now = self.current_time(); self.inner.set_valid_after(now); } @@ -406,7 +408,7 @@ where &self, predicate: PredicateFun, ) -> Result { - let now = self.current_time_from_expiration_clock(); + let now = self.current_time(); self.inner.register_invalidation_predicate(predicate, now) } } @@ -429,7 +431,7 @@ where self.inner.get_key_value_and_then(key, hash, |k, entry| { let i = &self.inner; let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after()); - let now = self.current_time_from_expiration_clock(); + let now = self.current_time(); if is_expired_by_per_entry_ttl(entry.entry_info(), now) || is_expired_entry_wo(ttl, va, entry, now) @@ -497,7 +499,7 @@ where None }; - let ts = self.current_time_from_expiration_clock(); + let ts = self.current_time(); // TODO: Instead using Arc to check if the actual operation was // insert or update, check the return value of insert_with_or_modify. If it @@ -560,7 +562,7 @@ where if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) = (&self.inner.expiration_policy.expiry(), &ins_op) { - Self::expire_after_create(expiry, key, value_entry, ts, self.inner.clocks()); + Self::expire_after_create(expiry, key, value_entry, ts, self.inner.clock()); } (ins_op, ts) } @@ -585,7 +587,7 @@ where self.inner.expiration_policy.time_to_live(), self.inner.expiration_policy.time_to_idle(), ts, - self.inner.clocks(), + self.inner.clock(), ); } @@ -788,11 +790,11 @@ impl BaseCache { key: &K, value_entry: &ValueEntry, ts: Instant, - clocks: &Clocks, + clock: &Clock, ) { let duration = - expiry.expire_after_create(key, &value_entry.value, clocks.to_std_instant(ts)); - let expiration_time = duration.map(|duration| ts.checked_add(duration).expect("Overflow")); + expiry.expire_after_create(key, &value_entry.value, clock.to_std_instant(ts)); + let expiration_time = duration.map(|duration| ts.saturating_add(duration)); value_entry .entry_info() .set_expiration_time(expiration_time); @@ -805,29 +807,28 @@ impl BaseCache { ttl: Option, tti: Option, ts: Instant, - clocks: &Clocks, + clock: &Clock, ) -> bool { - let current_time = clocks.to_std_instant(ts); + let current_time = clock.to_std_instant(ts); let ei = &value_entry.entry_info(); let exp_time = IntoIterator::into_iter([ ei.expiration_time(), - ttl.and_then(|dur| ei.last_modified().and_then(|ts| ts.checked_add(dur))), - tti.and_then(|dur| ei.last_accessed().and_then(|ts| ts.checked_add(dur))), + ttl.and_then(|dur| ei.last_modified().map(|ts| ts.saturating_add(dur))), + tti.and_then(|dur| ei.last_accessed().map(|ts| ts.saturating_add(dur))), ]) .flatten() .min(); let current_duration = exp_time.and_then(|time| { - let std_time = clocks.to_std_instant(time); + let std_time = clock.to_std_instant(time); std_time.checked_duration_since(current_time) }); let duration = expiry(key, &value_entry.value, current_time, current_duration); if duration != current_duration { - let expiration_time = - duration.map(|duration| ts.checked_add(duration).expect("Overflow")); + let expiration_time = duration.map(|duration| ts.saturating_add(duration)); value_entry .entry_info() .set_expiration_time(expiration_time); @@ -862,13 +863,13 @@ where } } - pub(crate) async fn set_expiration_clock(&self, clock: Option) { - self.inner.set_expiration_clock(clock).await; - if let Some(hk) = &self.housekeeper { - let now = self.current_time_from_expiration_clock(); - hk.reset_run_after(now); - } - } + // pub(crate) async fn set_expiration_clock(&self, clock: Option) { + // self.inner.set_expiration_clock(clock).await; + // if let Some(hk) = &self.housekeeper { + // let now = self.current_time_from_expiration_clock(); + // hk.reset_run_after(now); + // } + // } pub(crate) fn key_locks_map_is_empty(&self) -> bool { self.inner.key_locks_map_is_empty() @@ -993,62 +994,6 @@ enum AdmissionResult { type CacheStore = crate::cht::SegmentedHashMap, MiniArc>, S>; -struct Clocks { - // Lock for this Clocks instance. Used when the `expiration_clock` is set. - _lock: Mutex<()>, - has_expiration_clock: AtomicBool, - expiration_clock: SyncRwLock>, - /// The time (`moka::common::time`) when this timer wheel was created. - origin: Instant, - /// The time (`StdInstant`) when this timer wheel was created. - origin_std: StdInstant, - /// Mutable version of `origin` and `origin_std`. Used when the - /// `expiration_clock` is set. - mutable_origin: SyncRwLock>, -} - -impl Clocks { - fn new(time: Instant, std_time: StdInstant) -> Self { - Self { - _lock: Mutex::default(), - has_expiration_clock: AtomicBool::default(), - expiration_clock: Default::default(), - origin: time, - origin_std: std_time, - mutable_origin: Default::default(), - } - } - - fn to_std_instant(&self, time: Instant) -> StdInstant { - let (origin, origin_std) = if self.has_expiration_clock.load(Ordering::Relaxed) { - self.mutable_origin - .read() - .expect("mutable_origin is not set") - } else { - (self.origin, self.origin_std) - }; - - // `checked_duration_since` should always succeed here because the `origin` - // is set when this `Cache` is created, and the `time` is either the last - // modified or last accessed time of a cached entry. So `time` should always - // be greater than or equal to `origin`. - // - // However, this is not always true when `quanta::Instant` is used as the - // time source? https://github.com/moka-rs/moka/issues/472 - // - // (Or do we set zero Instant to last modified/accessed time somewhere?) - // - // As a workaround, let's use zero duration when `checked_duration_since` - // fails. - origin_std + (time.checked_duration_since(origin).unwrap_or_default()) - } - - #[cfg(test)] - fn set_origin(&self, time: Instant, std_time: StdInstant) { - *self.mutable_origin.write() = Some((time, std_time)); - } -} - pub(crate) struct Inner { name: Option, max_capacity: Option, @@ -1070,7 +1015,7 @@ pub(crate) struct Inner { removal_notifier: Option>>, key_locks: Option>, invalidator: Option>, - clocks: Clocks, + clock: Clock, } impl Drop for Inner { @@ -1139,23 +1084,12 @@ impl Inner { } #[inline] - pub(crate) fn current_time_from_expiration_clock(&self) -> Instant { - if self.clocks.has_expiration_clock.load(Ordering::Relaxed) { - Instant::new( - self.clocks - .expiration_clock - .read() - .as_ref() - .expect("Cannot get the expiration clock") - .now(), - ) - } else { - Instant::now() - } + pub(crate) fn current_time(&self) -> Instant { + self.clock.now() } - fn clocks(&self) -> &Clocks { - &self.clocks + fn clock(&self) -> &Clock { + &self.clock } fn num_cht_segments(&self) -> usize { @@ -1220,6 +1154,7 @@ where write_op_ch: Receiver>, expiration_policy: ExpirationPolicy, invalidator_enabled: bool, + clock: Clock, ) -> Self { // TODO: Calculate the number of segments based on the max capacity and // the number of CPUs. @@ -1237,11 +1172,7 @@ where build_hasher.clone(), ); - // Assume that getting `moka::common::Instant::now` has lower latency than - // `StdInstant::now`. - let now_std = StdInstant::now(); - let now = Instant::now(); - let clocks = Clocks::new(now, now_std); + let now = clock.now(); let timer_wheel = Mutex::new(TimerWheel::new(now)); let (removal_notifier, key_locks) = if let Some(listener) = eviction_listener { @@ -1278,7 +1209,7 @@ where removal_notifier, key_locks, invalidator, - clocks, + clock, } } @@ -1388,7 +1319,7 @@ where let mut timer_wheel = self.timer_wheel.lock().await; let started_at = if timeout.is_some() { - Some(self.current_time_from_expiration_clock()) + Some(self.current_time()) } else { None }; @@ -1506,10 +1437,7 @@ where // Break the loop if the eviction listener is set and timeout has been // reached. if let (Some(to), Some(started)) = (timeout, started_at) { - let elapsed = self - .current_time_from_expiration_clock() - .checked_duration_since(started) - .expect("Arithmetic overflow occurred on calculating the elapse time"); + let elapsed = self.current_time().saturating_duration_since(started); if elapsed >= to { break; } @@ -2075,7 +2003,7 @@ where { use crate::common::timer_wheel::TimerEvent; - let now = self.current_time_from_expiration_clock(); + let now = self.current_time(); // NOTE: When necessary, the iterator returned from advance() will unset the // timer node pointer in the `ValueEntry`, so we do not have to do it here. @@ -2156,7 +2084,7 @@ where { use CacheRegion::{MainProbation as Probation, MainProtected as Protected, Window}; - let now = self.current_time_from_expiration_clock(); + let now = self.current_time(); if self.is_write_order_queue_enabled() { self.remove_expired_wo(deqs, timer_wheel, batch_size, now, state) @@ -2423,7 +2351,7 @@ where ) where V: Clone, { - let now = self.current_time_from_expiration_clock(); + let now = self.current_time(); // If the write order queue is empty, we are done and can remove the predicates // that have been registered by now. @@ -2607,7 +2535,7 @@ where ) -> BoxFuture<'static, ()> { use futures_util::future::FutureExt; - let now = self.current_time_from_expiration_clock(); + let now = self.current_time(); let exp = &self.expiration_policy; let mut cause = RemovalCause::Replaced; @@ -2646,7 +2574,7 @@ where ) -> BoxFuture<'static, ()> { use futures_util::future::FutureExt; - let now = self.current_time_from_expiration_clock(); + let now = self.current_time(); let exp = &self.expiration_policy; let mut cause = RemovalCause::Explicit; @@ -2691,28 +2619,6 @@ where } } - async fn set_expiration_clock(&self, clock: Option) { - // Acquire the lock for the clocks to prevent other threads from - // updating the expiration clock while we are setting it. - let _clocks_lock = self.clocks._lock.lock(); - - if let Some(clock) = clock { - let std_now = StdInstant::now(); - let now = Instant::new(clock.now()); - *(self.clocks.expiration_clock.write()) = Some(clock); - self.clocks - .has_expiration_clock - .store(true, Ordering::SeqCst); - self.clocks.set_origin(now, std_now); - self.timer_wheel.lock().await.set_origin(now); - } else { - self.clocks - .has_expiration_clock - .store(false, Ordering::SeqCst); - *(self.clocks.expiration_clock.write()) = None; - } - } - fn key_locks_map_is_empty(&self) -> bool { self.key_locks .as_ref() @@ -2814,8 +2720,8 @@ fn is_expired_by_tti( now: Instant, ) -> bool { if let Some(tti) = time_to_idle { - let checked_add = entry_last_accessed.checked_add(*tti).expect("tti overflow"); - checked_add <= now + let expiration = entry_last_accessed.saturating_add(*tti); + expiration <= now } else { false } @@ -2828,8 +2734,8 @@ fn is_expired_by_ttl( now: Instant, ) -> bool { if let Some(ttl) = time_to_live { - let checked_add = entry_last_modified.checked_add(*ttl).expect("ttl overflow"); - checked_add <= now + let expiration = entry_last_modified.saturating_add(*ttl); + expiration <= now } else { false } @@ -2838,7 +2744,7 @@ fn is_expired_by_ttl( #[cfg(test)] mod tests { use crate::{ - common::HousekeeperConfig, + common::{time::Clock, HousekeeperConfig}, policy::{EvictionPolicy, ExpirationPolicy}, }; @@ -2864,6 +2770,7 @@ mod tests { ExpirationPolicy::default(), HousekeeperConfig::default(), false, + Clock::default(), ); cache.inner.enable_frequency_sketch_for_testing().await; assert_eq!( @@ -2920,10 +2827,7 @@ mod tests { type Value = char; fn current_time(cache: &BaseCache) -> StdInstant { - cache - .inner - .clocks() - .to_std_instant(cache.current_time_from_expiration_clock()) + cache.inner.clock().to_std_instant(cache.current_time()) } async fn insert(cache: &BaseCache, key: Key, hash: u64, value: Value) { @@ -3201,6 +3105,7 @@ mod tests { Some(Arc::new(MyExpiry { expectation: Arc::clone(&expectation), })); + let (clock, mock) = Clock::mock(); let mut cache = BaseCache::::new( None, @@ -3217,12 +3122,10 @@ mod tests { ), HousekeeperConfig::default(), false, + clock, ); cache.reconfigure_for_testing().await; - let (clock, mock) = Clock::mock(); - cache.set_expiration_clock(Some(clock)).await; - // Make the cache exterior immutable. let cache = cache; diff --git a/src/future/builder.rs b/src/future/builder.rs index 6a99c428..9ed212ba 100644 --- a/src/future/builder.rs +++ b/src/future/builder.rs @@ -1,6 +1,6 @@ use super::{Cache, FutureExt}; use crate::{ - common::{builder_utils, concurrent::Weigher, HousekeeperConfig}, + common::{builder_utils, concurrent::Weigher, time::Clock, HousekeeperConfig}, notification::{AsyncEvictionListener, ListenerFuture, RemovalCause}, policy::{EvictionPolicy, ExpirationPolicy}, Expiry, @@ -65,6 +65,7 @@ pub struct CacheBuilder { expiration_policy: ExpirationPolicy, housekeeper_config: HousekeeperConfig, invalidator_enabled: bool, + clock: Clock, cache_type: PhantomData, } @@ -84,6 +85,7 @@ where expiration_policy: ExpirationPolicy::default(), housekeeper_config: HousekeeperConfig::default(), invalidator_enabled: false, + clock: Clock::default(), cache_type: PhantomData, } } @@ -125,6 +127,7 @@ where self.expiration_policy, self.housekeeper_config, self.invalidator_enabled, + self.clock, ) } @@ -223,6 +226,7 @@ where self.expiration_policy, self.housekeeper_config, self.invalidator_enabled, + self.clock, ) } } @@ -406,6 +410,11 @@ impl CacheBuilder { } } + #[cfg(test)] + pub(crate) fn clock(self, clock: Clock) -> Self { + Self { clock, ..self } + } + /// Enables support for [`Cache::invalidate_entries_if`][cache-invalidate-if] /// method. /// diff --git a/src/future/cache.rs b/src/future/cache.rs index 2c191d26..77474a99 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -5,7 +5,7 @@ use super::{ WriteOp, }; use crate::{ - common::{concurrent::Weigher, HousekeeperConfig}, + common::{concurrent::Weigher, time::Clock, HousekeeperConfig}, notification::AsyncEvictionListener, ops::compute::{self, CompResult}, policy::{EvictionPolicy, ExpirationPolicy}, @@ -793,6 +793,7 @@ where ExpirationPolicy::default(), HousekeeperConfig::default(), false, + Clock::default(), ) } @@ -824,6 +825,7 @@ where expiration_policy: ExpirationPolicy, housekeeper_config: HousekeeperConfig, invalidator_enabled: bool, + clock: Clock, ) -> Self { Self { base: BaseCache::new( @@ -837,6 +839,7 @@ where expiration_policy, housekeeper_config, invalidator_enabled, + clock, ), value_initializer: Arc::new(ValueInitializer::with_hasher(build_hasher)), @@ -1970,7 +1973,7 @@ where match self.base.remove_entry(key, hash) { None => None, Some(kv) => { - let now = self.base.current_time_from_expiration_clock(); + let now = self.base.current_time(); let maybe_v = if need_value { Some(kv.entry.value.clone()) @@ -2076,9 +2079,9 @@ where self.base.reconfigure_for_testing().await; } - async fn set_expiration_clock(&self, clock: Option) { - self.base.set_expiration_clock(clock).await; - } + // async fn set_expiration_clock(&self, clock: Option) { + // self.base.set_expiration_clock(clock).await; + // } fn key_locks_map_is_empty(&self) -> bool { self.base.key_locks_map_is_empty() @@ -2688,17 +2691,17 @@ mod tests { .boxed() }; + let (clock, mock) = Clock::mock(); + // Create a cache with the eviction listener. let mut cache = Cache::builder() .max_capacity(100) .support_invalidation_closures() .async_eviction_listener(listener) + .clock(clock) .build(); cache.reconfigure_for_testing().await; - let (clock, mock) = Clock::mock(); - cache.set_expiration_clock(Some(clock)).await; - // Make the cache exterior immutable. let cache = cache; @@ -2792,17 +2795,17 @@ mod tests { .boxed() }; + let (clock, mock) = Clock::mock(); + // Create a cache with the eviction listener. let mut cache = Cache::builder() .max_capacity(100) .time_to_live(Duration::from_secs(10)) .async_eviction_listener(listener) + .clock(clock) .build(); cache.reconfigure_for_testing().await; - let (clock, mock) = Clock::mock(); - cache.set_expiration_clock(Some(clock)).await; - // Make the cache exterior immutable. let cache = cache; @@ -2880,17 +2883,17 @@ mod tests { .boxed() }; + let (clock, mock) = Clock::mock(); + // Create a cache with the eviction listener. let mut cache = Cache::builder() .max_capacity(100) .time_to_idle(Duration::from_secs(10)) .async_eviction_listener(listener) + .clock(clock) .build(); cache.reconfigure_for_testing().await; - let (clock, mock) = Clock::mock(); - cache.set_expiration_clock(Some(clock)).await; - // Make the cache exterior immutable. let cache = cache; @@ -2952,15 +2955,13 @@ mod tests { // https://github.com/moka-rs/moka/issues/359 #[tokio::test] async fn ensure_access_time_is_updated_immediately_after_read() { + let (clock, mock) = Clock::mock(); let mut cache = Cache::builder() .max_capacity(10) .time_to_idle(Duration::from_secs(5)) + .clock(clock) .build(); cache.reconfigure_for_testing().await; - - let (clock, mock) = Clock::mock(); - cache.set_expiration_clock(Some(clock)).await; - // Make the cache exterior immutable. let cache = cache; @@ -3029,17 +3030,17 @@ mod tests { let expiry_counters = Arc::new(ExpiryCallCounters::default()); let expiry = MyExpiry::new(Arc::clone(&expiry_counters)); + let (clock, mock) = Clock::mock(); + // Create a cache with the expiry and eviction listener. let mut cache = Cache::builder() .max_capacity(100) .expire_after(expiry) .async_eviction_listener(listener) + .clock(clock) .build(); cache.reconfigure_for_testing().await; - let (clock, mock) = Clock::mock(); - cache.set_expiration_clock(Some(clock)).await; - // Make the cache exterior immutable. let cache = cache; @@ -3150,17 +3151,17 @@ mod tests { let expiry_counters = Arc::new(ExpiryCallCounters::default()); let expiry = MyExpiry::new(Arc::clone(&expiry_counters)); + let (clock, mock) = Clock::mock(); + // Create a cache with the expiry and eviction listener. let mut cache = Cache::builder() .max_capacity(100) .expire_after(expiry) .async_eviction_listener(listener) + .clock(clock) .build(); cache.reconfigure_for_testing().await; - let (clock, mock) = Clock::mock(); - cache.set_expiration_clock(Some(clock)).await; - // Make the cache exterior immutable. let cache = cache; @@ -3307,12 +3308,12 @@ mod tests { // https://github.com/moka-rs/moka/issues/345 #[tokio::test] async fn test_race_between_updating_entry_and_processing_its_write_ops() { + let (clock, mock) = Clock::mock(); let cache = Cache::builder() .max_capacity(2) .time_to_idle(Duration::from_secs(1)) + .clock(clock) .build(); - let (clock, mock) = Clock::mock(); - cache.set_expiration_clock(Some(clock)).await; cache.insert("a", "alice").await; cache.insert("b", "bob").await; @@ -5022,17 +5023,17 @@ mod tests { .boxed() }; + let (clock, mock) = Clock::mock(); + // Create a cache with the eviction listener and also TTL and TTI. let mut cache = Cache::builder() .async_eviction_listener(listener) .time_to_live(Duration::from_secs(7)) .time_to_idle(Duration::from_secs(5)) + .clock(clock) .build(); cache.reconfigure_for_testing().await; - let (clock, mock) = Clock::mock(); - cache.set_expiration_clock(Some(clock)).await; - // Make the cache exterior immutable. let cache = cache; @@ -5201,9 +5202,9 @@ mod tests { .eviction_policy(EvictionPolicy::lru()) .eviction_listener(listener) .housekeeper_config(hk_conf) + .clock(clock) .build(); cache.reconfigure_for_testing().await; - cache.set_expiration_clock(Some(clock)).await; // Make the cache exterior immutable. let cache = cache; @@ -5345,16 +5346,16 @@ mod tests { } }; + let (clock, mock) = Clock::mock(); + let mut cache: Cache = Cache::builder() .time_to_live(Duration::from_millis(10)) .async_eviction_listener(listener) + .clock(clock) .build(); cache.reconfigure_for_testing().await; - let (clock, mock) = Clock::mock(); - cache.set_expiration_clock(Some(clock)).await; - // Make the cache exterior immutable. let cache = cache; diff --git a/src/future/housekeeper.rs b/src/future/housekeeper.rs index 7149019a..1ef86cd1 100644 --- a/src/future/housekeeper.rs +++ b/src/future/housekeeper.rs @@ -1,9 +1,8 @@ use crate::common::{ - concurrent::{ - atomic_time::AtomicInstant, - constants::{LOG_SYNC_INTERVAL_MILLIS, READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT}, + concurrent::constants::{ + LOG_SYNC_INTERVAL_MILLIS, READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT, }, - time::{CheckedTimeOps, Instant}, + time::{AtomicInstant, Instant}, HousekeeperConfig, }; @@ -55,7 +54,11 @@ pub(crate) struct Housekeeper { } impl Housekeeper { - pub(crate) fn new(is_eviction_listener_enabled: bool, config: HousekeeperConfig) -> Self { + pub(crate) fn new( + is_eviction_listener_enabled: bool, + config: HousekeeperConfig, + now: Instant, + ) -> Self { let (more_entries_to_evict, maintenance_task_timeout) = if is_eviction_listener_enabled { ( Some(AtomicBool::new(false)), @@ -67,7 +70,7 @@ impl Housekeeper { Self { current_task: Mutex::default(), - run_after: AtomicInstant::new(Self::sync_after(Instant::now())), + run_after: AtomicInstant::new(Self::sync_after(now)), more_entries_to_evict, maintenance_task_timeout, max_log_sync_repeats: config.max_log_sync_repeats, @@ -159,7 +162,7 @@ impl Housekeeper { { use futures_util::FutureExt; - let now = cache.current_time_from_expiration_clock(); + let now = cache.current_time(); let more_to_evict; // Async Cancellation Safety: Our maintenance task is cancellable as we save // it in the lock. If it is canceled, we will resume it in the next run. @@ -200,10 +203,7 @@ impl Housekeeper { fn sync_after(now: Instant) -> Instant { let dur = Duration::from_millis(LOG_SYNC_INTERVAL_MILLIS); - let ts = now.checked_add(dur); - // Assuming that `now` is current wall clock time, this should never fail at - // least next millions of years. - ts.expect("Timestamp overflow") + now.saturating_add(dur) } } @@ -212,8 +212,4 @@ impl Housekeeper { pub(crate) fn disable_auto_run(&self) { self.auto_run_enabled.store(false, Ordering::Relaxed); } - - pub(crate) fn reset_run_after(&self, now: Instant) { - self.run_after.set_instant(Self::sync_after(now)); - } } diff --git a/src/sync/builder.rs b/src/sync/builder.rs index 4fec063e..6f12b631 100644 --- a/src/sync/builder.rs +++ b/src/sync/builder.rs @@ -1,6 +1,6 @@ use super::{Cache, SegmentedCache}; use crate::{ - common::{builder_utils, concurrent::Weigher, HousekeeperConfig}, + common::{builder_utils, concurrent::Weigher, time::Clock, HousekeeperConfig}, notification::{EvictionListener, RemovalCause}, policy::{EvictionPolicy, ExpirationPolicy}, Expiry, @@ -58,6 +58,7 @@ pub struct CacheBuilder { expiration_policy: ExpirationPolicy, housekeeper_config: HousekeeperConfig, invalidator_enabled: bool, + clock: Clock, cache_type: PhantomData, } @@ -78,6 +79,7 @@ where expiration_policy: ExpirationPolicy::default(), housekeeper_config: HousekeeperConfig::default(), invalidator_enabled: false, + clock: Clock::default(), cache_type: PhantomData, } } @@ -119,6 +121,7 @@ where expiration_policy: self.expiration_policy, housekeeper_config: self.housekeeper_config, invalidator_enabled: self.invalidator_enabled, + clock: self.clock, cache_type: PhantomData, } } @@ -148,6 +151,7 @@ where self.expiration_policy, self.housekeeper_config, self.invalidator_enabled, + self.clock, ) } @@ -236,6 +240,7 @@ where self.expiration_policy, self.housekeeper_config, self.invalidator_enabled, + self.clock, ) } } @@ -271,6 +276,7 @@ where self.expiration_policy, self.housekeeper_config, self.invalidator_enabled, + self.clock, ) } @@ -361,6 +367,7 @@ where self.expiration_policy, self.housekeeper_config, self.invalidator_enabled, + self.clock, ) } } @@ -491,6 +498,11 @@ impl CacheBuilder { } } + #[cfg(test)] + pub(crate) fn clock(self, clock: Clock) -> Self { + Self { clock, ..self } + } + /// Enables support for [`Cache::invalidate_entries_if`][cache-invalidate-if] /// method. /// diff --git a/src/sync/cache.rs b/src/sync/cache.rs index 6c846f06..1f6f8be2 100644 --- a/src/sync/cache.rs +++ b/src/sync/cache.rs @@ -7,7 +7,7 @@ use crate::{ concurrent::{ constants::WRITE_RETRY_INTERVAL_MICROS, housekeeper::InnerSync, Weigher, WriteOp, }, - time::Instant, + time::{Clock, Instant}, HousekeeperConfig, }, notification::EvictionListener, @@ -711,6 +711,7 @@ where ExpirationPolicy::default(), HousekeeperConfig::default(), false, + Clock::default(), ) } @@ -742,6 +743,7 @@ where expiration_policy: ExpirationPolicy, housekeeper_config: HousekeeperConfig, invalidator_enabled: bool, + clock: Clock, ) -> Self { Self { base: BaseCache::new( @@ -755,6 +757,7 @@ where expiration_policy, housekeeper_config, invalidator_enabled, + clock, ), value_initializer: Arc::new(ValueInitializer::with_hasher(build_hasher)), } @@ -1616,7 +1619,7 @@ where match self.base.remove_entry(key, hash) { None => None, Some(kv) => { - let now = self.base.current_time_from_expiration_clock(); + let now = self.base.current_time(); let info = kv.entry.entry_info(); let entry_gen = info.incr_entry_gen(); @@ -1886,10 +1889,6 @@ where self.base.reconfigure_for_testing(); } - pub(crate) fn set_expiration_clock(&self, clock: Option) { - self.base.set_expiration_clock(clock); - } - pub(crate) fn key_locks_map_is_empty(&self) -> bool { self.base.key_locks_map_is_empty() } @@ -2332,17 +2331,17 @@ mod tests { let a1 = Arc::clone(&actual); let listener = move |k, v, cause| a1.lock().push((k, v, cause)); + let (clock, mock) = Clock::mock(); + // Create a cache with the eviction listener. let mut cache = Cache::builder() .max_capacity(100) .support_invalidation_closures() .eviction_listener(listener) + .clock(clock) .build(); cache.reconfigure_for_testing(); - let (clock, mock) = Clock::mock(); - cache.set_expiration_clock(Some(clock)); - // Make the cache exterior immutable. let cache = cache; @@ -2430,17 +2429,17 @@ mod tests { let a1 = Arc::clone(&actual); let listener = move |k, v, cause| a1.lock().push((k, v, cause)); + let (clock, mock) = Clock::mock(); + // Create a cache with the eviction listener. let mut cache = Cache::builder() .max_capacity(100) .time_to_live(Duration::from_secs(10)) .eviction_listener(listener) + .clock(clock) .build(); cache.reconfigure_for_testing(); - let (clock, mock) = Clock::mock(); - cache.set_expiration_clock(Some(clock)); - // Make the cache exterior immutable. let cache = cache; @@ -2512,17 +2511,17 @@ mod tests { let a1 = Arc::clone(&actual); let listener = move |k, v, cause| a1.lock().push((k, v, cause)); + let (clock, mock) = Clock::mock(); + // Create a cache with the eviction listener. let mut cache = Cache::builder() .max_capacity(100) .time_to_idle(Duration::from_secs(10)) .eviction_listener(listener) + .clock(clock) .build(); cache.reconfigure_for_testing(); - let (clock, mock) = Clock::mock(); - cache.set_expiration_clock(Some(clock)); - // Make the cache exterior immutable. let cache = cache; @@ -2584,15 +2583,14 @@ mod tests { // https://github.com/moka-rs/moka/issues/359 #[test] fn ensure_access_time_is_updated_immediately_after_read() { + let (clock, mock) = Clock::mock(); let mut cache = Cache::builder() .max_capacity(10) .time_to_idle(Duration::from_secs(5)) + .clock(clock) .build(); cache.reconfigure_for_testing(); - let (clock, mock) = Clock::mock(); - cache.set_expiration_clock(Some(clock)); - // Make the cache exterior immutable. let cache = cache; @@ -2655,17 +2653,17 @@ mod tests { let a1 = Arc::clone(&actual); let listener = move |k, v, cause| a1.lock().push((k, v, cause)); + let (clock, mock) = Clock::mock(); + // Create a cache with the eviction listener. let mut cache = Cache::builder() .max_capacity(100) .expire_after(expiry) .eviction_listener(listener) + .clock(clock) .build(); cache.reconfigure_for_testing(); - let (clock, mock) = Clock::mock(); - cache.set_expiration_clock(Some(clock)); - // Make the cache exterior immutable. let cache = cache; @@ -2770,17 +2768,17 @@ mod tests { let a1 = Arc::clone(&actual); let listener = move |k, v, cause| a1.lock().push((k, v, cause)); + let (clock, mock) = Clock::mock(); + // Create a cache with the eviction listener. let mut cache = Cache::builder() .max_capacity(100) .expire_after(expiry) .eviction_listener(listener) + .clock(clock) .build(); cache.reconfigure_for_testing(); - let (clock, mock) = Clock::mock(); - cache.set_expiration_clock(Some(clock)); - // Make the cache exterior immutable. let cache = cache; @@ -2927,12 +2925,12 @@ mod tests { // https://github.com/moka-rs/moka/issues/345 #[test] fn test_race_between_updating_entry_and_processing_its_write_ops() { + let (clock, mock) = Clock::mock(); let cache = Cache::builder() .max_capacity(2) .time_to_idle(Duration::from_secs(1)) + .clock(clock) .build(); - let (clock, mock) = Clock::mock(); - cache.set_expiration_clock(Some(clock)); cache.insert("a", "alice"); cache.insert("b", "bob"); @@ -4550,17 +4548,17 @@ mod tests { let a1 = Arc::clone(&actual); let listener = move |k, v, cause| a1.lock().push((k, v, cause)); + let (clock, mock) = Clock::mock(); + // Create a cache with the eviction listener and also TTL and TTI. let mut cache = Cache::builder() .eviction_listener(listener) .time_to_live(Duration::from_secs(7)) .time_to_idle(Duration::from_secs(5)) + .clock(clock) .build(); cache.reconfigure_for_testing(); - let (clock, mock) = Clock::mock(); - cache.set_expiration_clock(Some(clock)); - // Make the cache exterior immutable. let cache = cache; @@ -4883,9 +4881,9 @@ mod tests { .eviction_policy(EvictionPolicy::lru()) .eviction_listener(listener) .housekeeper_config(hk_conf) + .clock(clock) .build(); cache.reconfigure_for_testing(); - cache.set_expiration_clock(Some(clock)); // Make the cache exterior immutable. let cache = cache; diff --git a/src/sync/segment.rs b/src/sync/segment.rs index c1d1cf17..13fa5e7f 100644 --- a/src/sync/segment.rs +++ b/src/sync/segment.rs @@ -1,5 +1,6 @@ use super::{cache::Cache, CacheBuilder, OwnedKeyEntrySelector, RefKeyEntrySelector}; use crate::common::concurrent::Weigher; +use crate::common::time::Clock; use crate::{ common::HousekeeperConfig, notification::EvictionListener, @@ -108,6 +109,7 @@ where ExpirationPolicy::default(), HousekeeperConfig::default(), false, + Clock::default(), ) } @@ -215,6 +217,7 @@ where expiration_policy: ExpirationPolicy, housekeeper_config: HousekeeperConfig, invalidator_enabled: bool, + clock: Clock, ) -> Self { Self { inner: Arc::new(Inner::new( @@ -229,6 +232,7 @@ where expiration_policy, housekeeper_config, invalidator_enabled, + clock, )), } } @@ -688,18 +692,6 @@ where } } - fn create_mock_expiration_clock(&self) -> MockExpirationClock { - let mut exp_clock = MockExpirationClock::default(); - - for segment in self.inner.segments.iter() { - let (clock, mock) = crate::common::time::Clock::mock(); - segment.set_expiration_clock(Some(clock)); - exp_clock.mocks.push(mock); - } - - exp_clock - } - fn key_locks_map_is_empty(&self) -> bool { self.inner .segments @@ -708,22 +700,6 @@ where } } -// For unit tests. -#[cfg(test)] -#[derive(Default)] -struct MockExpirationClock { - mocks: Vec>, -} - -#[cfg(test)] -impl MockExpirationClock { - fn increment(&mut self, duration: std::time::Duration) { - for mock in &mut self.mocks { - mock.increment(duration); - } - } -} - struct Inner { desired_capacity: Option, segments: Box<[Cache]>, @@ -753,6 +729,7 @@ where expiration_policy: ExpirationPolicy, housekeeper_config: HousekeeperConfig, invalidator_enabled: bool, + clock: Clock, ) -> Self { assert!(num_segments > 0); @@ -777,6 +754,7 @@ where expiration_policy.clone(), housekeeper_config.clone(), invalidator_enabled, + clock.clone(), ) }) .collect::>(); @@ -1181,16 +1159,17 @@ mod tests { a1.lock().insert(k, (v, cause)); }; + let (clock, mock) = crate::common::time::Clock::mock(); + // Create a cache with the eviction listener. let mut cache = SegmentedCache::builder(SEGMENTS) .max_capacity(100) .support_invalidation_closures() .eviction_listener(listener) + .clock(clock) .build(); cache.reconfigure_for_testing(); - let mut mock = cache.create_mock_expiration_clock(); - // Make the cache exterior immutable. let cache = cache; diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index 334c1262..3ba2abf6 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -10,7 +10,6 @@ use crate::{ self, concurrent::{ arc::MiniArc, - atomic_time::AtomicInstant, constants::{ READ_LOG_CH_SIZE, READ_LOG_FLUSH_POINT, WRITE_LOG_CH_SIZE, WRITE_LOG_FLUSH_POINT, }, @@ -22,7 +21,7 @@ use crate::{ }, deque::{DeqNode, Deque}, frequency_sketch::FrequencySketch, - time::{CheckedTimeOps, Clock, Instant}, + time::{AtomicInstant, Clock, Instant}, timer_wheel::{ReschedulingResult, TimerWheel}, CacheRegion, HousekeeperConfig, }, @@ -105,8 +104,8 @@ impl BaseCache { } #[inline] - pub(crate) fn current_time_from_expiration_clock(&self) -> Instant { - self.inner.current_time_from_expiration_clock() + pub(crate) fn current_time(&self) -> Instant { + self.inner.current_time() } pub(crate) fn notify_invalidate(&self, key: &Arc, entry: &MiniArc>) @@ -147,6 +146,7 @@ where expiration_policy: ExpirationPolicy, housekeeper_config: HousekeeperConfig, invalidator_enabled: bool, + clock: Clock, ) -> Self { let (r_size, w_size) = if max_capacity == Some(0) { (0, 0) @@ -154,6 +154,7 @@ where (READ_LOG_CH_SIZE, WRITE_LOG_CH_SIZE) }; let is_eviction_listener_enabled = eviction_listener.is_some(); + let fast_now = clock.fast_now(); let (r_snd, r_rcv) = crossbeam_channel::bounded(r_size); let (w_snd, w_rcv) = crossbeam_channel::bounded(w_size); @@ -170,6 +171,7 @@ where w_rcv, expiration_policy, invalidator_enabled, + clock, )); Self { @@ -179,6 +181,7 @@ where housekeeper: Some(Arc::new(Housekeeper::new( is_eviction_listener_enabled, housekeeper_config, + fast_now, ))), } } @@ -202,7 +205,7 @@ where .get_key_value_and(key, hash, |k, entry| { let i = &self.inner; let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after()); - let now = self.current_time_from_expiration_clock(); + let now = self.current_time(); !is_expired_by_per_entry_ttl(entry.entry_info(), now) && !is_expired_entry_wo(ttl, va, entry, now) @@ -281,7 +284,7 @@ where return None; } - let mut now = self.current_time_from_expiration_clock(); + let mut now = self.current_time(); let maybe_entry = self .inner @@ -324,7 +327,7 @@ where // Convert `last_modified` from `moka::common::time::Instant` to // `std::time::Instant`. - let lm = self.inner.clocks().to_std_instant(lm); + let lm = self.inner.clock().to_std_instant(lm); // Call the user supplied `expire_after_read` method. // @@ -352,7 +355,7 @@ where self.inner.expiration_policy.time_to_live(), self.inner.expiration_policy.time_to_idle(), now, - self.inner.clocks(), + self.inner.clock(), ); } @@ -406,7 +409,7 @@ where } pub(crate) fn invalidate_all(&self) { - let now = self.current_time_from_expiration_clock(); + let now = self.current_time(); self.inner.set_valid_after(now); } @@ -414,7 +417,7 @@ where &self, predicate: PredicateFun, ) -> Result { - let now = self.current_time_from_expiration_clock(); + let now = self.current_time(); self.inner.register_invalidation_predicate(predicate, now) } } @@ -437,7 +440,7 @@ where self.inner.get_key_value_and_then(key, hash, |k, entry| { let i = &self.inner; let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after()); - let now = self.current_time_from_expiration_clock(); + let now = self.current_time(); if is_expired_by_per_entry_ttl(entry.entry_info(), now) || is_expired_entry_wo(ttl, va, entry, now) @@ -499,7 +502,7 @@ where let kl = self.maybe_key_lock(&key); let _klg = &kl.as_ref().map(|kl| kl.lock()); - let ts = self.current_time_from_expiration_clock(); + let ts = self.current_time(); // TODO: Instead using Arc to check if the actual operation was // insert or update, check the return value of insert_with_or_modify. If it @@ -561,7 +564,7 @@ where if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) = (&self.inner.expiration_policy.expiry(), &ins_op) { - Self::expire_after_create(expiry, key, value_entry, ts, self.inner.clocks()); + Self::expire_after_create(expiry, key, value_entry, ts, self.inner.clock()); } (ins_op, ts) } @@ -583,7 +586,7 @@ where self.inner.expiration_policy.time_to_live(), self.inner.expiration_policy.time_to_idle(), ts, - self.inner.clocks(), + self.inner.clock(), ); } @@ -660,11 +663,11 @@ impl BaseCache { key: &K, value_entry: &ValueEntry, ts: Instant, - clocks: &Clocks, + clock: &Clock, ) { let duration = - expiry.expire_after_create(key, &value_entry.value, clocks.to_std_instant(ts)); - let expiration_time = duration.map(|duration| ts.checked_add(duration).expect("Overflow")); + expiry.expire_after_create(key, &value_entry.value, clock.to_std_instant(ts)); + let expiration_time = duration.map(|duration| ts.saturating_add(duration)); value_entry .entry_info() .set_expiration_time(expiration_time); @@ -677,29 +680,28 @@ impl BaseCache { ttl: Option, tti: Option, ts: Instant, - clocks: &Clocks, + clock: &Clock, ) -> bool { - let current_time = clocks.to_std_instant(ts); + let current_time = clock.to_std_instant(ts); let ei = &value_entry.entry_info(); let exp_time = IntoIterator::into_iter([ ei.expiration_time(), - ttl.and_then(|dur| ei.last_modified().and_then(|ts| ts.checked_add(dur))), - tti.and_then(|dur| ei.last_accessed().and_then(|ts| ts.checked_add(dur))), + ttl.and_then(|dur| ei.last_modified().map(|ts| ts.saturating_add(dur))), + tti.and_then(|dur| ei.last_accessed().map(|ts| ts.saturating_add(dur))), ]) .flatten() .min(); let current_duration = exp_time.and_then(|time| { - let std_time = clocks.to_std_instant(time); + let std_time = clock.to_std_instant(time); std_time.checked_duration_since(current_time) }); let duration = expiry(key, &value_entry.value, current_time, current_duration); if duration != current_duration { - let expiration_time = - duration.map(|duration| ts.checked_add(duration).expect("Overflow")); + let expiration_time = duration.map(|duration| ts.saturating_add(duration)); value_entry .entry_info() .set_expiration_time(expiration_time); @@ -734,14 +736,6 @@ where } } - pub(crate) fn set_expiration_clock(&self, clock: Option) { - self.inner.set_expiration_clock(clock); - if let Some(hk) = &self.housekeeper { - let now = self.current_time_from_expiration_clock(); - hk.reset_run_after(now); - } - } - pub(crate) fn key_locks_map_is_empty(&self) -> bool { self.inner.key_locks_map_is_empty() } @@ -865,59 +859,6 @@ enum AdmissionResult { type CacheStore = crate::cht::SegmentedHashMap, MiniArc>, S>; -struct Clocks { - has_expiration_clock: AtomicBool, - expiration_clock: RwLock>, - /// The time (`moka::common::time`) when this timer wheel was created. - origin: Instant, - /// The time (`StdInstant`) when this timer wheel was created. - origin_std: StdInstant, - /// Mutable version of `origin` and `origin_std`. Used when the - /// `expiration_clock` is set. - mutable_origin: RwLock>, -} - -impl Clocks { - fn new(time: Instant, std_time: StdInstant) -> Self { - Self { - has_expiration_clock: AtomicBool::default(), - expiration_clock: Default::default(), - origin: time, - origin_std: std_time, - mutable_origin: Default::default(), - } - } - - fn to_std_instant(&self, time: Instant) -> StdInstant { - let (origin, origin_std) = if self.has_expiration_clock.load(Ordering::Relaxed) { - self.mutable_origin - .read() - .expect("mutable_origin is not set") - } else { - (self.origin, self.origin_std) - }; - - // `checked_duration_since` should always succeed here because the `origin` - // is set when this `Cache` is created, and the `time` is either the last - // modified or last accessed time of a cached entry. So `time` should always - // be greater than or equal to `origin`. - // - // However, this is not always true when `quanta::Instant` is used as the - // time source? https://github.com/moka-rs/moka/issues/472 - // - // (Or do we set zero Instant to last modified/accessed time somewhere?) - // - // As a workaround, let's use zero duration when `checked_duration_since` - // fails. - origin_std + (time.checked_duration_since(origin).unwrap_or_default()) - } - - #[cfg(test)] - fn set_origin(&self, time: Instant, std_time: StdInstant) { - *self.mutable_origin.write() = Some((time, std_time)); - } -} - pub(crate) struct Inner { name: Option, max_capacity: Option, @@ -938,7 +879,7 @@ pub(crate) struct Inner { removal_notifier: Option>, key_locks: Option>, invalidator: Option>, - clocks: Clocks, + clock: Clock, } impl Drop for Inner { @@ -994,23 +935,12 @@ impl Inner { } #[inline] - fn current_time_from_expiration_clock(&self) -> Instant { - if self.clocks.has_expiration_clock.load(Ordering::Relaxed) { - Instant::new( - self.clocks - .expiration_clock - .read() - .as_ref() - .expect("Cannot get the expiration clock") - .now(), - ) - } else { - Instant::now() - } + fn current_time(&self) -> Instant { + self.clock.now() } - fn clocks(&self) -> &Clocks { - &self.clocks + fn clock(&self) -> &Clock { + &self.clock } fn num_cht_segments(&self) -> usize { @@ -1075,6 +1005,7 @@ where write_op_ch: Receiver>, expiration_policy: ExpirationPolicy, invalidator_enabled: bool, + clock: Clock, ) -> Self { // TODO: Calculate the number of segments based on the max capacity and the // number of CPUs. @@ -1092,11 +1023,7 @@ where build_hasher.clone(), ); - // Assume that getting `moka::common::Instant::now` has lower latency than - // `StdInstant::now`. - let now_std = StdInstant::now(); - let now = Instant::now(); - let clocks = Clocks::new(now, now_std); + let now = clock.now(); let timer_wheel = Mutex::new(TimerWheel::new(now)); let (removal_notifier, key_locks) = if let Some(listener) = eviction_listener { @@ -1133,7 +1060,7 @@ where removal_notifier, key_locks, invalidator, - clocks, + clock, } } @@ -1237,7 +1164,7 @@ where } fn now(&self) -> Instant { - self.current_time_from_expiration_clock() + self.current_time() } } @@ -1262,7 +1189,7 @@ where let mut timer_wheel = self.timer_wheel.lock(); let started_at = if timeout.is_some() { - Some(self.current_time_from_expiration_clock()) + Some(self.current_time()) } else { None }; @@ -1365,10 +1292,7 @@ where // Break the loop if the eviction listener is set and timeout has been // reached. if let (Some(to), Some(started)) = (timeout, started_at) { - let elapsed = self - .current_time_from_expiration_clock() - .checked_duration_since(started) - .expect("Arithmetic overflow occurred on calculating the elapse time"); + let elapsed = self.current_time().saturating_duration_since(started); if elapsed >= to { break; } @@ -1909,7 +1833,7 @@ where { use crate::common::timer_wheel::TimerEvent; - let now = self.current_time_from_expiration_clock(); + let now = self.current_time(); // NOTES: // @@ -1981,7 +1905,7 @@ where { use CacheRegion::{MainProbation as Probation, MainProtected as Protected, Window}; - let now = self.current_time_from_expiration_clock(); + let now = self.current_time(); if self.is_write_order_queue_enabled() { self.remove_expired_wo(deqs, timer_wheel, batch_size, now, state); @@ -2224,7 +2148,7 @@ where ) where V: Clone, { - let now = self.current_time_from_expiration_clock(); + let now = self.current_time(); // If the write order queue is empty, we are done and can remove the predicates // that have been registered by now. @@ -2397,7 +2321,7 @@ where last_accessed: Option, last_modified: Option, ) { - let now = self.current_time_from_expiration_clock(); + let now = self.current_time(); let exp = &self.expiration_policy; let mut cause = RemovalCause::Replaced; @@ -2421,7 +2345,7 @@ where #[inline] fn notify_invalidate(&self, key: &Arc, entry: &MiniArc>) { - let now = self.current_time_from_expiration_clock(); + let now = self.current_time(); let exp = &self.expiration_policy; let mut cause = RemovalCause::Explicit; @@ -2459,25 +2383,6 @@ where } } - fn set_expiration_clock(&self, clock: Option) { - let mut exp_clock = self.clocks.expiration_clock.write(); - if let Some(clock) = clock { - let std_now = StdInstant::now(); - let now = Instant::new(clock.now()); - *exp_clock = Some(clock); - self.clocks - .has_expiration_clock - .store(true, Ordering::SeqCst); - self.clocks.set_origin(now, std_now); - self.timer_wheel.lock().set_origin(now); - } else { - self.clocks - .has_expiration_clock - .store(false, Ordering::SeqCst); - *exp_clock = None; - } - } - fn key_locks_map_is_empty(&self) -> bool { self.key_locks .as_ref() @@ -2579,8 +2484,8 @@ fn is_expired_by_tti( now: Instant, ) -> bool { if let Some(tti) = time_to_idle { - let checked_add = entry_last_accessed.checked_add(*tti).expect("tti overflow"); - checked_add <= now + let expiration = entry_last_accessed.saturating_add(*tti); + expiration <= now } else { false } @@ -2593,8 +2498,8 @@ fn is_expired_by_ttl( now: Instant, ) -> bool { if let Some(ttl) = time_to_live { - let checked_add = entry_last_modified.checked_add(*ttl).expect("tti overflow"); - checked_add <= now + let expiration = entry_last_modified.saturating_add(*ttl); + expiration <= now } else { false } @@ -2603,7 +2508,7 @@ fn is_expired_by_ttl( #[cfg(test)] mod tests { use crate::{ - common::HousekeeperConfig, + common::{time::Clock, HousekeeperConfig}, policy::{EvictionPolicy, ExpirationPolicy}, }; @@ -2629,6 +2534,7 @@ mod tests { ExpirationPolicy::default(), HousekeeperConfig::default(), false, + Clock::default(), ); cache.inner.enable_frequency_sketch_for_testing(); assert_eq!( @@ -2686,10 +2592,7 @@ mod tests { type Value = char; fn current_time(cache: &BaseCache) -> StdInstant { - cache - .inner - .clocks() - .to_std_instant(cache.current_time_from_expiration_clock()) + cache.inner.clock().to_std_instant(cache.current_time()) } fn insert(cache: &BaseCache, key: Key, hash: u64, value: Value) { @@ -2964,6 +2867,8 @@ mod tests { expectation: Arc::clone(&expectation), })); + let (clock, mock) = Clock::mock(); + let mut cache = BaseCache::::new( None, None, @@ -2979,12 +2884,10 @@ mod tests { ), HousekeeperConfig::default(), false, + clock, ); cache.reconfigure_for_testing(); - let (clock, mock) = Clock::mock(); - cache.set_expiration_clock(Some(clock)); - // Make the cache exterior immutable. let cache = cache; From 668bdc01e1e143c2f111e99826bf1dd862dc920f Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sun, 5 Jan 2025 15:08:30 +0800 Subject: [PATCH 2/4] Remove commented out codes --- src/future/base_cache.rs | 8 -------- src/future/cache.rs | 4 ---- 2 files changed, 12 deletions(-) diff --git a/src/future/base_cache.rs b/src/future/base_cache.rs index 8154505a..cbec1bb1 100644 --- a/src/future/base_cache.rs +++ b/src/future/base_cache.rs @@ -863,14 +863,6 @@ where } } - // pub(crate) async fn set_expiration_clock(&self, clock: Option) { - // self.inner.set_expiration_clock(clock).await; - // if let Some(hk) = &self.housekeeper { - // let now = self.current_time_from_expiration_clock(); - // hk.reset_run_after(now); - // } - // } - pub(crate) fn key_locks_map_is_empty(&self) -> bool { self.inner.key_locks_map_is_empty() } diff --git a/src/future/cache.rs b/src/future/cache.rs index 77474a99..32325e93 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -2079,10 +2079,6 @@ where self.base.reconfigure_for_testing().await; } - // async fn set_expiration_clock(&self, clock: Option) { - // self.base.set_expiration_clock(clock).await; - // } - fn key_locks_map_is_empty(&self) -> bool { self.base.key_locks_map_is_empty() } From d22f4f3bf71c664728507d8c5764f128fa96bb0b Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sun, 5 Jan 2025 16:00:45 +0800 Subject: [PATCH 3/4] Update the change log --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1457e261..9ce88066 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ ### Changed +- Replaced most uses of `quanta::Instant` with `std::time::Instant` to increase the + accuracy of time measurements. ([#481][gh-pull-0481]) - Switched to `AtomicU64` of `portable-atomic` crate for the platforms where `AtomicU64` is not available in `std`. ([#480][gh-pull-0480]) - `moka`'s `atomic64` feature no longer has any effect on the build as @@ -948,6 +950,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (Mar 25, 2021). [gh-issue-0034]: https://github.com/moka-rs/moka/issues/34/ [gh-issue-0031]: https://github.com/moka-rs/moka/issues/31/ +[gh-pull-0481]: https://github.com/moka-rs/moka/pull/481/ [gh-pull-0480]: https://github.com/moka-rs/moka/pull/480/ [gh-pull-0474]: https://github.com/moka-rs/moka/pull/474/ [gh-pull-0466]: https://github.com/moka-rs/moka/pull/466/ From 5d03e029324e0baad4344c17ff7ed78614c64a02 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sun, 5 Jan 2025 22:42:54 +0800 Subject: [PATCH 4/4] Minor tweaks in the clock module --- src/common/time/clock.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/common/time/clock.rs b/src/common/time/clock.rs index 19735959..2032f91a 100644 --- a/src/common/time/clock.rs +++ b/src/common/time/clock.rs @@ -38,13 +38,9 @@ impl Default for ClockType { fn default() -> Self { #[cfg(feature = "quanta")] { - // Try to get the both origins at close to the same time. Assuming that - // `quanta::Instant::now` has lower latency than `StdInstant::now`. - let quanta_origin = quanta::Instant::now(); - let std_origin = StdInstant::now(); return ClockType::Hybrid { - std_origin, - quanta_origin, + std_origin: StdInstant::now(), + quanta_origin: quanta::Instant::now(), }; } @@ -113,6 +109,9 @@ impl Clock { } /// Converts an `Instant` to a `std::time::Instant`. + /// + /// **IMPORTANT**: The caller must ensure that the `Instant` was created by this + /// `Clock`, otherwise the resulting `std::time::Instant` will be incorrect. pub(crate) fn to_std_instant(&self, instant: Instant) -> StdInstant { match &self.ty { ClockType::Standard { origin } => {