diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 1b5f29b..77e731e 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -31,6 +31,7 @@ jobs: type=ref,event=branch type=semver,pattern={{version}} type=semver,pattern={{major}}.{{minor}} + type=sha - name: Set up Docker Buildx id: buildx diff --git a/capture/src/limiters/overflow.rs b/capture/src/limiters/overflow.rs index 76a012e..0e91a99 100644 --- a/capture/src/limiters/overflow.rs +++ b/capture/src/limiters/overflow.rs @@ -13,6 +13,7 @@ use std::sync::Arc; use governor::{clock, state::keyed::DefaultKeyedStateStore, Quota, RateLimiter}; use metrics::gauge; +use rand::Rng; // See: https://docs.rs/governor/latest/governor/_guide/index.html#usage-in-multiple-threads #[derive(Clone)] @@ -50,6 +51,23 @@ impl OverflowLimiter { gauge!("partition_limits_key_count", self.limiter.len() as f64); } } + + /// Clean up the rate limiter state, once per minute. Ensure we don't use more memory than + /// necessary. + pub async fn clean_state(&self) { + // Give a small amount of randomness to the interval to ensure we don't have all replicas + // locking at the same time. The lock isn't going to be held for long, but this will reduce + // impact regardless. + let interval_secs = rand::thread_rng().gen_range(60..70); + + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(interval_secs)); + loop { + interval.tick().await; + + self.limiter.retain_recent(); + self.limiter.shrink_to_fit(); + } + } } #[cfg(test)] diff --git a/capture/src/server.rs b/capture/src/server.rs index 2544ba2..22a1f3b 100644 --- a/capture/src/server.rs +++ b/capture/src/server.rs @@ -10,7 +10,8 @@ use crate::limiters::billing::BillingLimiter; use crate::limiters::overflow::OverflowLimiter; use crate::redis::RedisClient; use crate::router; -use crate::sinks::{kafka, print}; +use crate::sinks::kafka::KafkaSink; +use crate::sinks::print::PrintSink; pub async fn serve(config: Config, listener: TcpListener, shutdown: F) where @@ -35,7 +36,7 @@ where router::router( crate::time::SystemTime {}, liveness, - print::PrintSink {}, + PrintSink {}, redis_client, billing, config.export_prometheus, @@ -56,7 +57,14 @@ where partition.report_metrics().await; }); } - let sink = kafka::KafkaSink::new(config.kafka, sink_liveness, partition) + { + // Ensure that the rate limiter state does not grow unbounded + let partition = partition.clone(); + tokio::spawn(async move { + partition.clean_state().await; + }); + } + let sink = KafkaSink::new(config.kafka, sink_liveness, partition) .expect("failed to start Kafka sink"); router::router(