From 368d3ae38a0129a901e1dba6194c686ee5e9c24a Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Wed, 8 May 2024 14:49:54 +0800 Subject: [PATCH] fix(rdkafka): add missing methods in ClientContext. (#208) * add missing methods in `ClientContext`. Signed-off-by: Runji Wang * extract rdkafka changelog Signed-off-by: Runji Wang * add missing `timeout` argument in `BaseConsumer::poll`. Signed-off-by: Runji Wang * bump version Signed-off-by: Runji Wang --------- Signed-off-by: Runji Wang --- CHANGELOG.md | 48 ---------- madsim-rdkafka/CHANGELOG.md | 89 +++++++++++++++++++ madsim-rdkafka/Cargo.toml | 2 +- madsim-rdkafka/src/sim/client.rs | 67 +++++++++++++- madsim-rdkafka/src/sim/consumer.rs | 5 +- .../src/sim/producer/future_producer.rs | 35 +++++++- madsim-rdkafka/tests/test.rs | 2 +- 7 files changed, 192 insertions(+), 56 deletions(-) create mode 100644 madsim-rdkafka/CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md index ac18937..570d6c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,30 +22,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fix the problem that `getrandom` returns different values in multiple runs with the same seed. -## rdkafka [0.3.4] - 2024-03-22 - -### Fixed - -- Fix unintended drop of client in `fetch_watermarks`. - ## madsim [0.2.26] - 2024-03-18 ### Fixed - `sleep` and `sleep_until` now sleep for at least 1ms to be consistent with tokio's behavior. -## rdkafka [0.3.3] - 2024-02-28 - -### Changed - -- Wrap `fetch_watermarks` in `tokio::task::spawn_blocking`. - -## rdkafka [0.3.2] - 2024-02-28 - -### Changed - -- Update librdkafka to v2.3.0. - ## tonic [0.4.2] tonic-build [0.4.3] - 2024-02-27 ### Changed @@ -64,13 +46,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fix intercepting time on x86_64 macOS, Rust 1.75.0. -## rdkafka [0.3.1] - 2024-01-05 - -### Fixed - -- Add `rdkafka::message::{Header, HeaderIter}` and `BorrowedHeaders::detach`. -- Fix `rdkafka::message::Headers` trait. - ## aws-sdk-s3 [0.4.0] - 2023-11-24 ### Changed @@ -95,21 +70,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - tokio: Add `Runtime::enter` API. -## rdkafka [0.3.0] - 2023-10-11 - -### Added - -- Add statistics API. -- Add future producer API. - -### Changed - -- Update `rdkafka` to v0.34.0 and `librdkafka` to v2.2.0. - -### Fixed - -- Fix the error type of `DeliveryFuture`. - ## tonic-build [0.4.2] - 2023-10-08 ### Added @@ -188,7 +148,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - madsim: Add `restart_on_panic_matching` to support auto restarting on panic with certain messages. -- rdkafka: Add `producer::DeliveryResult` and fix `ProducerContext`. ## [0.2.21] - 2023-04-14 @@ -291,12 +250,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - etcd: Add `KeyValue::{lease, create_revision, mod_revision}` API. - etcd: Add maintenance `status` API. -- rdkafka: Add `Timestamp::to_millis`. - -### Changed - -- rdkafka: update to rdkafka v0.29.0. - - The return type of `Producer::flush` changed to `KafkaResult<()>`. ### Fixed @@ -346,7 +299,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Add simulation crate of `rdkafka`. - etcd: Add lease and election API. - madsim: Expose `JoinHandle::cancel_on_drop`. diff --git a/madsim-rdkafka/CHANGELOG.md b/madsim-rdkafka/CHANGELOG.md new file mode 100644 index 0000000..dc17b0f --- /dev/null +++ b/madsim-rdkafka/CHANGELOG.md @@ -0,0 +1,89 @@ +# Changelog +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [Unreleased] + +## [0.4.1] - 2024-05-08 + +### Fixed + +- Add missing methods in `ClientContext`. +- Add missing `timeout` argument in `BaseConsumer::poll`. + +## [0.4.0] - 2024-05-07 + +### Changed + +- The associated constant `ClientContext::ENABLE_REFRESH_OAUTH_TOKEN` is changed to a function in order to make the trait object-safe. + +## [0.3.4] - 2024-03-22 + +### Fixed + +- Fix unintended drop of client in `fetch_watermarks`. + +## [0.3.3] - 2024-02-28 + +### Changed + +- Wrap `fetch_watermarks` in `tokio::task::spawn_blocking`. + +## [0.3.2] - 2024-02-28 + +### Changed + +- Update librdkafka to v2.3.0. + +## [0.3.1] - 2024-01-05 + +### Fixed + +- Add `rdkafka::message::{Header, HeaderIter}` and `BorrowedHeaders::detach`. +- Fix `rdkafka::message::Headers` trait. + +## [0.3.0] - 2023-10-11 + +### Added + +- Add statistics API. +- Add future producer API. + +### Changed + +- Update `rdkafka` to v0.34.0 and `librdkafka` to v2.2.0. + +### Fixed + +- Fix the error type of `DeliveryFuture`. + +## [0.2.22] - 2023-04-19 + +### Added + +- Add `producer::DeliveryResult` and fix `ProducerContext`. + +## [0.2.14] - 2023-01-30 + +### Fixed + +- Resolve DNS on connection. + +## [0.2.13] - 2023-01-11 + +### Added + +- Add `Timestamp::to_millis`. + +### Changed + +- update to rdkafka v0.29.0. + - The return type of `Producer::flush` changed to `KafkaResult<()>`. + +## [0.2.8] - 2022-09-26 + +### Added + +- Add simulation crate of `rdkafka`. diff --git a/madsim-rdkafka/Cargo.toml b/madsim-rdkafka/Cargo.toml index 41072d4..d627f6d 100644 --- a/madsim-rdkafka/Cargo.toml +++ b/madsim-rdkafka/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "madsim-rdkafka" -version = "0.4.0+0.34.0" +version = "0.4.1+0.34.0" edition = "2021" authors = ["Runji Wang "] description = "The rdkafka simulator on madsim." diff --git a/madsim-rdkafka/src/sim/client.rs b/madsim-rdkafka/src/sim/client.rs index 4f97bc2..d33805c 100644 --- a/madsim-rdkafka/src/sim/client.rs +++ b/madsim-rdkafka/src/sim/client.rs @@ -1,9 +1,38 @@ +use crate::config::RDKafkaLogLevel; +use crate::error::KafkaError; use crate::Statistics; - -use tracing::info; +use std::error::Error; +use tracing::*; /// Client-level context. pub trait ClientContext: Send + Sync + 'static { + fn enable_refresh_oauth_token(&self) -> bool { + false + } + + fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) { + match level { + RDKafkaLogLevel::Emerg + | RDKafkaLogLevel::Alert + | RDKafkaLogLevel::Critical + | RDKafkaLogLevel::Error => { + error!(target: "librdkafka", "librdkafka: {} {}", fac, log_message) + } + RDKafkaLogLevel::Warning => { + warn!(target: "librdkafka", "librdkafka: {} {}", fac, log_message) + } + RDKafkaLogLevel::Notice => { + info!(target: "librdkafka", "librdkafka: {} {}", fac, log_message) + } + RDKafkaLogLevel::Info => { + info!(target: "librdkafka", "librdkafka: {} {}", fac, log_message) + } + RDKafkaLogLevel::Debug => { + debug!(target: "librdkafka", "librdkafka: {} {}", fac, log_message) + } + } + } + /// Receives the decoded statistics of the librdkafka client. To enable, the /// `statistics.interval.ms` configuration parameter must be specified. /// @@ -12,9 +41,27 @@ pub trait ClientContext: Send + Sync + 'static { info!("Client stats: {:?}", statistics); } + fn stats_raw(&self, statistics: &[u8]) { + match serde_json::from_slice(statistics) { + Ok(stats) => self.stats(stats), + Err(e) => error!("Could not parse statistics JSON: {}", e), + } + } + + fn error(&self, error: KafkaError, reason: &str) { + error!("librdkafka: {}: {}", error, reason); + } + fn rewrite_broker_addr(&self, addr: BrokerAddr) -> BrokerAddr { addr } + + fn generate_oauth_token( + &self, + _oauthbearer_config: Option<&str>, + ) -> Result> { + Err("Default implementation of generate_oauth_token must be overridden".into()) + } } /// An empty [`ClientContext`] that can be used when no customizations are needed. @@ -32,3 +79,19 @@ pub struct BrokerAddr { /// the services database. pub port: String, } + +/// A generated OAuth token and its associated metadata. +/// +/// When using the `OAUTHBEARER` SASL authentication method, this type is +/// returned from [`ClientContext::generate_oauth_token`]. The token and +/// principal name must not contain embedded null characters. +/// +/// Specifying SASL extensions is not currently supported. +pub struct OAuthToken { + /// The token value to set. + pub token: String, + /// The Kafka principal name associated with the token. + pub principal_name: String, + /// When the token expires, in number of milliseconds since the Unix epoch. + pub lifetime_ms: i64, +} diff --git a/madsim-rdkafka/src/sim/consumer.rs b/madsim-rdkafka/src/sim/consumer.rs index 82d04f2..70ab45e 100644 --- a/madsim-rdkafka/src/sim/consumer.rs +++ b/madsim-rdkafka/src/sim/consumer.rs @@ -170,7 +170,10 @@ where C: ConsumerContext, { /// Polls the consumer for new messages. - pub async fn poll(&self) -> Option>> { + pub async fn poll( + &self, + _timeout: impl Into, // TODO: timeout + ) -> Option>> { self.poll_internal() .await .map(|res| res.map(|msg| msg.borrow())) diff --git a/madsim-rdkafka/src/sim/producer/future_producer.rs b/madsim-rdkafka/src/sim/producer/future_producer.rs index fd569c7..38c8219 100644 --- a/madsim-rdkafka/src/sim/producer/future_producer.rs +++ b/madsim-rdkafka/src/sim/producer/future_producer.rs @@ -1,15 +1,16 @@ use super::{BaseRecord, DeliveryResult, IntoOpaque, ProducerContext, ThreadedProducer}; use crate::{ - client::{BrokerAddr, DefaultClientContext}, - config::{FromClientConfig, FromClientConfigAndContext}, + client::{BrokerAddr, DefaultClientContext, OAuthToken}, + config::{FromClientConfig, FromClientConfigAndContext, RDKafkaLogLevel}, error::{KafkaError, KafkaResult}, message::{Message, OwnedHeaders, OwnedMessage, ToBytes}, util::Timeout, - ClientConfig, ClientContext, + ClientConfig, ClientContext, Statistics, }; use futures_channel::oneshot; use futures_util::FutureExt; +use std::error::Error; use std::future::Future; use std::pin::Pin; use std::sync::Arc; @@ -125,9 +126,37 @@ pub type OwnedDeliveryResult = Result<(i32, i64), (KafkaError, OwnedMessage)>; // Delegates all the methods calls to the wrapped context. impl ClientContext for FutureProducerContext { + fn enable_refresh_oauth_token(&self) -> bool { + self.wrapped_context.enable_refresh_oauth_token() + } + + fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) { + self.wrapped_context.log(level, fac, log_message); + } + + fn stats(&self, statistics: Statistics) { + self.wrapped_context.stats(statistics); + } + + fn stats_raw(&self, statistics: &[u8]) { + self.wrapped_context.stats_raw(statistics) + } + + fn error(&self, error: KafkaError, reason: &str) { + self.wrapped_context.error(error, reason); + } + fn rewrite_broker_addr(&self, addr: BrokerAddr) -> BrokerAddr { self.wrapped_context.rewrite_broker_addr(addr) } + + fn generate_oauth_token( + &self, + oauthbearer_config: Option<&str>, + ) -> Result> { + self.wrapped_context + .generate_oauth_token(oauthbearer_config) + } } impl ProducerContext for FutureProducerContext { diff --git a/madsim-rdkafka/tests/test.rs b/madsim-rdkafka/tests/test.rs index c7901d7..5aef61e 100644 --- a/madsim-rdkafka/tests/test.rs +++ b/madsim-rdkafka/tests/test.rs @@ -134,7 +134,7 @@ async fn test() { consumer.assign(&assignment).expect("failed to assign"); loop { - let msg = match consumer.poll().await { + let msg = match consumer.poll(None).await { None => { madsim::time::sleep(Duration::from_millis(100)).await; continue;