Skip to content

Commit

Permalink
fix(rdkafka): add missing methods in ClientContext. (#208)
Browse files Browse the repository at this point in the history
* add missing methods in `ClientContext`.

Signed-off-by: Runji Wang <wangrunji0408@163.com>

* extract rdkafka changelog

Signed-off-by: Runji Wang <wangrunji0408@163.com>

* add missing `timeout` argument in `BaseConsumer::poll`.

Signed-off-by: Runji Wang <wangrunji0408@163.com>

* bump version

Signed-off-by: Runji Wang <wangrunji0408@163.com>

---------

Signed-off-by: Runji Wang <wangrunji0408@163.com>
  • Loading branch information
wangrunji0408 committed May 8, 2024
1 parent e1f6d8b commit 368d3ae
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 56 deletions.
48 changes: 0 additions & 48 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Fix the problem that `getrandom` returns different values in multiple runs with the same seed.

## rdkafka [0.3.4] - 2024-03-22

### Fixed

- Fix unintended drop of client in `fetch_watermarks`.

## madsim [0.2.26] - 2024-03-18

### Fixed

- `sleep` and `sleep_until` now sleep for at least 1ms to be consistent with tokio's behavior.

## rdkafka [0.3.3] - 2024-02-28

### Changed

- Wrap `fetch_watermarks` in `tokio::task::spawn_blocking`.

## rdkafka [0.3.2] - 2024-02-28

### Changed

- Update librdkafka to v2.3.0.

## tonic [0.4.2] tonic-build [0.4.3] - 2024-02-27

### Changed
Expand All @@ -64,13 +46,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Fix intercepting time on x86_64 macOS, Rust 1.75.0.

## rdkafka [0.3.1] - 2024-01-05

### Fixed

- Add `rdkafka::message::{Header, HeaderIter}` and `BorrowedHeaders::detach`.
- Fix `rdkafka::message::Headers` trait.

## aws-sdk-s3 [0.4.0] - 2023-11-24

### Changed
Expand All @@ -95,21 +70,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- tokio: Add `Runtime::enter` API.

## rdkafka [0.3.0] - 2023-10-11

### Added

- Add statistics API.
- Add future producer API.

### Changed

- Update `rdkafka` to v0.34.0 and `librdkafka` to v2.2.0.

### Fixed

- Fix the error type of `DeliveryFuture`.

## tonic-build [0.4.2] - 2023-10-08

### Added
Expand Down Expand Up @@ -188,7 +148,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- madsim: Add `restart_on_panic_matching` to support auto restarting on panic with certain messages.
- rdkafka: Add `producer::DeliveryResult` and fix `ProducerContext`.


## [0.2.21] - 2023-04-14
Expand Down Expand Up @@ -291,12 +250,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- etcd: Add `KeyValue::{lease, create_revision, mod_revision}` API.
- etcd: Add maintenance `status` API.
- rdkafka: Add `Timestamp::to_millis`.

### Changed

- rdkafka: update to rdkafka v0.29.0.
- The return type of `Producer::flush` changed to `KafkaResult<()>`.

### Fixed

Expand Down Expand Up @@ -346,7 +299,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Add simulation crate of `rdkafka`.
- etcd: Add lease and election API.
- madsim: Expose `JoinHandle::cancel_on_drop`.

Expand Down
89 changes: 89 additions & 0 deletions madsim-rdkafka/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Changelog
All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [0.4.1] - 2024-05-08

### Fixed

- Add missing methods in `ClientContext`.
- Add missing `timeout` argument in `BaseConsumer::poll`.

## [0.4.0] - 2024-05-07

### Changed

- The associated constant `ClientContext::ENABLE_REFRESH_OAUTH_TOKEN` is changed to a function in order to make the trait object-safe.

## [0.3.4] - 2024-03-22

### Fixed

- Fix unintended drop of client in `fetch_watermarks`.

## [0.3.3] - 2024-02-28

### Changed

- Wrap `fetch_watermarks` in `tokio::task::spawn_blocking`.

## [0.3.2] - 2024-02-28

### Changed

- Update librdkafka to v2.3.0.

## [0.3.1] - 2024-01-05

### Fixed

- Add `rdkafka::message::{Header, HeaderIter}` and `BorrowedHeaders::detach`.
- Fix `rdkafka::message::Headers` trait.

## [0.3.0] - 2023-10-11

### Added

- Add statistics API.
- Add future producer API.

### Changed

- Update `rdkafka` to v0.34.0 and `librdkafka` to v2.2.0.

### Fixed

- Fix the error type of `DeliveryFuture`.

## [0.2.22] - 2023-04-19

### Added

- Add `producer::DeliveryResult` and fix `ProducerContext`.

## [0.2.14] - 2023-01-30

### Fixed

- Resolve DNS on connection.

## [0.2.13] - 2023-01-11

### Added

- Add `Timestamp::to_millis`.

### Changed

- update to rdkafka v0.29.0.
- The return type of `Producer::flush` changed to `KafkaResult<()>`.

## [0.2.8] - 2022-09-26

### Added

- Add simulation crate of `rdkafka`.
2 changes: 1 addition & 1 deletion madsim-rdkafka/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "madsim-rdkafka"
version = "0.4.0+0.34.0"
version = "0.4.1+0.34.0"
edition = "2021"
authors = ["Runji Wang <wangrunji0408@163.com>"]
description = "The rdkafka simulator on madsim."
Expand Down
67 changes: 65 additions & 2 deletions madsim-rdkafka/src/sim/client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,38 @@
use crate::config::RDKafkaLogLevel;
use crate::error::KafkaError;
use crate::Statistics;

use tracing::info;
use std::error::Error;
use tracing::*;

/// Client-level context.
pub trait ClientContext: Send + Sync + 'static {
fn enable_refresh_oauth_token(&self) -> bool {
false
}

fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) {
match level {
RDKafkaLogLevel::Emerg
| RDKafkaLogLevel::Alert
| RDKafkaLogLevel::Critical
| RDKafkaLogLevel::Error => {
error!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
}
RDKafkaLogLevel::Warning => {
warn!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
}
RDKafkaLogLevel::Notice => {
info!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
}
RDKafkaLogLevel::Info => {
info!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
}
RDKafkaLogLevel::Debug => {
debug!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
}
}
}

