Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
fix MessageSizeTooLarge handling
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Dec 7, 2023
1 parent 7ff5b80 commit f19a0db
Showing 1 changed file with 24 additions and 1 deletion.
25 changes: 24 additions & 1 deletion capture/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl KafkaSink {
}) {
Ok(ack) => Ok(ack),
Err((e, _)) => match e.rdkafka_error_code() {
Some(RDKafkaErrorCode::InvalidMessageSize) => {
Some(RDKafkaErrorCode::MessageSizeTooLarge) => {
report_dropped_events("kafka_message_size", 1);
Err(CaptureError::EventTooBig)
}
Expand Down Expand Up @@ -297,6 +297,8 @@ mod tests {
use crate::partition_limits::PartitionLimiter;
use crate::sink::{EventSink, KafkaSink};
use crate::utils::uuid_v7;
use rand::distributions::Alphanumeric;
use rand::Rng;
use rdkafka::mocking::MockCluster;
use rdkafka::producer::DefaultProducerContext;
use rdkafka::types::{RDKafkaApiKey, RDKafkaRespErr};
Expand Down Expand Up @@ -358,6 +360,27 @@ mod tests {
.await
.expect("failed to send initial event batch");

// Producer should reject a 2MB message, twice the default `message.max.bytes`
let big_data = rand::thread_rng()
.sample_iter(Alphanumeric)
.take(2_000_000)
.map(char::from)
.collect();
let big_event: ProcessedEvent = ProcessedEvent {
uuid: uuid_v7(),
distinct_id: "id1".to_string(),
ip: "".to_string(),
data: big_data,
now: "".to_string(),
sent_at: None,
token: "token1".to_string(),
};
match sink.send(big_event).await {
Err(CaptureError::EventTooBig) => {} // Expected
Err(err) => panic!("wrong error code {}", err),
Ok(()) => panic!("should have errored"),
};

// Simulate unretriable errors
cluster.clear_request_errors(RDKafkaApiKey::Produce);
let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE; 1];
Expand Down

0 comments on commit f19a0db

Please sign in to comment.