diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index d7af5467..4bdebd99 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -238,7 +238,7 @@ impl AsyncRedisStorage { pub(crate) async fn update_counters( &self, counters_and_deltas: HashMap, - ) -> Result, StorageErr> { + ) -> Result, StorageErr> { let mut con = self.conn_manager.clone(); let span = trace_span!("datastore"); @@ -256,27 +256,12 @@ impl AsyncRedisStorage { } } - let script_res: Vec> = script_invocation + let script_res: Vec> = script_invocation .invoke_async::<_, _>(&mut con) .instrument(span) .await?; - let counter_value_map: HashMap = script_res - .iter() - .filter_map(|counter_value| match counter_value { - Some((raw_counter_key, val)) => { - let counter = partial_counter_from_counter_key(raw_counter_key); - let seconds = counter.seconds(); - Some(( - counter, - AtomicExpiringValue::new(*val, now + Duration::from_secs(seconds)), - )) - } - None => None, - }) - .collect(); - - Ok(counter_value_map) + Ok(script_res.into_iter().flatten().collect()) } } diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 38e63223..c85622ee 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -39,7 +39,7 @@ use tracing::{error, warn}; // multiple times when it is not cached. pub struct CachedRedisStorage { - cached_counters: CountersCache, + cached_counters: Arc, batcher_counter_updates: Arc>>, async_redis_storage: AsyncRedisStorage, redis_conn_manager: ConnectionManager, @@ -226,6 +226,15 @@ impl CachedRedisStorage { ) .await?; + let cached_counters = CountersCacheBuilder::new() + .max_cached_counters(max_cached_counters) + .max_ttl_cached_counter(ttl_cached_counters) + .ttl_ratio_cached_counter(ttl_ratio_cached_counters) + .build(); + + let cacher = Arc::new(cached_counters); + let cacher_clone = cacher.clone(); + let partitioned = Arc::new(AtomicBool::new(false)); let async_redis_storage = AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone()); @@ -249,31 +258,30 @@ impl CachedRedisStorage { std::mem::take(&mut *batch) }; - let _updated_counters = storage + let updated_counters = storage .update_counters(counters) .await .or_else(|err| { if err.is_transient() { p.store(true, Ordering::Release); - Ok(HashMap::default()) + Ok(Vec::new()) } else { Err(err) } }) .expect("Unrecoverable Redis error!"); + + for (counter_key, value) in updated_counters { + let counter = partial_counter_from_counter_key(&counter_key); + cacher_clone.increase_by(&counter, value); + } } interval.tick().await; } }); - let cached_counters = CountersCacheBuilder::new() - .max_cached_counters(max_cached_counters) - .max_ttl_cached_counter(ttl_cached_counters) - .ttl_ratio_cached_counter(ttl_ratio_cached_counters) - .build(); - Ok(Self { - cached_counters, + cached_counters: cacher, batcher_counter_updates: batcher, redis_conn_manager, async_redis_storage,