From 3c7479a51a59cb5a512c84a0a76b9447965f28e6 Mon Sep 17 00:00:00 2001 From: Ellie Huxtable Date: Tue, 12 Dec 2023 16:16:27 +0000 Subject: [PATCH 1/2] chore(ci): add short sha to docker ci image (#71) --- .github/workflows/docker.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 1b5f29b..77e731e 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -31,6 +31,7 @@ jobs: type=ref,event=branch type=semver,pattern={{version}} type=semver,pattern={{major}}.{{minor}} + type=sha - name: Set up Docker Buildx id: buildx From 454b96d75c144b4dd428fc7862565d5a447ec4d9 Mon Sep 17 00:00:00 2001 From: Ellie Huxtable Date: Wed, 13 Dec 2023 13:34:45 +0000 Subject: [PATCH 2/2] feat: clean up partition limiter state on schedule (#72) * feat: clean up partition limiter state on schedule * add randomness --- capture/src/partition_limits.rs | 18 ++++++++++++++++++ capture/src/server.rs | 11 +++++++++++ 2 files changed, 29 insertions(+) diff --git a/capture/src/partition_limits.rs b/capture/src/partition_limits.rs index cd0148f..7059b45 100644 --- a/capture/src/partition_limits.rs +++ b/capture/src/partition_limits.rs @@ -12,6 +12,7 @@ use std::sync::Arc; use governor::{clock, state::keyed::DefaultKeyedStateStore, Quota, RateLimiter}; use metrics::gauge; +use rand::Rng; // See: https://docs.rs/governor/latest/governor/_guide/index.html#usage-in-multiple-threads #[derive(Clone)] @@ -49,6 +50,23 @@ impl PartitionLimiter { gauge!("partition_limits_key_count", self.limiter.len() as f64); } } + + /// Clean up the rate limiter state, once per minute. Ensure we don't use more memory than + /// necessary. + pub async fn clean_state(&self) { + // Give a small amount of randomness to the interval to ensure we don't have all replicas + // locking at the same time. The lock isn't going to be held for long, but this will reduce + // impact regardless. + let interval_secs = rand::thread_rng().gen_range(60..70); + + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(interval_secs)); + loop { + interval.tick().await; + + self.limiter.retain_recent(); + self.limiter.shrink_to_fit(); + } + } } #[cfg(test)] diff --git a/capture/src/server.rs b/capture/src/server.rs index 3eca676..9b8d60c 100644 --- a/capture/src/server.rs +++ b/capture/src/server.rs @@ -55,6 +55,17 @@ where partition.report_metrics().await; }); } + + { + // Ensure that the rate limiter state does not grow unbounded + + let partition = partition.clone(); + + tokio::spawn(async move { + partition.clean_state().await; + }); + } + let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition) .expect("failed to start Kafka sink");