diff --git a/rust-arroyo/src/backends/kafka/mod.rs b/rust-arroyo/src/backends/kafka/mod.rs index 805ad45c..22e6991b 100644 --- a/rust-arroyo/src/backends/kafka/mod.rs +++ b/rust-arroyo/src/backends/kafka/mod.rs @@ -4,6 +4,7 @@ use super::CommitOffsets; use super::Consumer as ArroyoConsumer; use super::ConsumerError; use crate::backends::kafka::types::KafkaPayload; +use crate::gauge; use crate::types::{BrokerMessage, Partition, Topic}; use chrono::{DateTime, NaiveDateTime, Utc}; use parking_lot::Mutex; @@ -15,6 +16,7 @@ use rdkafka::error::KafkaError; use rdkafka::message::{BorrowedMessage, Message}; use rdkafka::topic_partition_list::{Offset, TopicPartitionList}; use rdkafka::types::{RDKafkaErrorCode, RDKafkaRespErr}; +use rdkafka::Statistics; use sentry::Hub; use std::collections::HashMap; use std::collections::HashSet; @@ -167,6 +169,13 @@ impl ClientContext for CustomContext { tracing::error!(error, "librdkafka: {error}: {reason}"); }) } + + fn stats(&self, stats: Statistics) { + gauge!( + "arroyo.consumer.librdkafka.total_queue_size", + stats.replyq as u64, + ); + } } impl ConsumerContext for CustomContext {