Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Micro-optimize ValueInitializer #331

Merged
merged 5 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 159 additions & 113 deletions src/future/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::{
#[cfg(feature = "unstable-debug-counters")]
use crate::common::concurrent::debug_counters::CacheDebugStats;

use async_lock::Mutex;
use async_trait::async_trait;
use std::{
borrow::Borrow,
Expand Down Expand Up @@ -1371,113 +1370,6 @@ where
self.invalidate_with_hash(key, hash, true).await
}

pub async fn invalidate_with_hash<Q>(&self, key: &Q, hash: u64, need_value: bool) -> Option<V>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
use futures_util::FutureExt;

self.base.retry_interrupted_ops().await;

// Lock the key for removal if blocking removal notification is enabled.
let mut kl = None;
let mut klg = None;
if self.base.is_removal_notifier_enabled() {
// To lock the key, we have to get Arc<K> for key (&Q).
//
// TODO: Enhance this if possible. This is rather hack now because
// it cannot prevent race conditions like this:
//
// 1. We miss the key because it does not exist. So we do not lock
// the key.
// 2. Somebody else (other thread) inserts the key.
// 3. We remove the entry for the key, but without the key lock!
//
if let Some(arc_key) = self.base.get_key_with_hash(key, hash) {
kl = self.base.maybe_key_lock(&arc_key);
klg = if let Some(lock) = &kl {
Some(lock.lock().await)
} else {
None
};
}
}

match self.base.remove_entry(key, hash) {
None => None,
Some(kv) => {
let now = self.base.current_time_from_expiration_clock();

let maybe_v = if need_value {
Some(kv.entry.value.clone())
} else {
None
};

let op = WriteOp::Remove(kv.clone());

// Async Cancellation Safety: To ensure the below future should be
// executed even if our caller async task is cancelled, we create a
// cancel guard for the future (and the op). If our caller is
// cancelled while we are awaiting for the future, the cancel guard
// will save the future and the op to the interrupted_op_ch channel,
// so that we can resume/retry later.
let mut cancel_guard = CancelGuard::new(&self.base.interrupted_op_ch_snd, now);

if self.base.is_removal_notifier_enabled() {
let future = self
.base
.notify_invalidate(&kv.key, &kv.entry)
.boxed()
.shared();
cancel_guard.set_future_and_op(future.clone(), op.clone());
// Send notification to the eviction listener.
future.await;
cancel_guard.unset_future();
} else {
cancel_guard.set_op(op.clone());
}

// Drop the locks before scheduling write op to avoid a potential
// dead lock. (Scheduling write can do spin lock when the queue is
// full, and queue will be drained by the housekeeping thread that
// can lock the same key)
std::mem::drop(klg);
std::mem::drop(kl);

let should_block;
#[cfg(not(test))]
{
should_block = false;
}
#[cfg(test)]
{
should_block = self.schedule_write_op_should_block.load(Ordering::Acquire);
}

let lock = self.base.maintenance_task_lock();
let hk = self.base.housekeeper.as_ref();

BaseCache::<K, V, S>::schedule_write_op(
&self.base.inner,
&self.base.write_op_ch,
lock,
op,
now,
hk,
should_block,
)
.await
.expect("Failed to schedule write op for remove");
cancel_guard.clear();

crossbeam_epoch::pin().flush();
maybe_v
}
}
}

/// Discards all cached values.
///
/// This method returns immediately and a background thread will evict all the
Expand Down Expand Up @@ -1678,7 +1570,6 @@ where
None
};

let replace_if = Arc::new(Mutex::new(replace_if));
let type_id = ValueInitializer::<K, V, S>::type_id_for_get_with();
let post_init = ValueInitializer::<K, V, S>::post_init_for_get_with;

Expand Down Expand Up @@ -1806,13 +1697,12 @@ where
None
};

let ignore_if = Arc::new(Mutex::new(never_ignore()));
let type_id = ValueInitializer::<K, V, S>::type_id_for_optionally_get_with();
let post_init = ValueInitializer::<K, V, S>::post_init_for_optionally_get_with;

