Skip to content

Commit

Permalink
Entry Snapshot — Rename the cache.policy_snapshot() method to
Browse files Browse the repository at this point in the history
`cache.policy_ext().entry_snapshot()`.

Also remove the `estimated_frequency` field from the `EntryMetadata` struct.
  • Loading branch information
tatsuya6502 committed Jan 21, 2025
1 parent 5c0caaf commit acfdd8b
Show file tree
Hide file tree
Showing 9 changed files with 233 additions and 187 deletions.
7 changes: 5 additions & 2 deletions src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ pub(crate) mod deque;
pub mod entry;
pub(crate) mod error;
pub(crate) mod frequency_sketch;
pub(crate) mod time;
pub(crate) mod timer_wheel;

#[cfg(feature = "sync")]
pub(crate) mod policy_impl;

#[cfg(test)]
pub(crate) mod test_utils;
pub(crate) mod time;
pub(crate) mod timer_wheel;

use self::concurrent::constants::{
DEFAULT_EVICTION_BATCH_SIZE, DEFAULT_MAINTENANCE_TASK_TIMEOUT_MILLIS,
Expand Down
3 changes: 3 additions & 0 deletions src/common/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl<K> KeyHashDate<K> {
self.entry_info.last_accessed()
}

#[cfg(feature = "sync")]
pub(crate) fn expiration_time(&self) -> Option<Instant> {
self.entry_info.expiration_time()
}
Expand All @@ -89,8 +90,10 @@ impl<K> KeyHashDate<K> {
}
}

#[cfg(feature = "sync")]
pub(crate) type KeyHashDateNode<K> = DeqNode<KeyHashDate<K>>;

#[cfg(feature = "sync")]
pub(crate) type KeyHashDateNodePtr<K> = NonNull<KeyHashDateNode<K>>;

pub(crate) struct KvEntry<K, V> {
Expand Down
2 changes: 1 addition & 1 deletion src/common/concurrent/housekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::constants::LOG_SYNC_INTERVAL_MILLIS;
use super::constants::{READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT};
use crate::common::time::{AtomicInstant, Instant};
use crate::common::HousekeeperConfig;
use crate::entry::{EntrySnapshot, EntrySnapshotConfig};
use crate::policy::{EntrySnapshot, EntrySnapshotConfig};

use parking_lot::{Mutex, MutexGuard};
use std::marker::PhantomData;
Expand Down
2 changes: 2 additions & 0 deletions src/common/deque.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl<T> DeqNode<T> {
}
}

#[cfg(feature = "sync")]
pub(crate) fn prev_node_ptr(this: NonNull<Self>) -> Option<NonNull<DeqNode<T>>> {
unsafe { this.as_ref() }.prev
}
Expand Down Expand Up @@ -166,6 +167,7 @@ impl<T> Deque<T> {
self.tail.as_ref().map(|node| unsafe { node.as_ref() })
}

#[cfg(feature = "sync")]
pub(crate) fn peek_back_ptr(&self) -> Option<NonNull<DeqNode<T>>> {
self.tail.as_ref().copied()
}
Expand Down
108 changes: 14 additions & 94 deletions src/common/entry.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use std::{
fmt::Debug,
hash::{BuildHasher, Hash},
sync::Arc,
time::{Duration, Instant},
};
use std::{fmt::Debug, sync::Arc, time::Instant};

use crate::sync::Cache;
#[cfg(feature = "sync")]
use std::time::Duration;

#[cfg(feature = "sync")]
use super::concurrent::KeyHashDate;

