diff --git a/src/future/base_cache.rs b/src/future/base_cache.rs index 19a16630..94c83004 100644 --- a/src/future/base_cache.rs +++ b/src/future/base_cache.rs @@ -346,7 +346,7 @@ where #[inline] pub(crate) async fn apply_reads_writes_if_needed( - inner: &impl InnerSync, + inner: Arc, ch: &Sender>, now: Instant, housekeeper: Option<&HouseKeeperArc>, @@ -428,7 +428,8 @@ where op: ReadOp, now: Instant, ) -> Result<(), TrySendError>> { - self.apply_reads_if_needed(&self.inner, now).await; + self.apply_reads_if_needed(Arc::clone(&self.inner), now) + .await; let ch = &self.read_op_ch; match ch.try_send(op) { // Discard the ReadOp when the channel is full. @@ -589,7 +590,7 @@ where } #[inline] - async fn apply_reads_if_needed(&self, inner: &Inner, now: Instant) { + async fn apply_reads_if_needed(&self, inner: Arc>, now: Instant) { let len = self.read_op_ch.len(); if let Some(hk) = &self.housekeeper { @@ -1236,7 +1237,7 @@ where S: BuildHasher + Clone + Send + Sync + 'static, { async fn run_pending_tasks(&self, max_repeats: usize) { - self.run_pending_tasks(max_repeats).await; + self.do_run_pending_tasks(max_repeats).await; } fn now(&self) -> Instant { @@ -1250,7 +1251,7 @@ where V: Clone + Send + Sync + 'static, S: BuildHasher + Clone + Send + Sync + 'static, { - async fn run_pending_tasks(&self, max_repeats: usize) { + async fn do_run_pending_tasks(&self, max_repeats: usize) { if self.max_capacity == Some(0) { return; } @@ -2605,19 +2606,19 @@ mod tests { ($cache:ident, $key:ident, $hash:ident, $mock:ident, $duration_secs:expr) => { // Increment the time. $mock.increment(Duration::from_millis($duration_secs * 1000 - 1)); - $cache.inner.run_pending_tasks(1).await; + $cache.inner.do_run_pending_tasks(1).await; assert!($cache.contains_key_with_hash(&$key, $hash)); assert_eq!($cache.entry_count(), 1); // Increment the time by 1ms (3). The entry should be expired. $mock.increment(Duration::from_millis(1)); - $cache.inner.run_pending_tasks(1).await; + $cache.inner.do_run_pending_tasks(1).await; assert!(!$cache.contains_key_with_hash(&$key, $hash)); // Increment the time again to ensure the entry has been evicted from the // cache. $mock.increment(Duration::from_secs(1)); - $cache.inner.run_pending_tasks(1).await; + $cache.inner.do_run_pending_tasks(1).await; assert_eq!($cache.entry_count(), 0); }; } @@ -2899,7 +2900,7 @@ mod tests { insert(&cache, key, hash, value).await; // Run a sync to register the entry to the internal data structures including // the timer wheel. - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert_eq!(cache.entry_count(), 1); assert_expiry!(cache, key, hash, mock, 1); @@ -2921,12 +2922,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None); let inserted_at = current_time(&cache); insert(&cache, key, hash, value).await; - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(1)); - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert!(cache.contains_key_with_hash(&key, hash)); // Read the entry (2). @@ -2946,7 +2947,7 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert_expiry!(cache, key, hash, mock, 3); @@ -2968,12 +2969,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None); let inserted_at = current_time(&cache); insert(&cache, key, hash, value).await; - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(1)); - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert!(cache.contains_key_with_hash(&key, hash)); // Read the entry (2). @@ -2993,11 +2994,11 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; // Increment the time. mock.increment(Duration::from_secs(2)); - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3012,7 +3013,7 @@ mod tests { Some(3), ); insert(&cache, key, hash, value).await; - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert_eq!(cache.entry_count(), 1); assert_expiry!(cache, key, hash, mock, 3); @@ -3035,12 +3036,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None); let inserted_at = current_time(&cache); insert(&cache, key, hash, value).await; - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(1)); - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3061,11 +3062,11 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; // Increment the time. mock.increment(Duration::from_secs(2)); - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3080,7 +3081,7 @@ mod tests { None, ); insert(&cache, key, hash, value).await; - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert_eq!(cache.entry_count(), 1); assert_expiry!(cache, key, hash, mock, 7); @@ -3102,12 +3103,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(8)); let inserted_at = current_time(&cache); insert(&cache, key, hash, value).await; - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(5)); - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3128,7 +3129,7 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert_expiry!(cache, key, hash, mock, 7); @@ -3150,12 +3151,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(8)); let inserted_at = current_time(&cache); insert(&cache, key, hash, value).await; - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(5)); - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3176,11 +3177,11 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; // Increment the time. mock.increment(Duration::from_secs(6)); - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3201,7 +3202,7 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert_expiry!(cache, key, hash, mock, 5); @@ -3222,12 +3223,12 @@ mod tests { *expectation.lock().unwrap() = ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(9)); insert(&cache, key, hash, value).await; - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(6)); - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3243,12 +3244,12 @@ mod tests { ); let updated_at = current_time(&cache); insert(&cache, key, hash, value).await; - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(6)); - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3269,11 +3270,11 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; // Increment the time. mock.increment(Duration::from_secs(6)); - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3294,7 +3295,7 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(1).await; + cache.inner.do_run_pending_tasks(1).await; assert_expiry!(cache, key, hash, mock, 4); } diff --git a/src/future/cache.rs b/src/future/cache.rs index c2eda527..afc1b7b4 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -1394,15 +1394,9 @@ where let op = WriteOp::Remove(kv); let now = self.base.current_time_from_expiration_clock(); let hk = self.base.housekeeper.as_ref(); - Self::schedule_write_op( - self.base.inner.as_ref(), - &self.base.write_op_ch, - op, - now, - hk, - ) - .await - .expect("Failed to remove"); + Self::schedule_write_op(&self.base.inner, &self.base.write_op_ch, op, now, hk) + .await + .expect("Failed to remove"); crossbeam_epoch::pin().flush(); maybe_v } @@ -1844,20 +1838,14 @@ where let (op, now) = self.base.do_insert_with_hash(key, hash, value).await; let hk = self.base.housekeeper.as_ref(); - Self::schedule_write_op( - self.base.inner.as_ref(), - &self.base.write_op_ch, - op, - now, - hk, - ) - .await - .expect("Failed to insert"); + Self::schedule_write_op(&self.base.inner, &self.base.write_op_ch, op, now, hk) + .await + .expect("Failed to insert"); } #[inline] async fn schedule_write_op( - inner: &impl InnerSync, + inner: &Arc, ch: &Sender>, op: WriteOp, now: Instant, @@ -1866,7 +1854,13 @@ where let mut op = op; let mut spin_count = 0u8; loop { - BaseCache::::apply_reads_writes_if_needed(inner, ch, now, housekeeper).await; + BaseCache::::apply_reads_writes_if_needed( + Arc::clone(inner), + ch, + now, + housekeeper, + ) + .await; match ch.try_send(op) { Ok(()) => return Ok(()), Err(TrySendError::Full(op1)) => { diff --git a/src/future/housekeeper.rs b/src/future/housekeeper.rs index 7fc2de7f..610414b0 100644 --- a/src/future/housekeeper.rs +++ b/src/future/housekeeper.rs @@ -9,12 +9,11 @@ use crate::common::{ time::{CheckedTimeOps, Instant}, }; -use std::{ - sync::atomic::{AtomicBool, Ordering}, - time::Duration, -}; +use std::{sync::Arc, time::Duration}; +use async_lock::Mutex; use async_trait::async_trait; +use futures_util::future::{BoxFuture, Shared}; #[async_trait] pub(crate) trait InnerSync { @@ -23,15 +22,16 @@ pub(crate) trait InnerSync { } pub(crate) struct Housekeeper { - is_sync_running: AtomicBool, - sync_after: AtomicInstant, + /// A shared `Future` of the maintenance task that is currently being resolved. + current_task: Mutex>>>, + run_after: AtomicInstant, } impl Default for Housekeeper { fn default() -> Self { Self { - is_sync_running: Default::default(), - sync_after: AtomicInstant::new(Self::sync_after(Instant::now())), + current_task: Default::default(), + run_after: AtomicInstant::new(Self::sync_after(Instant::now())), } } } @@ -47,27 +47,44 @@ impl Housekeeper { #[inline] fn should_apply(&self, ch_len: usize, ch_flush_point: usize, now: Instant) -> bool { - ch_len >= ch_flush_point || self.sync_after.instant().unwrap() >= now + ch_len >= ch_flush_point || self.run_after.instant().unwrap() >= now } - pub(crate) async fn try_sync(&self, cache: &T) -> bool { - // Try to flip the value of sync_scheduled from false to true. - match self.is_sync_running.compare_exchange( - false, - true, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - let now = cache.now(); - self.sync_after.set_instant(Self::sync_after(now)); + // TODO: Change the name to something that shows some kind of relationship with + // `run_pending_tasks`. + pub(crate) async fn try_sync(&self, cache: Arc) -> bool + where + T: InnerSync + Send + Sync + 'static, + { + use futures_util::FutureExt; - cache.run_pending_tasks(MAX_SYNC_REPEATS).await; + // TODO: This will skip to run pending tasks if lock cannot be acquired. + // Change this so that when `try_sync` is explicitly called, it will be + // blocked here until the lock is acquired. + if let Some(mut lock) = self.current_task.try_lock() { + let now = cache.now(); - self.is_sync_running.store(false, Ordering::Release); - true + if let Some(task) = &*lock { + // This task was being resolved, but did not complete. This means + // that the enclosing Future was canceled. Try to resolve it. + task.clone().await; + } else { + // Create a new maintenance task and try to resolve it. + let task = async move { cache.run_pending_tasks(MAX_SYNC_REPEATS).await } + .boxed() + .shared(); + *lock = Some(task.clone()); + task.await; } - Err(_) => false, + + // If we are here, it means that the maintenance task has been completed, + // so we can remove it from the lock. + *lock = None; + self.run_after.set_instant(Self::sync_after(now)); + + true + } else { + false } }