Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
revise names and comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Dec 12, 2023
1 parent f3e08bb commit 3024499
Show file tree
Hide file tree
Showing 13 changed files with 52 additions and 53 deletions.
4 changes: 2 additions & 2 deletions capture-server/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
print_sink: false,
address: SocketAddr::from_str("127.0.0.1:0").unwrap(),
redis_url: "redis://localhost:6379/".to_string(),
burst_limit: NonZeroU32::new(5).unwrap(),
per_second_limit: NonZeroU32::new(10).unwrap(),
overflow_burst_limit: NonZeroU32::new(5).unwrap(),
overflow_per_second_limit: NonZeroU32::new(10).unwrap(),
overflow_forced_keys: None,
kafka: KafkaConfig {
kafka_producer_linger_ms: 0, // Send messages as soon as possible
Expand Down
12 changes: 6 additions & 6 deletions capture-server/tests/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async fn it_captures_a_batch() -> Result<()> {
}

#[tokio::test]
async fn it_is_limited_with_burst() -> Result<()> {
async fn it_overflows_events_on_burst() -> Result<()> {
setup_tracing();

let token = random_string("token", 16);
Expand All @@ -87,8 +87,8 @@ async fn it_is_limited_with_burst() -> Result<()> {

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.burst_limit = NonZeroU32::new(2).unwrap();
config.per_second_limit = NonZeroU32::new(1).unwrap();
config.overflow_burst_limit = NonZeroU32::new(2).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

let server = ServerHandle::for_config(config);

Expand Down Expand Up @@ -125,7 +125,7 @@ async fn it_is_limited_with_burst() -> Result<()> {
}

#[tokio::test]
async fn it_does_not_partition_limit_different_ids() -> Result<()> {
async fn it_does_not_overflow_team_with_different_ids() -> Result<()> {
setup_tracing();

let token = random_string("token", 16);
Expand All @@ -136,8 +136,8 @@ async fn it_does_not_partition_limit_different_ids() -> Result<()> {

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.burst_limit = NonZeroU32::new(1).unwrap();
config.per_second_limit = NonZeroU32::new(1).unwrap();
config.overflow_burst_limit = NonZeroU32::new(1).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

let server = ServerHandle::for_config(config);

Expand Down
4 changes: 2 additions & 2 deletions capture/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use time::OffsetDateTime;
use tracing::instrument;

use crate::event::{Compression, ProcessingContext};
use crate::limiters::billing_limits::QuotaResource;
use crate::limiters::billing::QuotaResource;
use crate::prometheus::report_dropped_events;
use crate::token::validate_token;
use crate::{
Expand Down Expand Up @@ -209,7 +209,7 @@ pub fn extract_and_verify_token(events: &[RawEvent]) -> Result<String, CaptureEr

#[instrument(skip_all, fields(events = events.len()))]
pub async fn process_events<'a>(
sink: Arc<dyn sinks::EventSink + Send + Sync>,
sink: Arc<dyn sinks::Event + Send + Sync>,
events: &'a [RawEvent],
context: &'a ProcessingContext,
) -> Result<(), CaptureError> {
Expand Down
4 changes: 2 additions & 2 deletions capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ pub struct Config {
pub otel_url: Option<String>,

#[envconfig(default = "100")]
pub per_second_limit: NonZeroU32,
pub overflow_per_second_limit: NonZeroU32,

#[envconfig(default = "1000")]
pub burst_limit: NonZeroU32,
pub overflow_burst_limit: NonZeroU32,

pub overflow_forced_keys: Option<String>, // Coma-delimited keys

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ mod tests {
use time::Duration;

use crate::{
limiters::billing_limits::{BillingLimiter, QuotaResource},
limiters::billing::{BillingLimiter, QuotaResource},
redis::MockRedisClient,
};

Expand Down
4 changes: 2 additions & 2 deletions capture/src/limiters/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pub mod billing_limits;
pub mod partition_limits;
pub mod billing;
pub mod overflow;
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
/// When a customer is writing too often to the same key, we get hot partitions. This negatively
/// affects our write latency and cluster health. We try to provide ordering guarantees wherever
/// possible, but this does require that we map key -> partition.
/// The analytics ingestion pipeline provides ordering guarantees for events of the same
/// token and distinct_id. We currently achieve this through a locality constraint on the
/// Kafka partition (consistent partition hashing through a computed key).
///
/// If the write-rate reaches a certain amount, we need to be able to handle the hot partition
/// before it causes a negative impact. In this case, instead of passing the error to the customer
/// with a 429, we relax our ordering constraints and temporarily override the key, meaning the
/// customers data will be spread across all partitions.
/// Volume spikes to a given key can create lag on the destination partition and induce
/// ingestion lag. To protect the downstream systems, capture can relax this locality
/// constraint when bursts are detected. When that happens, the excess traffic will be
/// spread across all partitions and be processed by the overflow consumer, without
/// strict ordering guarantees.
use std::collections::HashSet;
use std::num::NonZeroU32;
use std::sync::Arc;
Expand All @@ -15,12 +16,12 @@ use metrics::gauge;

// See: https://docs.rs/governor/latest/governor/_guide/index.html#usage-in-multiple-threads
#[derive(Clone)]
pub struct PartitionLimiter {
pub struct OverflowLimiter {
limiter: Arc<RateLimiter<String, DefaultKeyedStateStore<String>, clock::DefaultClock>>,
forced_keys: HashSet<String>,
}

impl PartitionLimiter {
impl OverflowLimiter {
pub fn new(per_second: NonZeroU32, burst: NonZeroU32, forced_keys: Option<String>) -> Self {
let quota = Quota::per_second(per_second).allow_burst(burst);
let limiter = Arc::new(governor::RateLimiter::dashmap(quota));
Expand All @@ -30,7 +31,7 @@ impl PartitionLimiter {
Some(values) => values.split(',').map(String::from).collect(),
};

PartitionLimiter {
OverflowLimiter {
limiter,
forced_keys,
}
Expand All @@ -53,12 +54,12 @@ impl PartitionLimiter {

#[cfg(test)]
mod tests {
use crate::limiters::partition_limits::PartitionLimiter;
use crate::limiters::overflow::OverflowLimiter;
use std::num::NonZeroU32;

#[tokio::test]
async fn low_limits() {
let limiter = PartitionLimiter::new(
let limiter = OverflowLimiter::new(
NonZeroU32::new(1).unwrap(),
NonZeroU32::new(1).unwrap(),
None,
Expand All @@ -71,7 +72,7 @@ mod tests {

#[tokio::test]
async fn bursting() {
let limiter = PartitionLimiter::new(
let limiter = OverflowLimiter::new(
NonZeroU32::new(1).unwrap(),
NonZeroU32::new(3).unwrap(),
None,
Expand All @@ -91,7 +92,7 @@ mod tests {
let key_three = String::from("three");
let forced_keys = Some(String::from("one,three"));

let limiter = PartitionLimiter::new(
let limiter = OverflowLimiter::new(
NonZeroU32::new(1).unwrap(),
NonZeroU32::new(1).unwrap(),
forced_keys,
Expand Down
8 changes: 3 additions & 5 deletions capture/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@ use tower_http::cors::{AllowHeaders, AllowOrigin, CorsLayer};
use tower_http::trace::TraceLayer;

use crate::health::HealthRegistry;
use crate::{
capture, limiters::billing_limits::BillingLimiter, redis::Client, sinks, time::TimeSource,
};
use crate::{capture, limiters::billing::BillingLimiter, redis::Client, sinks, time::TimeSource};

use crate::prometheus::{setup_metrics_recorder, track_metrics};

#[derive(Clone)]
pub struct State {
pub sink: Arc<dyn sinks::EventSink + Send + Sync>,
pub sink: Arc<dyn sinks::Event + Send + Sync>,
pub timesource: Arc<dyn TimeSource + Send + Sync>,
pub redis: Arc<dyn Client + Send + Sync>,
pub billing: BillingLimiter,
Expand All @@ -30,7 +28,7 @@ async fn index() -> &'static str {

pub fn router<
TZ: TimeSource + Send + Sync + 'static,
S: sinks::EventSink + Send + Sync + 'static,
S: sinks::Event + Send + Sync + 'static,
R: Client + Send + Sync + 'static,
>(
timesource: TZ,
Expand Down
10 changes: 5 additions & 5 deletions capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use time::Duration;

use crate::config::Config;
use crate::health::{ComponentStatus, HealthRegistry};
use crate::limiters::billing_limits::BillingLimiter;
use crate::limiters::partition_limits::PartitionLimiter;
use crate::limiters::billing::BillingLimiter;
use crate::limiters::overflow::OverflowLimiter;
use crate::redis::RedisClient;
use crate::router;
use crate::sinks::{kafka, print};
Expand Down Expand Up @@ -45,9 +45,9 @@ where
.register("rdkafka".to_string(), Duration::seconds(30))
.await;

let partition = PartitionLimiter::new(
config.per_second_limit,
config.burst_limit,
let partition = OverflowLimiter::new(
config.overflow_per_second_limit,
config.overflow_burst_limit,
config.overflow_forced_keys,
);
if config.export_prometheus {
Expand Down
16 changes: 8 additions & 8 deletions capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ use crate::api::CaptureError;
use crate::config::KafkaConfig;
use crate::event::ProcessedEvent;
use crate::health::HealthHandle;
use crate::limiters::partition_limits::PartitionLimiter;
use crate::limiters::overflow::OverflowLimiter;
use crate::prometheus::report_dropped_events;
use crate::sinks::EventSink;
use crate::sinks::Event;

struct KafkaContext {
liveness: HealthHandle,
Expand Down Expand Up @@ -83,14 +83,14 @@ impl rdkafka::ClientContext for KafkaContext {
pub struct KafkaSink {
producer: FutureProducer<KafkaContext>,
topic: String,
partition: PartitionLimiter,
partition: OverflowLimiter,
}

impl KafkaSink {
pub fn new(
config: KafkaConfig,
liveness: HealthHandle,
partition: PartitionLimiter,
partition: OverflowLimiter,
) -> anyhow::Result<KafkaSink> {
info!("connecting to Kafka brokers at {}...", config.kafka_hosts);

Expand Down Expand Up @@ -204,7 +204,7 @@ impl KafkaSink {
}

#[async_trait]
impl EventSink for KafkaSink {
impl Event for KafkaSink {
#[instrument(skip_all)]
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
let limited = self.partition.is_limited(&event.key());
Expand Down Expand Up @@ -264,9 +264,9 @@ mod tests {
use crate::config;
use crate::event::ProcessedEvent;
use crate::health::HealthRegistry;
use crate::limiters::partition_limits::PartitionLimiter;
use crate::limiters::overflow::OverflowLimiter;
use crate::sinks::kafka::KafkaSink;
use crate::sinks::EventSink;
use crate::sinks::Event;
use crate::utils::uuid_v7;
use rand::distributions::Alphanumeric;
use rand::Rng;
Expand All @@ -281,7 +281,7 @@ mod tests {
let handle = registry
.register("one".to_string(), Duration::seconds(30))
.await;
let limiter = PartitionLimiter::new(
let limiter = OverflowLimiter::new(
NonZeroU32::new(10).unwrap(),
NonZeroU32::new(10).unwrap(),
None,
Expand Down
2 changes: 1 addition & 1 deletion capture/src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub mod kafka;
pub mod print;

#[async_trait]
pub trait EventSink {
pub trait Event {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError>;
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError>;
}
4 changes: 2 additions & 2 deletions capture/src/sinks/print.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use tracing::log::info;

use crate::api::CaptureError;
use crate::event::ProcessedEvent;
use crate::sinks::EventSink;
use crate::sinks::Event;

pub struct PrintSink {}

#[async_trait]
impl EventSink for PrintSink {
impl Event for PrintSink {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
info!("single event: {:?}", event);
counter!("capture_events_ingested_total", 1);
Expand Down
6 changes: 3 additions & 3 deletions capture/tests/django_compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use base64::Engine;
use capture::api::{CaptureError, CaptureResponse, CaptureResponseCode};
use capture::event::ProcessedEvent;
use capture::health::HealthRegistry;
use capture::limiters::billing_limits::BillingLimiter;
use capture::limiters::billing::BillingLimiter;
use capture::redis::MockRedisClient;
use capture::router::router;
use capture::sinks::EventSink;
use capture::sinks::Event;
use capture::time::TimeSource;
use serde::Deserialize;
use serde_json::{json, Value};
Expand Down Expand Up @@ -61,7 +61,7 @@ impl MemorySink {
}

#[async_trait]
impl EventSink for MemorySink {
impl Event for MemorySink {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
self.events.lock().unwrap().push(event);
Ok(())
Expand Down

0 comments on commit 3024499

Please sign in to comment.