Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
feat(overflow): add overflow_forced_keys envvar (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello authored Nov 27, 2023
1 parent 7228307 commit 7056e80
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 11 deletions.
1 change: 1 addition & 0 deletions capture-server/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
redis_url: "redis://localhost:6379/".to_string(),
burst_limit: NonZeroU32::new(5).unwrap(),
per_second_limit: NonZeroU32::new(10).unwrap(),
overflow_forced_keys: None,
kafka: KafkaConfig {
kafka_producer_linger_ms: 0, // Send messages as soon as possible
kafka_producer_queue_mib: 10,
Expand Down
2 changes: 2 additions & 0 deletions capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub struct Config {
#[envconfig(default = "1000")]
pub burst_limit: NonZeroU32,

pub overflow_forced_keys: Option<String>, // Coma-delimited keys

#[envconfig(nested = true)]
pub kafka: KafkaConfig,

Expand Down
55 changes: 47 additions & 8 deletions capture/src/partition_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,37 @@
/// before it causes a negative impact. In this case, instead of passing the error to the customer
/// with a 429, we relax our ordering constraints and temporarily override the key, meaning the
/// customers data will be spread across all partitions.
use std::{num::NonZeroU32, sync::Arc};
use std::collections::HashSet;
use std::num::NonZeroU32;
use std::sync::Arc;

use governor::{clock, state::keyed::DefaultKeyedStateStore, Quota, RateLimiter};

// See: https://docs.rs/governor/latest/governor/_guide/index.html#usage-in-multiple-threads
#[derive(Clone)]
pub struct PartitionLimiter {
limiter: Arc<RateLimiter<String, DefaultKeyedStateStore<String>, clock::DefaultClock>>,
forced_keys: HashSet<String>,
}

impl PartitionLimiter {
pub fn new(per_second: NonZeroU32, burst: NonZeroU32) -> Self {
pub fn new(per_second: NonZeroU32, burst: NonZeroU32, forced_keys: Option<String>) -> Self {
let quota = Quota::per_second(per_second).allow_burst(burst);
let limiter = Arc::new(governor::RateLimiter::dashmap(quota));

PartitionLimiter { limiter }
let forced_keys: HashSet<String> = match forced_keys {
None => HashSet::new(),
Some(values) => values.split(',').map(String::from).collect(),
};

PartitionLimiter {
limiter,
forced_keys,
}
}

pub fn is_limited(&self, key: &String) -> bool {
self.limiter.check_key(key).is_err()
self.forced_keys.contains(key) || self.limiter.check_key(key).is_err()
}
}

Expand All @@ -36,8 +47,11 @@ mod tests {

#[tokio::test]
async fn low_limits() {
let limiter =
PartitionLimiter::new(NonZeroU32::new(1).unwrap(), NonZeroU32::new(1).unwrap());
let limiter = PartitionLimiter::new(
NonZeroU32::new(1).unwrap(),
NonZeroU32::new(1).unwrap(),
None,
);
let token = String::from("test");

assert!(!limiter.is_limited(&token));
Expand All @@ -46,13 +60,38 @@ mod tests {

#[tokio::test]
async fn bursting() {
let limiter =
PartitionLimiter::new(NonZeroU32::new(1).unwrap(), NonZeroU32::new(3).unwrap());
let limiter = PartitionLimiter::new(
NonZeroU32::new(1).unwrap(),
NonZeroU32::new(3).unwrap(),
None,
);
let token = String::from("test");

assert!(!limiter.is_limited(&token));
assert!(!limiter.is_limited(&token));
assert!(!limiter.is_limited(&token));
assert!(limiter.is_limited(&token));
}

#[tokio::test]
async fn forced_key() {
let key_one = String::from("one");
let key_two = String::from("two");
let key_three = String::from("three");
let forced_keys = Some(String::from("one,three"));

let limiter = PartitionLimiter::new(
NonZeroU32::new(1).unwrap(),
NonZeroU32::new(1).unwrap(),
forced_keys,
);

// One and three are limited from the start, two is not
assert!(limiter.is_limited(&key_one));
assert!(!limiter.is_limited(&key_two));
assert!(limiter.is_limited(&key_three));

// Two is limited on the second event
assert!(limiter.is_limited(&key_two));
}
}
6 changes: 5 additions & 1 deletion capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ where
.register("rdkafka".to_string(), Duration::seconds(30))
.await;

let partition = PartitionLimiter::new(config.per_second_limit, config.burst_limit);
let partition = PartitionLimiter::new(
config.per_second_limit,
config.burst_limit,
config.overflow_forced_keys,
);
let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition)
.expect("failed to start Kafka sink");

Expand Down
7 changes: 5 additions & 2 deletions capture/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,11 @@ mod tests {
let handle = registry
.register("one".to_string(), Duration::seconds(30))
.await;
let limiter =
PartitionLimiter::new(NonZeroU32::new(10).unwrap(), NonZeroU32::new(10).unwrap());
let limiter = PartitionLimiter::new(
NonZeroU32::new(10).unwrap(),
NonZeroU32::new(10).unwrap(),
None,
);
let cluster = MockCluster::new(1).expect("failed to create mock brokers");
let config = config::KafkaConfig {
kafka_producer_linger_ms: 0,
Expand Down

0 comments on commit 7056e80

Please sign in to comment.