Skip to content

Commit

Permalink
Remove the thread pool from sync caches
Browse files Browse the repository at this point in the history
- Remove `scheduled-thread-pool` crate.
- Remove `thread_pool` and `unsafe_weak_pointers` modules.
- Remove notification `DeliveryMode` as now `sync` caches only support the
  `Immediate` mode (like `future::Cache`).
- Remove `sync::ConcurrentCacheExt`.
- Rename `sync::ConcurrentCacheExt::sync` method to
  `sync::{Cache, SegmentedCache}::run_pending_tasks`.
  • Loading branch information
tatsuya6502 committed Sep 3, 2023
1 parent 3818083 commit 432262a
Show file tree
Hide file tree
Showing 21 changed files with 1,520 additions and 3,118 deletions.
6 changes: 0 additions & 6 deletions .cirrus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,6 @@ linux_arm64_task:
# Run tests (release, sync feature)
- cargo test -j 1 --release --features sync -- --test-threads=$NUM_CPUS

# Run tests (release, sync feature, thread-pool test for sync::Cache)
- cargo test --release --lib --features sync sync::cache::tests::enabling_and_disabling_thread_pools -- --exact --ignored

# Run tests (release, sync feature, thread-pool test for sync::SegmentedCache)
- cargo test --release --lib --features sync sync::segment::tests::enabling_and_disabling_thread_pools -- --exact --ignored

# Run tests (sync feature, key lock test for notification)
- cargo test --release --lib --features sync sync::cache::tests::test_key_lock_used_by_immediate_removal_notifications -- --exact --ignored

Expand Down
12 changes: 0 additions & 12 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,6 @@ jobs:
env:
RUSTFLAGS: '--cfg rustver'

- name: Run tests (sync feature, thread-pool test for sync::Cache)
uses: actions-rs/cargo@v1
with:
command: test
args: --release --lib --features sync sync::cache::tests::enabling_and_disabling_thread_pools -- --exact --ignored

- name: Run tests (sync feature, thread-pool test for sync::SegmentedCache)
uses: actions-rs/cargo@v1
with:
command: test
args: --release --lib --features sync sync::segment::tests::enabling_and_disabling_thread_pools -- --exact --ignored

- name: Run tests (sync feature, key lock test for notification)
uses: actions-rs/cargo@v1
with:
Expand Down
12 changes: 4 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "moka"
version = "0.12.0-beta.1"
version = "0.12.0-beta.2"
edition = "2018"
# Rust 1.65 was released on Nov 3, 2022.
rust-version = "1.65"
Expand All @@ -16,11 +16,10 @@ exclude = [".circleci", ".devcontainer", ".github", ".gitpod.yml", ".vscode"]
build = "build.rs"

[features]
default = ["sync", "atomic64", "quanta"]
default = ["atomic64", "quanta"]

# This feature is enabled by default. Disable it when you do not need
# `moka::sync::{Cache, SegmentedCache}`
sync = ["scheduled-thread-pool"]
# Enabled this feature to use `moka::sync::{Cache, SegmentedCache}`
sync = []

# Enable this feature to use `moka::future::Cache`.
future = ["async-lock", "async-trait", "futures-util"]
Expand Down Expand Up @@ -64,9 +63,6 @@ triomphe = { version = "0.1.3", default-features = false }
# Optional dependencies (enabled by default)
quanta = { version = "0.11.0", optional = true }

# Optional dependencies (sync)
scheduled-thread-pool = { version = "0.2.7", optional = true }

