Skip to content

Commit

Permalink
Remove the thread pool from future::Cache
Browse files Browse the repository at this point in the history
Clean up dependencies and feature flags.
  • Loading branch information
tatsuya6502 committed Aug 5, 2023
1 parent 0d65e4b commit d637ecf
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 35 deletions.
45 changes: 15 additions & 30 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ default = ["sync", "atomic64", "quanta"]

# This feature is enabled by default. Disable it when you do not need
# `moka::sync::{Cache, SegmentedCache}`
sync = ["_core"]
sync = ["scheduled-thread-pool"]

# Enable this feature to use `moka::future::Cache`.
future = ["_core", "async-lock", "async-trait", "futures-util"]
future = ["async-lock", "async-trait", "futures-util"]

# Enable this feature to activate optional logging from caches.
# Currently cache will emit log only when it encounters a panic in user provided
Expand All @@ -46,41 +46,26 @@ js = ["uuid/js"]
# performance impacts and is intended for debugging purpose.
unstable-debug-counters = ["future"]

# A feature used internally.
_core = [
"crossbeam-channel",
"crossbeam-epoch",
"crossbeam-utils",
"once_cell",
"parking_lot",
"scheduled-thread-pool",
"smallvec",
"tagptr",
"thiserror",
"triomphe",
"uuid",
]

[dependencies]

# The "_core" dependencies used by "sync" and "future" features.
crossbeam-channel = { version = "0.5.5", optional = true }
crossbeam-utils = { version = "0.8", optional = true }
once_cell = { version = "1.7", optional = true }
parking_lot = { version = "0.12", optional = true }
scheduled-thread-pool = { version = "0.2.7", optional = true }
smallvec = { version = "1.8", optional = true }
tagptr = { version = "0.2", optional = true }
crossbeam-channel = { version = "0.5.5" }
crossbeam-epoch = { version = "0.9.9" }
crossbeam-utils = { version = "0.8" }
once_cell = { version = "1.7" }
parking_lot = { version = "0.12" }
smallvec = { version = "1.8" }
tagptr = { version = "0.2" }
thiserror = { version = "1.0" }
uuid = { version = "1.1", features = ["v4"] }

# Opt-out serde and stable_deref_trait features
# https://github.com/Manishearth/triomphe/pull/5
triomphe = { version = "0.1.3", default-features = false, optional = true }
triomphe = { version = "0.1.3", default-features = false }

# Optional dependencies (enabled by default)
crossbeam-epoch = { version = "0.9.9", optional = true }
quanta = { version = "0.11.0", optional = true }
thiserror = { version = "1.0", optional = true }
uuid = { version = "1.1", features = ["v4"], optional = true }

# Optional dependencies (sync)
scheduled-thread-pool = { version = "0.2.7", optional = true }

# Optional dependencies (future)
async-io = { version = "1.4", optional = true }
Expand Down
2 changes: 2 additions & 0 deletions src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl From<usize> for CacheRegion {
}
}

