From e6a9687050401c200719aceead83dd521ba093f0 Mon Sep 17 00:00:00 2001 From: Hongbo Miao <3375461+hongbo-miao@users.noreply.github.com> Date: Wed, 6 Nov 2024 12:54:38 -0800 Subject: [PATCH] feat(udp-kafka-bridge): check backpressure --- .../kafka-rust/udp-kafka-bridge/src/main.rs | 44 ++++++++++++++++--- 1 file changed, 39 insertions(+), 5 deletions(-) diff --git a/hm-kafka/kafka-client/kafka-rust/udp-kafka-bridge/src/main.rs b/hm-kafka/kafka-client/kafka-rust/udp-kafka-bridge/src/main.rs index 253f008302..135b5e0f98 100644 --- a/hm-kafka/kafka-client/kafka-rust/udp-kafka-bridge/src/main.rs +++ b/hm-kafka/kafka-client/kafka-rust/udp-kafka-bridge/src/main.rs @@ -30,7 +30,9 @@ struct ProcessingContext { fn create_producer(bootstrap_server: &str) -> FutureProducer { ClientConfig::new() .set("bootstrap.servers", bootstrap_server) - .set("batch.size", "20971520") // 20 MiB + .set("batch.size", "1048576") // 1 MiB + .set("queue.buffering.max.messages", "100000") // Limit in-flight messages + .set("queue.buffering.max.kbytes", "1048576") // 1GB max memory usage .set("linger.ms", "2") .set("compression.type", "zstd") .create() @@ -43,7 +45,6 @@ async fn process_message( ) -> Result<(), Box> { let mut buf = Vec::new(); motor.encode(&mut buf)?; - let proto_payload = context .encoder .encode( @@ -58,6 +59,7 @@ async fn process_message( .id .clone() .unwrap_or_else(|| "unknown_motor".to_string()); + context .producer .send( @@ -70,7 +72,7 @@ async fn process_message( .map_err(|(err, _)| Box::new(err) as Box)?; let count = context.message_count.fetch_add(1, Ordering::Relaxed) + 1; - if count % 100000 == 0 { + if count % 100_000 == 0 { let duration = context.start_time.elapsed(); println!( "Total messages sent to Kafka: {}, Time elapsed: {:?}, Avg msg/sec: {:.2}", @@ -87,10 +89,19 @@ async fn main() -> Result<(), Box> { let start_time = Instant::now(); println!("Starting UDP listener and Kafka producer..."); + const CHANNEL_BUFFER_SIZE: usize = 10_000_000; + const LOW_CAPACITY_THRESHOLD: usize = CHANNEL_BUFFER_SIZE / 10; // 10% of buffer size + // Set up UDP socket let socket = Socket::new(Domain::IPV4, Type::DGRAM, None)?; socket.set_reuse_address(true)?; - socket.set_recv_buffer_size(20 * 1024 * 1024)?; + + // Set and get receive buffer size + let desired_recv_buf_size = 20 * 1024 * 1024; // 20MB + socket.set_recv_buffer_size(desired_recv_buf_size)?; + let actual_recv_buf_size = socket.recv_buffer_size()?; + println!("UDP receive buffer size: {} bytes", actual_recv_buf_size); + let addr: SocketAddr = "127.0.0.1:50537".parse()?; socket.bind(&addr.into())?; let socket = UdpSocket::from_std(socket.into())?; @@ -114,7 +125,11 @@ async fn main() -> Result<(), Box> { }); // Create channel for message processing - let (tx, mut rx) = mpsc::channel::(1000000); // Buffer size of 1000000 messages + let (tx, mut rx) = mpsc::channel::(CHANNEL_BUFFER_SIZE); + println!( + "Channel buffer size: {}, Low capacity threshold: {}", + CHANNEL_BUFFER_SIZE, LOW_CAPACITY_THRESHOLD + ); // Spawn message processing task let processing_context = context.clone(); @@ -136,8 +151,27 @@ async fn main() -> Result<(), Box> { // Main UDP receiving loop let mut buffer = vec![0u8; 65536]; + let mut packet_count = 0; loop { let (amt, _src) = socket.recv_from(&mut buffer).await?; + + // Monitor packet count and channel capacity + packet_count += 1; + if packet_count % 100_000 == 0 { + let current_capacity = tx.capacity(); + println!( + "Processed {} packets, channel capacity: {} ({:.1}%)", + packet_count, + current_capacity, + (current_capacity as f64 / CHANNEL_BUFFER_SIZE as f64) * 100.0 + ); + + // If channel capacity is low, this might indicate backpressure + if current_capacity < LOW_CAPACITY_THRESHOLD { + println!("Warning: Channel capacity running low - possible backpressure"); + } + } + match Motor::decode(&buffer[..amt]) { Ok(motor) => { if let Err(e) = tx.send(motor).await {