diff --git a/hm-kafka/kafka-client/kafka-rust/udp-kafka-bridge/src/main.rs b/hm-kafka/kafka-client/kafka-rust/udp-kafka-bridge/src/main.rs index 87a8fd728a..ba9a2ae51e 100644 --- a/hm-kafka/kafka-client/kafka-rust/udp-kafka-bridge/src/main.rs +++ b/hm-kafka/kafka-client/kafka-rust/udp-kafka-bridge/src/main.rs @@ -9,7 +9,7 @@ use socket2::{Domain, Socket, Type}; use std::error::Error; use std::net::SocketAddr; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use tokio::net::UdpSocket; use tokio::sync::mpsc; @@ -26,7 +26,8 @@ struct ProcessingContext { topic: String, messages_sent: AtomicUsize, messages_received: AtomicUsize, - start_time: Instant, + last_count: AtomicUsize, + last_time: Mutex, } fn create_producer(bootstrap_server: &str) -> FutureProducer { @@ -73,22 +74,12 @@ async fn process_message( .await .map_err(|(err, _)| Box::new(err) as Box)?; - let count = context.messages_sent.fetch_add(1, Ordering::Relaxed) + 1; - if count % 100_000 == 0 { - let duration = context.start_time.elapsed(); - println!( - "Total messages sent to Kafka: {}, Time elapsed: {:.2?}, Avg msg/sec: {:.2}", - count, - duration, - count as f64 / duration.as_secs_f64() - ); - } + context.messages_sent.fetch_add(1, Ordering::Relaxed); Ok(()) } #[tokio::main] async fn main() -> Result<(), Box> { - let start_time = Instant::now(); println!("Starting UDP listener and Kafka producer..."); const CHANNEL_BUFFER_SIZE: usize = 10_000_000; @@ -125,7 +116,8 @@ async fn main() -> Result<(), Box> { topic, messages_sent: AtomicUsize::new(0), messages_received: AtomicUsize::new(0), - start_time, + last_count: AtomicUsize::new(0), + last_time: Mutex::new(Instant::now()), }); // Create channel for message processing @@ -194,13 +186,24 @@ async fn main() -> Result<(), Box> { if received % 100_000 == 0 { let sent = context.messages_sent.load(Ordering::Relaxed); let current_capacity = tx.capacity(); + + let now = Instant::now(); + let mut last_time = context.last_time.lock().unwrap(); + let elapsed = now.duration_since(*last_time); + let interval_count = received - context.last_count.swap(received, Ordering::Relaxed); + let interval_speed = interval_count as f64 / elapsed.as_secs_f64(); + + *last_time = now; + drop(last_time); + println!( - "Messages received: {}, Messages sent to Kafka: {}, Messages in flight: {}, Channel capacity: {} ({:.1}%)", + "Messages received: {}, sent: {}, in flight: {}, Channel capacity: {} ({:.1}%)\nInterval speed: {:.2} msg/s", received, sent, received - sent, current_capacity, - (current_capacity as f64 / CHANNEL_BUFFER_SIZE as f64) * 100.0 + (current_capacity as f64 / CHANNEL_BUFFER_SIZE as f64) * 100.0, + interval_speed ); if current_capacity < LOW_CAPACITY_THRESHOLD { diff --git a/network-programmability/udp/udp-sender/src/main.rs b/network-programmability/udp/udp-sender/src/main.rs index e21ce02812..a7711e5c03 100644 --- a/network-programmability/udp/udp-sender/src/main.rs +++ b/network-programmability/udp/udp-sender/src/main.rs @@ -13,7 +13,7 @@ use iot::Motor; async fn main() -> Result<(), Box> { let start_time = Instant::now(); - let message_count = 10_000_000; + let message_count = 1_000_000; let socket = UdpSocket::bind("0.0.0.0:0").await?; let receiver_addr = "127.0.0.1:50537"; let mut rng = rand::thread_rng();