Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally disable thread pools in sync::Cache #165

Merged
merged 17 commits into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@ jobs:
# hashbrown >= v0.12 requires Rust 2021 edition.
# native-tls >= v0.2.9 requires more recent Rust version.
# async-global-executor >= 2.1 requires Rust 2021 edition.
# pull-down-cmark >= 0.9.2 requires Rust 2021 edition.
run: |
cargo update -p dashmap --precise 5.2.0
cargo update -p indexmap --precise 1.8.2
cargo update -p hashbrown --precise 0.11.2
cargo update -p native-tls --precise 0.2.8
cargo update -p async-global-executor --precise 2.0.4
cargo update -p pulldown-cmark --precise 0.9.1

- name: Show cargo tree
uses: actions-rs/cargo@v1
Expand All @@ -76,6 +78,18 @@ jobs:
command: test
args: --features sync

- name: Run tests (debug, sync feature, thread-pool test for sync::Cache)
uses: actions-rs/cargo@v1
with:
command: test
args: --lib --features sync sync::cache::tests::enabling_and_disabling_thread_pools -- --exact --ignored

- name: Run tests (debug, sync feature, thread-pool test for sync::SegmentCache)
uses: actions-rs/cargo@v1
with:
command: test
args: --lib --features sync sync::segment::tests::enabling_and_disabling_thread_pools -- --exact --ignored

- name: Run tests (release, sync feature)
uses: actions-rs/cargo@v1
with:
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/CIQuantaDisabled.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@ jobs:
# hashbrown >= v0.12 requires Rust 2021 edition.
# native-tls >= v0.2.9 requires more recent Rust version.
# async-global-executor >= 2.1 requires Rust 2021 edition.
# pull-down-cmark >= 0.9.2 requires Rust 2021 edition.
run: |
cargo update -p dashmap --precise 5.2.0
cargo update -p indexmap --precise 1.8.2
cargo update -p hashbrown --precise 0.11.2
cargo update -p native-tls --precise 0.2.8
cargo update -p async-global-executor --precise 2.0.4
cargo update -p pulldown-cmark --precise 0.9.1

- name: Run tests (debug, but no quanta feature)
uses: actions-rs/cargo@v1
Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

## Version 0.9.3

### Added