match self
.value_initializer
.try_init_or_read(&key, hash, type_id, self, ignore_if, init, post_init)
.try_init_or_read(&key, hash, type_id, self, never_ignore(), init, post_init)
.await
{
InitResult::Initialized(v) => {
Expand Down Expand Up @@ -1889,13 +1779,12 @@ where
None
};

let ignore_if = Arc::new(Mutex::new(never_ignore()));
let type_id = ValueInitializer::<K, V, S>::type_id_for_try_get_with::<E>();
let post_init = ValueInitializer::<K, V, S>::post_init_for_try_get_with;

match self
.value_initializer
.try_init_or_read(&key, hash, type_id, self, ignore_if, init, post_init)
.try_init_or_read(&key, hash, type_id, self, never_ignore(), init, post_init)
.await
{
InitResult::Initialized(v) => {
Expand Down Expand Up @@ -1945,6 +1834,113 @@ where
.expect("Failed to schedule write op for insert");
cancel_guard.clear();
}

async fn invalidate_with_hash<Q>(&self, key: &Q, hash: u64, need_value: bool) -> Option<V>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
use futures_util::FutureExt;

self.base.retry_interrupted_ops().await;

// Lock the key for removal if blocking removal notification is enabled.
let mut kl = None;
let mut klg = None;
if self.base.is_removal_notifier_enabled() {
// To lock the key, we have to get Arc<K> for key (&Q).
//
// TODO: Enhance this if possible. This is rather hack now because
// it cannot prevent race conditions like this:
//
// 1. We miss the key because it does not exist. So we do not lock
// the key.
// 2. Somebody else (other thread) inserts the key.
// 3. We remove the entry for the key, but without the key lock!
//
if let Some(arc_key) = self.base.get_key_with_hash(key, hash) {
kl = self.base.maybe_key_lock(&arc_key);
klg = if let Some(lock) = &kl {
Some(lock.lock().await)
} else {
None
};
}
}

match self.base.remove_entry(key, hash) {
None => None,
Some(kv) => {
let now = self.base.current_time_from_expiration_clock();

let maybe_v = if need_value {
Some(kv.entry.value.clone())
} else {
None
};

let op = WriteOp::Remove(kv.clone());

// Async Cancellation Safety: To ensure the below future should be
// executed even if our caller async task is cancelled, we create a
// cancel guard for the future (and the op). If our caller is
// cancelled while we are awaiting for the future, the cancel guard
// will save the future and the op to the interrupted_op_ch channel,
// so that we can resume/retry later.
let mut cancel_guard = CancelGuard::new(&self.base.interrupted_op_ch_snd, now);

if self.base.is_removal_notifier_enabled() {
let future = self
.base
.notify_invalidate(&kv.key, &kv.entry)
.boxed()
.shared();
cancel_guard.set_future_and_op(future.clone(), op.clone());
// Send notification to the eviction listener.
future.await;
cancel_guard.unset_future();
} else {
cancel_guard.set_op(op.clone());
}

// Drop the locks before scheduling write op to avoid a potential
// dead lock. (Scheduling write can do spin lock when the queue is
// full, and queue will be drained by the housekeeping thread that
// can lock the same key)
std::mem::drop(klg);
std::mem::drop(kl);

let should_block;
#[cfg(not(test))]
{
should_block = false;
}
#[cfg(test)]
{
should_block = self.schedule_write_op_should_block.load(Ordering::Acquire);
}

let lock = self.base.maintenance_task_lock();
let hk = self.base.housekeeper.as_ref();

BaseCache::<K, V, S>::schedule_write_op(
&self.base.inner,
&self.base.write_op_ch,
lock,
op,
now,
hk,
should_block,
)
.await
.expect("Failed to schedule write op for remove");
cancel_guard.clear();

crossbeam_epoch::pin().flush();
maybe_v
}
}
}
}

#[async_trait]
Expand Down Expand Up @@ -2050,6 +2046,56 @@ mod tests {
};
use tokio::time::sleep;

#[test]
fn futures_are_send() {
let cache = Cache::new(0);

fn is_send(_: impl Send) {}

// pub fns
is_send(cache.get(&()));
is_send(cache.get_with((), async {}));
is_send(cache.get_with_by_ref(&(), async {}));
#[allow(deprecated)]
is_send(cache.get_with_if((), async {}, |_| false));
is_send(cache.insert((), ()));
is_send(cache.invalidate(&()));
is_send(cache.optionally_get_with((), async { None }));
is_send(cache.optionally_get_with_by_ref(&(), async { None }));
is_send(cache.remove(&()));
is_send(cache.run_pending_tasks());
is_send(cache.try_get_with((), async { Err(()) }));
is_send(cache.try_get_with_by_ref(&(), async { Err(()) }));

// entry fns
is_send(cache.entry(()).or_default());
is_send(cache.entry(()).or_insert(()));
is_send(cache.entry(()).or_insert_with(async {}));
is_send(cache.entry(()).or_insert_with_if(async {}, |_| false));
is_send(cache.entry(()).or_optionally_insert_with(async { None }));
is_send(cache.entry(()).or_try_insert_with(async { Err(()) }));

// entry_by_ref fns
is_send(cache.entry_by_ref(&()).or_default());
is_send(cache.entry_by_ref(&()).or_insert(()));
is_send(cache.entry_by_ref(&()).or_insert_with(async {}));
is_send(
cache
.entry_by_ref(&())
.or_insert_with_if(async {}, |_| false),
);
is_send(
cache
.entry_by_ref(&())
.or_optionally_insert_with(async { None }),
);
is_send(
cache
.entry_by_ref(&())
.or_try_insert_with(async { Err(()) }),
);
}

#[tokio::test]
async fn max_capacity_zero() {
let mut cache = Cache::new(0);
Expand Down
Loading