From 32574576b46c710bc41cf431d837bd8fb392c34a Mon Sep 17 00:00:00 2001 From: Hongbo Miao <3375461+hongbo-miao@users.noreply.github.com> Date: Wed, 6 Nov 2024 22:28:28 -0800 Subject: [PATCH] feat(udp-kafka-bridge): flush remaining messages (#20195) --- .../kafka-rust/udp-kafka-bridge/src/main.rs | 93 ++++++++++++------- .../src/protos/production.iot.motor.proto | 2 +- .../src/protos/production.iot.motor.proto | 2 +- 3 files changed, 61 insertions(+), 36 deletions(-) diff --git a/hm-kafka/kafka-client/kafka-rust/udp-kafka-bridge/src/main.rs b/hm-kafka/kafka-client/kafka-rust/udp-kafka-bridge/src/main.rs index 135b5e0f98..87a8fd728a 100644 --- a/hm-kafka/kafka-client/kafka-rust/udp-kafka-bridge/src/main.rs +++ b/hm-kafka/kafka-client/kafka-rust/udp-kafka-bridge/src/main.rs @@ -13,6 +13,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::net::UdpSocket; use tokio::sync::mpsc; +use tokio::time::sleep; pub mod iot { include!(concat!(env!("OUT_DIR"), "/production.iot.rs")); @@ -23,7 +24,8 @@ struct ProcessingContext { producer: FutureProducer, encoder: EasyProtoRawEncoder, topic: String, - message_count: AtomicUsize, + messages_sent: AtomicUsize, + messages_received: AtomicUsize, start_time: Instant, } @@ -71,11 +73,11 @@ async fn process_message( .await .map_err(|(err, _)| Box::new(err) as Box)?; - let count = context.message_count.fetch_add(1, Ordering::Relaxed) + 1; + let count = context.messages_sent.fetch_add(1, Ordering::Relaxed) + 1; if count % 100_000 == 0 { let duration = context.start_time.elapsed(); println!( - "Total messages sent to Kafka: {}, Time elapsed: {:?}, Avg msg/sec: {:.2}", + "Total messages sent to Kafka: {}, Time elapsed: {:.2?}, Avg msg/sec: {:.2}", count, duration, count as f64 / duration.as_secs_f64() @@ -91,13 +93,14 @@ async fn main() -> Result<(), Box> { const CHANNEL_BUFFER_SIZE: usize = 10_000_000; const LOW_CAPACITY_THRESHOLD: usize = CHANNEL_BUFFER_SIZE / 10; // 10% of buffer size + const INACTIVITY_TIMEOUT: Duration = Duration::from_secs(5); // Timeout for inactivity // Set up UDP socket let socket = Socket::new(Domain::IPV4, Type::DGRAM, None)?; socket.set_reuse_address(true)?; // Set and get receive buffer size - let desired_recv_buf_size = 20 * 1024 * 1024; // 20MB + let desired_recv_buf_size = 5 * 1024 * 1024; // 5MiB socket.set_recv_buffer_size(desired_recv_buf_size)?; let actual_recv_buf_size = socket.recv_buffer_size()?; println!("UDP receive buffer size: {} bytes", actual_recv_buf_size); @@ -115,12 +118,13 @@ async fn main() -> Result<(), Box> { let encoder = EasyProtoRawEncoder::new(sr_settings); let topic = "production.iot.motor.proto".to_string(); - // Create processing context with start_time + // Create processing context let context = Arc::new(ProcessingContext { producer, encoder, topic, - message_count: AtomicUsize::new(0), + messages_sent: AtomicUsize::new(0), + messages_received: AtomicUsize::new(0), start_time, }); @@ -131,56 +135,77 @@ async fn main() -> Result<(), Box> { CHANNEL_BUFFER_SIZE, LOW_CAPACITY_THRESHOLD ); - // Spawn message processing task + // Spawn message processing task with inactivity monitoring let processing_context = context.clone(); tokio::spawn(async move { + let mut last_message_time = Instant::now(); let mut handles = Vec::new(); - while let Some(motor) = rx.recv().await { - let ctx = processing_context.clone(); - handles.push(tokio::spawn(async move { - if let Err(e) = process_message(ctx, motor).await { - eprintln!("Error processing message: {}", e); + loop { + tokio::select! { + Some(motor) = rx.recv() => { + let ctx = processing_context.clone(); + last_message_time = Instant::now(); + handles.push(tokio::spawn(async move { + if let Err(e) = process_message(ctx, motor).await { + eprintln!("Error processing message: {}", e); + } + })); + handles.retain(|handle| !handle.is_finished()); + while handles.len() >= 2000 { + futures::future::join_all(handles.drain(..1000)).await; + } + }, + _ = sleep(INACTIVITY_TIMEOUT) => { + if last_message_time.elapsed() >= INACTIVITY_TIMEOUT { + println!("Inactivity detected, flushing remaining messages..."); + let received = processing_context.messages_received.load(Ordering::Relaxed); + let sent = processing_context.messages_sent.load(Ordering::Relaxed); + println!("Messages received: {}, Messages sent to Kafka: {}, Messages in flight: {}", + received, sent, received - sent); + futures::future::join_all(handles.drain(..)).await; + let final_received = processing_context.messages_received.load(Ordering::Relaxed); + let final_sent = processing_context.messages_sent.load(Ordering::Relaxed); + println!("After flush - Messages received: {}, Messages sent to Kafka: {}, Messages in flight: {}", + final_received, final_sent, final_received - final_sent); + } } - })); - handles.retain(|handle| !handle.is_finished()); - while handles.len() >= 2000 { - futures::future::join_all(handles.drain(..1000)).await; } } }); // Main UDP receiving loop let mut buffer = vec![0u8; 65536]; - let mut packet_count = 0; loop { let (amt, _src) = socket.recv_from(&mut buffer).await?; + match Motor::decode(&buffer[..amt]) { + Ok(motor) => { + context.messages_received.fetch_add(1, Ordering::Relaxed); + if let Err(e) = tx.send(motor).await { + eprintln!("Failed to send message to processing task: {}", e); + } + } + Err(e) => { + eprintln!("Failed to decode protobuf message: {}", e); + } + } - // Monitor packet count and channel capacity - packet_count += 1; - if packet_count % 100_000 == 0 { + // Print statistics every 100,000 messages + let received = context.messages_received.load(Ordering::Relaxed); + if received % 100_000 == 0 { + let sent = context.messages_sent.load(Ordering::Relaxed); let current_capacity = tx.capacity(); println!( - "Processed {} packets, channel capacity: {} ({:.1}%)", - packet_count, + "Messages received: {}, Messages sent to Kafka: {}, Messages in flight: {}, Channel capacity: {} ({:.1}%)", + received, + sent, + received - sent, current_capacity, (current_capacity as f64 / CHANNEL_BUFFER_SIZE as f64) * 100.0 ); - // If channel capacity is low, this might indicate backpressure if current_capacity < LOW_CAPACITY_THRESHOLD { println!("Warning: Channel capacity running low - possible backpressure"); } } - - match Motor::decode(&buffer[..amt]) { - Ok(motor) => { - if let Err(e) = tx.send(motor).await { - eprintln!("Failed to send message to processing task: {}", e); - } - } - Err(e) => { - eprintln!("Failed to decode protobuf message: {}", e); - } - } } } diff --git a/network-programmability/udp/udp-receiver/src/protos/production.iot.motor.proto b/network-programmability/udp/udp-receiver/src/protos/production.iot.motor.proto index 7c62fd8733..d4c3df9bd7 100644 --- a/network-programmability/udp/udp-receiver/src/protos/production.iot.motor.proto +++ b/network-programmability/udp/udp-receiver/src/protos/production.iot.motor.proto @@ -9,4 +9,4 @@ message Motor { optional double temperature3 = 5; optional double temperature4 = 6; optional double temperature5 = 7; -} \ No newline at end of file +} diff --git a/network-programmability/udp/udp-sender/src/protos/production.iot.motor.proto b/network-programmability/udp/udp-sender/src/protos/production.iot.motor.proto index 7c62fd8733..d4c3df9bd7 100644 --- a/network-programmability/udp/udp-sender/src/protos/production.iot.motor.proto +++ b/network-programmability/udp/udp-sender/src/protos/production.iot.motor.proto @@ -9,4 +9,4 @@ message Motor { optional double temperature3 = 5; optional double temperature4 = 6; optional double temperature5 = 7; -} \ No newline at end of file +}