diff --git a/src/future/cache.rs b/src/future/cache.rs index 4c56afab..fbb0b35d 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -12,7 +12,6 @@ use crate::{ #[cfg(feature = "unstable-debug-counters")] use crate::common::concurrent::debug_counters::CacheDebugStats; -use async_lock::Mutex; use async_trait::async_trait; use std::{ borrow::Borrow, @@ -1371,113 +1370,6 @@ where self.invalidate_with_hash(key, hash, true).await } - pub async fn invalidate_with_hash(&self, key: &Q, hash: u64, need_value: bool) -> Option - where - K: Borrow, - Q: Hash + Eq + ?Sized, - { - use futures_util::FutureExt; - - self.base.retry_interrupted_ops().await; - - // Lock the key for removal if blocking removal notification is enabled. - let mut kl = None; - let mut klg = None; - if self.base.is_removal_notifier_enabled() { - // To lock the key, we have to get Arc for key (&Q). - // - // TODO: Enhance this if possible. This is rather hack now because - // it cannot prevent race conditions like this: - // - // 1. We miss the key because it does not exist. So we do not lock - // the key. - // 2. Somebody else (other thread) inserts the key. - // 3. We remove the entry for the key, but without the key lock! - // - if let Some(arc_key) = self.base.get_key_with_hash(key, hash) { - kl = self.base.maybe_key_lock(&arc_key); - klg = if let Some(lock) = &kl { - Some(lock.lock().await) - } else { - None - }; - } - } - - match self.base.remove_entry(key, hash) { - None => None, - Some(kv) => { - let now = self.base.current_time_from_expiration_clock(); - - let maybe_v = if need_value { - Some(kv.entry.value.clone()) - } else { - None - }; - - let op = WriteOp::Remove(kv.clone()); - - // Async Cancellation Safety: To ensure the below future should be - // executed even if our caller async task is cancelled, we create a - // cancel guard for the future (and the op). If our caller is - // cancelled while we are awaiting for the future, the cancel guard - // will save the future and the op to the interrupted_op_ch channel, - // so that we can resume/retry later. - let mut cancel_guard = CancelGuard::new(&self.base.interrupted_op_ch_snd, now); - - if self.base.is_removal_notifier_enabled() { - let future = self - .base - .notify_invalidate(&kv.key, &kv.entry) - .boxed() - .shared(); - cancel_guard.set_future_and_op(future.clone(), op.clone()); - // Send notification to the eviction listener. - future.await; - cancel_guard.unset_future(); - } else { - cancel_guard.set_op(op.clone()); - } - - // Drop the locks before scheduling write op to avoid a potential - // dead lock. (Scheduling write can do spin lock when the queue is - // full, and queue will be drained by the housekeeping thread that - // can lock the same key) - std::mem::drop(klg); - std::mem::drop(kl); - - let should_block; - #[cfg(not(test))] - { - should_block = false; - } - #[cfg(test)] - { - should_block = self.schedule_write_op_should_block.load(Ordering::Acquire); - } - - let lock = self.base.maintenance_task_lock(); - let hk = self.base.housekeeper.as_ref(); - - BaseCache::::schedule_write_op( - &self.base.inner, - &self.base.write_op_ch, - lock, - op, - now, - hk, - should_block, - ) - .await - .expect("Failed to schedule write op for remove"); - cancel_guard.clear(); - - crossbeam_epoch::pin().flush(); - maybe_v - } - } - } - /// Discards all cached values. /// /// This method returns immediately and a background thread will evict all the @@ -1678,7 +1570,6 @@ where None }; - let replace_if = Arc::new(Mutex::new(replace_if)); let type_id = ValueInitializer::::type_id_for_get_with(); let post_init = ValueInitializer::::post_init_for_get_with; @@ -1806,13 +1697,12 @@ where None }; - let ignore_if = Arc::new(Mutex::new(never_ignore())); let type_id = ValueInitializer::::type_id_for_optionally_get_with(); let post_init = ValueInitializer::::post_init_for_optionally_get_with; match self .value_initializer - .try_init_or_read(&key, hash, type_id, self, ignore_if, init, post_init) + .try_init_or_read(&key, hash, type_id, self, never_ignore(), init, post_init) .await { InitResult::Initialized(v) => { @@ -1889,13 +1779,12 @@ where None }; - let ignore_if = Arc::new(Mutex::new(never_ignore())); let type_id = ValueInitializer::::type_id_for_try_get_with::(); let post_init = ValueInitializer::::post_init_for_try_get_with; match self .value_initializer - .try_init_or_read(&key, hash, type_id, self, ignore_if, init, post_init) + .try_init_or_read(&key, hash, type_id, self, never_ignore(), init, post_init) .await { InitResult::Initialized(v) => { @@ -1945,6 +1834,113 @@ where .expect("Failed to schedule write op for insert"); cancel_guard.clear(); } + + async fn invalidate_with_hash(&self, key: &Q, hash: u64, need_value: bool) -> Option + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + use futures_util::FutureExt; + + self.base.retry_interrupted_ops().await; + + // Lock the key for removal if blocking removal notification is enabled. + let mut kl = None; + let mut klg = None; + if self.base.is_removal_notifier_enabled() { + // To lock the key, we have to get Arc for key (&Q). + // + // TODO: Enhance this if possible. This is rather hack now because + // it cannot prevent race conditions like this: + // + // 1. We miss the key because it does not exist. So we do not lock + // the key. + // 2. Somebody else (other thread) inserts the key. + // 3. We remove the entry for the key, but without the key lock! + // + if let Some(arc_key) = self.base.get_key_with_hash(key, hash) { + kl = self.base.maybe_key_lock(&arc_key); + klg = if let Some(lock) = &kl { + Some(lock.lock().await) + } else { + None + }; + } + } + + match self.base.remove_entry(key, hash) { + None => None, + Some(kv) => { + let now = self.base.current_time_from_expiration_clock(); + + let maybe_v = if need_value { + Some(kv.entry.value.clone()) + } else { + None + }; + + let op = WriteOp::Remove(kv.clone()); + + // Async Cancellation Safety: To ensure the below future should be + // executed even if our caller async task is cancelled, we create a + // cancel guard for the future (and the op). If our caller is + // cancelled while we are awaiting for the future, the cancel guard + // will save the future and the op to the interrupted_op_ch channel, + // so that we can resume/retry later. + let mut cancel_guard = CancelGuard::new(&self.base.interrupted_op_ch_snd, now); + + if self.base.is_removal_notifier_enabled() { + let future = self + .base + .notify_invalidate(&kv.key, &kv.entry) + .boxed() + .shared(); + cancel_guard.set_future_and_op(future.clone(), op.clone()); + // Send notification to the eviction listener. + future.await; + cancel_guard.unset_future(); + } else { + cancel_guard.set_op(op.clone()); + } + + // Drop the locks before scheduling write op to avoid a potential + // dead lock. (Scheduling write can do spin lock when the queue is + // full, and queue will be drained by the housekeeping thread that + // can lock the same key) + std::mem::drop(klg); + std::mem::drop(kl); + + let should_block; + #[cfg(not(test))] + { + should_block = false; + } + #[cfg(test)] + { + should_block = self.schedule_write_op_should_block.load(Ordering::Acquire); + } + + let lock = self.base.maintenance_task_lock(); + let hk = self.base.housekeeper.as_ref(); + + BaseCache::::schedule_write_op( + &self.base.inner, + &self.base.write_op_ch, + lock, + op, + now, + hk, + should_block, + ) + .await + .expect("Failed to schedule write op for remove"); + cancel_guard.clear(); + + crossbeam_epoch::pin().flush(); + maybe_v + } + } + } } #[async_trait] @@ -2050,6 +2046,56 @@ mod tests { }; use tokio::time::sleep; + #[test] + fn futures_are_send() { + let cache = Cache::new(0); + + fn is_send(_: impl Send) {} + + // pub fns + is_send(cache.get(&())); + is_send(cache.get_with((), async {})); + is_send(cache.get_with_by_ref(&(), async {})); + #[allow(deprecated)] + is_send(cache.get_with_if((), async {}, |_| false)); + is_send(cache.insert((), ())); + is_send(cache.invalidate(&())); + is_send(cache.optionally_get_with((), async { None })); + is_send(cache.optionally_get_with_by_ref(&(), async { None })); + is_send(cache.remove(&())); + is_send(cache.run_pending_tasks()); + is_send(cache.try_get_with((), async { Err(()) })); + is_send(cache.try_get_with_by_ref(&(), async { Err(()) })); + + // entry fns + is_send(cache.entry(()).or_default()); + is_send(cache.entry(()).or_insert(())); + is_send(cache.entry(()).or_insert_with(async {})); + is_send(cache.entry(()).or_insert_with_if(async {}, |_| false)); + is_send(cache.entry(()).or_optionally_insert_with(async { None })); + is_send(cache.entry(()).or_try_insert_with(async { Err(()) })); + + // entry_by_ref fns + is_send(cache.entry_by_ref(&()).or_default()); + is_send(cache.entry_by_ref(&()).or_insert(())); + is_send(cache.entry_by_ref(&()).or_insert_with(async {})); + is_send( + cache + .entry_by_ref(&()) + .or_insert_with_if(async {}, |_| false), + ); + is_send( + cache + .entry_by_ref(&()) + .or_optionally_insert_with(async { None }), + ); + is_send( + cache + .entry_by_ref(&()) + .or_try_insert_with(async { Err(()) }), + ); + } + #[tokio::test] async fn max_capacity_zero() { let mut cache = Cache::new(0); diff --git a/src/future/value_initializer.rs b/src/future/value_initializer.rs index b3c1fab6..751f4d68 100644 --- a/src/future/value_initializer.rs +++ b/src/future/value_initializer.rs @@ -1,4 +1,4 @@ -use async_lock::{Mutex, RwLock, RwLockWriteGuard}; +use async_lock::{RwLock, RwLockWriteGuard}; use async_trait::async_trait; use futures_util::FutureExt; use std::{ @@ -57,10 +57,9 @@ where V: Clone, S: BuildHasher, { - is_waiter_value_set: bool, - w_key: (Arc, TypeId), + w_key: Option<(Arc, TypeId)>, w_hash: u64, - waiters: TrioArc>, + waiters: &'a WaiterMap, write_lock: RwLockWriteGuard<'a, WaiterValue>, } @@ -73,21 +72,22 @@ where fn new( w_key: (Arc, TypeId), w_hash: u64, - waiters: TrioArc>, + waiters: &'a WaiterMap, write_lock: RwLockWriteGuard<'a, WaiterValue>, ) -> Self { Self { - is_waiter_value_set: false, - w_key, + w_key: Some(w_key), w_hash, waiters, write_lock, } } - fn set_waiter_value(&mut self, v: WaiterValue) { + fn set_waiter_value(mut self, v: WaiterValue) { *self.write_lock = v; - self.is_waiter_value_set = true; + if let Some(w_key) = self.w_key.take() { + remove_waiter(self.waiters, w_key, self.w_hash); + } } } @@ -98,13 +98,12 @@ where S: BuildHasher, { fn drop(&mut self) { - if !self.is_waiter_value_set { + if let Some(w_key) = self.w_key.take() { // Value is not set. This means the future containing `*get_with` method // has been aborted. Remove our waiter to prevent the issue described in // https://github.com/moka-rs/moka/issues/59 *self.write_lock = WaiterValue::EnclosingFutureAborted; - remove_waiter(&self.waiters, self.w_key.clone(), self.w_hash); - self.is_waiter_value_set = true; + remove_waiter(self.waiters, w_key, self.w_hash); } } } @@ -151,7 +150,7 @@ where c_hash: u64, type_id: TypeId, cache: &C, - ignore_if: Arc>>, + mut ignore_if: Option, // Future to initialize a new value. init: Pin<&mut impl Future>, // Function to convert a value O, returned from the init future, into @@ -171,104 +170,88 @@ where let (w_key, w_hash) = waiter_key_hash(&self.waiters, c_key, type_id); + let waiter = TrioArc::new(RwLock::new(WaiterValue::Computing)); + // NOTE: We have to acquire a write lock before `try_insert_waiter`, + // so that any concurrent attempt will get our lock and wait on it. + let lock = waiter.write().await; + loop { - let waiter = TrioArc::new(RwLock::new(WaiterValue::Computing)); - let lock = waiter.write().await; - - match try_insert_waiter(&self.waiters, w_key.clone(), w_hash, &waiter) { - None => { - // Our waiter was inserted. - - // Create a guard. This will ensure to remove our waiter when the - // enclosing future has been aborted: - // https://github.com/moka-rs/moka/issues/59 - let mut waiter_guard = WaiterGuard::new( - w_key.clone(), - w_hash, - TrioArc::clone(&self.waiters), - lock, - ); - - // Check if the value has already been inserted by other thread. - if let Some(value) = cache - .get_without_recording(c_key, c_hash, ignore_if.lock().await.as_mut()) - .await - { - // Yes. Set the waiter value, remove our waiter, and return - // the existing value. - waiter_guard.set_waiter_value(WaiterValue::Ready(Ok(value.clone()))); - remove_waiter(&self.waiters, w_key, w_hash); - return InitResult::ReadExisting(value); - } - - // The value still does note exist. Let's resolve the init - // future. Catching panic is safe here as we do not try to - // resolve the future again. - match AssertUnwindSafe(init).catch_unwind().await { - // Resolved. - Ok(value) => { - let (waiter_val, init_res) = match post_init(value) { - Ok(value) => { - cache.insert(Arc::clone(c_key), c_hash, value.clone()).await; - ( - WaiterValue::Ready(Ok(value.clone())), - InitResult::Initialized(value), - ) - } - Err(e) => { - let err: ErrorObject = Arc::new(e); - ( - WaiterValue::Ready(Err(Arc::clone(&err))), - InitResult::InitErr(err.downcast().unwrap()), - ) - } - }; - waiter_guard.set_waiter_value(waiter_val); - remove_waiter(&self.waiters, w_key, w_hash); - return init_res; - } - // Panicked. - Err(payload) => { - waiter_guard.set_waiter_value(WaiterValue::InitFuturePanicked); - // Remove the waiter so that others can retry. - remove_waiter(&self.waiters, w_key, w_hash); - resume_unwind(payload); - } - } // The lock will be unlocked here. + let Some(existing_waiter) = + try_insert_waiter(&self.waiters, w_key.clone(), w_hash, &waiter) + else { + break; + }; + + // Somebody else's waiter already exists, so wait for its result to become available. + let waiter_result = existing_waiter.read().await; + match &*waiter_result { + WaiterValue::Ready(Ok(value)) => return ReadExisting(value.clone()), + WaiterValue::Ready(Err(e)) => return InitErr(Arc::clone(e).downcast().unwrap()), + // Somebody else's init future has been panicked. + WaiterValue::InitFuturePanicked => { + retries += 1; + panic_if_retry_exhausted_for_panicking(retries, MAX_RETRIES); + // Retry from the beginning. + continue; + } + // Somebody else (a future containing `get_with`/`try_get_with`) + // has been aborted. + WaiterValue::EnclosingFutureAborted => { + retries += 1; + panic_if_retry_exhausted_for_aborting(retries, MAX_RETRIES); + // Retry from the beginning. + continue; + } + // Unexpected state. + WaiterValue::Computing => panic!( + "Got unexpected state `Computing` after resolving `init` future. \ + This might be a bug in Moka" + ), + } + } + + // Our waiter was inserted. + + // Create a guard. This will ensure to remove our waiter when the + // enclosing future has been aborted: + // https://github.com/moka-rs/moka/issues/59 + let waiter_guard = WaiterGuard::new(w_key, w_hash, &self.waiters, lock); + + // Check if the value has already been inserted by other thread. + if let Some(value) = cache + .get_without_recording(c_key, c_hash, ignore_if.as_mut()) + .await + { + // Yes. Set the waiter value, remove our waiter, and return + // the existing value. + waiter_guard.set_waiter_value(WaiterValue::Ready(Ok(value.clone()))); + return ReadExisting(value); + } + + // The value still does note exist. Let's resolve the init + // future. Catching panic is safe here as we do not try to + // resolve the future again. + match AssertUnwindSafe(init).catch_unwind().await { + // Resolved. + Ok(value) => match post_init(value) { + Ok(value) => { + cache.insert(Arc::clone(c_key), c_hash, value.clone()).await; + waiter_guard.set_waiter_value(WaiterValue::Ready(Ok(value.clone()))); + Initialized(value) } - Some(res) => { - // Somebody else's waiter already exists. Drop our write lock and - // wait for the read lock to become available. - std::mem::drop(lock); - match &*res.read().await { - WaiterValue::Ready(Ok(value)) => return ReadExisting(value.clone()), - WaiterValue::Ready(Err(e)) => { - return InitErr(Arc::clone(e).downcast().unwrap()) - } - // Somebody else's init future has been panicked. - WaiterValue::InitFuturePanicked => { - retries += 1; - panic_if_retry_exhausted_for_panicking(retries, MAX_RETRIES); - // Retry from the beginning. - continue; - } - // Somebody else (a future containing `get_with`/`try_get_with`) - // has been aborted. - WaiterValue::EnclosingFutureAborted => { - retries += 1; - panic_if_retry_exhausted_for_aborting(retries, MAX_RETRIES); - // Retry from the beginning. - continue; - } - // Unexpected state. - WaiterValue::Computing => panic!( - "Got unexpected state `Computing` after resolving `init` future. \ - This might be a bug in Moka" - ), - } + Err(e) => { + let err: ErrorObject = Arc::new(e); + waiter_guard.set_waiter_value(WaiterValue::Ready(Err(Arc::clone(&err)))); + InitErr(err.downcast().unwrap()) } + }, + // Panicked. + Err(payload) => { + waiter_guard.set_waiter_value(WaiterValue::InitFuturePanicked); + resume_unwind(payload); } } + // The lock will be unlocked here. } /// The `post_init` function for the `get_with` method of cache. diff --git a/src/sync/value_initializer.rs b/src/sync/value_initializer.rs index 03d7329d..1bf32bcd 100644 --- a/src/sync/value_initializer.rs +++ b/src/sync/value_initializer.rs @@ -70,79 +70,80 @@ where let (w_key, w_hash) = self.waiter_key_hash(key, type_id); - loop { - let waiter = TrioArc::new(RwLock::new(None)); - let mut lock = waiter.write(); + let waiter = TrioArc::new(RwLock::new(None)); + let mut lock = waiter.write(); - match self.try_insert_waiter(w_key.clone(), w_hash, &waiter) { + loop { + let Some(existing_waiter) = self.try_insert_waiter(w_key.clone(), w_hash, &waiter) + else { + break; + }; + + // Somebody else's waiter already exists, so wait for its result to become available. + let waiter_result = existing_waiter.read(); + match &*waiter_result { + Some(Ok(value)) => return ReadExisting(value.clone()), + Some(Err(e)) => return InitErr(Arc::clone(e).downcast().unwrap()), + // None means somebody else's init closure has been panicked. None => { - // Our waiter was inserted. - // Check if the value has already been inserted by other thread. - if let Some(value) = get() { - // Yes. Set the waiter value, remove our waiter, and return - // the existing value. - *lock = Some(Ok(value.clone())); - self.remove_waiter(w_key, w_hash); - return InitResult::ReadExisting(value); + retries += 1; + if retries < MAX_RETRIES { + // Retry from the beginning. + continue; + } else { + panic!( + "Too many retries. Tried to read the return value from the `init` \ + closure but failed {} times. Maybe the `init` kept panicking?", + retries + ); } - - // The value still does note exist. Let's evaluate the init - // closure. Catching panic is safe here as we do not try to - // evaluate the closure again. - match catch_unwind(AssertUnwindSafe(init)) { - // Evaluated. - Ok(value) => { - let (waiter_val, init_res) = match post_init(value) { - Ok(value) => { - insert(value.clone()); - (Some(Ok(value.clone())), InitResult::Initialized(value)) - } - Err(e) => { - let err: ErrorObject = Arc::new(e); - ( - Some(Err(Arc::clone(&err))), - InitResult::InitErr(err.downcast().unwrap()), - ) - } - }; - *lock = waiter_val; - self.remove_waiter(w_key, w_hash); - return init_res; - } - // Panicked. - Err(payload) => { - *lock = None; - // Remove the waiter so that others can retry. - self.remove_waiter(w_key, w_hash); - resume_unwind(payload); - } - } // The write lock will be unlocked here. } - Some(res) => { - // Somebody else's waiter already exists. Drop our write lock and - // wait for the read lock to become available. - std::mem::drop(lock); - match &*res.read() { - Some(Ok(value)) => return ReadExisting(value.clone()), - Some(Err(e)) => return InitErr(Arc::clone(e).downcast().unwrap()), - // None means somebody else's init closure has been panicked. - None => { - retries += 1; - if retries < MAX_RETRIES { - // Retry from the beginning. - continue; - } else { - panic!( - "Too many retries. Tried to read the return value from the `init` \ - closure but failed {} times. Maybe the `init` kept panicking?", - retries - ); - } - } + } + } + + // Our waiter was inserted. + + // Check if the value has already been inserted by other thread. + if let Some(value) = get() { + // Yes. Set the waiter value, remove our waiter, and return + // the existing value. + *lock = Some(Ok(value.clone())); + self.remove_waiter(w_key, w_hash); + return InitResult::ReadExisting(value); + } + + // The value still does note exist. Let's evaluate the init + // closure. Catching panic is safe here as we do not try to + // evaluate the closure again. + match catch_unwind(AssertUnwindSafe(init)) { + // Evaluated. + Ok(value) => { + let (waiter_val, init_res) = match post_init(value) { + Ok(value) => { + insert(value.clone()); + (Some(Ok(value.clone())), InitResult::Initialized(value)) } - } + Err(e) => { + let err: ErrorObject = Arc::new(e); + ( + Some(Err(Arc::clone(&err))), + InitResult::InitErr(err.downcast().unwrap()), + ) + } + }; + *lock = waiter_val; + self.remove_waiter(w_key, w_hash); + init_res + } + // Panicked. + Err(payload) => { + *lock = None; + // Remove the waiter so that others can retry. + self.remove_waiter(w_key, w_hash); + resume_unwind(payload); } } + // The write lock will be unlocked here. } /// The `post_init` function for the `get_with` method of cache.