Skip to content

Commit

Permalink
Merge pull request #617 from fede1024/scanterog/event-based-client
Browse files Browse the repository at this point in the history
Use rdkafka event API instead of the callback API
  • Loading branch information
davidblewett authored Nov 8, 2023
2 parents c87c1e7 + 978c964 commit c719e55
Show file tree
Hide file tree
Showing 12 changed files with 600 additions and 317 deletions.
2 changes: 1 addition & 1 deletion src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ fn start_poll_thread(queue: Arc<NativeQueue>, should_stop: Arc<AtomicBool>) -> J
.expect("Failed to start polling thread")
}

type NativeEvent = NativePtr<RDKafkaEvent>;
pub(crate) type NativeEvent = NativePtr<RDKafkaEvent>;

unsafe impl KafkaDrop for RDKafkaEvent {
const TYPE: &'static str = "event";
Expand Down
244 changes: 130 additions & 114 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@
//! [`consumer`]: crate::consumer
//! [`producer`]: crate::producer
use std::convert::TryFrom;
use std::error::Error;
use std::ffi::{CStr, CString};
use std::mem::ManuallyDrop;
use std::os::raw::{c_char, c_void};
use std::os::raw::c_char;
use std::ptr;
use std::slice;
use std::string::ToString;
use std::sync::Arc;

use libc::c_void;
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;

use crate::admin::NativeEvent;
use crate::config::{ClientConfig, NativeClientConfig, RDKafkaLogLevel};
use crate::consumer::RebalanceProtocol;
use crate::error::{IsError, KafkaError, KafkaResult};
Expand Down Expand Up @@ -239,21 +239,6 @@ impl<C: ClientContext> Client<C> {
Arc::as_ptr(&context) as *mut c_void,
)
};
unsafe { rdsys::rd_kafka_conf_set_log_cb(native_config.ptr(), Some(native_log_cb::<C>)) };
unsafe {
rdsys::rd_kafka_conf_set_stats_cb(native_config.ptr(), Some(native_stats_cb::<C>))
};
unsafe {
rdsys::rd_kafka_conf_set_error_cb(native_config.ptr(), Some(native_error_cb::<C>))
};
if C::ENABLE_REFRESH_OAUTH_TOKEN {
unsafe {
rdsys::rd_kafka_conf_set_oauthbearer_token_refresh_cb(
native_config.ptr(),
Some(native_oauth_refresh_cb::<C>),
)
};
}

let client_ptr = unsafe {
let native_config = ManuallyDrop::new(native_config);
Expand Down Expand Up @@ -293,6 +278,128 @@ impl<C: ClientContext> Client<C> {
&self.context
}

pub(crate) fn poll_event(&self, queue: &NativeQueue, timeout: Timeout) -> Option<NativeEvent> {
let event = unsafe { NativeEvent::from_ptr(queue.poll(timeout)) };
if let Some(ev) = event {
let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) };
match evtype {
rdsys::RD_KAFKA_EVENT_LOG => self.handle_log_event(ev.ptr()),
rdsys::RD_KAFKA_EVENT_STATS => self.handle_stats_event(ev.ptr()),
rdsys::RD_KAFKA_EVENT_ERROR => {
// rdkafka reports consumer errors via RD_KAFKA_EVENT_ERROR but producer errors gets
// embedded on the ack returned via RD_KAFKA_EVENT_DR. Hence we need to return this event
// for the consumer case in order to return the error to the user.
self.handle_error_event(ev.ptr());
return Some(ev);
}
rdsys::RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH => {
if C::ENABLE_REFRESH_OAUTH_TOKEN {
self.handle_oauth_refresh_event(ev.ptr());
}
}
_ => {
return Some(ev);
}
}
}
None
}

fn handle_log_event(&self, event: *mut RDKafkaEvent) {
let mut fac: *const c_char = std::ptr::null();
let mut str_: *const c_char = std::ptr::null();
let mut level: i32 = 0;
let result = unsafe { rdsys::rd_kafka_event_log(event, &mut fac, &mut str_, &mut level) };
if result == 0 {
let fac = unsafe { CStr::from_ptr(fac).to_string_lossy() };
let log_message = unsafe { CStr::from_ptr(str_).to_string_lossy() };
self.context().log(
RDKafkaLogLevel::from_int(level),
fac.trim(),
log_message.trim(),
);
}
}

fn handle_stats_event(&self, event: *mut RDKafkaEvent) {
let json = unsafe { CStr::from_ptr(rdsys::rd_kafka_event_stats(event)) };
self.context().stats_raw(json.to_bytes());
}

fn handle_error_event(&self, event: *mut RDKafkaEvent) {
let rdkafka_err = unsafe { rdsys::rd_kafka_event_error(event) };
let error = KafkaError::Global(rdkafka_err.into());
let reason =
unsafe { CStr::from_ptr(rdsys::rd_kafka_event_error_string(event)).to_string_lossy() };
self.context().error(error, reason.trim());
}

