Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'origin/main' into xvello/submods
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Dec 13, 2023
2 parents 3024499 + 454b96d commit 3f4682d
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 3 deletions.
1 change: 1 addition & 0 deletions .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ jobs:
type=ref,event=branch
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
type=sha
- name: Set up Docker Buildx
id: buildx
Expand Down
18 changes: 18 additions & 0 deletions capture/src/limiters/overflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::sync::Arc;

use governor::{clock, state::keyed::DefaultKeyedStateStore, Quota, RateLimiter};
use metrics::gauge;
use rand::Rng;

// See: https://docs.rs/governor/latest/governor/_guide/index.html#usage-in-multiple-threads
#[derive(Clone)]
Expand Down Expand Up @@ -50,6 +51,23 @@ impl OverflowLimiter {
gauge!("partition_limits_key_count", self.limiter.len() as f64);
}
}

/// Clean up the rate limiter state, once per minute. Ensure we don't use more memory than
/// necessary.
pub async fn clean_state(&self) {
// Give a small amount of randomness to the interval to ensure we don't have all replicas
// locking at the same time. The lock isn't going to be held for long, but this will reduce
// impact regardless.
let interval_secs = rand::thread_rng().gen_range(60..70);

let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(interval_secs));
loop {
interval.tick().await;

self.limiter.retain_recent();
self.limiter.shrink_to_fit();
}
}
}

#[cfg(test)]
Expand Down
14 changes: 11 additions & 3 deletions capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use crate::limiters::billing::BillingLimiter;
use crate::limiters::overflow::OverflowLimiter;
use crate::redis::RedisClient;
use crate::router;
use crate::sinks::{kafka, print};
use crate::sinks::kafka::KafkaSink;
use crate::sinks::print::PrintSink;

pub async fn serve<F>(config: Config, listener: TcpListener, shutdown: F)
where
Expand All @@ -35,7 +36,7 @@ where
router::router(
crate::time::SystemTime {},
liveness,
print::PrintSink {},
PrintSink {},
redis_client,
billing,
config.export_prometheus,
Expand All @@ -56,7 +57,14 @@ where
partition.report_metrics().await;
});
}
let sink = kafka::KafkaSink::new(config.kafka, sink_liveness, partition)
{
// Ensure that the rate limiter state does not grow unbounded
let partition = partition.clone();
tokio::spawn(async move {
partition.clean_state().await;
});
}
let sink = KafkaSink::new(config.kafka, sink_liveness, partition)
.expect("failed to start Kafka sink");

router::router(
Expand Down

0 comments on commit 3f4682d

Please sign in to comment.