# Optional dependencies (future)
async-lock = { version = "2.4", optional = true }
async-trait = { version = "0.1.58", optional = true }
Expand Down
4 changes: 1 addition & 3 deletions examples/eviction_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,9 @@ fn main() {
// called. If you want to remove all entries immediately, call sync() method
// repeatedly like the loop below.
cache.invalidate_all();
// This trait provides sync() method.
use moka::sync::ConcurrentCacheExt;
loop {
// Synchronization is limited to at most 500 entries for each call.
cache.sync();
cache.run_pending_tasks();
// Check if all is done. Calling entry_count() requires calling sync()
// first!
if cache.entry_count() == 0 {
Expand Down
4 changes: 1 addition & 3 deletions src/cht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,10 @@
//! [Junction]: https://github.com/preshing/junction
//! [a tech talk]: https://youtu.be/HJ-719EGIts

pub(crate) mod iter;
pub(crate) mod map;
pub(crate) mod segment;

#[cfg(feature = "future")]
pub(crate) mod iter;

#[cfg(test)]
#[macro_use]
pub(crate) mod test_util;
Expand Down
7 changes: 0 additions & 7 deletions src/cht/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use crate::cht::map::{
DefaultHashBuilder,
};

#[cfg(feature = "future")]
use super::iter::{Iter, ScanningGet};

use std::{
Expand Down Expand Up @@ -206,7 +205,6 @@ impl<K, V, S> HashMap<K, V, S> {
///
/// This method on its own is safe, but other threads can add or remove
/// elements at any time.
#[cfg(any(test, feature = "future"))]
pub(crate) fn len(&self) -> usize {
self.len.load(Ordering::Relaxed)
}
Expand All @@ -217,7 +215,6 @@ impl<K, V, S> HashMap<K, V, S> {
///
/// This method on its own is safe, but other threads can add or remove
/// elements at any time.
#[cfg(any(test, feature = "future"))]
pub(crate) fn is_empty(&self) -> bool {
self.len() == 0
}
Expand Down Expand Up @@ -252,7 +249,6 @@ impl<K, V, S> HashMap<K, V, S> {
}

impl<K: Hash + Eq, V, S: BuildHasher> HashMap<K, V, S> {
#[cfg(feature = "future")]
#[inline]
pub(crate) fn contains_key(&self, hash: u64, eq: impl FnMut(&K) -> bool) -> bool {
self.get_key_value_and_then(hash, eq, |_, _| Some(()))
Expand Down Expand Up @@ -299,7 +295,6 @@ impl<K: Hash + Eq, V, S: BuildHasher> HashMap<K, V, S> {
///
/// If the map did have this key present, both the key and value are
/// updated.
#[cfg(any(test, feature = "future"))]
#[inline]
pub fn insert_entry_and<T>(
&self,
Expand Down Expand Up @@ -494,7 +489,6 @@ impl<K: Hash + Eq, V, S: BuildHasher> HashMap<K, V, S> {
Some(bucket_array_ref.keys(with_key))
}

#[cfg(feature = "future")]
pub(crate) fn iter(&self) -> Iter<'_, K, V>
where
K: Clone,
Expand All @@ -513,7 +507,6 @@ impl<K: Hash + Eq, V, S: BuildHasher> HashMap<K, V, S> {
}
}

#[cfg(feature = "future")]
impl<K, V, S> ScanningGet<K, V> for HashMap<K, V, S>
where
K: Hash + Eq + Clone,
Expand Down
2 changes: 1 addition & 1 deletion src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub(crate) fn sketch_capacity(max_capacity: u64) -> u32 {
max_capacity.try_into().unwrap_or(u32::MAX).max(128)
}

#[cfg(any(test, feature = "sync"))]
#[cfg(test)]
pub(crate) fn available_parallelism() -> usize {
use std::{num::NonZeroUsize, thread::available_parallelism};
available_parallelism().map(NonZeroUsize::get).unwrap_or(1)
Expand Down
12 changes: 0 additions & 12 deletions src/common/builder_utils.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use std::time::Duration;

#[cfg(feature = "sync")]
use super::concurrent::housekeeper;

const YEAR_SECONDS: u64 = 365 * 24 * 3600;

pub(crate) fn ensure_expirations_or_panic(
Expand All @@ -17,12 +14,3 @@ pub(crate) fn ensure_expirations_or_panic(
assert!(d <= max_duration, "time_to_idle is longer than 1000 years");
}
}

#[cfg(feature = "sync")]
pub(crate) fn housekeeper_conf(thread_pool_enabled: bool) -> housekeeper::Configuration {
if thread_pool_enabled {
housekeeper::Configuration::new_thread_pool(true)
} else {
housekeeper::Configuration::new_blocking()
}
}
6 changes: 0 additions & 6 deletions src/common/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,6 @@ pub(crate) mod entry_info;
#[cfg(feature = "sync")]
pub(crate) mod housekeeper;

#[cfg(feature = "sync")]
pub(crate) mod thread_pool;

#[cfg(feature = "sync")]
pub(crate) mod unsafe_weak_pointer;

// target_has_atomic is more convenient but yet unstable (Rust 1.55)
// https://github.com/rust-lang/rust/issues/32976
// #[cfg_attr(target_has_atomic = "64", path = "common/time_atomic64.rs")]
Expand Down
9 changes: 0 additions & 9 deletions src/common/concurrent/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,5 @@ pub(crate) const READ_LOG_SIZE: usize = READ_LOG_FLUSH_POINT * (MAX_SYNC_REPEATS
pub(crate) const WRITE_LOG_FLUSH_POINT: usize = 512;
pub(crate) const WRITE_LOG_SIZE: usize = WRITE_LOG_FLUSH_POINT * (MAX_SYNC_REPEATS + 2);

#[cfg(feature = "sync")]
pub(crate) const WRITE_LOG_LOW_WATER_MARK: usize = WRITE_LOG_FLUSH_POINT / 2;

#[cfg(feature = "sync")]
pub(crate) const WRITE_RETRY_INTERVAL_MICROS: u64 = 50;

#[cfg(feature = "sync")]
pub(crate) const PERIODICAL_SYNC_NORMAL_PACE_MILLIS: u64 = 300;

#[cfg(feature = "sync")]
pub(crate) const PERIODICAL_SYNC_FAST_PACE_NANOS: u64 = 500;
Loading

0 comments on commit 432262a

Please sign in to comment.