Skip to content

Commit

Permalink
Merge pull request #315 from Kuadrant/cached_redis_tests
Browse files Browse the repository at this point in the history
Batcher tests
  • Loading branch information
alexsnaps authored May 7, 2024
2 parents 0caa044 + 3191caa commit 6e95dcc
Showing 1 changed file with 130 additions and 0 deletions.
130 changes: 130 additions & 0 deletions limitador/src/storage/redis/counters_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,136 @@ mod tests {
}
}

mod batcher {
use crate::storage::redis::counters_cache::tests::test_counter;
use crate::storage::redis::counters_cache::{Batcher, CachedCounterValue};
use std::sync::Arc;
use std::time::{Duration, SystemTime};

#[tokio::test]
async fn consume_waits_when_empty() {
let duration = Duration::from_millis(100);
let batcher = Batcher::new(duration);
let start = SystemTime::now();
batcher
.consume(2, |items| {
assert!(items.is_empty());
assert!(SystemTime::now().duration_since(start).unwrap() >= duration);
async {}
})
.await;
}

#[tokio::test]
async fn consume_waits_when_batch_not_filled() {
let duration = Duration::from_millis(100);
let batcher = Arc::new(Batcher::new(duration));
let start = SystemTime::now();
{
let batcher = Arc::clone(&batcher);
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(40)).await;
let counter = test_counter(6, None);
let arc = Arc::new(CachedCounterValue::from_authority(
&counter,
0,
Duration::from_secs(1),
));
batcher.add(counter, arc);
});
}
batcher
.consume(2, |items| {
assert_eq!(items.len(), 1);
assert!(
SystemTime::now().duration_since(start).unwrap()
>= Duration::from_millis(100)
);
async {}
})
.await;
}

#[tokio::test]
async fn consume_waits_until_batch_is_filled() {
let duration = Duration::from_millis(100);
let batcher = Arc::new(Batcher::new(duration));
let start = SystemTime::now();
{
let batcher = Arc::clone(&batcher);
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(40)).await;
let counter = test_counter(6, None);
let arc = Arc::new(CachedCounterValue::from_authority(
&counter,
0,
Duration::from_secs(1),
));
batcher.add(counter, arc);
});
}
batcher
.consume(1, |items| {
assert_eq!(items.len(), 1);
let wait_period = SystemTime::now().duration_since(start).unwrap();
assert!(wait_period >= Duration::from_millis(40));
assert!(wait_period < Duration::from_millis(50));
async {}
})
.await;
}

#[tokio::test]
async fn consume_immediately_when_batch_is_filled() {
let duration = Duration::from_millis(100);
let batcher = Arc::new(Batcher::new(duration));
let start = SystemTime::now();
{
let counter = test_counter(6, None);
let arc = Arc::new(CachedCounterValue::from_authority(
&counter,
0,
Duration::from_secs(1),
));
batcher.add(counter, arc);
}
batcher
.consume(1, |items| {
assert_eq!(items.len(), 1);
assert!(
SystemTime::now().duration_since(start).unwrap() < Duration::from_millis(5)
);
async {}
})
.await;
}

#[tokio::test]
async fn consume_triggers_on_fast_flush() {
let duration = Duration::from_millis(100);
let batcher = Arc::new(Batcher::new(duration));
let start = SystemTime::now();
{
let batcher = Arc::clone(&batcher);
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(40)).await;
let counter = test_counter(6, None);
let arc = Arc::new(CachedCounterValue::load_from_authority_asap(&counter, 0));
batcher.add(counter, arc);
});
}
batcher
.consume(2, |items| {
assert_eq!(items.len(), 1);
let wait_period = SystemTime::now().duration_since(start).unwrap();
assert!(wait_period >= Duration::from_millis(40));
assert!(wait_period < Duration::from_millis(50));
async {}
})
.await;
}
}

#[test]
fn get_existing_counter() {
let counter = test_counter(10, None);
Expand Down

0 comments on commit 6e95dcc

Please sign in to comment.