/// A snapshot of a single entry in the cache.
Expand Down Expand Up @@ -104,38 +101,39 @@ impl<K, V> Entry<K, V> {
pub struct EntryMetadata {
region: EntryRegion,
policy_weight: u32,
estimated_frequency: u8,
last_modified: Instant,
last_accessed: Instant,
expiration_time: Option<Instant>,
snapshot_at: Instant,
}

impl EntryMetadata {
pub fn new(
region: EntryRegion,
policy_weight: u32,
estimated_frequency: u8,
last_modified: Instant,
last_accessed: Instant,
expiration_time: Option<Instant>,
snapshot_at: Instant,
) -> Self {
Self {
region,
policy_weight,
estimated_frequency,
last_modified,
last_accessed,
expiration_time,
snapshot_at,
}
}

#[cfg(feature = "sync")]
pub(crate) fn from_element<K>(
region: EntryRegion,
estimated_frequency: u8,
element: &KeyHashDate<K>,
clock: &super::time::Clock,
time_to_live: Option<Duration>,
time_to_idle: Option<Duration>,
snapshot_at: Instant,
) -> Self {
// SAFETY: `last_accessed` and `last_modified` should be `Some` since we
// assume the element is not dirty. But we use `unwrap_or_default` to avoid
Expand Down Expand Up @@ -164,10 +162,10 @@ impl EntryMetadata {
Self {
region,
policy_weight: element.entry_info().policy_weight(),
estimated_frequency,
last_modified,
last_accessed,
expiration_time,
snapshot_at,
}
}

Expand All @@ -179,10 +177,6 @@ impl EntryMetadata {
self.policy_weight
}

pub fn estimated_frequency(&self) -> u8 {
self.estimated_frequency
}

pub fn last_modified(&self) -> Instant {
self.last_modified
}
Expand All @@ -194,88 +188,14 @@ impl EntryMetadata {
pub fn expiration_time(&self) -> Option<Instant> {
self.expiration_time
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EntryRegion {
Window,
Main,
}

#[derive(Debug, Clone)]
pub struct EntrySnapshot<K> {
snapshot_at: Instant,
coldest: Vec<(Arc<K>, EntryMetadata)>,
hottest: Vec<(Arc<K>, EntryMetadata)>,
}

impl<K> EntrySnapshot<K> {
pub(crate) fn new(
snapshot_at: Instant,
coldest: Vec<(Arc<K>, EntryMetadata)>,
hottest: Vec<(Arc<K>, EntryMetadata)>,
) -> Self {
Self {
snapshot_at,
coldest,
hottest,
}
}

pub fn snapshot_at(&self) -> Instant {
self.snapshot_at
}

pub fn coldest(&self) -> &[(Arc<K>, EntryMetadata)] {
&self.coldest
}

pub fn hottest(&self) -> &[(Arc<K>, EntryMetadata)] {
&self.hottest
}
}

pub struct EntrySnapshotRequest<K, V, S> {
cache: Cache<K, V, S>,
config: EntrySnapshotConfig,
}

impl<K, V, S> EntrySnapshotRequest<K, V, S> {
pub(crate) fn new_with_cache(cache: Cache<K, V, S>) -> Self {
Self {
cache,
config: EntrySnapshotConfig::default(),
}
}

pub fn with_coldest(self, count: usize) -> Self {
let mut req = self;
req.config.coldest = count;
req
}

pub fn with_hottest(self, count: usize) -> Self {
let mut req = self;
req.config.hottest = count;
req
}

// pub fn config(&self) -> &EntrySnapshotConfig {
// &self.config
// }

pub fn capture(self) -> EntrySnapshot<K>
where
K: Hash + Send + Sync + Eq + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
self.cache.capture_entry_snapshot(self.config)
}
}

#[derive(Debug, Clone, Default)]
pub(crate) struct EntrySnapshotConfig {
pub(crate) coldest: usize,
pub(crate) hottest: usize,
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EntryRegion {
Window,
Main,
}
86 changes: 86 additions & 0 deletions src/common/policy_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use std::sync::Arc;

use crate::{
common::deque::DeqNode,
policy::{EntrySnapshot, EntrySnapshotConfig, ExpirationPolicy},
};

use super::{
concurrent::{deques::Deques, KeyHashDateNodePtr},
entry::EntryMetadata,
time::Clock,
};

impl<K, V> ExpirationPolicy<K, V> {
pub(crate) fn capture_entry_snapshot(
&self,
sc: EntrySnapshotConfig,
deqs: &Deques<K>,
clock: &Clock,
) -> EntrySnapshot<K> {
use crate::entry::EntryRegion;

let coldest_entries = if sc.coldest == 0 {
Vec::new()
} else {
self.top_entries(
EntryRegion::Main,
sc.coldest,
deqs.probation.peek_front_ptr(),
DeqNode::next_node_ptr,
clock,
)
};

let hottest_entries = if sc.hottest == 0 {
Vec::new()
} else {
self.top_entries(
EntryRegion::Main,
sc.hottest,
deqs.probation.peek_back_ptr(),
DeqNode::prev_node_ptr,
clock,
)
};

EntrySnapshot::new(coldest_entries, hottest_entries)
}

fn top_entries(
&self,
region: crate::entry::EntryRegion,
count: usize,
head: Option<KeyHashDateNodePtr<K>>,
next_fun: fn(KeyHashDateNodePtr<K>) -> Option<KeyHashDateNodePtr<K>>,
clock: &Clock,
) -> Vec<(Arc<K>, EntryMetadata)> {
let mut entries = Vec::with_capacity(count);

let mut next = head;
while entries.len() < count {
let Some(current) = next.take() else {
break;
};
next = next_fun(current);

let elem = &unsafe { current.as_ref() }.element;
if elem.is_dirty() {
continue;
}

let snapshot_at = clock.to_std_instant(clock.now());
let md = EntryMetadata::from_element(
region,
elem,
clock,
self.time_to_live(),
self.time_to_idle(),
snapshot_at,
);
entries.push((Arc::clone(elem.key()), md));
}

entries
}
}
Loading

0 comments on commit acfdd8b

Please sign in to comment.