Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(udp-kafka-bridge): check backpressure #20194

Merged
merged 1 commit into from
Nov 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions hm-kafka/kafka-client/kafka-rust/udp-kafka-bridge/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ struct ProcessingContext {
fn create_producer(bootstrap_server: &str) -> FutureProducer {
ClientConfig::new()
.set("bootstrap.servers", bootstrap_server)
.set("batch.size", "20971520") // 20 MiB
.set("batch.size", "1048576") // 1 MiB
.set("queue.buffering.max.messages", "100000") // Limit in-flight messages
.set("queue.buffering.max.kbytes", "1048576") // 1GB max memory usage
.set("linger.ms", "2")
.set("compression.type", "zstd")
.create()
Expand All @@ -43,7 +45,6 @@ async fn process_message(
) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut buf = Vec::new();
motor.encode(&mut buf)?;

let proto_payload = context
.encoder
.encode(
Expand All @@ -58,6 +59,7 @@ async fn process_message(
.id
.clone()
.unwrap_or_else(|| "unknown_motor".to_string());

context
.producer
.send(
Expand All @@ -70,7 +72,7 @@ async fn process_message(
.map_err(|(err, _)| Box::new(err) as Box<dyn Error + Send + Sync>)?;

let count = context.message_count.fetch_add(1, Ordering::Relaxed) + 1;
if count % 100000 == 0 {
if count % 100_000 == 0 {
let duration = context.start_time.elapsed();
println!(
"Total messages sent to Kafka: {}, Time elapsed: {:?}, Avg msg/sec: {:.2}",
Expand All @@ -87,10 +89,19 @@ async fn main() -> Result<(), Box<dyn Error>> {
let start_time = Instant::now();
println!("Starting UDP listener and Kafka producer...");

const CHANNEL_BUFFER_SIZE: usize = 10_000_000;
const LOW_CAPACITY_THRESHOLD: usize = CHANNEL_BUFFER_SIZE / 10; // 10% of buffer size

// Set up UDP socket
let socket = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
socket.set_reuse_address(true)?;
socket.set_recv_buffer_size(20 * 1024 * 1024)?;

// Set and get receive buffer size
let desired_recv_buf_size = 20 * 1024 * 1024; // 20MB
socket.set_recv_buffer_size(desired_recv_buf_size)?;
let actual_recv_buf_size = socket.recv_buffer_size()?;
println!("UDP receive buffer size: {} bytes", actual_recv_buf_size);

let addr: SocketAddr = "127.0.0.1:50537".parse()?;
socket.bind(&addr.into())?;
let socket = UdpSocket::from_std(socket.into())?;
Expand All @@ -114,7 +125,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
});

// Create channel for message processing
let (tx, mut rx) = mpsc::channel::<Motor>(1000000); // Buffer size of 1000000 messages
let (tx, mut rx) = mpsc::channel::<Motor>(CHANNEL_BUFFER_SIZE);
println!(
"Channel buffer size: {}, Low capacity threshold: {}",
CHANNEL_BUFFER_SIZE, LOW_CAPACITY_THRESHOLD
);

// Spawn message processing task
let processing_context = context.clone();
Expand All @@ -136,8 +151,27 @@ async fn main() -> Result<(), Box<dyn Error>> {

// Main UDP receiving loop
let mut buffer = vec![0u8; 65536];
let mut packet_count = 0;
loop {
let (amt, _src) = socket.recv_from(&mut buffer).await?;

// Monitor packet count and channel capacity
packet_count += 1;
if packet_count % 100_000 == 0 {
let current_capacity = tx.capacity();
println!(
"Processed {} packets, channel capacity: {} ({:.1}%)",
packet_count,
current_capacity,
(current_capacity as f64 / CHANNEL_BUFFER_SIZE as f64) * 100.0
);

// If channel capacity is low, this might indicate backpressure
if current_capacity < LOW_CAPACITY_THRESHOLD {
println!("Warning: Channel capacity running low - possible backpressure");
}
}

match Motor::decode(&buffer[..amt]) {
Ok(motor) => {
if let Err(e) = tx.send(motor).await {
Expand Down
Loading