diff --git a/capture/src/sink.rs b/capture/src/sink.rs index 13397dc..af83e20 100644 --- a/capture/src/sink.rs +++ b/capture/src/sink.rs @@ -192,7 +192,7 @@ impl KafkaSink { }) { Ok(ack) => Ok(ack), Err((e, _)) => match e.rdkafka_error_code() { - Some(RDKafkaErrorCode::InvalidMessageSize) => { + Some(RDKafkaErrorCode::MessageSizeTooLarge) => { report_dropped_events("kafka_message_size", 1); Err(CaptureError::EventTooBig) } @@ -297,6 +297,8 @@ mod tests { use crate::partition_limits::PartitionLimiter; use crate::sink::{EventSink, KafkaSink}; use crate::utils::uuid_v7; + use rand::distributions::Alphanumeric; + use rand::Rng; use rdkafka::mocking::MockCluster; use rdkafka::producer::DefaultProducerContext; use rdkafka::types::{RDKafkaApiKey, RDKafkaRespErr}; @@ -358,6 +360,27 @@ mod tests { .await .expect("failed to send initial event batch"); + // Producer should reject a 2MB message, twice the default `message.max.bytes` + let big_data = rand::thread_rng() + .sample_iter(Alphanumeric) + .take(2_000_000) + .map(char::from) + .collect(); + let big_event: ProcessedEvent = ProcessedEvent { + uuid: uuid_v7(), + distinct_id: "id1".to_string(), + ip: "".to_string(), + data: big_data, + now: "".to_string(), + sent_at: None, + token: "token1".to_string(), + }; + match sink.send(big_event).await { + Err(CaptureError::EventTooBig) => {} // Expected + Err(err) => panic!("wrong error code {}", err), + Ok(()) => panic!("should have errored"), + }; + // Simulate unretriable errors cluster.clear_request_errors(RDKafkaApiKey::Produce); let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE; 1];