Skip to content

Commit

Permalink
Merge pull request #331 from Swatinem/opt-waiterguard
Browse files Browse the repository at this point in the history
Micro-optimize ValueInitializer
  • Loading branch information
tatsuya6502 authored Oct 3, 2023
2 parents b3074fc + 70f0132 commit 616cb6c
Show file tree
Hide file tree
Showing 3 changed files with 315 additions and 285 deletions.
272 changes: 159 additions & 113 deletions src/future/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::{
#[cfg(feature = "unstable-debug-counters")]
use crate::common::concurrent::debug_counters::CacheDebugStats;

use async_lock::Mutex;
use async_trait::async_trait;
use std::{
borrow::Borrow,
Expand Down Expand Up @@ -1371,113 +1370,6 @@ where
self.invalidate_with_hash(key, hash, true).await
}

pub async fn invalidate_with_hash<Q>(&self, key: &Q, hash: u64, need_value: bool) -> Option<V>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
use futures_util::FutureExt;

self.base.retry_interrupted_ops().await;

// Lock the key for removal if blocking removal notification is enabled.
let mut kl = None;
let mut klg = None;
if self.base.is_removal_notifier_enabled() {
// To lock the key, we have to get Arc<K> for key (&Q).
//
// TODO: Enhance this if possible. This is rather hack now because
// it cannot prevent race conditions like this:
//
// 1. We miss the key because it does not exist. So we do not lock
// the key.
// 2. Somebody else (other thread) inserts the key.
// 3. We remove the entry for the key, but without the key lock!
//
if let Some(arc_key) = self.base.get_key_with_hash(key, hash) {
kl = self.base.maybe_key_lock(&arc_key);
klg = if let Some(lock) = &kl {
Some(lock.lock().await)
} else {
None
};
}
}

match self.base.remove_entry(key, hash) {
None => None,
Some(kv) => {
let now = self.base.current_time_from_expiration_clock();

let maybe_v = if need_value {
Some(kv.entry.value.clone())
} else {
None
};

let op = WriteOp::Remove(kv.clone());

// Async Cancellation Safety: To ensure the below future should be
// executed even if our caller async task is cancelled, we create a
// cancel guard for the future (and the op). If our caller is
// cancelled while we are awaiting for the future, the cancel guard
// will save the future and the op to the interrupted_op_ch channel,
// so that we can resume/retry later.
let mut cancel_guard = CancelGuard::new(&self.base.interrupted_op_ch_snd, now);

if self.base.is_removal_notifier_enabled() {
let future = self
.base
.notify_invalidate(&kv.key, &kv.entry)
.boxed()
.shared();
cancel_guard.set_future_and_op(future.clone(), op.clone());
// Send notification to the eviction listener.
future.await;
cancel_guard.unset_future();
} else {
cancel_guard.set_op(op.clone());
}

// Drop the locks before scheduling write op to avoid a potential
// dead lock. (Scheduling write can do spin lock when the queue is
// full, and queue will be drained by the housekeeping thread that
// can lock the same key)
std::mem::drop(klg);
std::mem::drop(kl);

let should_block;
#[cfg(not(test))]
{
should_block = false;
}
#[cfg(test)]
{
should_block = self.schedule_write_op_should_block.load(Ordering::Acquire);
}

let lock = self.base.maintenance_task_lock();
let hk = self.base.housekeeper.as_ref();

BaseCache::<K, V, S>::schedule_write_op(
&self.base.inner,
&self.base.write_op_ch,
lock,
op,
now,
hk,
should_block,
)
.await
.expect("Failed to schedule write op for remove");
cancel_guard.clear();

crossbeam_epoch::pin().flush();
maybe_v
}
}
}

/// Discards all cached values.
///
/// This method returns immediately and a background thread will evict all the
Expand Down Expand Up @@ -1678,7 +1570,6 @@ where
None
};

let replace_if = Arc::new(Mutex::new(replace_if));
let type_id = ValueInitializer::<K, V, S>::type_id_for_get_with();
let post_init = ValueInitializer::<K, V, S>::post_init_for_get_with;

Expand Down Expand Up @@ -1806,13 +1697,12 @@ where
None
};

let ignore_if = Arc::new(Mutex::new(never_ignore()));
let type_id = ValueInitializer::<K, V, S>::type_id_for_optionally_get_with();
let post_init = ValueInitializer::<K, V, S>::post_init_for_optionally_get_with;

