Skip to content

Commit

Permalink
feat(udp-kafka-bridge): flush remaining messages (#20195)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongbo-miao authored Nov 7, 2024
1 parent 1f8a252 commit 3257457
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 36 deletions.
93 changes: 59 additions & 34 deletions hm-kafka/kafka-client/kafka-rust/udp-kafka-bridge/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::net::UdpSocket;
use tokio::sync::mpsc;
use tokio::time::sleep;

pub mod iot {
include!(concat!(env!("OUT_DIR"), "/production.iot.rs"));
Expand All @@ -23,7 +24,8 @@ struct ProcessingContext {
producer: FutureProducer,
encoder: EasyProtoRawEncoder,
topic: String,
message_count: AtomicUsize,
messages_sent: AtomicUsize,
messages_received: AtomicUsize,
start_time: Instant,
}

Expand Down Expand Up @@ -71,11 +73,11 @@ async fn process_message(
.await
.map_err(|(err, _)| Box::new(err) as Box<dyn Error + Send + Sync>)?;

let count = context.message_count.fetch_add(1, Ordering::Relaxed) + 1;
let count = context.messages_sent.fetch_add(1, Ordering::Relaxed) + 1;
if count % 100_000 == 0 {
let duration = context.start_time.elapsed();
println!(
"Total messages sent to Kafka: {}, Time elapsed: {:?}, Avg msg/sec: {:.2}",
"Total messages sent to Kafka: {}, Time elapsed: {:.2?}, Avg msg/sec: {:.2}",
count,
duration,
count as f64 / duration.as_secs_f64()
Expand All @@ -91,13 +93,14 @@ async fn main() -> Result<(), Box<dyn Error>> {

const CHANNEL_BUFFER_SIZE: usize = 10_000_000;
const LOW_CAPACITY_THRESHOLD: usize = CHANNEL_BUFFER_SIZE / 10; // 10% of buffer size
const INACTIVITY_TIMEOUT: Duration = Duration::from_secs(5); // Timeout for inactivity

// Set up UDP socket
let socket = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
socket.set_reuse_address(true)?;

// Set and get receive buffer size
let desired_recv_buf_size = 20 * 1024 * 1024; // 20MB
let desired_recv_buf_size = 5 * 1024 * 1024; // 5MiB
socket.set_recv_buffer_size(desired_recv_buf_size)?;
let actual_recv_buf_size = socket.recv_buffer_size()?;
println!("UDP receive buffer size: {} bytes", actual_recv_buf_size);
Expand All @@ -115,12 +118,13 @@ async fn main() -> Result<(), Box<dyn Error>> {
let encoder = EasyProtoRawEncoder::new(sr_settings);
let topic = "production.iot.motor.proto".to_string();

// Create processing context with start_time
// Create processing context
let context = Arc::new(ProcessingContext {
producer,
encoder,
topic,
message_count: AtomicUsize::new(0),
messages_sent: AtomicUsize::new(0),
messages_received: AtomicUsize::new(0),
start_time,
});

Expand All @@ -131,56 +135,77 @@ async fn main() -> Result<(), Box<dyn Error>> {
CHANNEL_BUFFER_SIZE, LOW_CAPACITY_THRESHOLD
);

// Spawn message processing task
// Spawn message processing task with inactivity monitoring
let processing_context = context.clone();
tokio::spawn(async move {
let mut last_message_time = Instant::now();
let mut handles = Vec::new();
while let Some(motor) = rx.recv().await {
let ctx = processing_context.clone();
handles.push(tokio::spawn(async move {
if let Err(e) = process_message(ctx, motor).await {
eprintln!("Error processing message: {}", e);
loop {
tokio::select! {
Some(motor) = rx.recv() => {
let ctx = processing_context.clone();
last_message_time = Instant::now();
handles.push(tokio::spawn(async move {
if let Err(e) = process_message(ctx, motor).await {
eprintln!("Error processing message: {}", e);
}
}));
handles.retain(|handle| !handle.is_finished());
while handles.len() >= 2000 {
futures::future::join_all(handles.drain(..1000)).await;
}
},
_ = sleep(INACTIVITY_TIMEOUT) => {
if last_message_time.elapsed() >= INACTIVITY_TIMEOUT {
println!("Inactivity detected, flushing remaining messages...");
let received = processing_context.messages_received.load(Ordering::Relaxed);
let sent = processing_context.messages_sent.load(Ordering::Relaxed);
println!("Messages received: {}, Messages sent to Kafka: {}, Messages in flight: {}",
received, sent, received - sent);
futures::future::join_all(handles.drain(..)).await;
let final_received = processing_context.messages_received.load(Ordering::Relaxed);
let final_sent = processing_context.messages_sent.load(Ordering::Relaxed);
println!("After flush - Messages received: {}, Messages sent to Kafka: {}, Messages in flight: {}",
final_received, final_sent, final_received - final_sent);
}
}
}));
handles.retain(|handle| !handle.is_finished());
while handles.len() >= 2000 {
futures::future::join_all(handles.drain(..1000)).await;
}
}
});

// Main UDP receiving loop
let mut buffer = vec![0u8; 65536];
let mut packet_count = 0;
loop {
let (amt, _src) = socket.recv_from(&mut buffer).await?;
match Motor::decode(&buffer[..amt]) {
Ok(motor) => {
context.messages_received.fetch_add(1, Ordering::Relaxed);
if let Err(e) = tx.send(motor).await {
eprintln!("Failed to send message to processing task: {}", e);
}
}
Err(e) => {
eprintln!("Failed to decode protobuf message: {}", e);
}
}

// Monitor packet count and channel capacity
packet_count += 1;
if packet_count % 100_000 == 0 {
// Print statistics every 100,000 messages
let received = context.messages_received.load(Ordering::Relaxed);
if received % 100_000 == 0 {
let sent = context.messages_sent.load(Ordering::Relaxed);
let current_capacity = tx.capacity();
println!(
"Processed {} packets, channel capacity: {} ({:.1}%)",
packet_count,
"Messages received: {}, Messages sent to Kafka: {}, Messages in flight: {}, Channel capacity: {} ({:.1}%)",
received,
sent,
received - sent,
current_capacity,
(current_capacity as f64 / CHANNEL_BUFFER_SIZE as f64) * 100.0
);

// If channel capacity is low, this might indicate backpressure
if current_capacity < LOW_CAPACITY_THRESHOLD {
println!("Warning: Channel capacity running low - possible backpressure");
}
}

match Motor::decode(&buffer[..amt]) {
Ok(motor) => {
if let Err(e) = tx.send(motor).await {
eprintln!("Failed to send message to processing task: {}", e);
}
}
Err(e) => {
eprintln!("Failed to decode protobuf message: {}", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ message Motor {
optional double temperature3 = 5;
optional double temperature4 = 6;
optional double temperature5 = 7;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ message Motor {
optional double temperature3 = 5;
optional double temperature4 = 6;
optional double temperature5 = 7;
}
}

0 comments on commit 3257457

Please sign in to comment.