From d277e381b1dde52d49f0e38bfdd8daf138104c02 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Mon, 15 Feb 2021 23:53:18 +0800 Subject: [PATCH 1/6] Implement unsync::Cache Also move some existing types from common module to sync module. --- .vscode/settings.json | 1 + CHANGELOG.md | 7 + src/common.rs | 272 ------------- src/future/cache.rs | 2 +- src/lib.rs | 1 + src/sync.rs | 274 +++++++++++++ src/{common => sync}/base_cache.rs | 185 ++++----- src/sync/cache.rs | 5 +- src/{common => sync}/deques.rs | 8 +- src/{common => sync}/housekeeper.rs | 10 +- src/unsync.rs | 179 +++++++++ src/unsync/cache.rs | 585 ++++++++++++++++++++++++++++ src/unsync/deques.rs | 149 +++++++ 13 files changed, 1301 insertions(+), 377 deletions(-) rename src/{common => sync}/base_cache.rs (97%) rename src/{common => sync}/deques.rs (97%) rename src/{common => sync}/housekeeper.rs (99%) create mode 100644 src/unsync.rs create mode 100644 src/unsync/cache.rs create mode 100644 src/unsync/deques.rs diff --git a/.vscode/settings.json b/.vscode/settings.json index 57330662..60c697f7 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -10,6 +10,7 @@ "Moka", "Ristretto", "Tatsuya", + "unsync", "actix", "ahash", "benmanes", diff --git a/CHANGELOG.md b/CHANGELOG.md index 271b573f..79cbb28b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Moka — Release Notes +## Unreleased + +### Features + +- Introduce an unsync cache. + + ## Version 0.2.0 ### Features diff --git a/src/common.rs b/src/common.rs index e8452403..e7956937 100644 --- a/src/common.rs +++ b/src/common.rs @@ -1,285 +1,13 @@ -use parking_lot::Mutex; use quanta::Instant; -use std::{ - ptr::NonNull, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, -}; -pub(crate) mod base_cache; pub(crate) mod deque; -pub(crate) mod deques; pub(crate) mod frequency_sketch; -pub(crate) mod housekeeper; pub(crate) mod thread_pool; pub(crate) mod unsafe_weak_pointer; -use self::deque::DeqNode; - -pub(crate) struct KeyHash { - pub(crate) key: Arc, - pub(crate) hash: u64, -} - -impl KeyHash { - pub(crate) fn new(key: Arc, hash: u64) -> Self { - Self { key, hash } - } -} - -pub(crate) struct KeyDate { - pub(crate) key: Arc, - pub(crate) timestamp: Option>, -} - -impl KeyDate { - pub(crate) fn new(key: Arc, timestamp: Option>) -> Self { - Self { key, timestamp } - } -} - -pub(crate) struct KeyHashDate { - pub(crate) key: Arc, - pub(crate) hash: u64, - pub(crate) timestamp: Option>, -} - -impl KeyHashDate { - pub(crate) fn new(kh: KeyHash, timestamp: Option>) -> Self { - Self { - key: kh.key, - hash: kh.hash, - timestamp, - } - } -} - -// DeqNode for an access order queue. -type KeyDeqNodeAo = NonNull>>; - -// DeqNode for the write order queue. -type KeyDeqNodeWo = NonNull>>; - -struct DeqNodes { - access_order_q_node: Option>, - write_order_q_node: Option>, -} - -#[cfg(feature = "future")] -// Multi-threaded async runtimes require ValueEntry to be Send, but it will -// not be without this `unsafe impl`. This is because DeqNodes have NonNull -// pointers. -unsafe impl Send for DeqNodes {} - -pub(crate) struct ValueEntry { - pub(crate) value: V, - last_accessed: Option>, - last_modified: Option>, - nodes: Mutex>, -} - -impl ValueEntry { - pub(crate) fn new( - value: V, - last_accessed: Option, - last_modified: Option, - access_order_q_node: Option>, - write_order_q_node: Option>, - ) -> Self { - Self { - value, - last_accessed: last_accessed.map(|ts| Arc::new(AtomicU64::new(ts.as_u64()))), - last_modified: last_modified.map(|ts| Arc::new(AtomicU64::new(ts.as_u64()))), - nodes: Mutex::new(DeqNodes { - access_order_q_node, - write_order_q_node, - }), - } - } - - pub(crate) fn new_with(value: V, other: &Self) -> Self { - let nodes = { - let other_nodes = other.nodes.lock(); - DeqNodes { - access_order_q_node: other_nodes.access_order_q_node, - write_order_q_node: other_nodes.write_order_q_node, - } - }; - Self { - value, - last_accessed: other.last_accessed.clone(), - last_modified: other.last_modified.clone(), - nodes: Mutex::new(nodes), - } - } - - pub(crate) fn raw_last_accessed(&self) -> Option> { - self.last_accessed.clone() - } - - pub(crate) fn raw_last_modified(&self) -> Option> { - self.last_modified.clone() - } - - pub(crate) fn access_order_q_node(&self) -> Option> { - self.nodes.lock().access_order_q_node - } - - pub(crate) fn set_access_order_q_node(&self, node: Option>) { - self.nodes.lock().access_order_q_node = node; - } - - pub(crate) fn take_access_order_q_node(&self) -> Option> { - self.nodes.lock().access_order_q_node.take() - } - - pub(crate) fn write_order_q_node(&self) -> Option> { - self.nodes.lock().write_order_q_node - } - - pub(crate) fn set_write_order_q_node(&self, node: Option>) { - self.nodes.lock().write_order_q_node = node; - } - - pub(crate) fn take_write_order_q_node(&self) -> Option> { - self.nodes.lock().write_order_q_node.take() - } - - pub(crate) fn unset_q_nodes(&self) { - let mut nodes = self.nodes.lock(); - nodes.access_order_q_node = None; - nodes.write_order_q_node = None; - } -} - pub(crate) trait AccessTime { fn last_accessed(&self) -> Option; fn set_last_accessed(&mut self, timestamp: Instant); fn last_modified(&self) -> Option; fn set_last_modified(&mut self, timestamp: Instant); } - -impl AccessTime for Arc> { - #[inline] - fn last_accessed(&self) -> Option { - self.last_accessed - .as_ref() - .map(|ts| ts.load(Ordering::Relaxed)) - .and_then(|ts| { - if ts == u64::MAX { - None - } else { - Some(unsafe { std::mem::transmute(ts) }) - } - }) - } - - #[inline] - fn set_last_accessed(&mut self, timestamp: Instant) { - if let Some(ts) = &self.last_accessed { - ts.store(timestamp.as_u64(), Ordering::Relaxed); - } - } - - #[inline] - fn last_modified(&self) -> Option { - self.last_modified - .as_ref() - .map(|ts| ts.load(Ordering::Relaxed)) - .and_then(|ts| { - if ts == u64::MAX { - None - } else { - Some(unsafe { std::mem::transmute(ts) }) - } - }) - } - - #[inline] - fn set_last_modified(&mut self, timestamp: Instant) { - if let Some(ts) = &self.last_modified { - ts.store(timestamp.as_u64(), Ordering::Relaxed); - } - } -} - -impl AccessTime for DeqNode> { - #[inline] - fn last_accessed(&self) -> Option { - None - } - - #[inline] - fn set_last_accessed(&mut self, _timestamp: Instant) { - // do nothing - } - - #[inline] - fn last_modified(&self) -> Option { - self.element - .timestamp - .as_ref() - .map(|ts| ts.load(Ordering::Relaxed)) - .and_then(|ts| { - if ts == u64::MAX { - None - } else { - Some(unsafe { std::mem::transmute(ts) }) - } - }) - } - - #[inline] - fn set_last_modified(&mut self, timestamp: Instant) { - if let Some(ts) = self.element.timestamp.as_ref() { - ts.store(timestamp.as_u64(), Ordering::Relaxed); - } - } -} - -impl AccessTime for DeqNode> { - #[inline] - fn last_accessed(&self) -> Option { - self.element - .timestamp - .as_ref() - .map(|ts| ts.load(Ordering::Relaxed)) - .and_then(|ts| { - if ts == u64::MAX { - None - } else { - Some(unsafe { std::mem::transmute(ts) }) - } - }) - } - - #[inline] - fn set_last_accessed(&mut self, timestamp: Instant) { - if let Some(ts) = self.element.timestamp.as_ref() { - ts.store(timestamp.as_u64(), Ordering::Relaxed); - } - } - - #[inline] - fn last_modified(&self) -> Option { - None - } - - #[inline] - fn set_last_modified(&mut self, _timestamp: Instant) { - // do nothing - } -} - -pub(crate) enum ReadOp { - Hit(u64, Arc>, Option), - Miss(u64), -} - -pub(crate) enum WriteOp { - Insert(KeyHash, Arc>), - Update(Arc>), - Remove(Arc>), -} diff --git a/src/future/cache.rs b/src/future/cache.rs index dd20c8a6..045a2253 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -1,5 +1,5 @@ use super::ConcurrentCacheExt; -use crate::common::{ +use crate::sync::{ base_cache::{BaseCache, HouseKeeperArc, MAX_SYNC_REPEATS, WRITE_RETRY_INTERVAL_MICROS}, housekeeper::InnerSync, WriteOp, diff --git a/src/lib.rs b/src/lib.rs index 56f5ecc9..becb36f6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -141,5 +141,6 @@ pub mod future; pub mod sync; +pub mod unsync; pub(crate) mod common; diff --git a/src/sync.rs b/src/sync.rs index f3064af0..b8d8d1b5 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -1,7 +1,22 @@ //! Provides thread-safe, synchronous (blocking) cache implementations. +use crate::common::{deque::DeqNode, AccessTime}; + +use parking_lot::Mutex; +use quanta::Instant; +use std::{ + ptr::NonNull, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, +}; + +pub(crate) mod base_cache; mod builder; pub(crate) mod cache; +pub(crate) mod deques; +pub(crate) mod housekeeper; mod segment; pub use builder::CacheBuilder; @@ -13,3 +28,262 @@ pub trait ConcurrentCacheExt { /// Performs any pending maintenance operations needed by the cache. fn sync(&self); } + +pub(crate) struct KeyHash { + pub(crate) key: Arc, + pub(crate) hash: u64, +} + +impl KeyHash { + pub(crate) fn new(key: Arc, hash: u64) -> Self { + Self { key, hash } + } +} + +pub(crate) struct KeyDate { + pub(crate) key: Arc, + pub(crate) timestamp: Option>, +} + +impl KeyDate { + pub(crate) fn new(key: Arc, timestamp: Option>) -> Self { + Self { key, timestamp } + } +} + +pub(crate) struct KeyHashDate { + pub(crate) key: Arc, + pub(crate) hash: u64, + pub(crate) timestamp: Option>, +} + +impl KeyHashDate { + pub(crate) fn new(kh: KeyHash, timestamp: Option>) -> Self { + Self { + key: kh.key, + hash: kh.hash, + timestamp, + } + } +} + +// DeqNode for an access order queue. +type KeyDeqNodeAo = NonNull>>; + +// DeqNode for the write order queue. +type KeyDeqNodeWo = NonNull>>; + +struct DeqNodes { + access_order_q_node: Option>, + write_order_q_node: Option>, +} + +#[cfg(feature = "future")] +// Multi-threaded async runtimes require ValueEntry to be Send, but it will +// not be without this `unsafe impl`. This is because DeqNodes have NonNull +// pointers. +unsafe impl Send for DeqNodes {} + +pub(crate) struct ValueEntry { + pub(crate) value: V, + last_accessed: Option>, + last_modified: Option>, + nodes: Mutex>, +} + +impl ValueEntry { + pub(crate) fn new( + value: V, + last_accessed: Option, + last_modified: Option, + access_order_q_node: Option>, + write_order_q_node: Option>, + ) -> Self { + Self { + value, + last_accessed: last_accessed.map(|ts| Arc::new(AtomicU64::new(ts.as_u64()))), + last_modified: last_modified.map(|ts| Arc::new(AtomicU64::new(ts.as_u64()))), + nodes: Mutex::new(DeqNodes { + access_order_q_node, + write_order_q_node, + }), + } + } + + pub(crate) fn new_with(value: V, other: &Self) -> Self { + let nodes = { + let other_nodes = other.nodes.lock(); + DeqNodes { + access_order_q_node: other_nodes.access_order_q_node, + write_order_q_node: other_nodes.write_order_q_node, + } + }; + Self { + value, + last_accessed: other.last_accessed.clone(), + last_modified: other.last_modified.clone(), + nodes: Mutex::new(nodes), + } + } + + pub(crate) fn raw_last_accessed(&self) -> Option> { + self.last_accessed.clone() + } + + pub(crate) fn raw_last_modified(&self) -> Option> { + self.last_modified.clone() + } + + pub(crate) fn access_order_q_node(&self) -> Option> { + self.nodes.lock().access_order_q_node + } + + pub(crate) fn set_access_order_q_node(&self, node: Option>) { + self.nodes.lock().access_order_q_node = node; + } + + pub(crate) fn take_access_order_q_node(&self) -> Option> { + self.nodes.lock().access_order_q_node.take() + } + + pub(crate) fn write_order_q_node(&self) -> Option> { + self.nodes.lock().write_order_q_node + } + + pub(crate) fn set_write_order_q_node(&self, node: Option>) { + self.nodes.lock().write_order_q_node = node; + } + + pub(crate) fn take_write_order_q_node(&self) -> Option> { + self.nodes.lock().write_order_q_node.take() + } + + pub(crate) fn unset_q_nodes(&self) { + let mut nodes = self.nodes.lock(); + nodes.access_order_q_node = None; + nodes.write_order_q_node = None; + } +} + +impl AccessTime for Arc> { + #[inline] + fn last_accessed(&self) -> Option { + self.last_accessed + .as_ref() + .map(|ts| ts.load(Ordering::Relaxed)) + .and_then(|ts| { + if ts == u64::MAX { + None + } else { + Some(unsafe { std::mem::transmute(ts) }) + } + }) + } + + #[inline] + fn set_last_accessed(&mut self, timestamp: Instant) { + if let Some(ts) = &self.last_accessed { + ts.store(timestamp.as_u64(), Ordering::Relaxed); + } + } + + #[inline] + fn last_modified(&self) -> Option { + self.last_modified + .as_ref() + .map(|ts| ts.load(Ordering::Relaxed)) + .and_then(|ts| { + if ts == u64::MAX { + None + } else { + Some(unsafe { std::mem::transmute(ts) }) + } + }) + } + + #[inline] + fn set_last_modified(&mut self, timestamp: Instant) { + if let Some(ts) = &self.last_modified { + ts.store(timestamp.as_u64(), Ordering::Relaxed); + } + } +} + +impl AccessTime for DeqNode> { + #[inline] + fn last_accessed(&self) -> Option { + None + } + + #[inline] + fn set_last_accessed(&mut self, _timestamp: Instant) { + // do nothing + } + + #[inline] + fn last_modified(&self) -> Option { + self.element + .timestamp + .as_ref() + .map(|ts| ts.load(Ordering::Relaxed)) + .and_then(|ts| { + if ts == u64::MAX { + None + } else { + Some(unsafe { std::mem::transmute(ts) }) + } + }) + } + + #[inline] + fn set_last_modified(&mut self, timestamp: Instant) { + if let Some(ts) = self.element.timestamp.as_ref() { + ts.store(timestamp.as_u64(), Ordering::Relaxed); + } + } +} + +impl AccessTime for DeqNode> { + #[inline] + fn last_accessed(&self) -> Option { + self.element + .timestamp + .as_ref() + .map(|ts| ts.load(Ordering::Relaxed)) + .and_then(|ts| { + if ts == u64::MAX { + None + } else { + Some(unsafe { std::mem::transmute(ts) }) + } + }) + } + + #[inline] + fn set_last_accessed(&mut self, timestamp: Instant) { + if let Some(ts) = self.element.timestamp.as_ref() { + ts.store(timestamp.as_u64(), Ordering::Relaxed); + } + } + + #[inline] + fn last_modified(&self) -> Option { + None + } + + #[inline] + fn set_last_modified(&mut self, _timestamp: Instant) { + // do nothing + } +} + +pub(crate) enum ReadOp { + Hit(u64, Arc>, Option), + Miss(u64), +} + +pub(crate) enum WriteOp { + Insert(KeyHash, Arc>), + Update(Arc>), + Remove(Arc>), +} diff --git a/src/common/base_cache.rs b/src/sync/base_cache.rs similarity index 97% rename from src/common/base_cache.rs rename to src/sync/base_cache.rs index 1e148aa5..2a1525f3 100644 --- a/src/common/base_cache.rs +++ b/src/sync/base_cache.rs @@ -1,9 +1,12 @@ use super::{ - deque::{CacheRegion, DeqNode, Deque}, deques::Deques, - frequency_sketch::FrequencySketch, housekeeper::{Housekeeper, InnerSync, SyncPace}, - AccessTime, KeyDate, KeyHash, KeyHashDate, ReadOp, ValueEntry, WriteOp, + KeyDate, KeyHash, KeyHashDate, ReadOp, ValueEntry, WriteOp, +}; +use crate::common::{ + deque::{CacheRegion, DeqNode, Deque}, + frequency_sketch::FrequencySketch, + AccessTime, }; use crossbeam_channel::{Receiver, Sender, TrySendError}; @@ -513,92 +516,6 @@ where K: Hash + Eq, S: BuildHasher + Clone, { - #[inline] - fn handle_insert( - &self, - kh: KeyHash, - entry: Arc>, - timestamp: Option, - deqs: &mut Deques, - freq: &FrequencySketch, - ) { - let last_accessed = entry.raw_last_accessed().map(|ts| { - ts.store(timestamp.unwrap().as_u64(), Ordering::Relaxed); - ts - }); - let last_modified = entry.raw_last_modified().map(|ts| { - ts.store(timestamp.unwrap().as_u64(), Ordering::Relaxed); - ts - }); - - if self.cache.len() <= self.max_capacity { - // Add the candidate to the deque. - let key = Arc::clone(&kh.key); - deqs.push_back_ao( - CacheRegion::MainProbation, - KeyHashDate::new(kh, last_accessed), - &entry, - ); - if self.time_to_live.is_some() { - deqs.push_back_wo(KeyDate::new(key, last_modified), &entry); - } - } else { - let victim = self.find_cache_victim(deqs, freq); - if self.admit(kh.hash, victim, freq) { - // Remove the victim from the cache and deque. - // - // TODO: Check if the selected victim was actually removed. If not, - // maybe we should find another victim. This can happen because it - // could have been already removed from the cache but the removal - // from the deque is still on the write operations queue and is not - // yet executed. - if let Some(vic_entry) = self.cache.remove(&victim.element.key) { - deqs.unlink_ao(Arc::clone(&vic_entry)); - Deques::unlink_wo(&mut deqs.write_order, vic_entry); - } else { - let victim = NonNull::from(victim); - deqs.unlink_node_ao(victim); - } - // Add the candidate to the deque. - let key = Arc::clone(&kh.key); - deqs.push_back_ao( - CacheRegion::MainProbation, - KeyHashDate::new(kh, last_accessed), - &entry, - ); - if self.time_to_live.is_some() { - deqs.push_back_wo(KeyDate::new(key, last_modified), &entry); - } - } else { - // Remove the candidate from the cache. - self.cache.remove(&kh.key); - } - } - } - - #[inline] - fn find_cache_victim<'a>( - &self, - deqs: &'a mut Deques, - _freq: &FrequencySketch, - ) -> &'a DeqNode> { - // TODO: Check its frequency. If it is not very low, maybe we should - // check frequencies of next few others and pick from them. - deqs.probation.peek_front().expect("No victim found") - } - - #[inline] - fn admit( - &self, - candidate_hash: u64, - victim: &DeqNode>, - freq: &FrequencySketch, - ) -> bool { - // TODO: Implement some randomness to mitigate hash DoS attack. - // See Caffeine's implementation. - freq.frequency(candidate_hash) > freq.frequency(victim.element.hash) - } - fn do_sync(&self, mut deqs: MutexGuard<'_, Deques>, max_repeats: usize) -> Option { let mut calls = 0; let mut should_sync = true; @@ -676,14 +593,100 @@ where deqs.move_to_back_wo(entry) } Ok(Remove(entry)) => { - deqs.unlink_ao(Arc::clone(&entry)); - Deques::unlink_wo(&mut deqs.write_order, entry); + Self::handle_remove(deqs, entry); } Err(_) => break, }; } } + fn handle_insert( + &self, + kh: KeyHash, + entry: Arc>, + timestamp: Option, + deqs: &mut Deques, + freq: &FrequencySketch, + ) { + let last_accessed = entry.raw_last_accessed().map(|ts| { + ts.store(timestamp.unwrap().as_u64(), Ordering::Relaxed); + ts + }); + let last_modified = entry.raw_last_modified().map(|ts| { + ts.store(timestamp.unwrap().as_u64(), Ordering::Relaxed); + ts + }); + + if self.cache.len() <= self.max_capacity { + // Add the candidate to the deque. + let key = Arc::clone(&kh.key); + deqs.push_back_ao( + CacheRegion::MainProbation, + KeyHashDate::new(kh, last_accessed), + &entry, + ); + if self.time_to_live.is_some() { + deqs.push_back_wo(KeyDate::new(key, last_modified), &entry); + } + } else { + let victim = Self::find_cache_victim(deqs, freq); + if Self::admit(kh.hash, victim, freq) { + // Remove the victim from the cache and deque. + // + // TODO: Check if the selected victim was actually removed. If not, + // maybe we should find another victim. This can happen because it + // could have been already removed from the cache but the removal + // from the deque is still on the write operations queue and is not + // yet executed. + if let Some(vic_entry) = self.cache.remove(&victim.element.key) { + Self::handle_remove(deqs, vic_entry); + } else { + let victim = NonNull::from(victim); + deqs.unlink_node_ao(victim); + } + // Add the candidate to the deque. + let key = Arc::clone(&kh.key); + deqs.push_back_ao( + CacheRegion::MainProbation, + KeyHashDate::new(kh, last_accessed), + &entry, + ); + if self.time_to_live.is_some() { + deqs.push_back_wo(KeyDate::new(key, last_modified), &entry); + } + } else { + // Remove the candidate from the cache. + self.cache.remove(&kh.key); + } + } + } + + #[inline] + fn find_cache_victim<'a>( + deqs: &'a mut Deques, + _freq: &FrequencySketch, + ) -> &'a DeqNode> { + // TODO: Check its frequency. If it is not very low, maybe we should + // check frequencies of next few others and pick from them. + deqs.probation.peek_front().expect("No victim found") + } + + #[inline] + fn admit( + candidate_hash: u64, + victim: &DeqNode>, + freq: &FrequencySketch, + ) -> bool { + // TODO: Implement some randomness to mitigate hash DoS attack. + // See Caffeine's implementation. + freq.frequency(candidate_hash) > freq.frequency(victim.element.hash) + } + + fn handle_remove(deqs: &mut Deques, entry: Arc>) { + deqs.unlink_ao(Arc::clone(&entry)); + Deques::unlink_wo(&mut deqs.write_order, entry); + } + fn evict(&self, deqs: &mut Deques, batch_size: usize) { debug_assert!(self.has_expiry()); diff --git a/src/sync/cache.rs b/src/sync/cache.rs index b8c05726..5fa0f2c0 100644 --- a/src/sync/cache.rs +++ b/src/sync/cache.rs @@ -1,8 +1,7 @@ -use super::ConcurrentCacheExt; -use crate::common::{ +use super::{ base_cache::{BaseCache, HouseKeeperArc, MAX_SYNC_REPEATS, WRITE_RETRY_INTERVAL_MICROS}, housekeeper::InnerSync, - WriteOp, + ConcurrentCacheExt, WriteOp, }; use crossbeam_channel::{Sender, TrySendError}; diff --git a/src/common/deques.rs b/src/sync/deques.rs similarity index 97% rename from src/common/deques.rs rename to src/sync/deques.rs index 8faf69f1..50106f7a 100644 --- a/src/common/deques.rs +++ b/src/sync/deques.rs @@ -1,7 +1,5 @@ -use super::{ - deque::{CacheRegion, DeqNode, Deque}, - KeyDate, KeyHashDate, ValueEntry, -}; +use super::{KeyDate, KeyHashDate, ValueEntry}; +use crate::common::deque::{CacheRegion, DeqNode, Deque}; use std::{ptr::NonNull, sync::Arc}; @@ -99,7 +97,7 @@ impl Deques { pub(crate) fn unlink_wo(deq: &mut Deque>, entry: Arc>) { if let Some(node) = entry.take_write_order_q_node() { - Deques::unlink_node_wo(deq, node); + Self::unlink_node_wo(deq, node); } } diff --git a/src/common/housekeeper.rs b/src/sync/housekeeper.rs similarity index 99% rename from src/common/housekeeper.rs rename to src/sync/housekeeper.rs index c86744c4..c4113222 100644 --- a/src/common/housekeeper.rs +++ b/src/sync/housekeeper.rs @@ -1,11 +1,11 @@ -use super::{ - thread_pool::{ThreadPool, ThreadPoolRegistry}, - unsafe_weak_pointer::UnsafeWeakPointer, -}; -use crate::common::base_cache::{ +use super::base_cache::{ MAX_SYNC_REPEATS, PERIODICAL_SYNC_FAST_PACE_NANOS, PERIODICAL_SYNC_INITIAL_DELAY_MILLIS, PERIODICAL_SYNC_NORMAL_PACE_MILLIS, }; +use crate::common::{ + thread_pool::{ThreadPool, ThreadPoolRegistry}, + unsafe_weak_pointer::UnsafeWeakPointer, +}; use parking_lot::Mutex; use scheduled_thread_pool::JobHandle; diff --git a/src/unsync.rs b/src/unsync.rs new file mode 100644 index 00000000..23cf68b9 --- /dev/null +++ b/src/unsync.rs @@ -0,0 +1,179 @@ +pub(crate) mod cache; +mod deques; + +use std::{cell::RefCell, ptr::NonNull, rc::Rc}; + +pub use cache::Cache; +use quanta::Instant; + +use crate::common::{deque::DeqNode, AccessTime}; + +pub(crate) struct KeyDate { + pub(crate) key: Rc, + pub(crate) timestamp: Option, +} + +impl KeyDate { + pub(crate) fn new(key: Rc, timestamp: Option) -> Self { + Self { key, timestamp } + } +} + +pub(crate) struct KeyHashDate { + pub(crate) key: Rc, + pub(crate) hash: u64, + pub(crate) timestamp: Option, +} + +impl KeyHashDate { + pub(crate) fn new(key: Rc, hash: u64, timestamp: Option) -> Self { + Self { + key, + hash, + timestamp, + } + } +} + +// DeqNode for an access order queue. +type KeyDeqNodeAo = NonNull>>; + +// DeqNode for the write order queue. +type KeyDeqNodeWo = NonNull>>; + +struct DeqNodes { + access_order_q_node: Option>, + write_order_q_node: Option>, +} + +pub(crate) struct ValueEntry { + pub(crate) value: V, + deq_nodes: RefCell>, +} + +impl ValueEntry { + pub(crate) fn new( + value: V, + access_order_q_node: Option>, + write_order_q_node: Option>, + ) -> Self { + Self { + value, + deq_nodes: RefCell::new(DeqNodes { + access_order_q_node, + write_order_q_node, + }), + } + } + + // #[inline] + // pub(crate) fn set_deq_nodes(&self, access_order_q_node: Option>, write_order_q_node: Option>) { + // let mut deq_nodes = self.deq_nodes.borrow_mut(); + // deq_nodes.access_order_q_node = access_order_q_node; + // deq_nodes.write_order_q_node = write_order_q_node; + // } + + #[inline] + pub(crate) fn replace_deq_nodes_with(&self, other: Self) { + let mut self_deq_nodes = self.deq_nodes.borrow_mut(); + let mut other_deq_nodes = other.deq_nodes.borrow_mut(); + self_deq_nodes.access_order_q_node = other_deq_nodes.access_order_q_node.take(); + self_deq_nodes.write_order_q_node = other_deq_nodes.write_order_q_node.take(); + } + + #[inline] + pub(crate) fn access_order_q_node(&self) -> Option> { + self.deq_nodes.borrow().access_order_q_node + } + + #[inline] + pub(crate) fn set_access_order_q_node(&mut self, node: Option>) { + self.deq_nodes.borrow_mut().access_order_q_node = node; + } + + #[inline] + pub(crate) fn take_access_order_q_node(&mut self) -> Option> { + self.deq_nodes.borrow_mut().access_order_q_node.take() + } + + #[inline] + pub(crate) fn write_order_q_node(&self) -> Option> { + self.deq_nodes.borrow().write_order_q_node + } + + #[inline] + pub(crate) fn set_write_order_q_node(&mut self, node: Option>) { + self.deq_nodes.borrow_mut().write_order_q_node = node; + } + + #[inline] + pub(crate) fn take_write_order_q_node(&mut self) -> Option> { + self.deq_nodes.borrow_mut().write_order_q_node.take() + } + + // #[inline] + // fn last_accessed(&self) -> Option { + // todo!() + // } + + #[inline] + fn set_last_accessed(&self, timestamp: Instant) { + if let Some(mut node) = self.deq_nodes.borrow_mut().access_order_q_node { + unsafe { node.as_mut() }.set_last_accessed(timestamp); + } + } + + // #[inline] + // fn last_modified(&self) -> Option { + // todo!() + // } + + #[inline] + fn set_last_modified(&self, _timestamp: Instant) { + todo!() + } +} + +impl AccessTime for DeqNode> { + #[inline] + fn last_accessed(&self) -> Option { + None + } + + #[inline] + fn set_last_accessed(&mut self, _timestamp: Instant) { + // do nothing + } + + #[inline] + fn last_modified(&self) -> Option { + self.element.timestamp + } + + #[inline] + fn set_last_modified(&mut self, timestamp: Instant) { + self.element.timestamp = Some(timestamp); + } +} + +impl AccessTime for DeqNode> { + #[inline] + fn last_accessed(&self) -> Option { + self.element.timestamp + } + + #[inline] + fn set_last_accessed(&mut self, timestamp: Instant) { + self.element.timestamp = Some(timestamp); + } + + #[inline] + fn last_modified(&self) -> Option { + None + } + + #[inline] + fn set_last_modified(&mut self, _timestamp: Instant) { + // do nothing + } +} diff --git a/src/unsync/cache.rs b/src/unsync/cache.rs new file mode 100644 index 00000000..0d64d492 --- /dev/null +++ b/src/unsync/cache.rs @@ -0,0 +1,585 @@ +use super::{deques::Deques, KeyDate, KeyHashDate, ValueEntry}; +use crate::common::{ + deque::{CacheRegion, DeqNode, Deque}, + frequency_sketch::FrequencySketch, + AccessTime, +}; + +use quanta::{Clock, Instant}; +use std::{ + borrow::Borrow, + collections::{hash_map::RandomState, HashMap}, + hash::{BuildHasher, Hash, Hasher}, + ptr::NonNull, + rc::Rc, + time::Duration, +}; + +const EVICTION_BATCH_SIZE: usize = 100; + +type CacheStore = std::collections::HashMap, ValueEntry, S>; + +pub struct Cache { + max_capacity: usize, + cache: CacheStore, + build_hasher: S, + deques: Deques, + frequency_sketch: FrequencySketch, + time_to_live: Option, + time_to_idle: Option, + expiration_clock: Option, +} + +impl Cache +where + K: Hash + Eq, + V: Clone, +{ + pub fn new(max_capacity: usize) -> Self { + let build_hasher = RandomState::default(); + Self::with_everything(max_capacity, None, build_hasher, None, None) + } +} + +impl Cache +where + K: Hash + Eq, + V: Clone, + S: BuildHasher + Clone, +{ + pub(crate) fn with_everything( + max_capacity: usize, + initial_capacity: Option, + build_hasher: S, + time_to_live: Option, + time_to_idle: Option, + ) -> Self { + let cache = HashMap::with_capacity_and_hasher( + initial_capacity.unwrap_or_default(), + build_hasher.clone(), + ); + let skt_capacity = usize::max(max_capacity * 32, 100); + let frequency_sketch = FrequencySketch::with_capacity(skt_capacity); + Self { + max_capacity, + cache, + build_hasher, + deques: Deques::default(), + frequency_sketch, + time_to_live, + time_to_idle, + expiration_clock: None, + } + } + + pub fn get(&mut self, key: &Q) -> Option<&V> + where + Rc: Borrow, + Q: Hash + Eq + ?Sized, + { + let hash = self.hash(key); + let has_expiry = self.has_expiry(); + + let now; + if has_expiry { + let now1 = self.current_time_from_expiration_clock(); + self.evict(now1, EVICTION_BATCH_SIZE); + now = Some(now1); + } else { + now = None; + }; + + let (entry, sketch, deqs) = ( + self.cache.get(key), + &mut self.frequency_sketch, + &mut self.deques, + ); + + match (entry, has_expiry) { + // Value not found. + (None, _) => { + Self::record_read(sketch, deqs, hash, None, None); + None + } + // Value found, no expiry. + (Some(entry), false) => { + Self::record_read(sketch, deqs, hash, Some(entry), None); + Some(&entry.value) + } + // Value found, need to check if expired. + (Some(entry), true) => { + // if self.is_expired_entry_wo( + // unsafe { entry.access_order_q_node().borrow().as_ref() }, + // now, + // ) || self.is_expired_entry_ao( + // unsafe { entry.write_order_q_node().borrow().as_ref() }, + // now, + // ) { + // // Expired entry. Record this access as a cache miss rather than a hit. + // self.record_read(hash, None, None); + // None + // } else { + // Valid entry. + Self::record_read(sketch, deqs, hash, Some(entry), now); + Some(&entry.value) + // } + } + } + } + + pub fn insert(&mut self, key: K, value: V) { + let mut timestamp = None; + + if self.has_expiry() { + let now = self.current_time_from_expiration_clock(); + self.evict(now, EVICTION_BATCH_SIZE); + timestamp = Some(now); + } + + let key = Rc::new(key); + let entry = ValueEntry::new(value, None, None); + + if let Some(old_entry) = self.cache.insert(Rc::clone(&key), entry) { + self.handle_update(key, timestamp, old_entry); + } else { + // Insert + let hash = self.hash(&key); + self.handle_insert(key, hash, timestamp); + } + } + + pub fn invalidate(&mut self, key: &Q) + where + Rc: Borrow, + Q: Hash + Eq + ?Sized, + { + if self.has_expiry() { + let now = self.current_time_from_expiration_clock(); + self.evict(now, EVICTION_BATCH_SIZE); + } + + if let Some(mut entry) = self.cache.remove(key) { + self.deques.unlink_ao(&mut entry); + Deques::unlink_wo(&mut self.deques.write_order, &mut entry) + } + } + + pub fn max_capacity(&self) -> usize { + self.max_capacity + } + + pub fn time_to_live(&self) -> Option { + self.time_to_live + } + + /// Returns the `time_to_idle` of this cache. + pub fn time_to_idle(&self) -> Option { + self.time_to_idle + } +} + +// private methods +impl Cache +where + K: Hash + Eq, + V: Clone, + S: BuildHasher + Clone, +{ + #[inline] + fn hash(&self, key: &Q) -> u64 + where + Rc: Borrow, + Q: Hash + Eq + ?Sized, + { + let mut hasher = self.build_hasher.build_hasher(); + key.hash(&mut hasher); + hasher.finish() + } + + #[inline] + fn has_expiry(&self) -> bool { + self.time_to_live.is_some() || self.time_to_idle.is_some() + } + + #[inline] + fn current_time_from_expiration_clock(&self) -> Instant { + if let Some(clock) = &self.expiration_clock { + clock.now() + } else { + Instant::now() + } + } + + #[inline] + fn is_expired_entry_ao( + time_to_idle: &Option, + entry: &impl AccessTime, + now: Instant, + ) -> bool { + if let (Some(ts), Some(tti)) = (entry.last_accessed(), time_to_idle) { + if ts + *tti <= now { + return true; + } + } + false + } + + #[inline] + fn is_expired_entry_wo(&self, entry: &impl AccessTime, now: Instant) -> bool { + debug_assert!(self.has_expiry()); + if let (Some(ts), Some(ttl)) = (entry.last_modified(), self.time_to_live) { + if ts + ttl <= now { + return true; + } + } + false + } + + fn record_read( + frequency_sketch: &mut FrequencySketch, + deques: &mut Deques, + hash: u64, + entry: Option<&ValueEntry>, + timestamp: Option, + ) { + frequency_sketch.increment(hash); + if let Some(entry) = entry { + if let Some(ts) = timestamp { + entry.set_last_accessed(ts); + } + deques.move_to_back_ao(entry) + } + } + + #[inline] + fn handle_insert(&mut self, key: Rc, hash: u64, _timestamp: Option) { + let last_accessed = None; + // let last_accessed = entry.raw_last_accessed().map(|ts| { + // ts.store(timestamp.unwrap().as_u64(), Ordering::Relaxed); + // ts + // }); + let last_modified = None; + // let last_modified = entry.raw_last_modified().map(|ts| { + // ts.store(timestamp.unwrap().as_u64(), Ordering::Relaxed); + // ts + // }); + + let has_free_space = self.cache.len() <= self.max_capacity; + let (cache, deqs, freq) = (&mut self.cache, &mut self.deques, &self.frequency_sketch); + + if has_free_space { + // Add the candidate to the deque. + let key = Rc::clone(&key); + let mut entry = cache.get_mut(&key).unwrap(); + deqs.push_back_ao( + CacheRegion::MainProbation, + KeyHashDate::new(key, hash, last_accessed), + &mut entry, + ); + // if self.time_to_live.is_some() { + // deqs.push_back_wo(KeyDate::new(key, last_modified), &entry); + // } + } else { + let victim = Self::find_cache_victim(deqs, freq); + if Self::admit(hash, victim, freq) { + // Remove the victim from the cache and deque. + // + // TODO: Check if the selected victim was actually removed. If not, + // maybe we should find another victim. This can happen because it + // could have been already removed from the cache but the removal + // from the deque is still on the write operations queue and is not + // yet executed. + if let Some(mut vic_entry) = cache.remove(&victim.element.key) { + deqs.unlink_ao(&mut vic_entry); + Deques::unlink_wo(&mut deqs.write_order, &mut vic_entry); + } else { + let victim = NonNull::from(victim); + deqs.unlink_node_ao(victim); + } + // Add the candidate to the deque. + let mut entry = cache.get_mut(&key).unwrap(); + + let key = Rc::clone(&key); + deqs.push_back_ao( + CacheRegion::MainProbation, + KeyHashDate::new(Rc::clone(&key), hash, last_accessed), + &mut entry, + ); + if self.time_to_live.is_some() { + deqs.push_back_wo(KeyDate::new(key, last_modified), &mut entry); + } + } else { + // Remove the candidate from the cache. + cache.remove(&key); + } + } + } + + #[inline] + fn find_cache_victim<'a>( + deqs: &'a mut Deques, + _freq: &FrequencySketch, + ) -> &'a DeqNode> { + // TODO: Check its frequency. If it is not very low, maybe we should + // check frequencies of next few others and pick from them. + deqs.probation.peek_front().expect("No victim found") + } + + #[inline] + fn admit( + candidate_hash: u64, + victim: &DeqNode>, + freq: &FrequencySketch, + ) -> bool { + // TODO: Implement some randomness to mitigate hash DoS attack. + // See Caffeine's implementation. + freq.frequency(candidate_hash) > freq.frequency(victim.element.hash) + } + + fn handle_update( + &mut self, + key: Rc, + timestamp: Option, + old_entry: ValueEntry, + ) { + let entry = self.cache.get_mut(&key).unwrap(); + entry.replace_deq_nodes_with(old_entry); + if let Some(ts) = timestamp { + entry.set_last_accessed(ts); + entry.set_last_modified(ts); + } + let deqs = &mut self.deques; + deqs.move_to_back_ao(&entry); + deqs.move_to_back_wo(&entry) + } + + fn evict(&mut self, now: Instant, batch_size: usize) { + if self.time_to_live.is_some() { + self.remove_expired_wo(batch_size, now); + } + + if self.time_to_idle.is_some() { + let deqs = &mut self.deques; + let (window, probation, protected, wo, cache, time_to_idle) = ( + &mut deqs.window, + &mut deqs.probation, + &mut deqs.protected, + &mut deqs.write_order, + &mut self.cache, + &self.time_to_idle, + ); + + let mut rm_expired_ao = |name, deq| { + Self::remove_expired_ao(name, deq, wo, cache, time_to_idle, batch_size, now) + }; + + rm_expired_ao("window", window); + rm_expired_ao("probation", probation); + rm_expired_ao("protected", protected); + } + } + + #[inline] + fn remove_expired_ao( + deq_name: &str, + deq: &mut Deque>, + write_order_deq: &mut Deque>, + cache: &mut CacheStore, + time_to_idle: &Option, + batch_size: usize, + now: Instant, + ) { + for _ in 0..batch_size { + let key = deq + .peek_front() + .and_then(|node| { + if Self::is_expired_entry_ao(time_to_idle, &*node, now) { + Some(Some(Rc::clone(&node.element.key))) + } else { + None + } + }) + .unwrap_or(None); + + if key.is_none() { + break; + } + + if let Some(mut entry) = cache.remove(&key.unwrap()) { + Deques::unlink_ao_from_deque(deq_name, deq, &mut entry); + Deques::unlink_wo(write_order_deq, &mut entry); + } else { + deq.pop_front(); + } + } + } + + #[inline] + fn remove_expired_wo(&mut self, batch_size: usize, now: Instant) { + for _ in 0..batch_size { + let key = self + .deques + .write_order + .peek_front() + .and_then(|node| { + if self.is_expired_entry_wo(&*node, now) { + Some(Some(Rc::clone(&node.element.key))) + } else { + None + } + }) + .unwrap_or(None); + + if key.is_none() { + break; + } + + if let Some(mut entry) = self.cache.remove(&key.unwrap()) { + self.deques.unlink_ao(&mut entry); + Deques::unlink_wo(&mut self.deques.write_order, &mut entry); + } else { + self.deques.write_order.pop_front(); + } + } + } +} + +// For unit tests. +// #[cfg(test)] +// impl Cache +// where +// K: Hash + Eq, +// S: BuildHasher + Clone, +// { +// fn set_expiration_clock(&self, clock: Option) { +// todo!() +// } +// } + +// To see the debug prints, run test as `cargo test -- --nocapture` +#[cfg(test)] +mod tests { + use super::Cache; + // use crate::unsync::CacheBuilder; + + // use quanta::Clock; + // use std::time::Duration; + + #[test] + fn basic_single_thread() { + let mut cache = Cache::new(3); + + cache.insert("a", "alice"); + cache.insert("b", "bob"); + assert_eq!(cache.get(&"a"), Some(&"alice")); + assert_eq!(cache.get(&"b"), Some(&"bob")); + // counts: a -> 1, b -> 1 + + cache.insert("c", "cindy"); + assert_eq!(cache.get(&"c"), Some(&"cindy")); + // counts: a -> 1, b -> 1, c -> 1 + + assert_eq!(cache.get(&"a"), Some(&"alice")); + assert_eq!(cache.get(&"b"), Some(&"bob")); + // counts: a -> 2, b -> 2, c -> 1 + + // "d" should not be admitted because its frequency is too low. + cache.insert("d", "david"); // count: d -> 0 + assert_eq!(cache.get(&"d"), None); // d -> 1 + + cache.insert("d", "david"); + assert_eq!(cache.get(&"d"), None); // d -> 2 + + // "d" should be admitted and "c" should be evicted + // because d's frequency is higher then c's. + cache.insert("d", "dennis"); + assert_eq!(cache.get(&"a"), Some(&"alice")); + assert_eq!(cache.get(&"b"), Some(&"bob")); + assert_eq!(cache.get(&"c"), None); + assert_eq!(cache.get(&"d"), Some(&"dennis")); + + cache.invalidate(&"b"); + assert_eq!(cache.get(&"b"), None); + } + + // #[test] + // fn time_to_live() { + // let mut cache = CacheBuilder::new(100) + // .time_to_live(Duration::from_secs(10)) + // .build(); + + // cache.reconfigure_for_testing(); + + // let (clock, mock) = Clock::mock(); + // cache.set_expiration_clock(Some(clock)); + + // cache.insert("a", "alice"); + + // mock.increment(Duration::from_secs(5)); // 5 secs from the start. + + // cache.get(&"a"); + + // mock.increment(Duration::from_secs(5)); // 10 secs. + + // assert_eq!(cache.get(&"a"), None); + // assert!(cache.base.is_empty()); + + // cache.insert("b", "bob"); + + // assert_eq!(cache.base.len(), 1); + + // mock.increment(Duration::from_secs(5)); // 15 secs. + + // assert_eq!(cache.get(&"b"), Some("bob")); + // assert_eq!(cache.base.len(), 1); + + // cache.insert("b", "bill"); + + // mock.increment(Duration::from_secs(5)); // 20 secs + + // assert_eq!(cache.get(&"b"), Some("bill")); + // assert_eq!(cache.cache.len(), 1); + + // mock.increment(Duration::from_secs(5)); // 25 secs + + // assert_eq!(cache.get(&"a"), None); + // assert_eq!(cache.get(&"b"), None); + // assert!(cache.base.is_empty()); + // } + + // #[test] + // fn time_to_idle() { + // let mut cache = CacheBuilder::new(100) + // .time_to_idle(Duration::from_secs(10)) + // .build(); + + // cache.reconfigure_for_testing(); + + // let (clock, mock) = Clock::mock(); + // cache.set_expiration_clock(Some(clock)); + + // cache.insert("a", "alice"); + + // mock.increment(Duration::from_secs(5)); // 5 secs from the start. + + // assert_eq!(cache.get(&"a"), Some("alice")); + + // mock.increment(Duration::from_secs(5)); // 10 secs. + + // cache.insert("b", "bob"); + + // assert_eq!(cache.base.len(), 2); + + // mock.increment(Duration::from_secs(5)); // 15 secs. + + // assert_eq!(cache.get(&"a"), None); + // assert_eq!(cache.get(&"b"), Some("bob")); + // assert_eq!(cache.base.len(), 1); + + // mock.increment(Duration::from_secs(10)); // 25 secs + + // assert_eq!(cache.get(&"a"), None); + // assert_eq!(cache.get(&"b"), None); + // assert!(cache.base.is_empty()); + // } +} diff --git a/src/unsync/deques.rs b/src/unsync/deques.rs new file mode 100644 index 00000000..640bf7d1 --- /dev/null +++ b/src/unsync/deques.rs @@ -0,0 +1,149 @@ +use super::{KeyDate, KeyHashDate, ValueEntry}; +use crate::common::deque::{CacheRegion, DeqNode, Deque}; + +use std::ptr::NonNull; + +pub(crate) struct Deques { + pub(crate) window: Deque>, // Not used yet. + pub(crate) probation: Deque>, + pub(crate) protected: Deque>, // Not used yet. + pub(crate) write_order: Deque>, +} + +#[cfg(feature = "future")] +// Multi-threaded async runtimes require base_cache::Inner to be Send, but it will +// not be without this `unsafe impl`. This is because DeqNodes have NonNull +// pointers. +unsafe impl Send for Deques {} + +impl Default for Deques { + fn default() -> Self { + Self { + window: Deque::new(CacheRegion::Window), + probation: Deque::new(CacheRegion::MainProbation), + protected: Deque::new(CacheRegion::MainProtected), + write_order: Deque::new(CacheRegion::WriteOrder), + } + } +} + +impl Deques { + pub(crate) fn push_back_ao( + &mut self, + region: CacheRegion, + kh: KeyHashDate, + entry: &mut ValueEntry, + ) { + use CacheRegion::*; + let node = Box::new(DeqNode::new(region, kh)); + let node = match node.as_ref().region { + Window => self.window.push_back(node), + MainProbation => self.probation.push_back(node), + MainProtected => self.protected.push_back(node), + WriteOrder => unreachable!(), + }; + entry.set_access_order_q_node(Some(node)); + } + + pub(crate) fn push_back_wo(&mut self, kh: KeyDate, entry: &mut ValueEntry) { + let node = Box::new(DeqNode::new(CacheRegion::WriteOrder, kh)); + let node = self.write_order.push_back(node); + entry.set_write_order_q_node(Some(node)); + } + + pub(crate) fn move_to_back_ao(&mut self, entry: &ValueEntry) { + use CacheRegion::*; + let node = entry.access_order_q_node().unwrap(); + let p = unsafe { node.as_ref() }; + match &p.region { + Window if self.window.contains(p) => unsafe { self.window.move_to_back(node) }, + MainProbation if self.probation.contains(p) => unsafe { + self.probation.move_to_back(node) + }, + MainProtected if self.protected.contains(p) => unsafe { + self.protected.move_to_back(node) + }, + _ => {} + } + } + + pub(crate) fn move_to_back_wo(&mut self, entry: &ValueEntry) { + use CacheRegion::*; + let node = entry.write_order_q_node().unwrap(); + let p = unsafe { node.as_ref() }; + debug_assert_eq!(&p.region, &WriteOrder); + if self.write_order.contains(p) { + unsafe { self.write_order.move_to_back(node) }; + } + } + + pub(crate) fn unlink_ao(&mut self, entry: &mut ValueEntry) { + if let Some(node) = entry.take_access_order_q_node() { + self.unlink_node_ao(node); + } + } + + pub(crate) fn unlink_ao_from_deque( + deq_name: &str, + deq: &mut Deque>, + entry: &mut ValueEntry, + ) { + if let Some(node) = entry.take_access_order_q_node() { + unsafe { Self::unlink_node_ao_from_deque(deq_name, deq, node) }; + } + } + + pub(crate) fn unlink_wo(deq: &mut Deque>, entry: &mut ValueEntry) { + if let Some(node) = entry.take_write_order_q_node() { + Self::unlink_node_wo(deq, node); + } + } + + pub(crate) fn unlink_node_ao(&mut self, node: NonNull>>) { + use CacheRegion::*; + unsafe { + match node.as_ref().region { + Window => Self::unlink_node_ao_from_deque("window", &mut self.window, node), + MainProbation => { + Self::unlink_node_ao_from_deque("probation", &mut self.probation, node) + } + MainProtected => { + Self::unlink_node_ao_from_deque("protected", &mut self.protected, node) + } + _ => unreachable!(), + } + } + } + + unsafe fn unlink_node_ao_from_deque( + deq_name: &str, + deq: &mut Deque>, + node: NonNull>>, + ) { + if deq.contains(node.as_ref()) { + deq.unlink(node); + } else { + panic!( + "unlink_node - node is not a member of {} deque. {:?}", + deq_name, + node.as_ref() + ) + } + } + + pub(crate) fn unlink_node_wo(deq: &mut Deque>, node: NonNull>>) { + use CacheRegion::*; + unsafe { + let p = node.as_ref(); + debug_assert_eq!(&p.region, &WriteOrder); + if deq.contains(p) { + deq.unlink(node); + } else { + panic!( + "unlink_node - node is not a member of write_order deque. {:?}", + p + ) + } + } + } +} From 851d130b909c8ef8e439ef3ccdadbf4c87b1174b Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Tue, 16 Feb 2021 00:08:31 +0800 Subject: [PATCH 2/6] Skip rustfmt and clippy for specific Rust versions --- .github/workflows/CI.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index fd8afeff..8f778ae1 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -54,12 +54,14 @@ jobs: - name: Run Rustfmt uses: actions-rs/cargo@v1 + if: ${{ matrix.rust == 'stable' }} with: command: fmt args: --all -- --check - name: Run Clippy uses: actions-rs/clippy-check@v1 + if: ${{ matrix.rust == 'stable' || matrix.rust == 'beta' }} with: token: ${{ secrets.GITHUB_TOKEN }} args: --features future -- -D warnings From 7aaa9b2c6ec5bfa058bf58f4361d4398487ce1b4 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Tue, 16 Feb 2021 00:40:28 +0800 Subject: [PATCH 3/6] Tweak unit/integration tests for multi threads/async tasks cases - Test blocking_insert() and blocking_invalidate(). - Remove calls for reconfigure_for_testing() and sync(). --- src/future/cache.rs | 30 ++++++++++++++++-------------- src/sync/cache.rs | 10 +--------- tests/runtime_actix_rt1.rs | 16 ++++++++++++---- tests/runtime_actix_rt2.rs | 16 ++++++++++++---- tests/runtime_async_std.rs | 16 ++++++++++++---- tests/runtime_tokio.rs | 16 ++++++++++++---- 6 files changed, 65 insertions(+), 39 deletions(-) diff --git a/src/future/cache.rs b/src/future/cache.rs index 045a2253..50005fa0 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -548,28 +548,30 @@ mod tests { #[tokio::test] async fn basic_multi_async_tasks() { let num_threads = 4; - - let mut cache = Cache::new(100); - cache.reconfigure_for_testing(); - - // Make the cache exterior immutable. - let cache: Cache = cache; + let cache = Cache::new(100); let tasks = (0..num_threads) .map(|id| { let cache = cache.clone(); - tokio::spawn(async move { - cache.insert(10, format!("{}-100", id)).await; - cache.get(&10); - cache.sync(); - cache.insert(20, format!("{}-200", id)).await; - cache.invalidate(&10).await; - }) + if id == 0 { + tokio::spawn(async move { + cache.blocking_insert(10, format!("{}-100", id)); + cache.get(&10); + cache.blocking_insert(20, format!("{}-200", id)); + cache.blocking_invalidate(&10); + }) + } else { + tokio::spawn(async move { + cache.insert(10, format!("{}-100", id)).await; + cache.get(&10); + cache.insert(20, format!("{}-200", id)).await; + cache.invalidate(&10).await; + }) + } }) .collect::>(); let _ = futures::future::join_all(tasks).await; - cache.sync(); assert!(cache.get(&10).is_none()); assert!(cache.get(&20).is_some()); diff --git a/src/sync/cache.rs b/src/sync/cache.rs index 5fa0f2c0..ea167078 100644 --- a/src/sync/cache.rs +++ b/src/sync/cache.rs @@ -424,12 +424,7 @@ mod tests { #[test] fn basic_multi_threads() { let num_threads = 4; - - let mut cache = Cache::new(100); - cache.reconfigure_for_testing(); - - // Make the cache exterior immutable. - let cache = cache; + let cache = Cache::new(100); let handles = (0..num_threads) .map(|id| { @@ -437,7 +432,6 @@ mod tests { std::thread::spawn(move || { cache.insert(10, format!("{}-100", id)); cache.get(&10); - cache.sync(); cache.insert(20, format!("{}-200", id)); cache.invalidate(&10); }) @@ -446,8 +440,6 @@ mod tests { handles.into_iter().for_each(|h| h.join().expect("Failed")); - cache.sync(); - assert!(cache.get(&10).is_none()); assert!(cache.get(&20).is_some()); } diff --git a/tests/runtime_actix_rt1.rs b/tests/runtime_actix_rt1.rs index 254ac933..0ba15253 100644 --- a/tests/runtime_actix_rt1.rs +++ b/tests/runtime_actix_rt1.rs @@ -31,16 +31,24 @@ fn main() { arbiter.spawn(async move { // Insert 64 entries. (NUM_KEYS_PER_TASK = 64) for key in start..end { - // insert() is an async method, so await it - my_cache.insert(key, value(key)).await; + if key % 8 == 0 { + my_cache.blocking_insert(key, value(key)); + } else { + // insert() is an async method, so await it + my_cache.insert(key, value(key)).await; + } // get() returns Option, a clone of the stored value. assert_eq!(my_cache.get(&key), Some(value(key))); } // Invalidate every 4 element of the inserted entries. for key in (start..end).step_by(4) { - // invalidate() is an async method, so await it - my_cache.invalidate(&key).await; + if key % 8 == 0 { + my_cache.blocking_invalidate(&key).await; + } else { + // invalidate() is an async method, so await it + my_cache.invalidate(&key).await; + } } }) }) diff --git a/tests/runtime_actix_rt2.rs b/tests/runtime_actix_rt2.rs index a9594e37..b93a8ccb 100644 --- a/tests/runtime_actix_rt2.rs +++ b/tests/runtime_actix_rt2.rs @@ -31,16 +31,24 @@ fn main() { arbiter.spawn(async move { // Insert 64 entries. (NUM_KEYS_PER_TASK = 64) for key in start..end { - // insert() is an async method, so await it - my_cache.insert(key, value(key)).await; + if key % 8 == 0 { + my_cache.blocking_insert(key, value(key)); + } else { + // insert() is an async method, so await it + my_cache.insert(key, value(key)).await; + } // get() returns Option, a clone of the stored value. assert_eq!(my_cache.get(&key), Some(value(key))); } // Invalidate every 4 element of the inserted entries. for key in (start..end).step_by(4) { - // invalidate() is an async method, so await it - my_cache.invalidate(&key).await; + if key % 8 == 0 { + my_cache.blocking_invalidate(&key).await; + } else { + // invalidate() is an async method, so await it + my_cache.invalidate(&key).await; + } } }) }) diff --git a/tests/runtime_async_std.rs b/tests/runtime_async_std.rs index 58584e8f..2b1ea860 100644 --- a/tests/runtime_async_std.rs +++ b/tests/runtime_async_std.rs @@ -26,16 +26,24 @@ async fn main() { async_std::task::spawn(async move { // Insert 64 entries. (NUM_KEYS_PER_TASK = 64) for key in start..end { - // insert() is an async method, so await it - my_cache.insert(key, value(key)).await; + if key % 8 == 0 { + my_cache.blocking_insert(key, value(key)); + } else { + // insert() is an async method, so await it + my_cache.insert(key, value(key)).await; + } // get() returns Option, a clone of the stored value. assert_eq!(my_cache.get(&key), Some(value(key))); } // Invalidate every 4 element of the inserted entries. for key in (start..end).step_by(4) { - // invalidate() is an async method, so await it - my_cache.invalidate(&key).await; + if key % 8 == 0 { + my_cache.blocking_invalidate(&key).await; + } else { + // invalidate() is an async method, so await it + my_cache.invalidate(&key).await; + } } }) }) diff --git a/tests/runtime_tokio.rs b/tests/runtime_tokio.rs index f82eaf82..09cc5c03 100644 --- a/tests/runtime_tokio.rs +++ b/tests/runtime_tokio.rs @@ -26,16 +26,24 @@ async fn main() { tokio::spawn(async move { // Insert 64 entries. (NUM_KEYS_PER_TASK = 64) for key in start..end { - // insert() is an async method, so await it - my_cache.insert(key, value(key)).await; + if key % 8 == 0 { + my_cache.blocking_insert(key, value(key)); + } else { + // insert() is an async method, so await it + my_cache.insert(key, value(key)).await; + } // get() returns Option, a clone of the stored value. assert_eq!(my_cache.get(&key), Some(value(key))); } // Invalidate every 4 element of the inserted entries. for key in (start..end).step_by(4) { - // invalidate() is an async method, so await it - my_cache.invalidate(&key).await; + if key % 8 == 0 { + my_cache.blocking_invalidate(&key).await; + } else { + // invalidate() is an async method, so await it + my_cache.invalidate(&key).await; + } } }) }) From 3f8e444e8b8b7423d0340a8a9332b1eabd52dd7c Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Tue, 16 Feb 2021 08:48:27 +0800 Subject: [PATCH 4/6] Add "Rust Cache" action to a CI (GitHub Actions) --- .github/workflows/CI.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 8f778ae1..65a3ca0d 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -35,6 +35,8 @@ jobs: override: true components: rustfmt, clippy + - uses: Swatinem/rust-cache@v1 + - name: Build (no features) uses: actions-rs/cargo@v1 with: From 6ee47df77895438bdba33933447255da874bf99f Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Tue, 16 Feb 2021 20:01:48 +0800 Subject: [PATCH 5/6] Implement unsync::Cache - Add support for time-to-idle and time-to-live to the unsync cache. --- src/sync.rs | 10 +- src/sync/base_cache.rs | 12 +- src/unsync.rs | 81 ++++++------- src/unsync/builder.rs | 159 +++++++++++++++++++++++++ src/unsync/cache.rs | 262 +++++++++++++++++++++-------------------- src/unsync/deques.rs | 6 - 6 files changed, 336 insertions(+), 194 deletions(-) create mode 100644 src/unsync/builder.rs diff --git a/src/sync.rs b/src/sync.rs index b8d8d1b5..1326be73 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -96,16 +96,14 @@ impl ValueEntry { value: V, last_accessed: Option, last_modified: Option, - access_order_q_node: Option>, - write_order_q_node: Option>, ) -> Self { Self { value, last_accessed: last_accessed.map(|ts| Arc::new(AtomicU64::new(ts.as_u64()))), last_modified: last_modified.map(|ts| Arc::new(AtomicU64::new(ts.as_u64()))), nodes: Mutex::new(DeqNodes { - access_order_q_node, - write_order_q_node, + access_order_q_node: None, + write_order_q_node: None, }), } } @@ -217,7 +215,7 @@ impl AccessTime for DeqNode> { #[inline] fn set_last_accessed(&mut self, _timestamp: Instant) { - // do nothing + unreachable!(); } #[inline] @@ -273,7 +271,7 @@ impl AccessTime for DeqNode> { #[inline] fn set_last_modified(&mut self, _timestamp: Instant) { - // do nothing + unreachable!(); } } diff --git a/src/sync/base_cache.rs b/src/sync/base_cache.rs index 2a1525f3..4ef7a704 100644 --- a/src/sync/base_cache.rs +++ b/src/sync/base_cache.rs @@ -255,13 +255,7 @@ where last_modified = Some(ts); } } - let entry = Arc::new(ValueEntry::new( - value.clone(), - last_accessed, - last_modified, - None, - None, - )); + let entry = Arc::new(ValueEntry::new(value.clone(), last_accessed, last_modified)); let cnt = op_cnt1.fetch_add(1, Ordering::Relaxed); op1 = Some(( cnt, @@ -732,7 +726,7 @@ where None } }) - .unwrap_or(None); + .unwrap_or_default(); if key.is_none() { break; @@ -760,7 +754,7 @@ where None } }) - .unwrap_or(None); + .unwrap_or_default(); if key.is_none() { break; diff --git a/src/unsync.rs b/src/unsync.rs index 23cf68b9..34701d1d 100644 --- a/src/unsync.rs +++ b/src/unsync.rs @@ -1,8 +1,10 @@ +pub(crate) mod builder; pub(crate) mod cache; mod deques; -use std::{cell::RefCell, ptr::NonNull, rc::Rc}; +use std::{ptr::NonNull, rc::Rc}; +pub use builder::CacheBuilder; pub use cache::Cache; use quanta::Instant; @@ -48,89 +50,82 @@ struct DeqNodes { pub(crate) struct ValueEntry { pub(crate) value: V, - deq_nodes: RefCell>, + deq_nodes: DeqNodes, } impl ValueEntry { - pub(crate) fn new( - value: V, - access_order_q_node: Option>, - write_order_q_node: Option>, - ) -> Self { + pub(crate) fn new(value: V) -> Self { Self { value, - deq_nodes: RefCell::new(DeqNodes { - access_order_q_node, - write_order_q_node, - }), + deq_nodes: DeqNodes { + access_order_q_node: None, + write_order_q_node: None, + }, } } - // #[inline] - // pub(crate) fn set_deq_nodes(&self, access_order_q_node: Option>, write_order_q_node: Option>) { - // let mut deq_nodes = self.deq_nodes.borrow_mut(); - // deq_nodes.access_order_q_node = access_order_q_node; - // deq_nodes.write_order_q_node = write_order_q_node; - // } - #[inline] - pub(crate) fn replace_deq_nodes_with(&self, other: Self) { - let mut self_deq_nodes = self.deq_nodes.borrow_mut(); - let mut other_deq_nodes = other.deq_nodes.borrow_mut(); - self_deq_nodes.access_order_q_node = other_deq_nodes.access_order_q_node.take(); - self_deq_nodes.write_order_q_node = other_deq_nodes.write_order_q_node.take(); + pub(crate) fn replace_deq_nodes_with(&mut self, mut other: Self) { + self.deq_nodes.access_order_q_node = other.deq_nodes.access_order_q_node.take(); + self.deq_nodes.write_order_q_node = other.deq_nodes.write_order_q_node.take(); } #[inline] pub(crate) fn access_order_q_node(&self) -> Option> { - self.deq_nodes.borrow().access_order_q_node + self.deq_nodes.access_order_q_node } #[inline] pub(crate) fn set_access_order_q_node(&mut self, node: Option>) { - self.deq_nodes.borrow_mut().access_order_q_node = node; + self.deq_nodes.access_order_q_node = node; } #[inline] pub(crate) fn take_access_order_q_node(&mut self) -> Option> { - self.deq_nodes.borrow_mut().access_order_q_node.take() + self.deq_nodes.access_order_q_node.take() } #[inline] pub(crate) fn write_order_q_node(&self) -> Option> { - self.deq_nodes.borrow().write_order_q_node + self.deq_nodes.write_order_q_node } #[inline] pub(crate) fn set_write_order_q_node(&mut self, node: Option>) { - self.deq_nodes.borrow_mut().write_order_q_node = node; + self.deq_nodes.write_order_q_node = node; } #[inline] pub(crate) fn take_write_order_q_node(&mut self) -> Option> { - self.deq_nodes.borrow_mut().write_order_q_node.take() + self.deq_nodes.write_order_q_node.take() } +} - // #[inline] - // fn last_accessed(&self) -> Option { - // todo!() - // } +impl AccessTime for ValueEntry { + #[inline] + fn last_accessed(&self) -> Option { + self.access_order_q_node() + .and_then(|node| unsafe { node.as_ref() }.element.timestamp) + } #[inline] - fn set_last_accessed(&self, timestamp: Instant) { - if let Some(mut node) = self.deq_nodes.borrow_mut().access_order_q_node { + fn set_last_accessed(&mut self, timestamp: Instant) { + if let Some(mut node) = self.deq_nodes.access_order_q_node { unsafe { node.as_mut() }.set_last_accessed(timestamp); } } - // #[inline] - // fn last_modified(&self) -> Option { - // todo!() - // } + #[inline] + fn last_modified(&self) -> Option { + self.write_order_q_node() + .and_then(|node| unsafe { node.as_ref() }.element.timestamp) + } #[inline] - fn set_last_modified(&self, _timestamp: Instant) { - todo!() + fn set_last_modified(&mut self, timestamp: Instant) { + if let Some(mut node) = self.deq_nodes.write_order_q_node { + unsafe { node.as_mut() }.set_last_modified(timestamp); + } } } @@ -142,7 +137,7 @@ impl AccessTime for DeqNode> { #[inline] fn set_last_accessed(&mut self, _timestamp: Instant) { - // do nothing + unreachable!(); } #[inline] @@ -174,6 +169,6 @@ impl AccessTime for DeqNode> { #[inline] fn set_last_modified(&mut self, _timestamp: Instant) { - // do nothing + unreachable!(); } } diff --git a/src/unsync/builder.rs b/src/unsync/builder.rs new file mode 100644 index 00000000..85692755 --- /dev/null +++ b/src/unsync/builder.rs @@ -0,0 +1,159 @@ +use super::Cache; + +use std::{ + collections::hash_map::RandomState, + hash::{BuildHasher, Hash}, + marker::PhantomData, + time::Duration, +}; + +/// Builds a [`Cache`][cache-struct] with various configuration knobs. +/// +/// [cache-struct]: ./struct.Cache.html +/// +/// # Examples +/// +/// ```rust +/// use moka::future::CacheBuilder; +/// +/// use std::time::Duration; +/// +/// let cache = CacheBuilder::new(10_000) // Max 10,000 elements +/// // Time to live (TTL): 30 minutes +/// .time_to_live(Duration::from_secs(30 * 60)) +/// // Time to idle (TTI): 5 minutes +/// .time_to_idle(Duration::from_secs( 5 * 60)) +/// // Create the cache. +/// .build(); +/// +/// // This entry will expire after 5 minutes (TTI) if there is no get(). +/// cache.insert(0, "zero"); +/// +/// // This get() will extend the entry life for another 5 minutes. +/// cache.get(&0); +/// +/// // Even though we keep calling get(), the entry will expire +/// // after 30 minutes (TTL) from the insert(). +/// ``` +/// +pub struct CacheBuilder { + max_capacity: usize, + initial_capacity: Option, + time_to_live: Option, + time_to_idle: Option, + cache_type: PhantomData, +} + +impl CacheBuilder> +where + K: Eq + Hash, + V: Clone, +{ + /// Construct a new `CacheBuilder` that will be used to build a `Cache` or + /// `SegmentedCache` holding up to `max_capacity` entries. + pub fn new(max_capacity: usize) -> Self { + Self { + max_capacity, + initial_capacity: None, + time_to_live: None, + time_to_idle: None, + cache_type: PhantomData::default(), + } + } + + /// Builds a `Cache`. + /// + /// If you want to build a `SegmentedCache`, call `segments` method before + /// calling this method. + pub fn build(self) -> Cache { + let build_hasher = RandomState::default(); + Cache::with_everything( + self.max_capacity, + self.initial_capacity, + build_hasher, + self.time_to_live, + self.time_to_idle, + ) + } + + /// Builds a `Cache`, with the given `hasher`. + /// + /// If you want to build a `SegmentedCache`, call `segments` method before + /// calling this method. + pub fn build_with_hasher(self, hasher: S) -> Cache + where + S: BuildHasher + Clone, + { + Cache::with_everything( + self.max_capacity, + self.initial_capacity, + hasher, + self.time_to_live, + self.time_to_idle, + ) + } +} + +impl CacheBuilder { + /// Sets the initial capacity of the cache. + pub fn initial_capacity(self, capacity: usize) -> Self { + Self { + initial_capacity: Some(capacity), + ..self + } + } + + /// Sets the time to live of the cache. + /// + /// A cached entry will be expired after the specified duration past from + /// `insert`. + pub fn time_to_live(self, duration: Duration) -> Self { + Self { + time_to_live: Some(duration), + ..self + } + } + + /// Sets the time to idle of the cache. + /// + /// A cached entry will be expired after the specified duration past from `get` + /// or `insert`. + pub fn time_to_idle(self, duration: Duration) -> Self { + Self { + time_to_idle: Some(duration), + ..self + } + } +} + +#[cfg(test)] +mod tests { + use super::CacheBuilder; + + use std::time::Duration; + + #[tokio::test] + async fn build_cache() { + // Cache + let mut cache = CacheBuilder::new(100).build(); + + assert_eq!(cache.max_capacity(), 100); + assert_eq!(cache.time_to_live(), None); + assert_eq!(cache.time_to_idle(), None); + + cache.insert('a', "Alice"); + assert_eq!(cache.get(&'a'), Some(&"Alice")); + + let mut cache = CacheBuilder::new(100) + .time_to_live(Duration::from_secs(45 * 60)) + .time_to_idle(Duration::from_secs(15 * 60)) + .build(); + + assert_eq!(cache.max_capacity(), 100); + assert_eq!(cache.time_to_live(), Some(Duration::from_secs(45 * 60))); + assert_eq!(cache.time_to_idle(), Some(Duration::from_secs(15 * 60))); + + cache.insert('a', "Alice"); + assert_eq!(cache.get(&'a'), Some(&"Alice")); + } +} diff --git a/src/unsync/cache.rs b/src/unsync/cache.rs index 0d64d492..b155f9b8 100644 --- a/src/unsync/cache.rs +++ b/src/unsync/cache.rs @@ -15,8 +15,6 @@ use std::{ time::Duration, }; -const EVICTION_BATCH_SIZE: usize = 100; - type CacheStore = std::collections::HashMap, ValueEntry, S>; pub struct Cache { @@ -41,6 +39,9 @@ where } } +// +// public +// impl Cache where K: Hash + Eq, @@ -79,18 +80,18 @@ where { let hash = self.hash(key); let has_expiry = self.has_expiry(); - - let now; - if has_expiry { - let now1 = self.current_time_from_expiration_clock(); - self.evict(now1, EVICTION_BATCH_SIZE); - now = Some(now1); + let timestamp = if has_expiry { + Some(self.current_time_from_expiration_clock()) } else { - now = None; + None }; + if let Some(ts) = timestamp { + self.evict(ts); + } + let (entry, sketch, deqs) = ( - self.cache.get(key), + self.cache.get_mut(key), &mut self.frequency_sketch, &mut self.deques, ); @@ -108,36 +109,34 @@ where } // Value found, need to check if expired. (Some(entry), true) => { - // if self.is_expired_entry_wo( - // unsafe { entry.access_order_q_node().borrow().as_ref() }, - // now, - // ) || self.is_expired_entry_ao( - // unsafe { entry.write_order_q_node().borrow().as_ref() }, - // now, - // ) { - // // Expired entry. Record this access as a cache miss rather than a hit. - // self.record_read(hash, None, None); - // None - // } else { - // Valid entry. - Self::record_read(sketch, deqs, hash, Some(entry), now); - Some(&entry.value) - // } + if Self::is_expired_entry_wo(&self.time_to_live, entry, timestamp.unwrap()) + || Self::is_expired_entry_ao(&self.time_to_idle, entry, timestamp.unwrap()) + { + // Expired entry. Record this access as a cache miss rather than a hit. + Self::record_read(sketch, deqs, hash, None, None); + None + } else { + // Valid entry. + Self::record_read(sketch, deqs, hash, Some(entry), timestamp); + Some(&entry.value) + } } } } pub fn insert(&mut self, key: K, value: V) { - let mut timestamp = None; + let timestamp = if self.has_expiry() { + Some(self.current_time_from_expiration_clock()) + } else { + None + }; - if self.has_expiry() { - let now = self.current_time_from_expiration_clock(); - self.evict(now, EVICTION_BATCH_SIZE); - timestamp = Some(now); + if let Some(ts) = timestamp { + self.evict(ts); } let key = Rc::new(key); - let entry = ValueEntry::new(value, None, None); + let entry = ValueEntry::new(value); if let Some(old_entry) = self.cache.insert(Rc::clone(&key), entry) { self.handle_update(key, timestamp, old_entry); @@ -154,8 +153,8 @@ where Q: Hash + Eq + ?Sized, { if self.has_expiry() { - let now = self.current_time_from_expiration_clock(); - self.evict(now, EVICTION_BATCH_SIZE); + let ts = self.current_time_from_expiration_clock(); + self.evict(ts); } if let Some(mut entry) = self.cache.remove(key) { @@ -178,7 +177,9 @@ where } } -// private methods +// +// private +// impl Cache where K: Hash + Eq, @@ -225,10 +226,13 @@ where } #[inline] - fn is_expired_entry_wo(&self, entry: &impl AccessTime, now: Instant) -> bool { - debug_assert!(self.has_expiry()); - if let (Some(ts), Some(ttl)) = (entry.last_modified(), self.time_to_live) { - if ts + ttl <= now { + fn is_expired_entry_wo( + time_to_live: &Option, + entry: &impl AccessTime, + now: Instant, + ) -> bool { + if let (Some(ts), Some(ttl)) = (entry.last_modified(), time_to_live) { + if ts + *ttl <= now { return true; } } @@ -239,7 +243,7 @@ where frequency_sketch: &mut FrequencySketch, deques: &mut Deques, hash: u64, - entry: Option<&ValueEntry>, + entry: Option<&mut ValueEntry>, timestamp: Option, ) { frequency_sketch.increment(hash); @@ -252,18 +256,7 @@ where } #[inline] - fn handle_insert(&mut self, key: Rc, hash: u64, _timestamp: Option) { - let last_accessed = None; - // let last_accessed = entry.raw_last_accessed().map(|ts| { - // ts.store(timestamp.unwrap().as_u64(), Ordering::Relaxed); - // ts - // }); - let last_modified = None; - // let last_modified = entry.raw_last_modified().map(|ts| { - // ts.store(timestamp.unwrap().as_u64(), Ordering::Relaxed); - // ts - // }); - + fn handle_insert(&mut self, key: Rc, hash: u64, timestamp: Option) { let has_free_space = self.cache.len() <= self.max_capacity; let (cache, deqs, freq) = (&mut self.cache, &mut self.deques, &self.frequency_sketch); @@ -273,12 +266,12 @@ where let mut entry = cache.get_mut(&key).unwrap(); deqs.push_back_ao( CacheRegion::MainProbation, - KeyHashDate::new(key, hash, last_accessed), + KeyHashDate::new(Rc::clone(&key), hash, timestamp), &mut entry, ); - // if self.time_to_live.is_some() { - // deqs.push_back_wo(KeyDate::new(key, last_modified), &entry); - // } + if self.time_to_live.is_some() { + deqs.push_back_wo(KeyDate::new(key, timestamp), &mut entry); + } } else { let victim = Self::find_cache_victim(deqs, freq); if Self::admit(hash, victim, freq) { @@ -302,11 +295,11 @@ where let key = Rc::clone(&key); deqs.push_back_ao( CacheRegion::MainProbation, - KeyHashDate::new(Rc::clone(&key), hash, last_accessed), + KeyHashDate::new(Rc::clone(&key), hash, timestamp), &mut entry, ); if self.time_to_live.is_some() { - deqs.push_back_wo(KeyDate::new(key, last_modified), &mut entry); + deqs.push_back_wo(KeyDate::new(key, timestamp), &mut entry); } } else { // Remove the candidate from the cache. @@ -353,9 +346,11 @@ where deqs.move_to_back_wo(&entry) } - fn evict(&mut self, now: Instant, batch_size: usize) { + fn evict(&mut self, now: Instant) { + const EVICTION_BATCH_SIZE: usize = 100; + if self.time_to_live.is_some() { - self.remove_expired_wo(batch_size, now); + self.remove_expired_wo(EVICTION_BATCH_SIZE, now); } if self.time_to_idle.is_some() { @@ -370,7 +365,15 @@ where ); let mut rm_expired_ao = |name, deq| { - Self::remove_expired_ao(name, deq, wo, cache, time_to_idle, batch_size, now) + Self::remove_expired_ao( + name, + deq, + wo, + cache, + time_to_idle, + EVICTION_BATCH_SIZE, + now, + ) }; rm_expired_ao("window", window); @@ -399,7 +402,7 @@ where None } }) - .unwrap_or(None); + .unwrap_or_default(); if key.is_none() { break; @@ -416,19 +419,20 @@ where #[inline] fn remove_expired_wo(&mut self, batch_size: usize, now: Instant) { + let time_to_live = &self.time_to_live; for _ in 0..batch_size { let key = self .deques .write_order .peek_front() .and_then(|node| { - if self.is_expired_entry_wo(&*node, now) { + if Self::is_expired_entry_wo(time_to_live, &*node, now) { Some(Some(Rc::clone(&node.element.key))) } else { None } }) - .unwrap_or(None); + .unwrap_or_default(); if key.is_none() { break; @@ -444,26 +448,28 @@ where } } -// For unit tests. -// #[cfg(test)] -// impl Cache -// where -// K: Hash + Eq, -// S: BuildHasher + Clone, -// { -// fn set_expiration_clock(&self, clock: Option) { -// todo!() -// } -// } +// +// for testing +// +#[cfg(test)] +impl Cache +where + K: Hash + Eq, + S: BuildHasher + Clone, +{ + fn set_expiration_clock(&mut self, clock: Option) { + self.expiration_clock = clock; + } +} // To see the debug prints, run test as `cargo test -- --nocapture` #[cfg(test)] mod tests { use super::Cache; - // use crate::unsync::CacheBuilder; + use crate::unsync::CacheBuilder; - // use quanta::Clock; - // use std::time::Duration; + use quanta::Clock; + use std::time::Duration; #[test] fn basic_single_thread() { @@ -502,84 +508,80 @@ mod tests { assert_eq!(cache.get(&"b"), None); } - // #[test] - // fn time_to_live() { - // let mut cache = CacheBuilder::new(100) - // .time_to_live(Duration::from_secs(10)) - // .build(); - - // cache.reconfigure_for_testing(); - - // let (clock, mock) = Clock::mock(); - // cache.set_expiration_clock(Some(clock)); + #[test] + fn time_to_live() { + let mut cache = CacheBuilder::new(100) + .time_to_live(Duration::from_secs(10)) + .build(); - // cache.insert("a", "alice"); + let (clock, mock) = Clock::mock(); + cache.set_expiration_clock(Some(clock)); - // mock.increment(Duration::from_secs(5)); // 5 secs from the start. + cache.insert("a", "alice"); - // cache.get(&"a"); + mock.increment(Duration::from_secs(5)); // 5 secs from the start. - // mock.increment(Duration::from_secs(5)); // 10 secs. + cache.get(&"a"); - // assert_eq!(cache.get(&"a"), None); - // assert!(cache.base.is_empty()); + mock.increment(Duration::from_secs(5)); // 10 secs. - // cache.insert("b", "bob"); + assert_eq!(cache.get(&"a"), None); + assert!(cache.cache.is_empty()); - // assert_eq!(cache.base.len(), 1); + cache.insert("b", "bob"); - // mock.increment(Duration::from_secs(5)); // 15 secs. + assert_eq!(cache.cache.len(), 1); - // assert_eq!(cache.get(&"b"), Some("bob")); - // assert_eq!(cache.base.len(), 1); + mock.increment(Duration::from_secs(5)); // 15 secs. - // cache.insert("b", "bill"); + assert_eq!(cache.get(&"b"), Some(&"bob")); + assert_eq!(cache.cache.len(), 1); - // mock.increment(Duration::from_secs(5)); // 20 secs + cache.insert("b", "bill"); - // assert_eq!(cache.get(&"b"), Some("bill")); - // assert_eq!(cache.cache.len(), 1); + mock.increment(Duration::from_secs(5)); // 20 secs - // mock.increment(Duration::from_secs(5)); // 25 secs + assert_eq!(cache.get(&"b"), Some(&"bill")); + assert_eq!(cache.cache.len(), 1); - // assert_eq!(cache.get(&"a"), None); - // assert_eq!(cache.get(&"b"), None); - // assert!(cache.base.is_empty()); - // } + mock.increment(Duration::from_secs(5)); // 25 secs - // #[test] - // fn time_to_idle() { - // let mut cache = CacheBuilder::new(100) - // .time_to_idle(Duration::from_secs(10)) - // .build(); + assert_eq!(cache.get(&"a"), None); + assert_eq!(cache.get(&"b"), None); + assert!(cache.cache.is_empty()); + } - // cache.reconfigure_for_testing(); + #[test] + fn time_to_idle() { + let mut cache = CacheBuilder::new(100) + .time_to_idle(Duration::from_secs(10)) + .build(); - // let (clock, mock) = Clock::mock(); - // cache.set_expiration_clock(Some(clock)); + let (clock, mock) = Clock::mock(); + cache.set_expiration_clock(Some(clock)); - // cache.insert("a", "alice"); + cache.insert("a", "alice"); - // mock.increment(Duration::from_secs(5)); // 5 secs from the start. + mock.increment(Duration::from_secs(5)); // 5 secs from the start. - // assert_eq!(cache.get(&"a"), Some("alice")); + assert_eq!(cache.get(&"a"), Some(&"alice")); - // mock.increment(Duration::from_secs(5)); // 10 secs. + mock.increment(Duration::from_secs(5)); // 10 secs. - // cache.insert("b", "bob"); + cache.insert("b", "bob"); - // assert_eq!(cache.base.len(), 2); + assert_eq!(cache.cache.len(), 2); - // mock.increment(Duration::from_secs(5)); // 15 secs. + mock.increment(Duration::from_secs(5)); // 15 secs. - // assert_eq!(cache.get(&"a"), None); - // assert_eq!(cache.get(&"b"), Some("bob")); - // assert_eq!(cache.base.len(), 1); + assert_eq!(cache.get(&"a"), None); + assert_eq!(cache.get(&"b"), Some(&"bob")); + assert_eq!(cache.cache.len(), 1); - // mock.increment(Duration::from_secs(10)); // 25 secs + mock.increment(Duration::from_secs(10)); // 25 secs - // assert_eq!(cache.get(&"a"), None); - // assert_eq!(cache.get(&"b"), None); - // assert!(cache.base.is_empty()); - // } + assert_eq!(cache.get(&"a"), None); + assert_eq!(cache.get(&"b"), None); + assert!(cache.cache.is_empty()); + } } diff --git a/src/unsync/deques.rs b/src/unsync/deques.rs index 640bf7d1..26cdfd76 100644 --- a/src/unsync/deques.rs +++ b/src/unsync/deques.rs @@ -10,12 +10,6 @@ pub(crate) struct Deques { pub(crate) write_order: Deque>, } -#[cfg(feature = "future")] -// Multi-threaded async runtimes require base_cache::Inner to be Send, but it will -// not be without this `unsafe impl`. This is because DeqNodes have NonNull -// pointers. -unsafe impl Send for Deques {} - impl Default for Deques { fn default() -> Self { Self { From 39f782cf86ccb1ba7252164e3dfa36c2a1a6253c Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Tue, 16 Feb 2021 20:10:15 +0800 Subject: [PATCH 6/6] Fix a wrong import in a doc test in unsync::CacheBuilder --- src/unsync/builder.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/unsync/builder.rs b/src/unsync/builder.rs index 85692755..89edd0f2 100644 --- a/src/unsync/builder.rs +++ b/src/unsync/builder.rs @@ -14,11 +14,11 @@ use std::{ /// # Examples /// /// ```rust -/// use moka::future::CacheBuilder; +/// use moka::unsync::CacheBuilder; /// /// use std::time::Duration; /// -/// let cache = CacheBuilder::new(10_000) // Max 10,000 elements +/// let mut cache = CacheBuilder::new(10_000) // Max 10,000 elements /// // Time to live (TTL): 30 minutes /// .time_to_live(Duration::from_secs(30 * 60)) /// // Time to idle (TTI): 5 minutes