match self
.value_initializer
.try_init_or_read(&key, hash, type_id, self, ignore_if, init, post_init)
.try_init_or_read(&key, hash, type_id, self, never_ignore(), init, post_init)
.await
{
InitResult::Initialized(v) => {
Expand Down Expand Up @@ -1889,13 +1779,12 @@ where
None
};

let ignore_if = Arc::new(Mutex::new(never_ignore()));
let type_id = ValueInitializer::<K, V, S>::type_id_for_try_get_with::<E>();
let post_init = ValueInitializer::<K, V, S>::post_init_for_try_get_with;

match self
.value_initializer
.try_init_or_read(&key, hash, type_id, self, ignore_if, init, post_init)
.try_init_or_read(&key, hash, type_id, self, never_ignore(), init, post_init)
.await
{
InitResult::Initialized(v) => {
Expand Down Expand Up @@ -1945,6 +1834,113 @@ where
.expect("Failed to schedule write op for insert");
cancel_guard.clear();
}

async fn invalidate_with_hash<Q>(&self, key: &Q, hash: u64, need_value: bool) -> Option<V>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
use futures_util::FutureExt;

self.base.retry_interrupted_ops().await;

// Lock the key for removal if blocking removal notification is enabled.
let mut kl = None;
let mut klg = None;
if self.base.is_removal_notifier_enabled() {
// To lock the key, we have to get Arc<K> for key (&Q).
//
// TODO: Enhance this if possible. This is rather hack now because
// it cannot prevent race conditions like this:
//
// 1. We miss the key because it does not exist. So we do not lock
// the key.
// 2. Somebody else (other thread) inserts the key.
// 3. We remove the entry for the key, but without the key lock!
//
if let Some(arc_key) = self.base.get_key_with_hash(key, hash) {
kl = self.base.maybe_key_lock(&arc_key);
klg = if let Some(lock) = &kl {
Some(lock.lock().await)
} else {
None
};
}
}

match self.base.remove_entry(key, hash) {
None => None,
Some(kv) => {
let now = self.base.current_time_from_expiration_clock();

let maybe_v = if need_value {
Some(kv.entry.value.clone())
} else {
None
};

let op = WriteOp::Remove(kv.clone());

// Async Cancellation Safety: To ensure the below future should be
// executed even if our caller async task is cancelled, we create a
// cancel guard for the future (and the op). If our caller is
// cancelled while we are awaiting for the future, the cancel guard
// will save the future and the op to the interrupted_op_ch channel,
// so that we can resume/retry later.
let mut cancel_guard = CancelGuard::new(&self.base.interrupted_op_ch_snd, now);

if self.base.is_removal_notifier_enabled() {
let future = self
.base
.notify_invalidate(&kv.key, &kv.entry)
.boxed()
.shared();
cancel_guard.set_future_and_op(future.clone(), op.clone());
// Send notification to the eviction listener.
future.await;
cancel_guard.unset_future();
} else {
cancel_guard.set_op(op.clone());
}

// Drop the locks before scheduling write op to avoid a potential
// dead lock. (Scheduling write can do spin lock when the queue is
// full, and queue will be drained by the housekeeping thread that
// can lock the same key)
std::mem::drop(klg);
std::mem::drop(kl);

let should_block;
#[cfg(not(test))]
{
should_block = false;
}
#[cfg(test)]
{
should_block = self.schedule_write_op_should_block.load(Ordering::Acquire);
}

let lock = self.base.maintenance_task_lock();
let hk = self.base.housekeeper.as_ref();

BaseCache::<K, V, S>::schedule_write_op(
&self.base.inner,
&self.base.write_op_ch,
lock,
op,
now,
hk,
should_block,
)
.await
.expect("Failed to schedule write op for remove");
cancel_guard.clear();

crossbeam_epoch::pin().flush();
maybe_v
}
}
}
}

#[async_trait]
Expand Down Expand Up @@ -2050,6 +2046,56 @@ mod tests {
};
use tokio::time::sleep;

#[test]
fn futures_are_send() {
let cache = Cache::new(0);

fn is_send(_: impl Send) {}

// pub fns
is_send(cache.get(&()));
is_send(cache.get_with((), async {}));
is_send(cache.get_with_by_ref(&(), async {}));
#[allow(deprecated)]
is_send(cache.get_with_if((), async {}, |_| false));
is_send(cache.insert((), ()));
is_send(cache.invalidate(&()));
is_send(cache.optionally_get_with((), async { None }));
is_send(cache.optionally_get_with_by_ref(&(), async { None }));
is_send(cache.remove(&()));
is_send(cache.run_pending_tasks());
is_send(cache.try_get_with((), async { Err(()) }));
is_send(cache.try_get_with_by_ref(&(), async { Err(()) }));

// entry fns
is_send(cache.entry(()).or_default());
is_send(cache.entry(()).or_insert(()));
is_send(cache.entry(()).or_insert_with(async {}));
is_send(cache.entry(()).or_insert_with_if(async {}, |_| false));
is_send(cache.entry(()).or_optionally_insert_with(async { None }));
is_send(cache.entry(()).or_try_insert_with(async { Err(()) }));

// entry_by_ref fns
is_send(cache.entry_by_ref(&()).or_default());
is_send(cache.entry_by_ref(&()).or_insert(()));
is_send(cache.entry_by_ref(&()).or_insert_with(async {}));
is_send(
cache
.entry_by_ref(&())
.or_insert_with_if(async {}, |_| false),
);
is_send(
cache
.entry_by_ref(&())
.or_optionally_insert_with(async { None }),
);
is_send(
cache
.entry_by_ref(&())
.or_try_insert_with(async { Err(()) }),
);
}

#[tokio::test]
async fn max_capacity_zero() {
let mut cache = Cache::new(0);
Expand Down
Loading

0 comments on commit 616cb6c

Please sign in to comment.