Skip to content

Commit

Permalink
feat(udp-kafka-bridge): check backpressure (#20194)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongbo-miao authored Nov 6, 2024
1 parent 485200f commit 1f8a252
Showing 1 changed file with 39 additions and 5 deletions.
44 changes: 39 additions & 5 deletions hm-kafka/kafka-client/kafka-rust/udp-kafka-bridge/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ struct ProcessingContext {
fn create_producer(bootstrap_server: &str) -> FutureProducer {
ClientConfig::new()
.set("bootstrap.servers", bootstrap_server)
.set("batch.size", "20971520") // 20 MiB
.set("batch.size", "1048576") // 1 MiB
.set("queue.buffering.max.messages", "100000") // Limit in-flight messages
.set("queue.buffering.max.kbytes", "1048576") // 1GB max memory usage
.set("linger.ms", "2")
.set("compression.type", "zstd")
.create()
Expand All @@ -43,7 +45,6 @@ async fn process_message(
) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut buf = Vec::new();
motor.encode(&mut buf)?;

let proto_payload = context
.encoder
.encode(
Expand All @@ -58,6 +59,7 @@ async fn process_message(
.id
.clone()
.unwrap_or_else(|| "unknown_motor".to_string());

context
.producer
.send(
Expand All @@ -70,7 +72,7 @@ async fn process_message(
.map_err(|(err, _)| Box::new(err) as Box<dyn Error + Send + Sync>)?;

let count = context.message_count.fetch_add(1, Ordering::Relaxed) + 1;
if count % 100000 == 0 {
if count % 100_000 == 0 {
let duration = context.start_time.elapsed();
println!(
"Total messages sent to Kafka: {}, Time elapsed: {:?}, Avg msg/sec: {:.2}",
Expand All @@ -87,10 +89,19 @@ async fn main() -> Result<(), Box<dyn Error>> {
let start_time = Instant::now();
println!("Starting UDP listener and Kafka producer...");

const CHANNEL_BUFFER_SIZE: usize = 10_000_000;
const LOW_CAPACITY_THRESHOLD: usize = CHANNEL_BUFFER_SIZE / 10; // 10% of buffer size

// Set up UDP socket
let socket = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
socket.set_reuse_address(true)?;
socket.set_recv_buffer_size(20 * 1024 * 1024)?;

// Set and get receive buffer size
let desired_recv_buf_size = 20 * 1024 * 1024; // 20MB
socket.set_recv_buffer_size(desired_recv_buf_size)?;
let actual_recv_buf_size = socket.recv_buffer_size()?;
println!("UDP receive buffer size: {} bytes", actual_recv_buf_size);

let addr: SocketAddr = "127.0.0.1:50537".parse()?;
socket.bind(&addr.into())?;
let socket = UdpSocket::from_std(socket.into())?;
Expand All @@ -114,7 +125,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
});

// Create channel for message processing
let (tx, mut rx) = mpsc::channel::<Motor>(1000000); // Buffer size of 1000000 messages
let (tx, mut rx) = mpsc::channel::<Motor>(CHANNEL_BUFFER_SIZE);
println!(
"Channel buffer size: {}, Low capacity threshold: {}",
CHANNEL_BUFFER_SIZE, LOW_CAPACITY_THRESHOLD
);

// Spawn message processing task
let processing_context = context.clone();
Expand All @@ -136,8 +151,27 @@ async fn main() -> Result<(), Box<dyn Error>> {

// Main UDP receiving loop
let mut buffer = vec![0u8; 65536];
let mut packet_count = 0;
loop {
let (amt, _src) = socket.recv_from(&mut buffer).await?;

// Monitor packet count and channel capacity
packet_count += 1;
if packet_count % 100_000 == 0 {
let current_capacity = tx.capacity();
println!(
"Processed {} packets, channel capacity: {} ({:.1}%)",
packet_count,
current_capacity,
(current_capacity as f64 / CHANNEL_BUFFER_SIZE as f64) * 100.0
);

// If channel capacity is low, this might indicate backpressure
if current_capacity < LOW_CAPACITY_THRESHOLD {
println!("Warning: Channel capacity running low - possible backpressure");
}
}

match Motor::decode(&buffer[..amt]) {
Ok(motor) => {
if let Err(e) = tx.send(motor).await {
Expand Down

0 comments on commit 1f8a252

Please sign in to comment.