From 3024499f4caa93e085b09b89f3cfeb78f5312298 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Tue, 12 Dec 2023 16:23:06 +0100 Subject: [PATCH] revise names and comments --- capture-server/tests/common.rs | 4 +-- capture-server/tests/events.rs | 12 ++++---- capture/src/capture.rs | 4 +-- capture/src/config.rs | 4 +-- .../{billing_limits.rs => billing.rs} | 2 +- capture/src/limiters/mod.rs | 4 +-- .../{partition_limits.rs => overflow.rs} | 29 ++++++++++--------- capture/src/router.rs | 8 ++--- capture/src/server.rs | 10 +++---- capture/src/sinks/kafka.rs | 16 +++++----- capture/src/sinks/mod.rs | 2 +- capture/src/sinks/print.rs | 4 +-- capture/tests/django_compat.rs | 6 ++-- 13 files changed, 52 insertions(+), 53 deletions(-) rename capture/src/limiters/{billing_limits.rs => billing.rs} (98%) rename capture/src/limiters/{partition_limits.rs => overflow.rs} (76%) diff --git a/capture-server/tests/common.rs b/capture-server/tests/common.rs index ce31897..214ecc8 100644 --- a/capture-server/tests/common.rs +++ b/capture-server/tests/common.rs @@ -28,8 +28,8 @@ pub static DEFAULT_CONFIG: Lazy = Lazy::new(|| Config { print_sink: false, address: SocketAddr::from_str("127.0.0.1:0").unwrap(), redis_url: "redis://localhost:6379/".to_string(), - burst_limit: NonZeroU32::new(5).unwrap(), - per_second_limit: NonZeroU32::new(10).unwrap(), + overflow_burst_limit: NonZeroU32::new(5).unwrap(), + overflow_per_second_limit: NonZeroU32::new(10).unwrap(), overflow_forced_keys: None, kafka: KafkaConfig { kafka_producer_linger_ms: 0, // Send messages as soon as possible diff --git a/capture-server/tests/events.rs b/capture-server/tests/events.rs index b38ac5a..56fcdf7 100644 --- a/capture-server/tests/events.rs +++ b/capture-server/tests/events.rs @@ -77,7 +77,7 @@ async fn it_captures_a_batch() -> Result<()> { } #[tokio::test] -async fn it_is_limited_with_burst() -> Result<()> { +async fn it_overflows_events_on_burst() -> Result<()> { setup_tracing(); let token = random_string("token", 16); @@ -87,8 +87,8 @@ async fn it_is_limited_with_burst() -> Result<()> { let mut config = DEFAULT_CONFIG.clone(); config.kafka.kafka_topic = topic.topic_name().to_string(); - config.burst_limit = NonZeroU32::new(2).unwrap(); - config.per_second_limit = NonZeroU32::new(1).unwrap(); + config.overflow_burst_limit = NonZeroU32::new(2).unwrap(); + config.overflow_per_second_limit = NonZeroU32::new(1).unwrap(); let server = ServerHandle::for_config(config); @@ -125,7 +125,7 @@ async fn it_is_limited_with_burst() -> Result<()> { } #[tokio::test] -async fn it_does_not_partition_limit_different_ids() -> Result<()> { +async fn it_does_not_overflow_team_with_different_ids() -> Result<()> { setup_tracing(); let token = random_string("token", 16); @@ -136,8 +136,8 @@ async fn it_does_not_partition_limit_different_ids() -> Result<()> { let mut config = DEFAULT_CONFIG.clone(); config.kafka.kafka_topic = topic.topic_name().to_string(); - config.burst_limit = NonZeroU32::new(1).unwrap(); - config.per_second_limit = NonZeroU32::new(1).unwrap(); + config.overflow_burst_limit = NonZeroU32::new(1).unwrap(); + config.overflow_per_second_limit = NonZeroU32::new(1).unwrap(); let server = ServerHandle::for_config(config); diff --git a/capture/src/capture.rs b/capture/src/capture.rs index ecfec8e..6a90378 100644 --- a/capture/src/capture.rs +++ b/capture/src/capture.rs @@ -16,7 +16,7 @@ use time::OffsetDateTime; use tracing::instrument; use crate::event::{Compression, ProcessingContext}; -use crate::limiters::billing_limits::QuotaResource; +use crate::limiters::billing::QuotaResource; use crate::prometheus::report_dropped_events; use crate::token::validate_token; use crate::{ @@ -209,7 +209,7 @@ pub fn extract_and_verify_token(events: &[RawEvent]) -> Result( - sink: Arc, + sink: Arc, events: &'a [RawEvent], context: &'a ProcessingContext, ) -> Result<(), CaptureError> { diff --git a/capture/src/config.rs b/capture/src/config.rs index 69a085d..0c6ab1c 100644 --- a/capture/src/config.rs +++ b/capture/src/config.rs @@ -14,10 +14,10 @@ pub struct Config { pub otel_url: Option, #[envconfig(default = "100")] - pub per_second_limit: NonZeroU32, + pub overflow_per_second_limit: NonZeroU32, #[envconfig(default = "1000")] - pub burst_limit: NonZeroU32, + pub overflow_burst_limit: NonZeroU32, pub overflow_forced_keys: Option, // Coma-delimited keys diff --git a/capture/src/limiters/billing_limits.rs b/capture/src/limiters/billing.rs similarity index 98% rename from capture/src/limiters/billing_limits.rs rename to capture/src/limiters/billing.rs index b425474..b908519 100644 --- a/capture/src/limiters/billing_limits.rs +++ b/capture/src/limiters/billing.rs @@ -166,7 +166,7 @@ mod tests { use time::Duration; use crate::{ - limiters::billing_limits::{BillingLimiter, QuotaResource}, + limiters::billing::{BillingLimiter, QuotaResource}, redis::MockRedisClient, }; diff --git a/capture/src/limiters/mod.rs b/capture/src/limiters/mod.rs index 47f32d5..58b2dcc 100644 --- a/capture/src/limiters/mod.rs +++ b/capture/src/limiters/mod.rs @@ -1,2 +1,2 @@ -pub mod billing_limits; -pub mod partition_limits; +pub mod billing; +pub mod overflow; diff --git a/capture/src/limiters/partition_limits.rs b/capture/src/limiters/overflow.rs similarity index 76% rename from capture/src/limiters/partition_limits.rs rename to capture/src/limiters/overflow.rs index a236ddf..76a012e 100644 --- a/capture/src/limiters/partition_limits.rs +++ b/capture/src/limiters/overflow.rs @@ -1,11 +1,12 @@ -/// When a customer is writing too often to the same key, we get hot partitions. This negatively -/// affects our write latency and cluster health. We try to provide ordering guarantees wherever -/// possible, but this does require that we map key -> partition. +/// The analytics ingestion pipeline provides ordering guarantees for events of the same +/// token and distinct_id. We currently achieve this through a locality constraint on the +/// Kafka partition (consistent partition hashing through a computed key). /// -/// If the write-rate reaches a certain amount, we need to be able to handle the hot partition -/// before it causes a negative impact. In this case, instead of passing the error to the customer -/// with a 429, we relax our ordering constraints and temporarily override the key, meaning the -/// customers data will be spread across all partitions. +/// Volume spikes to a given key can create lag on the destination partition and induce +/// ingestion lag. To protect the downstream systems, capture can relax this locality +/// constraint when bursts are detected. When that happens, the excess traffic will be +/// spread across all partitions and be processed by the overflow consumer, without +/// strict ordering guarantees. use std::collections::HashSet; use std::num::NonZeroU32; use std::sync::Arc; @@ -15,12 +16,12 @@ use metrics::gauge; // See: https://docs.rs/governor/latest/governor/_guide/index.html#usage-in-multiple-threads #[derive(Clone)] -pub struct PartitionLimiter { +pub struct OverflowLimiter { limiter: Arc, clock::DefaultClock>>, forced_keys: HashSet, } -impl PartitionLimiter { +impl OverflowLimiter { pub fn new(per_second: NonZeroU32, burst: NonZeroU32, forced_keys: Option) -> Self { let quota = Quota::per_second(per_second).allow_burst(burst); let limiter = Arc::new(governor::RateLimiter::dashmap(quota)); @@ -30,7 +31,7 @@ impl PartitionLimiter { Some(values) => values.split(',').map(String::from).collect(), }; - PartitionLimiter { + OverflowLimiter { limiter, forced_keys, } @@ -53,12 +54,12 @@ impl PartitionLimiter { #[cfg(test)] mod tests { - use crate::limiters::partition_limits::PartitionLimiter; + use crate::limiters::overflow::OverflowLimiter; use std::num::NonZeroU32; #[tokio::test] async fn low_limits() { - let limiter = PartitionLimiter::new( + let limiter = OverflowLimiter::new( NonZeroU32::new(1).unwrap(), NonZeroU32::new(1).unwrap(), None, @@ -71,7 +72,7 @@ mod tests { #[tokio::test] async fn bursting() { - let limiter = PartitionLimiter::new( + let limiter = OverflowLimiter::new( NonZeroU32::new(1).unwrap(), NonZeroU32::new(3).unwrap(), None, @@ -91,7 +92,7 @@ mod tests { let key_three = String::from("three"); let forced_keys = Some(String::from("one,three")); - let limiter = PartitionLimiter::new( + let limiter = OverflowLimiter::new( NonZeroU32::new(1).unwrap(), NonZeroU32::new(1).unwrap(), forced_keys, diff --git a/capture/src/router.rs b/capture/src/router.rs index ee282f5..d02e63f 100644 --- a/capture/src/router.rs +++ b/capture/src/router.rs @@ -10,15 +10,13 @@ use tower_http::cors::{AllowHeaders, AllowOrigin, CorsLayer}; use tower_http::trace::TraceLayer; use crate::health::HealthRegistry; -use crate::{ - capture, limiters::billing_limits::BillingLimiter, redis::Client, sinks, time::TimeSource, -}; +use crate::{capture, limiters::billing::BillingLimiter, redis::Client, sinks, time::TimeSource}; use crate::prometheus::{setup_metrics_recorder, track_metrics}; #[derive(Clone)] pub struct State { - pub sink: Arc, + pub sink: Arc, pub timesource: Arc, pub redis: Arc, pub billing: BillingLimiter, @@ -30,7 +28,7 @@ async fn index() -> &'static str { pub fn router< TZ: TimeSource + Send + Sync + 'static, - S: sinks::EventSink + Send + Sync + 'static, + S: sinks::Event + Send + Sync + 'static, R: Client + Send + Sync + 'static, >( timesource: TZ, diff --git a/capture/src/server.rs b/capture/src/server.rs index ce54fe2..2544ba2 100644 --- a/capture/src/server.rs +++ b/capture/src/server.rs @@ -6,8 +6,8 @@ use time::Duration; use crate::config::Config; use crate::health::{ComponentStatus, HealthRegistry}; -use crate::limiters::billing_limits::BillingLimiter; -use crate::limiters::partition_limits::PartitionLimiter; +use crate::limiters::billing::BillingLimiter; +use crate::limiters::overflow::OverflowLimiter; use crate::redis::RedisClient; use crate::router; use crate::sinks::{kafka, print}; @@ -45,9 +45,9 @@ where .register("rdkafka".to_string(), Duration::seconds(30)) .await; - let partition = PartitionLimiter::new( - config.per_second_limit, - config.burst_limit, + let partition = OverflowLimiter::new( + config.overflow_per_second_limit, + config.overflow_burst_limit, config.overflow_forced_keys, ); if config.export_prometheus { diff --git a/capture/src/sinks/kafka.rs b/capture/src/sinks/kafka.rs index 2d36101..dc57c11 100644 --- a/capture/src/sinks/kafka.rs +++ b/capture/src/sinks/kafka.rs @@ -14,9 +14,9 @@ use crate::api::CaptureError; use crate::config::KafkaConfig; use crate::event::ProcessedEvent; use crate::health::HealthHandle; -use crate::limiters::partition_limits::PartitionLimiter; +use crate::limiters::overflow::OverflowLimiter; use crate::prometheus::report_dropped_events; -use crate::sinks::EventSink; +use crate::sinks::Event; struct KafkaContext { liveness: HealthHandle, @@ -83,14 +83,14 @@ impl rdkafka::ClientContext for KafkaContext { pub struct KafkaSink { producer: FutureProducer, topic: String, - partition: PartitionLimiter, + partition: OverflowLimiter, } impl KafkaSink { pub fn new( config: KafkaConfig, liveness: HealthHandle, - partition: PartitionLimiter, + partition: OverflowLimiter, ) -> anyhow::Result { info!("connecting to Kafka brokers at {}...", config.kafka_hosts); @@ -204,7 +204,7 @@ impl KafkaSink { } #[async_trait] -impl EventSink for KafkaSink { +impl Event for KafkaSink { #[instrument(skip_all)] async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> { let limited = self.partition.is_limited(&event.key()); @@ -264,9 +264,9 @@ mod tests { use crate::config; use crate::event::ProcessedEvent; use crate::health::HealthRegistry; - use crate::limiters::partition_limits::PartitionLimiter; + use crate::limiters::overflow::OverflowLimiter; use crate::sinks::kafka::KafkaSink; - use crate::sinks::EventSink; + use crate::sinks::Event; use crate::utils::uuid_v7; use rand::distributions::Alphanumeric; use rand::Rng; @@ -281,7 +281,7 @@ mod tests { let handle = registry .register("one".to_string(), Duration::seconds(30)) .await; - let limiter = PartitionLimiter::new( + let limiter = OverflowLimiter::new( NonZeroU32::new(10).unwrap(), NonZeroU32::new(10).unwrap(), None, diff --git a/capture/src/sinks/mod.rs b/capture/src/sinks/mod.rs index 7183250..0747f0e 100644 --- a/capture/src/sinks/mod.rs +++ b/capture/src/sinks/mod.rs @@ -7,7 +7,7 @@ pub mod kafka; pub mod print; #[async_trait] -pub trait EventSink { +pub trait Event { async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError>; async fn send_batch(&self, events: Vec) -> Result<(), CaptureError>; } diff --git a/capture/src/sinks/print.rs b/capture/src/sinks/print.rs index 84add74..50bc1ad 100644 --- a/capture/src/sinks/print.rs +++ b/capture/src/sinks/print.rs @@ -4,12 +4,12 @@ use tracing::log::info; use crate::api::CaptureError; use crate::event::ProcessedEvent; -use crate::sinks::EventSink; +use crate::sinks::Event; pub struct PrintSink {} #[async_trait] -impl EventSink for PrintSink { +impl Event for PrintSink { async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> { info!("single event: {:?}", event); counter!("capture_events_ingested_total", 1); diff --git a/capture/tests/django_compat.rs b/capture/tests/django_compat.rs index b1c2df5..5d77899 100644 --- a/capture/tests/django_compat.rs +++ b/capture/tests/django_compat.rs @@ -7,10 +7,10 @@ use base64::Engine; use capture::api::{CaptureError, CaptureResponse, CaptureResponseCode}; use capture::event::ProcessedEvent; use capture::health::HealthRegistry; -use capture::limiters::billing_limits::BillingLimiter; +use capture::limiters::billing::BillingLimiter; use capture::redis::MockRedisClient; use capture::router::router; -use capture::sinks::EventSink; +use capture::sinks::Event; use capture::time::TimeSource; use serde::Deserialize; use serde_json::{json, Value}; @@ -61,7 +61,7 @@ impl MemorySink { } #[async_trait] -impl EventSink for MemorySink { +impl Event for MemorySink { async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> { self.events.lock().unwrap().push(event); Ok(())