fn handle_oauth_refresh_event(&self, event: *mut RDKafkaEvent) {
let oauthbearer_config = unsafe { rdsys::rd_kafka_event_config_string(event) };
let res: Result<_, Box<dyn Error>> = (|| {
let oauthbearer_config = match oauthbearer_config.is_null() {
true => None,
false => unsafe { Some(util::cstr_to_owned(oauthbearer_config)) },
};
let token_info = self
.context()
.generate_oauth_token(oauthbearer_config.as_deref())?;
let token = CString::new(token_info.token)?;
let principal_name = CString::new(token_info.principal_name)?;
Ok((token, principal_name, token_info.lifetime_ms))
})();
match res {
Ok((token, principal_name, lifetime_ms)) => {
let mut err_buf = ErrBuf::new();
let code = unsafe {
rdkafka_sys::rd_kafka_oauthbearer_set_token(
self.native_ptr(),
token.as_ptr(),
lifetime_ms,
principal_name.as_ptr(),
ptr::null_mut(),
0,
err_buf.as_mut_ptr(),
err_buf.capacity(),
)
};
if code == RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR {
debug!("successfully set refreshed OAuth token");
} else {
debug!(
"failed to set refreshed OAuth token (code {:?}): {}",
code, err_buf
);
unsafe {
rdkafka_sys::rd_kafka_oauthbearer_set_token_failure(
self.native_ptr(),
err_buf.as_mut_ptr(),
)
};
}
}
Err(e) => {
debug!("failed to refresh OAuth token: {}", e);
let message = match CString::new(e.to_string()) {
Ok(message) => message,
Err(e) => {
error!("error message generated while refreshing OAuth token has embedded null character: {}", e);
CString::new(
"error while refreshing OAuth token has embedded null character",
)
.expect("known to be a valid CString")
}
};
unsafe {
rdkafka_sys::rd_kafka_oauthbearer_set_token_failure(
self.native_ptr(),
message.as_ptr(),
)
};
}
}
}

/// Returns the metadata information for the specified topic, or for all topics in the cluster
/// if no topic is specified.
pub fn fetch_metadata<T: Into<Timeout>>(
Expand Down Expand Up @@ -442,6 +549,11 @@ impl<C: ClientContext> Client<C> {
pub(crate) fn consumer_queue(&self) -> Option<NativeQueue> {
unsafe { NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_consumer(self.native_ptr())) }
}

/// Returns a NativeQueue for the main librdkafka event queue from the current client.
pub(crate) fn main_queue(&self) -> NativeQueue {
unsafe { NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_main(self.native_ptr())).unwrap() }
}
}

pub(crate) type NativeTopic = NativePtr<RDKafkaTopic>;
Expand Down Expand Up @@ -471,48 +583,6 @@ impl NativeQueue {
}
}

pub(crate) unsafe extern "C" fn native_log_cb<C: ClientContext>(
client: *const RDKafka,
level: i32,
fac: *const c_char,
buf: *const c_char,
) {
let fac = CStr::from_ptr(fac).to_string_lossy();
let log_message = CStr::from_ptr(buf).to_string_lossy();

let context = &mut *(rdsys::rd_kafka_opaque(client) as *mut C);
context.log(
RDKafkaLogLevel::from_int(level),
fac.trim(),
log_message.trim(),
);
}

pub(crate) unsafe extern "C" fn native_stats_cb<C: ClientContext>(
_conf: *mut RDKafka,
json: *mut c_char,
json_len: usize,
opaque: *mut c_void,
) -> i32 {
let context = &mut *(opaque as *mut C);
context.stats_raw(slice::from_raw_parts(json as *mut u8, json_len));
0 // librdkafka will free the json buffer
}

pub(crate) unsafe extern "C" fn native_error_cb<C: ClientContext>(
_client: *mut RDKafka,
err: i32,
reason: *const c_char,
opaque: *mut c_void,
) {
let err = RDKafkaRespErr::try_from(err).expect("global error not an rd_kafka_resp_err_t");
let error = KafkaError::Global(err.into());
let reason = CStr::from_ptr(reason).to_string_lossy();

let context = &mut *(opaque as *mut C);
context.error(error, reason.trim());
}

/// A generated OAuth token and its associated metadata.
///
/// When using the `OAUTHBEARER` SASL authentication method, this type is
Expand All @@ -529,60 +599,6 @@ pub struct OAuthToken {
pub lifetime_ms: i64,
}

pub(crate) unsafe extern "C" fn native_oauth_refresh_cb<C: ClientContext>(
client: *mut RDKafka,
oauthbearer_config: *const c_char,
opaque: *mut c_void,
) {
let res: Result<_, Box<dyn Error>> = (|| {
let context = &mut *(opaque as *mut C);
let oauthbearer_config = match oauthbearer_config.is_null() {
true => None,
false => Some(util::cstr_to_owned(oauthbearer_config)),
};
let token_info = context.generate_oauth_token(oauthbearer_config.as_deref())?;
let token = CString::new(token_info.token)?;
let principal_name = CString::new(token_info.principal_name)?;
Ok((token, principal_name, token_info.lifetime_ms))
})();
match res {
Ok((token, principal_name, lifetime_ms)) => {
let mut err_buf = ErrBuf::new();
let code = rdkafka_sys::rd_kafka_oauthbearer_set_token(
client,
token.as_ptr(),
lifetime_ms,
principal_name.as_ptr(),
ptr::null_mut(),
0,
err_buf.as_mut_ptr(),
err_buf.capacity(),
);
if code == RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR {
debug!("successfully set refreshed OAuth token");
} else {
debug!(
"failed to set refreshed OAuth token (code {:?}): {}",
code, err_buf
);
rdkafka_sys::rd_kafka_oauthbearer_set_token_failure(client, err_buf.as_mut_ptr());
}
}
Err(e) => {
debug!("failed to refresh OAuth token: {}", e);
let message = match CString::new(e.to_string()) {
Ok(message) => message,
Err(e) => {
error!("error message generated while refreshing OAuth token has embedded null character: {}", e);
CString::new("error while refreshing OAuth token has embedded null character")
.expect("known to be a valid CString")
}
};
rdkafka_sys::rd_kafka_oauthbearer_set_token_failure(client, message.as_ptr());
}
}
}

#[cfg(test)]
mod tests {
// Just call everything to test there no panics by default, behavior
Expand Down
Loading

0 comments on commit c719e55

Please sign in to comment.