Skip to content

Commit

Permalink
[wip] Updating cached counters
Browse files Browse the repository at this point in the history
  • Loading branch information
didierofrivia committed Apr 11, 2024
1 parent 0222db7 commit 7d64901
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 28 deletions.
21 changes: 3 additions & 18 deletions limitador/src/storage/redis/redis_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ impl AsyncRedisStorage {
pub(crate) async fn update_counters(
&self,
counters_and_deltas: HashMap<Counter, AtomicExpiringValue>,
) -> Result<HashMap<Counter, AtomicExpiringValue>, StorageErr> {
) -> Result<Vec<(String, i64)>, StorageErr> {
let mut con = self.conn_manager.clone();
let span = trace_span!("datastore");

Expand All @@ -256,27 +256,12 @@ impl AsyncRedisStorage {
}
}

let script_res: Vec<Option<(String, i64)>> = script_invocation
let script_res: Vec<Vec<(String, i64)>> = script_invocation
.invoke_async::<_, _>(&mut con)
.instrument(span)
.await?;

let counter_value_map: HashMap<Counter, AtomicExpiringValue> = script_res
.iter()
.filter_map(|counter_value| match counter_value {
Some((raw_counter_key, val)) => {
let counter = partial_counter_from_counter_key(raw_counter_key);
let seconds = counter.seconds();
Some((
counter,
AtomicExpiringValue::new(*val, now + Duration::from_secs(seconds)),
))
}
None => None,
})
.collect();

Ok(counter_value_map)
Ok(script_res.into_iter().flatten().collect())
}
}

Expand Down
28 changes: 18 additions & 10 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use tracing::{error, warn};
// multiple times when it is not cached.

pub struct CachedRedisStorage {
cached_counters: CountersCache,
cached_counters: Arc<CountersCache>,
batcher_counter_updates: Arc<Mutex<HashMap<Counter, AtomicExpiringValue>>>,
async_redis_storage: AsyncRedisStorage,
redis_conn_manager: ConnectionManager,
Expand Down Expand Up @@ -226,6 +226,15 @@ impl CachedRedisStorage {
)
.await?;

let cached_counters = CountersCacheBuilder::new()
.max_cached_counters(max_cached_counters)
.max_ttl_cached_counter(ttl_cached_counters)
.ttl_ratio_cached_counter(ttl_ratio_cached_counters)
.build();

let cacher = Arc::new(cached_counters);
let cacher_clone = cacher.clone();

let partitioned = Arc::new(AtomicBool::new(false));
let async_redis_storage =
AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone());
Expand All @@ -249,31 +258,30 @@ impl CachedRedisStorage {
std::mem::take(&mut *batch)
};

let _updated_counters = storage
let updated_counters = storage
.update_counters(counters)
.await
.or_else(|err| {
if err.is_transient() {
p.store(true, Ordering::Release);
Ok(HashMap::default())
Ok(Vec::new())
} else {
Err(err)
}
})
.expect("Unrecoverable Redis error!");

for (counter_key, value) in updated_counters {
let counter = partial_counter_from_counter_key(&counter_key);
cacher_clone.increase_by(&counter, value);
}
}
interval.tick().await;
}
});

let cached_counters = CountersCacheBuilder::new()
.max_cached_counters(max_cached_counters)
.max_ttl_cached_counter(ttl_cached_counters)
.ttl_ratio_cached_counter(ttl_ratio_cached_counters)
.build();

Ok(Self {
cached_counters,
cached_counters: cacher,
batcher_counter_updates: batcher,
redis_conn_manager,
async_redis_storage,
Expand Down

0 comments on commit 7d64901

Please sign in to comment.