#[cfg(feature = "future")]
impl CacheRegion {
pub(crate) fn name(&self) -> &'static str {
match self {
Expand Down Expand Up @@ -63,6 +64,7 @@ pub(crate) fn sketch_capacity(max_capacity: u64) -> u32 {
max_capacity.try_into().unwrap_or(u32::MAX).max(128)
}

#[cfg(feature = "sync")]
pub(crate) fn available_parallelism() -> usize {
use std::{num::NonZeroUsize, thread::available_parallelism};
available_parallelism().map(NonZeroUsize::get).unwrap_or(1)
Expand Down
2 changes: 2 additions & 0 deletions src/common/builder_utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::time::Duration;

#[cfg(feature = "sync")]
use super::concurrent::housekeeper;

const YEAR_SECONDS: u64 = 365 * 24 * 3600;
Expand All @@ -17,6 +18,7 @@ pub(crate) fn ensure_expirations_or_panic(
}
}

#[cfg(feature = "sync")]
pub(crate) fn housekeeper_conf(thread_pool_enabled: bool) -> housekeeper::Configuration {
if thread_pool_enabled {
housekeeper::Configuration::new_thread_pool(true)
Expand Down
6 changes: 6 additions & 0 deletions src/common/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@ use triomphe::Arc as TrioArc;
pub(crate) mod constants;
pub(crate) mod deques;
pub(crate) mod entry_info;

#[cfg(feature = "sync")]
pub(crate) mod housekeeper;

#[cfg(feature = "sync")]
pub(crate) mod thread_pool;

#[cfg(feature = "sync")]
pub(crate) mod unsafe_weak_pointer;

// target_has_atomic is more convenient but yet unstable (Rust 1.55)
Expand Down
14 changes: 10 additions & 4 deletions src/common/concurrent/constants.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
pub(crate) const MAX_SYNC_REPEATS: usize = 4;
pub(crate) const PERIODICAL_SYNC_INITIAL_DELAY_MILLIS: u64 = 500;
pub(crate) const PERIODICAL_SYNC_NORMAL_PACE_MILLIS: u64 = 300;
pub(crate) const PERIODICAL_SYNC_FAST_PACE_NANOS: u64 = 500;

pub(crate) const READ_LOG_FLUSH_POINT: usize = 512;
pub(crate) const READ_LOG_SIZE: usize = READ_LOG_FLUSH_POINT * (MAX_SYNC_REPEATS + 2);

pub(crate) const WRITE_LOG_FLUSH_POINT: usize = 512;
pub(crate) const WRITE_LOG_LOW_WATER_MARK: usize = WRITE_LOG_FLUSH_POINT / 2;
// pub(crate) const WRITE_LOG_HIGH_WATER_MARK: usize = WRITE_LOG_FLUSH_POINT * (MAX_SYNC_REPEATS - 1);
pub(crate) const WRITE_LOG_SIZE: usize = WRITE_LOG_FLUSH_POINT * (MAX_SYNC_REPEATS + 2);

#[cfg(feature = "sync")]
pub(crate) const WRITE_LOG_LOW_WATER_MARK: usize = WRITE_LOG_FLUSH_POINT / 2;

#[cfg(feature = "sync")]
pub(crate) const WRITE_RETRY_INTERVAL_MICROS: u64 = 50;

#[cfg(feature = "sync")]
pub(crate) const PERIODICAL_SYNC_NORMAL_PACE_MILLIS: u64 = 300;

#[cfg(feature = "sync")]
pub(crate) const PERIODICAL_SYNC_FAST_PACE_NANOS: u64 = 500;
1 change: 1 addition & 0 deletions src/common/concurrent/deques.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ impl<K> Default for Deques<K> {
}

impl<K> Deques<K> {
#[cfg(feature = "future")]
pub(crate) fn select_mut(
&mut self,
selector: CacheRegion,
Expand Down
9 changes: 9 additions & 0 deletions src/notification.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
//! Common data types for notifications.

#[cfg(feature = "sync")]
pub(crate) mod notifier;

use std::{future::Future, pin::Pin, sync::Arc};

pub type ListenerFuture = Pin<Box<dyn Future<Output = ()> + Send>>;

#[cfg(feature = "sync")]
pub(crate) type EvictionListener<K, V> =
Arc<dyn Fn(Arc<K>, V, RemovalCause) + Send + Sync + 'static>;

#[cfg(feature = "sync")]
pub(crate) type EvictionListenerRef<'a, K, V> =
&'a Arc<dyn Fn(Arc<K>, V, RemovalCause) + Send + Sync + 'static>;

Expand All @@ -26,11 +29,13 @@ pub(crate) type AsyncEvictionListener<K, V> =
/// Currently only setting the [`DeliveryMode`][delivery-mode] is supported.
///
/// [delivery-mode]: ./enum.DeliveryMode.html
#[cfg(feature = "sync")]
#[derive(Clone, Debug, Default)]
pub struct Configuration {
mode: DeliveryMode,
}

#[cfg(feature = "sync")]
impl Configuration {
pub fn builder() -> ConfigurationBuilder {
ConfigurationBuilder::default()
Expand All @@ -47,11 +52,13 @@ impl Configuration {
///
/// [conf]: ./struct.Configuration.html
/// [delivery-mode]: ./enum.DeliveryMode.html
#[cfg(feature = "sync")]
#[derive(Default)]
pub struct ConfigurationBuilder {
mode: DeliveryMode,
}

#[cfg(feature = "sync")]
impl ConfigurationBuilder {
pub fn build(self) -> Configuration {
Configuration { mode: self.mode }
Expand All @@ -68,6 +75,7 @@ impl ConfigurationBuilder {
/// For more details, see [the document][delivery-mode-doc] of `sync::Cache`.
///
/// [delivery-mode-doc]: ../sync/struct.Cache.html#delivery-modes-for-eviction-listener
#[cfg(feature = "sync")]
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum DeliveryMode {
/// With this mode, a notification should be delivered to the listener
Expand All @@ -93,6 +101,7 @@ pub enum DeliveryMode {
Queued,
}

#[cfg(feature = "sync")]
impl Default for DeliveryMode {
fn default() -> Self {
Self::Immediate
Expand Down
10 changes: 9 additions & 1 deletion src/sync_base.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
pub(crate) mod iter;

#[cfg(feature = "sync")]
pub(crate) mod base_cache;

#[cfg(feature = "sync")]
mod invalidator;
pub(crate) mod iter;

#[cfg(feature = "sync")]
mod key_lock;

/// The type of the unique ID to identify a predicate used by
Expand All @@ -9,6 +15,8 @@ mod key_lock;
/// A `PredicateId` is a `String` of UUID (version 4).
///
/// [invalidate-if]: ./struct.Cache.html#method.invalidate_entries_if
#[cfg(feature = "sync")]
pub type PredicateId = String;

#[cfg(feature = "sync")]
pub(crate) type PredicateIdStr<'a> = &'a str;

0 comments on commit d637ecf

Please sign in to comment.