/// Receives the decoded statistics of the librdkafka client. To enable, the
/// `statistics.interval.ms` configuration parameter must be specified.
///
Expand All @@ -12,9 +41,27 @@ pub trait ClientContext: Send + Sync + 'static {
info!("Client stats: {:?}", statistics);
}

fn stats_raw(&self, statistics: &[u8]) {
match serde_json::from_slice(statistics) {
Ok(stats) => self.stats(stats),
Err(e) => error!("Could not parse statistics JSON: {}", e),
}
}

fn error(&self, error: KafkaError, reason: &str) {
error!("librdkafka: {}: {}", error, reason);
}

fn rewrite_broker_addr(&self, addr: BrokerAddr) -> BrokerAddr {
addr
}

fn generate_oauth_token(
&self,
_oauthbearer_config: Option<&str>,
) -> Result<OAuthToken, Box<dyn Error>> {
Err("Default implementation of generate_oauth_token must be overridden".into())
}
}

/// An empty [`ClientContext`] that can be used when no customizations are needed.
Expand All @@ -32,3 +79,19 @@ pub struct BrokerAddr {
/// the services database.
pub port: String,
}

/// A generated OAuth token and its associated metadata.
///
/// When using the `OAUTHBEARER` SASL authentication method, this type is
/// returned from [`ClientContext::generate_oauth_token`]. The token and
/// principal name must not contain embedded null characters.
///
/// Specifying SASL extensions is not currently supported.
pub struct OAuthToken {
/// The token value to set.
pub token: String,
/// The Kafka principal name associated with the token.
pub principal_name: String,
/// When the token expires, in number of milliseconds since the Unix epoch.
pub lifetime_ms: i64,
}
5 changes: 4 additions & 1 deletion madsim-rdkafka/src/sim/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,10 @@ where
C: ConsumerContext,
{
/// Polls the consumer for new messages.
pub async fn poll(&self) -> Option<KafkaResult<BorrowedMessage<'_>>> {
pub async fn poll(
&self,
_timeout: impl Into<Timeout>, // TODO: timeout
) -> Option<KafkaResult<BorrowedMessage<'_>>> {
self.poll_internal()
.await
.map(|res| res.map(|msg| msg.borrow()))
Expand Down
35 changes: 32 additions & 3 deletions madsim-rdkafka/src/sim/producer/future_producer.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use super::{BaseRecord, DeliveryResult, IntoOpaque, ProducerContext, ThreadedProducer};
use crate::{
client::{BrokerAddr, DefaultClientContext},
config::{FromClientConfig, FromClientConfigAndContext},
client::{BrokerAddr, DefaultClientContext, OAuthToken},
config::{FromClientConfig, FromClientConfigAndContext, RDKafkaLogLevel},
error::{KafkaError, KafkaResult},
message::{Message, OwnedHeaders, OwnedMessage, ToBytes},
util::Timeout,
ClientConfig, ClientContext,
ClientConfig, ClientContext, Statistics,
};
use futures_channel::oneshot;
use futures_util::FutureExt;

use std::error::Error;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
Expand Down Expand Up @@ -125,9 +126,37 @@ pub type OwnedDeliveryResult = Result<(i32, i64), (KafkaError, OwnedMessage)>;

// Delegates all the methods calls to the wrapped context.
impl<C: ClientContext + 'static> ClientContext for FutureProducerContext<C> {
fn enable_refresh_oauth_token(&self) -> bool {
self.wrapped_context.enable_refresh_oauth_token()
}

fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) {
self.wrapped_context.log(level, fac, log_message);
}

fn stats(&self, statistics: Statistics) {
self.wrapped_context.stats(statistics);
}

fn stats_raw(&self, statistics: &[u8]) {
self.wrapped_context.stats_raw(statistics)
}

fn error(&self, error: KafkaError, reason: &str) {
self.wrapped_context.error(error, reason);
}

fn rewrite_broker_addr(&self, addr: BrokerAddr) -> BrokerAddr {
self.wrapped_context.rewrite_broker_addr(addr)
}

fn generate_oauth_token(
&self,
oauthbearer_config: Option<&str>,
) -> Result<OAuthToken, Box<dyn Error>> {
self.wrapped_context
.generate_oauth_token(oauthbearer_config)
}
}

impl<C: ClientContext + 'static> ProducerContext for FutureProducerContext<C> {
Expand Down
2 changes: 1 addition & 1 deletion madsim-rdkafka/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async fn test() {
consumer.assign(&assignment).expect("failed to assign");

loop {
let msg = match consumer.poll().await {
let msg = match consumer.poll(None).await {
None => {
madsim::time::sleep(Duration::from_millis(100)).await;
continue;
Expand Down

0 comments on commit 368d3ae

Please sign in to comment.