Skip to content

Commit

Permalink
feat(udp-kafka-bridge): change cumulative speed to internal speed (#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
hongbo-miao authored Nov 15, 2024
1 parent 34f09be commit 9cea224
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 17 deletions.
35 changes: 19 additions & 16 deletions hm-kafka/kafka-client/kafka-rust/udp-kafka-bridge/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use socket2::{Domain, Socket, Type};
use std::error::Error;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::net::UdpSocket;
use tokio::sync::mpsc;
Expand All @@ -26,7 +26,8 @@ struct ProcessingContext {
topic: String,
messages_sent: AtomicUsize,
messages_received: AtomicUsize,
start_time: Instant,
last_count: AtomicUsize,
last_time: Mutex<Instant>,
}

fn create_producer(bootstrap_server: &str) -> FutureProducer {
Expand Down Expand Up @@ -73,22 +74,12 @@ async fn process_message(
.await
.map_err(|(err, _)| Box::new(err) as Box<dyn Error + Send + Sync>)?;

let count = context.messages_sent.fetch_add(1, Ordering::Relaxed) + 1;
if count % 100_000 == 0 {
let duration = context.start_time.elapsed();
println!(
"Total messages sent to Kafka: {}, Time elapsed: {:.2?}, Avg msg/sec: {:.2}",
count,
duration,
count as f64 / duration.as_secs_f64()
);
}
context.messages_sent.fetch_add(1, Ordering::Relaxed);
Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let start_time = Instant::now();
println!("Starting UDP listener and Kafka producer...");

const CHANNEL_BUFFER_SIZE: usize = 10_000_000;
Expand Down Expand Up @@ -125,7 +116,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
topic,
messages_sent: AtomicUsize::new(0),
messages_received: AtomicUsize::new(0),
start_time,
last_count: AtomicUsize::new(0),
last_time: Mutex::new(Instant::now()),
});

// Create channel for message processing
Expand Down Expand Up @@ -194,13 +186,24 @@ async fn main() -> Result<(), Box<dyn Error>> {
if received % 100_000 == 0 {
let sent = context.messages_sent.load(Ordering::Relaxed);
let current_capacity = tx.capacity();

let now = Instant::now();
let mut last_time = context.last_time.lock().unwrap();
let elapsed = now.duration_since(*last_time);
let interval_count = received - context.last_count.swap(received, Ordering::Relaxed);
let interval_speed = interval_count as f64 / elapsed.as_secs_f64();

*last_time = now;
drop(last_time);

println!(
"Messages received: {}, Messages sent to Kafka: {}, Messages in flight: {}, Channel capacity: {} ({:.1}%)",
"Messages received: {}, sent: {}, in flight: {}, Channel capacity: {} ({:.1}%)\nInterval speed: {:.2} msg/s",
received,
sent,
received - sent,
current_capacity,
(current_capacity as f64 / CHANNEL_BUFFER_SIZE as f64) * 100.0
(current_capacity as f64 / CHANNEL_BUFFER_SIZE as f64) * 100.0,
interval_speed
);

if current_capacity < LOW_CAPACITY_THRESHOLD {
Expand Down
2 changes: 1 addition & 1 deletion network-programmability/udp/udp-sender/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use iot::Motor;
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let start_time = Instant::now();

let message_count = 10_000_000;
let message_count = 1_000_000;
let socket = UdpSocket::bind("0.0.0.0:0").await?;
let receiver_addr = "127.0.0.1:50537";
let mut rng = rand::thread_rng();
Expand Down

0 comments on commit 9cea224

Please sign in to comment.