Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make a single call to run_pending_tasks to evict as many entries as possible from the cache #417

Merged
merged 9 commits into from
Apr 16, 2024
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Moka Cache — Change Log

## Version 0.12.7

### Changed

- Ensure a single call to `run_pending_tasks` to evict as many entries as possible
from the cache ([#417][gh-pull-0417]).


## Version 0.12.6

### Fixed
Expand Down Expand Up @@ -872,6 +880,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (Mar 25, 2021).
[gh-issue-0034]: https://github.com/moka-rs/moka/issues/34/
[gh-issue-0031]: https://github.com/moka-rs/moka/issues/31/

[gh-pull-0417]: https://github.com/moka-rs/moka/pull/417/
[gh-pull-0390]: https://github.com/moka-rs/moka/pull/390/
[gh-pull-0384]: https://github.com/moka-rs/moka/pull/384/
[gh-pull-0382]: https://github.com/moka-rs/moka/pull/382/
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "moka"
version = "0.12.6"
version = "0.12.7"
edition = "2021"
# Rust 1.65 was released on Nov 3, 2022.
rust-version = "1.65"
Expand Down
56 changes: 56 additions & 0 deletions src/common.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

pub(crate) mod builder_utils;
pub(crate) mod concurrent;
pub(crate) mod deque;
Expand All @@ -10,6 +12,11 @@ pub(crate) mod timer_wheel;
#[cfg(test)]
pub(crate) mod test_utils;

use self::concurrent::constants::{
DEFAULT_EVICTION_BATCH_SIZE, DEFAULT_MAINTENANCE_TASK_TIMEOUT_MILLIS,
DEFAULT_MAX_LOG_SYNC_REPEATS,
};

// Note: `CacheRegion` cannot have more than four enum variants. This is because
// `crate::{sync,unsync}::DeqNodes` uses a `tagptr::TagNonNull<DeqNode<T>, 2>`
// pointer, where the 2-bit tag is `CacheRegion`.
Expand Down Expand Up @@ -56,6 +63,55 @@ impl PartialEq<usize> for CacheRegion {
}
}

#[derive(Clone, Debug)]
pub(crate) struct HousekeeperConfig {
/// The timeout duration for the `run_pending_tasks` method. This is a safe-guard
/// to prevent cache read/write operations (that may call `run_pending_tasks`
/// internally) from being blocked for a long time when the user wrote a slow
/// eviction listener closure.
///
/// Used only when the eviction listener closure is set for the cache instance.
///
/// Default: `DEFAULT_MAINTENANCE_TASK_TIMEOUT_MILLIS`
pub(crate) maintenance_task_timeout: Duration,
/// The maximum repeat count for receiving operation logs from the read and write
/// log channels. Default: `MAX_LOG_SYNC_REPEATS`.
pub(crate) max_log_sync_repeats: u32,
/// The batch size of entries to be processed by each internal eviction method.
/// Default: `EVICTION_BATCH_SIZE`.
pub(crate) eviction_batch_size: u32,
}

impl Default for HousekeeperConfig {
fn default() -> Self {
Self {
maintenance_task_timeout: Duration::from_millis(
DEFAULT_MAINTENANCE_TASK_TIMEOUT_MILLIS,
),
max_log_sync_repeats: DEFAULT_MAX_LOG_SYNC_REPEATS as u32,
eviction_batch_size: DEFAULT_EVICTION_BATCH_SIZE,
}
}
}

impl HousekeeperConfig {
#[cfg(test)]
pub(crate) fn new(
maintenance_task_timeout: Option<Duration>,
max_log_sync_repeats: Option<u32>,
eviction_batch_size: Option<u32>,
) -> Self {
Self {
maintenance_task_timeout: maintenance_task_timeout.unwrap_or(Duration::from_millis(
DEFAULT_MAINTENANCE_TASK_TIMEOUT_MILLIS,
)),
max_log_sync_repeats: max_log_sync_repeats
.unwrap_or(DEFAULT_MAX_LOG_SYNC_REPEATS as u32),
eviction_batch_size: eviction_batch_size.unwrap_or(DEFAULT_EVICTION_BATCH_SIZE),
}
}
}

// Ensures the value fits in a range of `128u32..=u32::MAX`.
pub(crate) fn sketch_capacity(max_capacity: u64) -> u32 {
max_capacity.try_into().unwrap_or(u32::MAX).max(128)
Expand Down
24 changes: 18 additions & 6 deletions src/common/concurrent/constants.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
pub(crate) const MAX_SYNC_REPEATS: usize = 4;
pub(crate) const PERIODICAL_SYNC_INITIAL_DELAY_MILLIS: u64 = 300;
pub(crate) const DEFAULT_MAX_LOG_SYNC_REPEATS: usize = 4;
pub(crate) const LOG_SYNC_INTERVAL_MILLIS: u64 = 300;

pub(crate) const READ_LOG_FLUSH_POINT: usize = 512;
pub(crate) const READ_LOG_SIZE: usize = READ_LOG_FLUSH_POINT * (MAX_SYNC_REPEATS + 2);
pub(crate) const READ_LOG_FLUSH_POINT: usize = 64;
pub(crate) const WRITE_LOG_FLUSH_POINT: usize = 64;

pub(crate) const WRITE_LOG_FLUSH_POINT: usize = 512;
pub(crate) const WRITE_LOG_SIZE: usize = WRITE_LOG_FLUSH_POINT * (MAX_SYNC_REPEATS + 2);
// 384 elements
pub(crate) const READ_LOG_CH_SIZE: usize =
READ_LOG_FLUSH_POINT * (DEFAULT_MAX_LOG_SYNC_REPEATS + 2);

// 384 elements
pub(crate) const WRITE_LOG_CH_SIZE: usize =
WRITE_LOG_FLUSH_POINT * (DEFAULT_MAX_LOG_SYNC_REPEATS + 2);

// TODO: Calculate the batch size based on the number of entries in the cache (or an
// estimated number of entries to evict)
pub(crate) const DEFAULT_EVICTION_BATCH_SIZE: u32 = WRITE_LOG_CH_SIZE as u32;

/// The default timeout duration for the `run_pending_tasks` method.
pub(crate) const DEFAULT_MAINTENANCE_TASK_TIMEOUT_MILLIS: u64 = 100;

#[cfg(feature = "sync")]
pub(crate) const WRITE_RETRY_INTERVAL_MICROS: u64 = 50;
76 changes: 66 additions & 10 deletions src/common/concurrent/housekeeper.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use super::constants::{MAX_SYNC_REPEATS, PERIODICAL_SYNC_INITIAL_DELAY_MILLIS};
use super::constants::LOG_SYNC_INTERVAL_MILLIS;

use super::{
atomic_time::AtomicInstant,
constants::{READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT},
};
use crate::common::time::{CheckedTimeOps, Instant};
use crate::common::HousekeeperConfig;

use parking_lot::{Mutex, MutexGuard};
use std::{
Expand All @@ -13,34 +14,85 @@ use std::{
};

pub(crate) trait InnerSync {
fn run_pending_tasks(&self, max_sync_repeats: usize);
/// Runs the pending tasks. Returns `true` if there are more entries to evict in
/// next run.
fn run_pending_tasks(
&self,
timeout: Option<Duration>,
max_log_sync_repeats: u32,
eviction_batch_size: u32,
) -> bool;

fn now(&self) -> Instant;
}

pub(crate) struct Housekeeper {
run_lock: Mutex<()>,
run_after: AtomicInstant,
/// A flag to indicate if the last call on `run_pending_tasks` method left some
/// entries to evict.
///
/// Used only when the eviction listener closure is set for this cache instance
/// because, if not, `run_pending_tasks` will never leave entries to evict.
more_entries_to_evict: Option<AtomicBool>,
/// The timeout duration for the `run_pending_tasks` method. This is a safe-guard
/// to prevent cache read/write operations (that may call `run_pending_tasks`
/// internally) from being blocked for a long time when the user wrote a slow
/// eviction listener closure.
///
/// Used only when the eviction listener closure is set for this cache instance.
maintenance_task_timeout: Option<Duration>,
/// The maximum repeat count for receiving operation logs from the read and write
/// log channels. Default: `MAX_LOG_SYNC_REPEATS`.
max_log_sync_repeats: u32,
/// The batch size of entries to be processed by each internal eviction method.
/// Default: `EVICTION_BATCH_SIZE`.
eviction_batch_size: u32,
auto_run_enabled: AtomicBool,
}

impl Default for Housekeeper {
fn default() -> Self {
impl Housekeeper {
pub(crate) fn new(is_eviction_listener_enabled: bool, config: HousekeeperConfig) -> Self {
let (more_entries_to_evict, maintenance_task_timeout) = if is_eviction_listener_enabled {
(
Some(AtomicBool::new(false)),
Some(config.maintenance_task_timeout),
)
} else {
(None, None)
};

Self {
run_lock: Mutex::default(),
run_after: AtomicInstant::new(Self::sync_after(Instant::now())),
more_entries_to_evict,
maintenance_task_timeout,
max_log_sync_repeats: config.max_log_sync_repeats,
eviction_batch_size: config.eviction_batch_size,
auto_run_enabled: AtomicBool::new(true),
}
}
}

impl Housekeeper {
pub(crate) fn should_apply_reads(&self, ch_len: usize, now: Instant) -> bool {
self.should_apply(ch_len, READ_LOG_FLUSH_POINT / 8, now)
self.more_entries_to_evict() || self.should_apply(ch_len, READ_LOG_FLUSH_POINT, now)
}

pub(crate) fn should_apply_writes(&self, ch_len: usize, now: Instant) -> bool {
self.should_apply(ch_len, WRITE_LOG_FLUSH_POINT / 8, now)
self.more_entries_to_evict() || self.should_apply(ch_len, WRITE_LOG_FLUSH_POINT, now)
}

#[inline]
fn more_entries_to_evict(&self) -> bool {
self.more_entries_to_evict
.as_ref()
.map(|v| v.load(Ordering::Acquire))
.unwrap_or(false)
}

fn set_more_entries_to_evict(&self, v: bool) {
if let Some(flag) = &self.more_entries_to_evict {
flag.store(v, Ordering::Release);
}
}

#[inline]
Expand All @@ -66,11 +118,15 @@ impl Housekeeper {
fn do_run_pending_tasks<T: InnerSync>(&self, cache: &T, _lock: MutexGuard<'_, ()>) {
let now = cache.now();
self.run_after.set_instant(Self::sync_after(now));
cache.run_pending_tasks(MAX_SYNC_REPEATS);
let timeout = self.maintenance_task_timeout;
let repeats = self.max_log_sync_repeats;
let batch_size = self.eviction_batch_size;
let more_to_evict = cache.run_pending_tasks(timeout, repeats, batch_size);
self.set_more_entries_to_evict(more_to_evict);
}

fn sync_after(now: Instant) -> Instant {
let dur = Duration::from_millis(PERIODICAL_SYNC_INITIAL_DELAY_MILLIS);
let dur = Duration::from_millis(LOG_SYNC_INTERVAL_MILLIS);
let ts = now.checked_add(dur);
// Assuming that `now` is current wall clock time, this should never fail at
// least next millions of years.
Expand Down
Loading