Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(udp-kafka-bridge): flush remaining messages #20195

Merged
merged 1 commit into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 59 additions & 34 deletions hm-kafka/kafka-client/kafka-rust/udp-kafka-bridge/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::net::UdpSocket;
use tokio::sync::mpsc;
use tokio::time::sleep;

pub mod iot {
include!(concat!(env!("OUT_DIR"), "/production.iot.rs"));
Expand All @@ -23,7 +24,8 @@ struct ProcessingContext {
producer: FutureProducer,
encoder: EasyProtoRawEncoder,
topic: String,
message_count: AtomicUsize,
messages_sent: AtomicUsize,
messages_received: AtomicUsize,
start_time: Instant,
}

Expand Down Expand Up @@ -71,11 +73,11 @@ async fn process_message(
.await
.map_err(|(err, _)| Box::new(err) as Box<dyn Error + Send + Sync>)?;

let count = context.message_count.fetch_add(1, Ordering::Relaxed) + 1;
let count = context.messages_sent.fetch_add(1, Ordering::Relaxed) + 1;
if count % 100_000 == 0 {
let duration = context.start_time.elapsed();
println!(
"Total messages sent to Kafka: {}, Time elapsed: {:?}, Avg msg/sec: {:.2}",
"Total messages sent to Kafka: {}, Time elapsed: {:.2?}, Avg msg/sec: {:.2}",
count,
duration,
count as f64 / duration.as_secs_f64()
Expand All @@ -91,13 +93,14 @@ async fn main() -> Result<(), Box<dyn Error>> {

const CHANNEL_BUFFER_SIZE: usize = 10_000_000;
const LOW_CAPACITY_THRESHOLD: usize = CHANNEL_BUFFER_SIZE / 10; // 10% of buffer size
const INACTIVITY_TIMEOUT: Duration = Duration::from_secs(5); // Timeout for inactivity

// Set up UDP socket
let socket = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
socket.set_reuse_address(true)?;

// Set and get receive buffer size
let desired_recv_buf_size = 20 * 1024 * 1024; // 20MB
let desired_recv_buf_size = 5 * 1024 * 1024; // 5MiB
socket.set_recv_buffer_size(desired_recv_buf_size)?;
let actual_recv_buf_size = socket.recv_buffer_size()?;
println!("UDP receive buffer size: {} bytes", actual_recv_buf_size);
Expand All @@ -115,12 +118,13 @@ async fn main() -> Result<(), Box<dyn Error>> {
let encoder = EasyProtoRawEncoder::new(sr_settings);
let topic = "production.iot.motor.proto".to_string();

// Create processing context with start_time
// Create processing context
let context = Arc::new(ProcessingContext {
producer,
encoder,
topic,
message_count: AtomicUsize::new(0),
messages_sent: AtomicUsize::new(0),
messages_received: AtomicUsize::new(0),
start_time,
});

Expand All @@ -131,56 +135,77 @@ async fn main() -> Result<(), Box<dyn Error>> {
CHANNEL_BUFFER_SIZE, LOW_CAPACITY_THRESHOLD
);

// Spawn message processing task
// Spawn message processing task with inactivity monitoring
let processing_context = context.clone();
tokio::spawn(async move {
let mut last_message_time = Instant::now();
let mut handles = Vec::new();
while let Some(motor) = rx.recv().await {
let ctx = processing_context.clone();
handles.push(tokio::spawn(async move {
if let Err(e) = process_message(ctx, motor).await {
eprintln!("Error processing message: {}", e);
loop {
tokio::select! {
Some(motor) = rx.recv() => {
let ctx = processing_context.clone();
last_message_time = Instant::now();
handles.push(tokio::spawn(async move {
if let Err(e) = process_message(ctx, motor).await {
eprintln!("Error processing message: {}", e);
}
}));
handles.retain(|handle| !handle.is_finished());
while handles.len() >= 2000 {
futures::future::join_all(handles.drain(..1000)).await;
}
},
_ = sleep(INACTIVITY_TIMEOUT) => {
if last_message_time.elapsed() >= INACTIVITY_TIMEOUT {
println!("Inactivity detected, flushing remaining messages...");
let received = processing_context.messages_received.load(Ordering::Relaxed);
let sent = processing_context.messages_sent.load(Ordering::Relaxed);
println!("Messages received: {}, Messages sent to Kafka: {}, Messages in flight: {}",
received, sent, received - sent);
futures::future::join_all(handles.drain(..)).await;
let final_received = processing_context.messages_received.load(Ordering::Relaxed);
let final_sent = processing_context.messages_sent.load(Ordering::Relaxed);
println!("After flush - Messages received: {}, Messages sent to Kafka: {}, Messages in flight: {}",
final_received, final_sent, final_received - final_sent);
}
}
}));
handles.retain(|handle| !handle.is_finished());
while handles.len() >= 2000 {
futures::future::join_all(handles.drain(..1000)).await;
}
}
});

// Main UDP receiving loop
let mut buffer = vec![0u8; 65536];
let mut packet_count = 0;
loop {
let (amt, _src) = socket.recv_from(&mut buffer).await?;
match Motor::decode(&buffer[..amt]) {
Ok(motor) => {
context.messages_received.fetch_add(1, Ordering::Relaxed);
if let Err(e) = tx.send(motor).await {
eprintln!("Failed to send message to processing task: {}", e);
}
}
Err(e) => {
eprintln!("Failed to decode protobuf message: {}", e);
}
}

// Monitor packet count and channel capacity
packet_count += 1;
if packet_count % 100_000 == 0 {
// Print statistics every 100,000 messages
let received = context.messages_received.load(Ordering::Relaxed);
if received % 100_000 == 0 {
let sent = context.messages_sent.load(Ordering::Relaxed);
let current_capacity = tx.capacity();
println!(
"Processed {} packets, channel capacity: {} ({:.1}%)",
packet_count,
"Messages received: {}, Messages sent to Kafka: {}, Messages in flight: {}, Channel capacity: {} ({:.1}%)",
received,
sent,
received - sent,
current_capacity,
(current_capacity as f64 / CHANNEL_BUFFER_SIZE as f64) * 100.0
);

// If channel capacity is low, this might indicate backpressure
if current_capacity < LOW_CAPACITY_THRESHOLD {
println!("Warning: Channel capacity running low - possible backpressure");
}
}

match Motor::decode(&buffer[..amt]) {
Ok(motor) => {
if let Err(e) = tx.send(motor).await {
eprintln!("Failed to send message to processing task: {}", e);
}
}
Err(e) => {
eprintln!("Failed to decode protobuf message: {}", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ message Motor {
optional double temperature3 = 5;
optional double temperature4 = 6;
optional double temperature5 = 7;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ message Motor {
optional double temperature3 = 5;
optional double temperature4 = 6;
optional double temperature5 = 7;
}
}
Loading