- Add a configuration option to the following caches to avoid to start the global
thread pools ([#165][gh-pull-0165]):
- `sync::Cache`
- `sync::SegmentedCache`

### Fixed

- Ensure that the following caches will drop the value of evicted entries immediately
Expand Down Expand Up @@ -463,6 +470,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (2021-03-25).

[gh-pull-0169]: https://github.com/moka-rs/moka/pull/169/
[gh-pull-0167]: https://github.com/moka-rs/moka/pull/167/
[gh-pull-0165]: https://github.com/moka-rs/moka/pull/165/
[gh-pull-0159]: https://github.com/moka-rs/moka/pull/159/
[gh-pull-0157]: https://github.com/moka-rs/moka/pull/157/
[gh-pull-0145]: https://github.com/moka-rs/moka/pull/145/
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ log = { version = "0.4", optional = true }

[dev-dependencies]
actix-rt = { version = "2.7", default-features = false }
anyhow = "1.0"
anyhow = "1.0.19"
async-std = { version = "1.11", features = ["attributes"] }
env_logger = "0.9"
getrandom = "0.2"
Expand Down
12 changes: 12 additions & 0 deletions src/common/builder_utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::time::Duration;

#[cfg(any(feature = "sync", feature = "future"))]
use super::concurrent::housekeeper;

const YEAR_SECONDS: u64 = 365 * 24 * 3600;

pub(crate) fn ensure_expirations_or_panic(
Expand All @@ -14,3 +17,12 @@ pub(crate) fn ensure_expirations_or_panic(
assert!(d <= max_duration, "time_to_idle is longer than 1000 years");
}
}

#[cfg(any(feature = "sync", feature = "future"))]
pub(crate) fn housekeeper_conf(thread_pool_enabled: bool) -> housekeeper::Configuration {
if thread_pool_enabled {
housekeeper::Configuration::new_thread_pool(true)
} else {
housekeeper::Configuration::new_blocking()
}
}
211 changes: 189 additions & 22 deletions src/common/concurrent/housekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ use super::{
unsafe_weak_pointer::UnsafeWeakPointer,
};

#[cfg(any(feature = "sync", feature = "future"))]
use super::atomic_time::AtomicInstant;

#[cfg(any(feature = "sync", feature = "future"))]
use crate::common::time::{CheckedTimeOps, Instant};

#[cfg(any(feature = "sync", feature = "future"))]
use super::constants::{READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT};

use parking_lot::Mutex;
use scheduled_thread_pool::JobHandle;
use std::{
Expand All @@ -18,6 +27,155 @@ use std::{
time::Duration,
};

pub(crate) trait InnerSync {
fn sync(&self, max_sync_repeats: usize) -> Option<SyncPace>;

#[cfg(any(feature = "sync", feature = "future"))]
fn now(&self) -> Instant;
}

#[derive(Clone, Debug)]
pub(crate) struct Configuration {
is_blocking: bool,
periodical_sync_enabled: bool,
}

impl Configuration {
#[cfg(any(feature = "sync", feature = "future"))]
pub(crate) fn new_blocking() -> Self {
Self {
is_blocking: true,
periodical_sync_enabled: false,
}
}

pub(crate) fn new_thread_pool(periodical_sync_enabled: bool) -> Self {
Self {
is_blocking: false,
periodical_sync_enabled,
}
}
}

pub(crate) enum Housekeeper<T> {
Blocking(BlockingHousekeeper),
ThreadPool(ThreadPoolHousekeeper<T>),
}

impl<T> Housekeeper<T>
where
T: InnerSync + 'static,
{
pub(crate) fn new(inner: Weak<T>, config: Configuration) -> Self {
if config.is_blocking {
Housekeeper::Blocking(BlockingHousekeeper::default())
} else {
Housekeeper::ThreadPool(ThreadPoolHousekeeper::new(
inner,
config.periodical_sync_enabled,
))
}
}

#[cfg(any(feature = "sync", feature = "future"))]
pub(crate) fn should_apply_reads(&self, ch_len: usize, now: Instant) -> bool {
match self {
Housekeeper::Blocking(h) => h.should_apply_reads(ch_len, now),
Housekeeper::ThreadPool(h) => h.should_apply_reads(ch_len, now),
}
}

#[cfg(any(feature = "sync", feature = "future"))]
pub(crate) fn should_apply_writes(&self, ch_len: usize, now: Instant) -> bool {
match self {
Housekeeper::Blocking(h) => h.should_apply_writes(ch_len, now),
Housekeeper::ThreadPool(h) => h.should_apply_writes(ch_len, now),
}
}

pub(crate) fn try_sync(&self, cache: &impl InnerSync) -> bool {
match self {
Housekeeper::Blocking(h) => h.try_sync(cache),
Housekeeper::ThreadPool(h) => h.try_schedule_sync(),
}
}

#[cfg(test)]
pub(crate) fn stop_periodical_sync_job(&self) {
match self {
Housekeeper::Blocking(_) => (),
Housekeeper::ThreadPool(h) => h.stop_periodical_sync_job(),
}
}
}

pub(crate) struct BlockingHousekeeper {
is_sync_running: AtomicBool,
#[cfg(any(feature = "sync", feature = "future"))]
sync_after: AtomicInstant,
}

impl Default for BlockingHousekeeper {
fn default() -> Self {
Self {
is_sync_running: Default::default(),
#[cfg(any(feature = "sync", feature = "future"))]
sync_after: AtomicInstant::new(Self::sync_after(Instant::now())),
}
}
}

impl BlockingHousekeeper {
#[cfg(any(feature = "sync", feature = "future"))]
fn should_apply_reads(&self, ch_len: usize, now: Instant) -> bool {
self.should_apply(ch_len, READ_LOG_FLUSH_POINT / 8, now)
}

#[cfg(any(feature = "sync", feature = "future"))]
fn should_apply_writes(&self, ch_len: usize, now: Instant) -> bool {
self.should_apply(ch_len, WRITE_LOG_FLUSH_POINT / 8, now)
}

#[cfg(any(feature = "sync", feature = "future"))]
#[inline]
fn should_apply(&self, ch_len: usize, ch_flush_point: usize, now: Instant) -> bool {
ch_len >= ch_flush_point || self.sync_after.instant().unwrap() >= now
}

fn try_sync<T: InnerSync>(&self, cache: &T) -> bool {
// Try to flip the value of sync_scheduled from false to true.
match self.is_sync_running.compare_exchange(
false,
true,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => {
#[cfg(any(feature = "sync", feature = "future"))]
{
let now = cache.now();
self.sync_after.set_instant(Self::sync_after(now));
}

cache.sync(MAX_SYNC_REPEATS);

self.is_sync_running.store(false, Ordering::Release);
true
}
Err(_) => false,
}
}

#[cfg(any(feature = "sync", feature = "future"))]
fn sync_after(now: Instant) -> Instant {
let dur = Duration::from_millis(PERIODICAL_SYNC_INITIAL_DELAY_MILLIS);
let ts = now.checked_add(dur);
// Assuming that `now` is current wall clock time, this should never fail at
// least next millions of years.
ts.expect("Timestamp overflow")
}
}

#[derive(PartialEq, Eq)]
pub(crate) enum SyncPace {
Normal,
Expand All @@ -34,11 +192,7 @@ impl SyncPace {
}
}

pub(crate) trait InnerSync {
fn sync(&self, max_sync_repeats: usize) -> Option<SyncPace>;
}

pub(crate) struct Housekeeper<T> {
pub(crate) struct ThreadPoolHousekeeper<T> {
inner: Arc<Mutex<UnsafeWeakPointer<T>>>,
thread_pool: Arc<ThreadPool>,
is_shutting_down: Arc<AtomicBool>,
Expand All @@ -48,7 +202,7 @@ pub(crate) struct Housekeeper<T> {
_marker: PhantomData<T>,
}

impl<T> Drop for Housekeeper<T> {
impl<T> Drop for ThreadPoolHousekeeper<T> {
fn drop(&mut self) {
// Disallow to create and/or run sync jobs by now.
self.is_shutting_down.store(true, Ordering::Release);
Expand Down Expand Up @@ -78,30 +232,34 @@ impl<T> Drop for Housekeeper<T> {
}

// functions/methods used by Cache
impl<T: InnerSync> Housekeeper<T>
impl<T> ThreadPoolHousekeeper<T>
where
T: 'static,
T: InnerSync + 'static,
{
pub(crate) fn new(inner: Weak<T>) -> Self {
fn new(inner: Weak<T>, periodical_sync_enable: bool) -> Self {
use super::thread_pool::PoolName;

let thread_pool = ThreadPoolRegistry::acquire_pool(PoolName::Housekeeper);
let inner_ptr = Arc::new(Mutex::new(UnsafeWeakPointer::from_weak_arc(inner)));
let is_shutting_down = Arc::new(AtomicBool::new(false));
let periodical_sync_running = Arc::new(Mutex::new(()));

let sync_job = Self::start_periodical_sync_job(
&thread_pool,
Arc::clone(&inner_ptr),
Arc::clone(&is_shutting_down),
Arc::clone(&periodical_sync_running),
);
let maybe_sync_job = if periodical_sync_enable {
Some(Self::start_periodical_sync_job(
&thread_pool,
Arc::clone(&inner_ptr),
Arc::clone(&is_shutting_down),
Arc::clone(&periodical_sync_running),
))
} else {
None
};

Self {
inner: inner_ptr,
thread_pool,
is_shutting_down,
periodical_sync_job: Mutex::new(Some(sync_job)),
periodical_sync_job: Mutex::new(maybe_sync_job),
periodical_sync_running,
on_demand_sync_scheduled: Arc::new(AtomicBool::new(false)),
_marker: PhantomData::default(),
Expand Down Expand Up @@ -139,9 +297,17 @@ where
.execute_with_dynamic_delay(initial_delay, housekeeper_closure)
}

pub(crate) fn try_schedule_sync(&self) -> bool {
// TODO: Check if these `Orderings` are correct.
#[cfg(any(feature = "sync", feature = "future"))]
fn should_apply_reads(&self, ch_len: usize, _now: Instant) -> bool {
ch_len >= READ_LOG_FLUSH_POINT
}

#[cfg(any(feature = "sync", feature = "future"))]
fn should_apply_writes(&self, ch_len: usize, _now: Instant) -> bool {
ch_len >= WRITE_LOG_FLUSH_POINT
}

fn try_schedule_sync(&self) -> bool {
// If shutting down, do not schedule the task.
if self.is_shutting_down.load(Ordering::Acquire) {
return false;
Expand Down Expand Up @@ -169,13 +335,14 @@ where
}

#[cfg(test)]
pub(crate) fn periodical_sync_job(&self) -> &Mutex<Option<JobHandle>> {
&self.periodical_sync_job
pub(crate) fn stop_periodical_sync_job(&self) {
if let Some(j) = self.periodical_sync_job.lock().take() {
j.cancel();
}
}
}

// private functions/methods
impl<T: InnerSync> Housekeeper<T> {
impl<T: InnerSync> ThreadPoolHousekeeper<T> {
fn call_sync(unsafe_weak_ptr: &Arc<Mutex<UnsafeWeakPointer<T>>>) -> Option<SyncPace> {
let lock = unsafe_weak_ptr.lock();
// Restore the Weak pointer to Inner<K, V, S>.
Expand Down
Loading