From 3302a9535039ce7d01a8903fd4f4135d7da7eaee Mon Sep 17 00:00:00 2001 From: Robert Ignat Date: Thu, 20 Aug 2020 19:25:34 +0300 Subject: [PATCH 01/15] Add consumer group metadata type --- rdkafka-sys/src/types.rs | 2 ++ src/consumer/base_consumer.rs | 11 ++++++++++- src/consumer/mod.rs | 30 +++++++++++++++++++++++++++++- 3 files changed, 41 insertions(+), 2 deletions(-) diff --git a/rdkafka-sys/src/types.rs b/rdkafka-sys/src/types.rs index c403173ae..f13981ef0 100644 --- a/rdkafka-sys/src/types.rs +++ b/rdkafka-sys/src/types.rs @@ -81,6 +81,8 @@ pub type RDKafkaAdminOptions = bindings::rd_kafka_AdminOptions_t; /// Native rdkafka topic result. pub type RDKafkaTopicResult = bindings::rd_kafka_topic_result_t; +pub type RDKafkaConsumerGroupMetadata = bindings::rd_kafka_consumer_group_metadata_t; + // ENUMS /// Client types. diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index 1e46980ee..d0e44a529 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -14,7 +14,9 @@ use rdkafka_sys::types::*; use crate::client::{Client, NativeClient, NativeQueue}; use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext}; -use crate::consumer::{CommitMode, Consumer, ConsumerContext, DefaultConsumerContext}; +use crate::consumer::{ + CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext, +}; use crate::error::{IsError, KafkaError, KafkaResult}; use crate::groups::GroupList; use crate::message::{BorrowedMessage, Message}; @@ -537,6 +539,13 @@ impl Consumer for BaseConsumer { }; Ok(()) } + + fn group_metadata(&self) -> ConsumerGroupMetadata { + unsafe { + let ptr = rdsys::rd_kafka_consumer_group_metadata(self.client.native_ptr()); + ConsumerGroupMetadata::from_ptr(ptr) + } + } } impl Drop for BaseConsumer { diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 9160bfada..b2e6fa206 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -14,7 +14,7 @@ use crate::groups::GroupList; use crate::message::BorrowedMessage; use crate::metadata::Metadata; use crate::topic_partition_list::{Offset, TopicPartitionList}; -use crate::util::{cstr_to_owned, Timeout}; +use crate::util::{cstr_to_owned, KafkaDrop, NativePtr, Timeout}; pub mod base_consumer; pub mod stream_consumer; @@ -143,6 +143,29 @@ pub enum CommitMode { Async = 1, } +/// Consumer group metadata used to send offsets to producer transactions. +pub struct ConsumerGroupMetadata { + ptr: NativePtr, +} + +unsafe impl KafkaDrop for RDKafkaConsumerGroupMetadata { + const TYPE: &'static str = "consumer_group_metadata"; + const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_consumer_group_metadata_destroy; +} + +impl ConsumerGroupMetadata { + pub(crate) unsafe fn from_ptr(ptr: *mut RDKafkaConsumerGroupMetadata) -> ConsumerGroupMetadata { + ConsumerGroupMetadata { + ptr: NativePtr::from_ptr(ptr).unwrap(), + } + } + + /// Returns the pointer to the internal librdkafka structure. + pub fn ptr(&self) -> *mut RDKafkaConsumerGroupMetadata { + self.ptr.ptr() + } +} + /// Common trait for all consumers. /// /// # Note about object safety @@ -339,4 +362,9 @@ pub trait Consumer { fn client(&self) -> &Client { self.get_base_consumer().client() } + + /// Returns the consumer group metadata needed to send offsets to producer transactions. + fn group_metadata(&self) -> ConsumerGroupMetadata { + self.get_base_consumer().group_metadata() + } } From d81e1e0342252226117175e57246b3c610a2f719 Mon Sep 17 00:00:00 2001 From: Robert Ignat Date: Thu, 20 Aug 2020 19:25:56 +0300 Subject: [PATCH 02/15] Add BaseProducer transaction methods --- src/error.rs | 5 +++ src/producer/base_producer.rs | 77 +++++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/src/error.rs b/src/error.rs index e299d3bd5..33c8122ed 100644 --- a/src/error.rs +++ b/src/error.rs @@ -78,6 +78,8 @@ pub enum KafkaError { StoreOffset(RDKafkaError), /// Subscription creation failed. Subscription(String), + /// Transaction error. + Transaction(RDKafkaError), } impl fmt::Debug for KafkaError { @@ -129,6 +131,7 @@ impl fmt::Debug for KafkaError { KafkaError::Subscription(ref err) => { write!(f, "KafkaError (Subscription error: {})", err) } + KafkaError::Transaction(err) => write!(f, "KafkaError (Transaction error: {})", err), } } } @@ -162,6 +165,7 @@ impl fmt::Display for KafkaError { KafkaError::SetPartitionOffset(err) => write!(f, "Set partition offset error: {}", err), KafkaError::StoreOffset(err) => write!(f, "Store offset error: {}", err), KafkaError::Subscription(ref err) => write!(f, "Subscription error: {}", err), + KafkaError::Transaction(err) => write!(f, "Transaction error: {}", err), } } } @@ -213,6 +217,7 @@ impl KafkaError { KafkaError::SetPartitionOffset(err) => Some(err), KafkaError::StoreOffset(err) => Some(err), KafkaError::Subscription(_) => None, + KafkaError::Transaction(err) => Some(err), } } } diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index 324c72504..8d7258b6c 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -58,9 +58,11 @@ use rdkafka_sys::types::*; use crate::client::{Client, ClientContext}; use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext}; +use crate::consumer::ConsumerGroupMetadata; use crate::error::{IsError, KafkaError, KafkaResult}; use crate::message::{BorrowedMessage, OwnedHeaders, ToBytes}; use crate::util::{IntoOpaque, Timeout}; +use crate::TopicPartitionList; pub use crate::message::DeliveryResult; @@ -426,6 +428,81 @@ impl BaseProducer { pub fn client(&self) -> &Client { &*self.client_arc } + + /// Initialize transactions for the producer instance. + pub fn init_transactions>(&self, timeout: T) -> KafkaResult<()> { + let init_error = unsafe { + let err = + rdsys::rd_kafka_init_transactions(self.native_ptr(), timeout.into().as_millis()); + rdsys::rd_kafka_error_code(err) + }; + if init_error.is_error() { + return Err(KafkaError::Transaction(init_error.into())); + } + Ok(()) + } + + /// Begin a new transaction. + pub fn begin_transaction(&self) -> KafkaResult<()> { + let begin_error = unsafe { + let err = rdsys::rd_kafka_begin_transaction(self.native_ptr()); + rdsys::rd_kafka_error_code(err) + }; + if begin_error.is_error() { + return Err(KafkaError::Transaction(begin_error.into())); + } + Ok(()) + } + + /// Sends a list of topic partition offsets to the consumer group coordinator for `cgm`, and + /// marks the offsets as part part of the current transaction. These offsets will be considered + /// committed only if the transaction is committed successfully. + pub fn send_offsets_to_transaction>( + &self, + offsets: &TopicPartitionList, + cgm: &ConsumerGroupMetadata, + timeout: T, + ) -> KafkaResult<()> { + let send_error = unsafe { + let err = rdsys::rd_kafka_send_offsets_to_transaction( + self.native_ptr(), + offsets.ptr(), + cgm.ptr(), + timeout.into().as_millis(), + ); + rdsys::rd_kafka_error_code(err) + }; + if send_error.is_error() { + return Err(KafkaError::Transaction(send_error.into())); + } + Ok(()) + } + + /// Commit the current transaction. + pub fn commit_transaction>(&self, timeout: T) -> KafkaResult<()> { + let commit_error = unsafe { + let err = + rdsys::rd_kafka_commit_transaction(self.native_ptr(), timeout.into().as_millis()); + rdsys::rd_kafka_error_code(err) + }; + if commit_error.is_error() { + return Err(KafkaError::Transaction(commit_error.into())); + } + Ok(()) + } + + /// Aborts the ongoing transaction. + pub fn abort_transaction>(&self, timeout: T) -> KafkaResult<()> { + let abort_error = unsafe { + let err = + rdsys::rd_kafka_abort_transaction(self.native_ptr(), timeout.into().as_millis()); + rdsys::rd_kafka_error_code(err) + }; + if abort_error.is_error() { + return Err(KafkaError::Transaction(abort_error.into())); + } + Ok(()) + } } impl Clone for BaseProducer { From c35310475db8ac86a3f7c993487d7b690d3023e6 Mon Sep 17 00:00:00 2001 From: Robert Ignat Date: Tue, 25 Aug 2020 16:23:28 +0300 Subject: [PATCH 03/15] Rename RDKafkaError to RDKafkaErrorCode --- rdkafka-sys/src/helpers.rs | 6 +++--- rdkafka-sys/src/types.rs | 14 ++++++------ src/admin.rs | 10 ++++----- src/client.rs | 2 +- src/error.rs | 38 ++++++++++++++++----------------- src/message.rs | 4 ++-- src/producer/future_producer.rs | 6 +++--- tests/test_admin.rs | 26 +++++++++++++--------- tests/test_high_producers.rs | 4 ++-- tests/test_low_producers.rs | 13 +++++++---- 10 files changed, 67 insertions(+), 56 deletions(-) diff --git a/rdkafka-sys/src/helpers.rs b/rdkafka-sys/src/helpers.rs index f0865d8de..c2a8203e7 100644 --- a/rdkafka-sys/src/helpers.rs +++ b/rdkafka-sys/src/helpers.rs @@ -1,11 +1,11 @@ //! Utility functions. -use crate::types::RDKafkaError; -use crate::types::RDKafkaError::*; +use crate::types::RDKafkaErrorCode; +use crate::types::RDKafkaErrorCode::*; use crate::types::RDKafkaRespErr; use crate::types::RDKafkaRespErr::*; -pub fn rd_kafka_resp_err_t_to_rdkafka_error(err: RDKafkaRespErr) -> RDKafkaError { +pub fn rd_kafka_resp_err_t_to_rdkafka_error(err: RDKafkaRespErr) -> RDKafkaErrorCode { match err { RD_KAFKA_RESP_ERR__BEGIN => Begin, RD_KAFKA_RESP_ERR__BAD_MSG => BadMessage, diff --git a/rdkafka-sys/src/types.rs b/rdkafka-sys/src/types.rs index f13981ef0..141dd54f6 100644 --- a/rdkafka-sys/src/types.rs +++ b/rdkafka-sys/src/types.rs @@ -105,9 +105,9 @@ pub use bindings::rd_kafka_ConfigSource_t as RDKafkaConfigSource; // Errors enum -/// Native rdkafka error. +/// Native rdkafka error code. #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum RDKafkaError { +pub enum RDKafkaErrorCode { #[doc(hidden)] Begin = -200, /// Received message is incorrect. @@ -402,13 +402,13 @@ pub enum RDKafkaError { EndAll, } -impl From for RDKafkaError { - fn from(err: RDKafkaRespErr) -> RDKafkaError { +impl From for RDKafkaErrorCode { + fn from(err: RDKafkaRespErr) -> RDKafkaErrorCode { helpers::rd_kafka_resp_err_t_to_rdkafka_error(err) } } -impl fmt::Display for RDKafkaError { +impl fmt::Display for RDKafkaErrorCode { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let description = match RDKafkaRespErr::try_from(*self as i32) { Ok(err) => { @@ -424,7 +424,7 @@ impl fmt::Display for RDKafkaError { } } -impl error::Error for RDKafkaError { +impl error::Error for RDKafkaErrorCode { fn description(&self) -> &str { "Error from underlying rdkafka library" } @@ -436,7 +436,7 @@ mod tests { #[test] fn test_display_error() { - let error: RDKafkaError = RDKafkaRespErr::RD_KAFKA_RESP_ERR__PARTITION_EOF.into(); + let error: RDKafkaErrorCode = RDKafkaRespErr::RD_KAFKA_RESP_ERR__PARTITION_EOF.into(); assert_eq!( "PartitionEOF (Broker: No more messages)", format!("{}", error) diff --git a/src/admin.rs b/src/admin.rs index b57e90e18..cde4d5312 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -526,8 +526,8 @@ type NativeAdminOptions = NativePtr; fn check_rdkafka_invalid_arg(res: RDKafkaRespErr, err_buf: &ErrBuf) -> KafkaResult<()> { match res.into() { - RDKafkaError::NoError => Ok(()), - RDKafkaError::InvalidArgument => { + RDKafkaErrorCode::NoError => Ok(()), + RDKafkaErrorCode::InvalidArgument => { let msg = if err_buf.len() == 0 { "invalid argument".into() } else { @@ -548,7 +548,7 @@ fn check_rdkafka_invalid_arg(res: RDKafkaRespErr, err_buf: &ErrBuf) -> KafkaResu /// The result of an individual CreateTopic, DeleteTopic, or /// CreatePartition operation. -pub type TopicResult = Result; +pub type TopicResult = Result; fn build_topic_results(topics: *const *const RDKafkaTopicResult, n: usize) -> Vec { let mut out = Vec::with_capacity(n); @@ -863,7 +863,7 @@ impl Future for CreatePartitionsFuture { // /// The result of an individual DescribeConfig operation. -pub type ConfigResourceResult = Result; +pub type ConfigResourceResult = Result; /// Specification of a configurable resource. #[derive(Copy, Clone, Debug, Eq, PartialEq)] @@ -1072,7 +1072,7 @@ impl Future for DescribeConfigsFuture { /// The result of an individual AlterConfig operation. pub type AlterConfigsResult = - Result; + Result; /// Configuration for an AlterConfig operation. pub struct AlterConfig<'a> { diff --git a/src/client.rs b/src/client.rs index bc760ae9e..01f0f9be8 100644 --- a/src/client.rs +++ b/src/client.rs @@ -310,7 +310,7 @@ impl Client { /// /// This function is intended to be used with idempotent producers, where /// some errors must logically be considered fatal to retain consistency. - pub fn fatal_error(&self) -> Option<(RDKafkaError, String)> { + pub fn fatal_error(&self) -> Option<(RDKafkaErrorCode, String)> { const LEN: usize = 512; let mut buf = [0; LEN]; let code = unsafe { rdsys::rd_kafka_fatal_error(self.native_ptr(), buf.as_mut_ptr(), LEN) }; diff --git a/src/error.rs b/src/error.rs index 33c8122ed..145762336 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,8 +5,8 @@ use std::{error, ffi, fmt}; use rdkafka_sys as rdsys; use rdkafka_sys::types::*; -// Re-export rdkafka error -pub use rdsys::types::RDKafkaError; +// Re-export rdkafka error code +pub use rdsys::types::RDKafkaErrorCode; /// Kafka result. pub type KafkaResult = Result; @@ -35,13 +35,13 @@ impl IsError for RDKafkaConfRes { /// Represents all possible Kafka errors. /// -/// If applicable, check the underlying [`RDKafkaError`] to get details. +/// If applicable, check the underlying [`RDKafkaErrorCode`] to get details. #[derive(Clone, PartialEq, Eq)] pub enum KafkaError { /// Creation of admin operation failed. AdminOpCreation(String), /// The admin operation itself failed. - AdminOp(RDKafkaError), + AdminOp(RDKafkaErrorCode), /// The client was dropped before the operation completed. Canceled, /// Invalid client configuration. @@ -49,23 +49,23 @@ pub enum KafkaError { /// Client creation failed. ClientCreation(String), /// Consumer commit failed. - ConsumerCommit(RDKafkaError), + ConsumerCommit(RDKafkaErrorCode), /// Global error. - Global(RDKafkaError), + Global(RDKafkaErrorCode), /// Group list fetch failed. - GroupListFetch(RDKafkaError), + GroupListFetch(RDKafkaErrorCode), /// Message consumption failed. - MessageConsumption(RDKafkaError), + MessageConsumption(RDKafkaErrorCode), /// Message production error. - MessageProduction(RDKafkaError), + MessageProduction(RDKafkaErrorCode), /// Metadata fetch error. - MetadataFetch(RDKafkaError), + MetadataFetch(RDKafkaErrorCode), /// No message was received. NoMessageReceived, /// Unexpected null pointer Nul(ffi::NulError), /// Offset fetch failed. - OffsetFetch(RDKafkaError), + OffsetFetch(RDKafkaErrorCode), /// End of partition reached. PartitionEOF(i32), /// Pause/Resume failed. @@ -73,13 +73,13 @@ pub enum KafkaError { /// Seeking a partition failed. Seek(String), /// Setting partition offset failed. - SetPartitionOffset(RDKafkaError), + SetPartitionOffset(RDKafkaErrorCode), /// Offset store failed. - StoreOffset(RDKafkaError), + StoreOffset(RDKafkaErrorCode), /// Subscription creation failed. Subscription(String), /// Transaction error. - Transaction(RDKafkaError), + Transaction(RDKafkaErrorCode), } impl fmt::Debug for KafkaError { @@ -172,7 +172,7 @@ impl fmt::Display for KafkaError { impl error::Error for KafkaError { fn source(&self) -> Option<&(dyn error::Error + 'static)> { - self.rdkafka_error() + self.rdkafka_error_code() .map(|e| e as &(dyn error::Error + 'static)) } } @@ -187,15 +187,15 @@ impl KafkaError { /// Returns if an error is `Fatal` and requires reinitialisation. /// for details see https://docs.confluent.io/5.5.0/clients/librdkafka/rdkafka_8h.html pub fn is_fatal(&self) -> bool { - match self.rdkafka_error() { - Some(RDKafkaError::Fatal) => true, + match self.rdkafka_error_code() { + Some(RDKafkaErrorCode::Fatal) => true, _ => false, } } - /// Returns the [`RDKafkaError`] underlying this error, if any. + /// Returns the [`RDKafkaErrorCode`] underlying this error, if any. #[allow(clippy::match_same_arms)] - pub fn rdkafka_error(&self) -> Option<&RDKafkaError> { + pub fn rdkafka_error_code(&self) -> Option<&RDKafkaErrorCode> { match self { KafkaError::AdminOp(_) => None, KafkaError::AdminOpCreation(_) => None, diff --git a/src/message.rs b/src/message.rs index aca25b927..3010bbbf3 100644 --- a/src/message.rs +++ b/src/message.rs @@ -361,10 +361,10 @@ impl<'a> Message for BorrowedMessage<'a> { unsafe { let err = rdsys::rd_kafka_message_headers(self.ptr.ptr(), &mut native_headers_ptr); match err.into() { - RDKafkaError::NoError => { + RDKafkaErrorCode::NoError => { Some(BorrowedHeaders::from_native_ptr(self, native_headers_ptr)) } - RDKafkaError::NoEnt => None, + RDKafkaErrorCode::NoEnt => None, _ => None, } } diff --git a/src/producer/future_producer.rs b/src/producer/future_producer.rs index 711357f58..e969c9988 100644 --- a/src/producer/future_producer.rs +++ b/src/producer/future_producer.rs @@ -14,7 +14,7 @@ use futures::FutureExt; use crate::client::{Client, ClientContext, DefaultClientContext}; use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext, RDKafkaLogLevel}; -use crate::error::{KafkaError, KafkaResult, RDKafkaError}; +use crate::error::{KafkaError, KafkaResult, RDKafkaErrorCode}; use crate::message::{Message, OwnedHeaders, OwnedMessage, Timestamp, ToBytes}; use crate::producer::{BaseRecord, DeliveryResult, ProducerContext, ThreadedProducer}; use crate::statistics::Statistics; @@ -234,7 +234,7 @@ impl FutureProducer { /// The `queue_timeout` parameter controls how long to retry for if the /// librdkafka producer queue is full. Set it to `Timeout::Never` to retry /// forever or `Timeout::After(0)` to never block. If the timeout is reached - /// and the queue is still full, an [`RDKafkaError::QueueFull`] error will + /// and the queue is still full, an [`RDKafkaErrorCode::QueueFull`] error will /// be reported in the [`OwnedDeliveryResult`]. /// /// Keep in mind that `queue_timeout` only applies to the first phase of the @@ -293,7 +293,7 @@ impl FutureProducer { loop { match self.producer.send(base_record) { Err((e, record)) - if e == KafkaError::MessageProduction(RDKafkaError::QueueFull) + if e == KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) && can_retry() => { base_record = record; diff --git a/tests/test_admin.rs b/tests/test_admin.rs index ebff81ceb..a425d3c25 100644 --- a/tests/test_admin.rs +++ b/tests/test_admin.rs @@ -10,7 +10,7 @@ use rdkafka::admin::{ }; use rdkafka::client::DefaultClientContext; use rdkafka::consumer::{BaseConsumer, Consumer, DefaultConsumerContext}; -use rdkafka::error::{KafkaError, RDKafkaError}; +use rdkafka::error::{KafkaError, RDKafkaErrorCode}; use rdkafka::metadata::Metadata; use rdkafka::ClientConfig; @@ -239,7 +239,10 @@ async fn test_topics() { .create_partitions(&[partitions], &opts) .await .expect("partition creation failed"); - assert_eq!(res, &[Err((name, RDKafkaError::InvalidReplicaAssignment))],); + assert_eq!( + res, + &[Err((name, RDKafkaErrorCode::InvalidReplicaAssignment))], + ); } // Verify that deleting a non-existent topic fails. @@ -249,7 +252,10 @@ async fn test_topics() { .delete_topics(&[&name], &opts) .await .expect("delete topics failed"); - assert_eq!(res, &[Err((name, RDKafkaError::UnknownTopicOrPartition))]); + assert_eq!( + res, + &[Err((name, RDKafkaErrorCode::UnknownTopicOrPartition))] + ); } // Verify that mixed-success operations properly report the successful and @@ -275,7 +281,7 @@ async fn test_topics() { assert_eq!( res, &[ - Err((name1.clone(), RDKafkaError::TopicAlreadyExists)), + Err((name1.clone(), RDKafkaErrorCode::TopicAlreadyExists)), Ok(name2.clone()) ] ); @@ -296,7 +302,7 @@ async fn test_topics() { res, &[ Ok(name2.clone()), - Err((name1.clone(), RDKafkaError::UnknownTopicOrPartition)) + Err((name1.clone(), RDKafkaErrorCode::UnknownTopicOrPartition)) ] ); } @@ -392,30 +398,30 @@ async fn test_event_errors() { let res = admin_client.create_topics(&[], &opts).await; assert_eq!( res, - Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut)) + Err(KafkaError::AdminOp(RDKafkaErrorCode::OperationTimedOut)) ); let res = admin_client.create_partitions(&[], &opts).await; assert_eq!( res, - Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut)) + Err(KafkaError::AdminOp(RDKafkaErrorCode::OperationTimedOut)) ); let res = admin_client.delete_topics(&[], &opts).await; assert_eq!( res, - Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut)) + Err(KafkaError::AdminOp(RDKafkaErrorCode::OperationTimedOut)) ); let res = admin_client.describe_configs(&[], &opts).await; assert_eq!( res.err(), - Some(KafkaError::AdminOp(RDKafkaError::OperationTimedOut)) + Some(KafkaError::AdminOp(RDKafkaErrorCode::OperationTimedOut)) ); let res = admin_client.alter_configs(&[], &opts).await; assert_eq!( res, - Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut)) + Err(KafkaError::AdminOp(RDKafkaErrorCode::OperationTimedOut)) ); } diff --git a/tests/test_high_producers.rs b/tests/test_high_producers.rs index cc5628ed3..84c9689df 100644 --- a/tests/test_high_producers.rs +++ b/tests/test_high_producers.rs @@ -7,7 +7,7 @@ use futures::stream::{FuturesUnordered, StreamExt}; use rdkafka::client::DefaultClientContext; use rdkafka::config::ClientConfig; -use rdkafka::error::{KafkaError, RDKafkaError}; +use rdkafka::error::{KafkaError, RDKafkaErrorCode}; use rdkafka::message::{Headers, Message, OwnedHeaders}; use rdkafka::producer::future_producer::FutureRecord; use rdkafka::producer::FutureProducer; @@ -75,7 +75,7 @@ async fn test_future_producer_send_full() { .await; match res { Ok(_) => panic!("send unexpectedly succeeded"), - Err((KafkaError::MessageProduction(RDKafkaError::QueueFull), _)) => start.elapsed(), + Err((KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), _)) => start.elapsed(), Err((e, _)) => panic!("got incorrect error: {}", e), } }; diff --git a/tests/test_low_producers.rs b/tests/test_low_producers.rs index 90b5351b0..c271f5934 100644 --- a/tests/test_low_producers.rs +++ b/tests/test_low_producers.rs @@ -8,7 +8,7 @@ use std::sync::Mutex; use std::time::Duration; use rdkafka::config::ClientConfig; -use rdkafka::error::{KafkaError, RDKafkaError}; +use rdkafka::error::{KafkaError, RDKafkaErrorCode}; use rdkafka::message::{Headers, Message, OwnedHeaders, OwnedMessage}; use rdkafka::producer::{ BaseProducer, BaseRecord, DeliveryResult, ProducerContext, ThreadedProducer, @@ -143,7 +143,7 @@ fn test_base_producer_queue_full() { let errors = results .iter() .filter(|&e| { - if let &Err((KafkaError::MessageProduction(RDKafkaError::QueueFull), _)) = e { + if let &Err((KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), _)) = e { true } else { false @@ -190,7 +190,9 @@ fn test_base_producer_timeout() { assert_eq!(message.key_view::(), Some(Ok("B"))); assert_eq!( error, - &Some(KafkaError::MessageProduction(RDKafkaError::MessageTimedOut)) + &Some(KafkaError::MessageProduction( + RDKafkaErrorCode::MessageTimedOut + )) ); ids.insert(id); } @@ -333,6 +335,9 @@ fn test_fatal_errors() { assert_eq!( producer.client().fatal_error(), - Some((RDKafkaError::OutOfOrderSequenceNumber, "test_fatal_error: fake error".into())) + Some(( + RDKafkaErrorCode::OutOfOrderSequenceNumber, + "test_fatal_error: fake error".into() + )) ) } From 9bc954b784f61a76c0e9cb2dae6ec728f49dd94b Mon Sep 17 00:00:00 2001 From: Robert Ignat Date: Tue, 25 Aug 2020 18:39:50 +0300 Subject: [PATCH 04/15] Add new RDKafkaError --- rdkafka-sys/src/types.rs | 51 +++++++++++++++++++++++++++ src/error.rs | 66 ++++++++++++++++++++++++++--------- src/producer/base_producer.rs | 41 +++++++++------------- 3 files changed, 117 insertions(+), 41 deletions(-) diff --git a/rdkafka-sys/src/types.rs b/rdkafka-sys/src/types.rs index 141dd54f6..2ab90a72d 100644 --- a/rdkafka-sys/src/types.rs +++ b/rdkafka-sys/src/types.rs @@ -83,6 +83,57 @@ pub type RDKafkaTopicResult = bindings::rd_kafka_topic_result_t; pub type RDKafkaConsumerGroupMetadata = bindings::rd_kafka_consumer_group_metadata_t; +/// Native rdkafka error. +pub type RDKafkaError = bindings::rd_kafka_error_t; + +impl RDKafkaError { + pub fn code(&self) -> RDKafkaErrorCode { + unsafe { bindings::rd_kafka_error_code(self).into() } + } + + pub fn name(&self) -> String { + let cstr = unsafe { bindings::rd_kafka_error_name(self) }; + unsafe { CStr::from_ptr(cstr).to_string_lossy().into_owned() } + } + + pub fn string(&self) -> String { + let cstr = unsafe { bindings::rd_kafka_error_string(self) }; + unsafe { CStr::from_ptr(cstr).to_string_lossy().into_owned() } + } + + pub fn is_fatal(&self) -> bool { + unsafe { bindings::rd_kafka_error_is_fatal(self) == 1 } + } + + pub fn is_retriable(&self) -> bool { + unsafe { bindings::rd_kafka_error_is_retriable(self) == 1 } + } + + pub fn txn_requires_abort(&self) -> bool { + unsafe { bindings::rd_kafka_error_txn_requires_abort(self) == 1 } + } +} + +impl PartialEq for RDKafkaError { + fn eq(&self, other: &RDKafkaError) -> bool { + self.code() == other.code() + } +} + +impl Eq for RDKafkaError {} + +impl fmt::Display for RDKafkaError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.code().fmt(f) + } +} + +impl error::Error for RDKafkaError { + fn description(&self) -> &str { + "Error from underlying rdkafka library" + } +} + // ENUMS /// Client types. diff --git a/src/error.rs b/src/error.rs index 145762336..573c3a296 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,8 +5,10 @@ use std::{error, ffi, fmt}; use rdkafka_sys as rdsys; use rdkafka_sys::types::*; -// Re-export rdkafka error code -pub use rdsys::types::RDKafkaErrorCode; +use crate::util::KafkaDrop; + +// Re-export rdkafka error types +pub use rdsys::types::{RDKafkaError, RDKafkaErrorCode}; /// Kafka result. pub type KafkaResult = Result; @@ -31,6 +33,17 @@ impl IsError for RDKafkaConfRes { } } +impl IsError for *const RDKafkaError { + fn is_error(self) -> bool { + self.is_null() + } +} + +unsafe impl KafkaDrop for RDKafkaError { + const TYPE: &'static str = "error"; + const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_error_destroy; +} + // TODO: consider using macro /// Represents all possible Kafka errors. @@ -79,7 +92,7 @@ pub enum KafkaError { /// Subscription creation failed. Subscription(String), /// Transaction error. - Transaction(RDKafkaErrorCode), + Transaction(RDKafkaError), } impl fmt::Debug for KafkaError { @@ -172,8 +185,29 @@ impl fmt::Display for KafkaError { impl error::Error for KafkaError { fn source(&self) -> Option<&(dyn error::Error + 'static)> { - self.rdkafka_error_code() - .map(|e| e as &(dyn error::Error + 'static)) + match self { + KafkaError::AdminOp(_) => None, + KafkaError::AdminOpCreation(_) => None, + KafkaError::Canceled => None, + KafkaError::ClientConfig(_, _, _, _) => None, + KafkaError::ClientCreation(_) => None, + KafkaError::ConsumerCommit(err) => Some(err), + KafkaError::Global(err) => Some(err), + KafkaError::GroupListFetch(err) => Some(err), + KafkaError::MessageConsumption(err) => Some(err), + KafkaError::MessageProduction(err) => Some(err), + KafkaError::MetadataFetch(err) => Some(err), + KafkaError::NoMessageReceived => None, + KafkaError::Nul(_) => None, + KafkaError::OffsetFetch(err) => Some(err), + KafkaError::PartitionEOF(_) => None, + KafkaError::PauseResume(_) => None, + KafkaError::Seek(_) => None, + KafkaError::SetPartitionOffset(err) => Some(err), + KafkaError::StoreOffset(err) => Some(err), + KafkaError::Subscription(_) => None, + KafkaError::Transaction(err) => Some(err), + } } } @@ -195,29 +229,29 @@ impl KafkaError { /// Returns the [`RDKafkaErrorCode`] underlying this error, if any. #[allow(clippy::match_same_arms)] - pub fn rdkafka_error_code(&self) -> Option<&RDKafkaErrorCode> { + pub fn rdkafka_error_code(&self) -> Option { match self { KafkaError::AdminOp(_) => None, KafkaError::AdminOpCreation(_) => None, KafkaError::Canceled => None, KafkaError::ClientConfig(_, _, _, _) => None, KafkaError::ClientCreation(_) => None, - KafkaError::ConsumerCommit(err) => Some(err), - KafkaError::Global(err) => Some(err), - KafkaError::GroupListFetch(err) => Some(err), - KafkaError::MessageConsumption(err) => Some(err), - KafkaError::MessageProduction(err) => Some(err), - KafkaError::MetadataFetch(err) => Some(err), + KafkaError::ConsumerCommit(err) => Some(*err), + KafkaError::Global(err) => Some(*err), + KafkaError::GroupListFetch(err) => Some(*err), + KafkaError::MessageConsumption(err) => Some(*err), + KafkaError::MessageProduction(err) => Some(*err), + KafkaError::MetadataFetch(err) => Some(*err), KafkaError::NoMessageReceived => None, KafkaError::Nul(_) => None, - KafkaError::OffsetFetch(err) => Some(err), + KafkaError::OffsetFetch(err) => Some(*err), KafkaError::PartitionEOF(_) => None, KafkaError::PauseResume(_) => None, KafkaError::Seek(_) => None, - KafkaError::SetPartitionOffset(err) => Some(err), - KafkaError::StoreOffset(err) => Some(err), + KafkaError::SetPartitionOffset(err) => Some(*err), + KafkaError::StoreOffset(err) => Some(*err), KafkaError::Subscription(_) => None, - KafkaError::Transaction(err) => Some(err), + KafkaError::Transaction(err) => Some(err.code()), } } } diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index 8d7258b6c..13248fff3 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -431,25 +431,21 @@ impl BaseProducer { /// Initialize transactions for the producer instance. pub fn init_transactions>(&self, timeout: T) -> KafkaResult<()> { - let init_error = unsafe { - let err = - rdsys::rd_kafka_init_transactions(self.native_ptr(), timeout.into().as_millis()); - rdsys::rd_kafka_error_code(err) + let init_error: *const RDKafkaError = unsafe { + rdsys::rd_kafka_init_transactions(self.native_ptr(), timeout.into().as_millis()) }; if init_error.is_error() { - return Err(KafkaError::Transaction(init_error.into())); + return unsafe { Err(KafkaError::Transaction(*init_error)) }; } Ok(()) } /// Begin a new transaction. pub fn begin_transaction(&self) -> KafkaResult<()> { - let begin_error = unsafe { - let err = rdsys::rd_kafka_begin_transaction(self.native_ptr()); - rdsys::rd_kafka_error_code(err) - }; + let begin_error: *const RDKafkaError = + unsafe { rdsys::rd_kafka_begin_transaction(self.native_ptr()) }; if begin_error.is_error() { - return Err(KafkaError::Transaction(begin_error.into())); + return unsafe { Err(KafkaError::Transaction(*begin_error)) }; } Ok(()) } @@ -463,43 +459,38 @@ impl BaseProducer { cgm: &ConsumerGroupMetadata, timeout: T, ) -> KafkaResult<()> { - let send_error = unsafe { - let err = rdsys::rd_kafka_send_offsets_to_transaction( + let send_error: *const RDKafkaError = unsafe { + rdsys::rd_kafka_send_offsets_to_transaction( self.native_ptr(), offsets.ptr(), cgm.ptr(), timeout.into().as_millis(), - ); - rdsys::rd_kafka_error_code(err) + ) }; if send_error.is_error() { - return Err(KafkaError::Transaction(send_error.into())); + return unsafe { Err(KafkaError::Transaction(*send_error)) }; } Ok(()) } /// Commit the current transaction. pub fn commit_transaction>(&self, timeout: T) -> KafkaResult<()> { - let commit_error = unsafe { - let err = - rdsys::rd_kafka_commit_transaction(self.native_ptr(), timeout.into().as_millis()); - rdsys::rd_kafka_error_code(err) + let commit_error: *const RDKafkaError = unsafe { + rdsys::rd_kafka_commit_transaction(self.native_ptr(), timeout.into().as_millis()) }; if commit_error.is_error() { - return Err(KafkaError::Transaction(commit_error.into())); + return unsafe { Err(KafkaError::Transaction(*commit_error)) }; } Ok(()) } /// Aborts the ongoing transaction. pub fn abort_transaction>(&self, timeout: T) -> KafkaResult<()> { - let abort_error = unsafe { - let err = - rdsys::rd_kafka_abort_transaction(self.native_ptr(), timeout.into().as_millis()); - rdsys::rd_kafka_error_code(err) + let abort_error: *const RDKafkaError = unsafe { + rdsys::rd_kafka_abort_transaction(self.native_ptr(), timeout.into().as_millis()) }; if abort_error.is_error() { - return Err(KafkaError::Transaction(abort_error.into())); + return unsafe { Err(KafkaError::Transaction(*abort_error)) }; } Ok(()) } From 31d1bc463cd0369a7c06c289b0db37806306efad Mon Sep 17 00:00:00 2001 From: Robert Ignat Date: Wed, 26 Aug 2020 09:38:03 +0300 Subject: [PATCH 05/15] Check KafkaError::Transaction in KafkaError::is_fatal --- src/error.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/error.rs b/src/error.rs index 573c3a296..7ab10cb43 100644 --- a/src/error.rs +++ b/src/error.rs @@ -218,12 +218,15 @@ impl From for KafkaError { } impl KafkaError { - /// Returns if an error is `Fatal` and requires reinitialisation. + /// Returns if an error is fatal and requires reinitialisation. /// for details see https://docs.confluent.io/5.5.0/clients/librdkafka/rdkafka_8h.html pub fn is_fatal(&self) -> bool { - match self.rdkafka_error_code() { - Some(RDKafkaErrorCode::Fatal) => true, - _ => false, + if let Some(RDKafkaErrorCode::Fatal) = self.rdkafka_error_code() { + true + } else if let KafkaError::Transaction(err) = self { + err.is_fatal() + } else { + false } } From 357c357a7d65eded19b19e20861c9525131acf22 Mon Sep 17 00:00:00 2001 From: Robert Ignat Date: Wed, 26 Aug 2020 11:46:48 +0300 Subject: [PATCH 06/15] Fix RDKafkaError::is_error --- src/error.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/error.rs b/src/error.rs index 7ab10cb43..02898c54f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -35,7 +35,7 @@ impl IsError for RDKafkaConfRes { impl IsError for *const RDKafkaError { fn is_error(self) -> bool { - self.is_null() + !self.is_null() } } From c45aa5d8fe94338d43ccfd8a7862fc3181aec741 Mon Sep 17 00:00:00 2001 From: Robert Ignat Date: Wed, 26 Aug 2020 15:12:56 +0300 Subject: [PATCH 07/15] Typo --- src/producer/base_producer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index 13248fff3..253d5dcc0 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -451,7 +451,7 @@ impl BaseProducer { } /// Sends a list of topic partition offsets to the consumer group coordinator for `cgm`, and - /// marks the offsets as part part of the current transaction. These offsets will be considered + /// marks the offsets as part of the current transaction. These offsets will be considered /// committed only if the transaction is committed successfully. pub fn send_offsets_to_transaction>( &self, From 3444cb618a47a1b1c700aa5832ecc320aa3996c6 Mon Sep 17 00:00:00 2001 From: Robert Ignat Date: Wed, 26 Aug 2020 15:31:11 +0300 Subject: [PATCH 08/15] Add ThreadedProducer transaction methods --- src/producer/base_producer.rs | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index 253d5dcc0..5521bad36 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -609,6 +609,39 @@ impl ThreadedProducer { pub fn client(&self) -> &Client { self.producer.client() } + + /// Initialize transactions for the producer instance. + pub fn init_transactions>(&self, timeout: T) -> KafkaResult<()> { + self.producer.init_transactions(timeout) + } + + /// Begin a new transaction. + pub fn begin_transaction(&self) -> KafkaResult<()> { + self.producer.begin_transaction() + } + + /// Sends a list of topic partition offsets to the consumer group coordinator for `cgm`, and + /// marks the offsets as part of the current transaction. These offsets will be considered + /// committed only if the transaction is committed successfully. + pub fn send_offsets_to_transaction>( + &self, + offsets: &TopicPartitionList, + cgm: &ConsumerGroupMetadata, + timeout: T, + ) -> KafkaResult<()> { + self.producer + .send_offsets_to_transaction(offsets, cgm, timeout) + } + + /// Commit the current transaction. + pub fn commit_transaction>(&self, timeout: T) -> KafkaResult<()> { + self.producer.commit_transaction(timeout) + } + + /// Abort the ongoing transaction. + pub fn abort_transaction>(&self, timeout: T) -> KafkaResult<()> { + self.producer.abort_transaction(timeout) + } } impl Drop for ThreadedProducer { From bf5d1f0f6c40d18a13095e76f5673920fcd85c61 Mon Sep 17 00:00:00 2001 From: Robert Ignat Date: Tue, 1 Sep 2020 15:05:03 +0300 Subject: [PATCH 09/15] Add FutureProducer transaction methods --- src/producer/future_producer.rs | 35 +++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/src/producer/future_producer.rs b/src/producer/future_producer.rs index e969c9988..5238c25e9 100644 --- a/src/producer/future_producer.rs +++ b/src/producer/future_producer.rs @@ -14,10 +14,12 @@ use futures::FutureExt; use crate::client::{Client, ClientContext, DefaultClientContext}; use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext, RDKafkaLogLevel}; +use crate::consumer::ConsumerGroupMetadata; use crate::error::{KafkaError, KafkaResult, RDKafkaErrorCode}; use crate::message::{Message, OwnedHeaders, OwnedMessage, Timestamp, ToBytes}; use crate::producer::{BaseRecord, DeliveryResult, ProducerContext, ThreadedProducer}; use crate::statistics::Statistics; +use crate::topic_partition_list::TopicPartitionList; #[cfg(feature = "tokio")] use crate::util::TokioRuntime; use crate::util::{AsyncRuntime, IntoOpaque, Timeout}; @@ -367,6 +369,39 @@ impl FutureProducer { pub fn client(&self) -> &Client> { self.producer.client() } + + /// Initialize transactions for the producer instance. + pub fn init_transactions>(&self, timeout: T) -> KafkaResult<()> { + self.producer.init_transactions(timeout) + } + + /// Begin a new transaction. + pub fn begin_transaction(&self) -> KafkaResult<()> { + self.producer.begin_transaction() + } + + /// Sends a list of topic partition offsets to the consumer group coordinator for `cgm`, and + /// marks the offsets as part of the current transaction. These offsets will be considered + /// committed only if the transaction is committed successfully. + pub fn send_offsets_to_transaction>( + &self, + offsets: &TopicPartitionList, + cgm: &ConsumerGroupMetadata, + timeout: T, + ) -> KafkaResult<()> { + self.producer + .send_offsets_to_transaction(offsets, cgm, timeout) + } + + /// Commit the current transaction. + pub fn commit_transaction>(&self, timeout: T) -> KafkaResult<()> { + self.producer.commit_transaction(timeout) + } + + /// Abort the ongoing transaction. + pub fn abort_transaction>(&self, timeout: T) -> KafkaResult<()> { + self.producer.abort_transaction(timeout) + } } #[cfg(test)] From 41d7988b1e13181ddde78653015acb6701205526 Mon Sep 17 00:00:00 2001 From: Robert Ignat Date: Tue, 1 Sep 2020 15:05:22 +0300 Subject: [PATCH 10/15] Add RDKafkaConsumerGroupMetadata docs --- rdkafka-sys/src/types.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rdkafka-sys/src/types.rs b/rdkafka-sys/src/types.rs index 2ab90a72d..69633e7e3 100644 --- a/rdkafka-sys/src/types.rs +++ b/rdkafka-sys/src/types.rs @@ -81,6 +81,7 @@ pub type RDKafkaAdminOptions = bindings::rd_kafka_AdminOptions_t; /// Native rdkafka topic result. pub type RDKafkaTopicResult = bindings::rd_kafka_topic_result_t; +/// Native rdkafka consumer group metadata. pub type RDKafkaConsumerGroupMetadata = bindings::rd_kafka_consumer_group_metadata_t; /// Native rdkafka error. From 15a73644a57e8997d24250fd9850574feb8cf10c Mon Sep 17 00:00:00 2001 From: Robert Ignat Date: Tue, 15 Sep 2020 10:13:40 +0300 Subject: [PATCH 11/15] Add transaction integration tests --- docker-compose.yaml | 2 + tests/test_transactions.rs | 159 +++++++++++++++++++++++++++++++++++++ tests/utils.rs | 8 ++ 3 files changed, 169 insertions(+) create mode 100644 tests/test_transactions.rs diff --git a/docker-compose.yaml b/docker-compose.yaml index baee434c4..85bba5efe 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -8,6 +8,8 @@ services: - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 + - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 + - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 - KAFKA_NUM_PARTITIONS=3 - CONFLUENT_SUPPORT_METRICS_ENABLE=0 ports: ["9092:9092"] diff --git a/tests/test_transactions.rs b/tests/test_transactions.rs new file mode 100644 index 000000000..0da298c3b --- /dev/null +++ b/tests/test_transactions.rs @@ -0,0 +1,159 @@ +//! Test transactions using low level consumer and producer. + +use std::collections::HashMap; +use std::time::Duration; + +use rdkafka::config::ClientConfig; +use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer, DefaultConsumerContext}; +use rdkafka::error::KafkaError; +use rdkafka::producer::{BaseProducer, BaseRecord, DefaultProducerContext}; +use rdkafka::topic_partition_list::{Offset, TopicPartitionList}; + +use crate::utils::*; + +mod utils; + +const TIMEOUT: Duration = Duration::from_secs(10); +const POPULATE_COUNT: i32 = 30; +const COMMIT_OFFSET: i64 = 10; +const SEND_OFFSET: i64 = 20; +const PRODUCE_COUNT: usize = 20; + +fn create_base_consumer(group_id: &str) -> BaseConsumer { + consumer_config( + group_id, + Some(map!( + "isolation.level" => "read_committed", + "enable.partition.eof" => "true" + )), + ) + .create() + .expect("Consumer creation failed") +} + +fn create_base_producer(transactional_id: &str) -> BaseProducer { + let mut config = ClientConfig::new(); + config + .set("bootstrap.servers", &get_bootstrap_server()) + .set("message.timeout.ms", "5000") + .set("enable.idempotence", "true") + .set("transactional.id", transactional_id) + .set("debug", "eos"); + config.set_log_level(rdkafka::config::RDKafkaLogLevel::Debug); + config.create().expect("Producer creation failed") +} + +async fn prepare_transaction( + consumer: &BaseConsumer, + producer: &BaseProducer, + consume_topic: &str, + produce_topic: &str, +) { + // populate consume_topic + populate_topic( + &consume_topic, + POPULATE_COUNT, + &value_fn, + &key_fn, + Some(0), + None, + ) + .await; + consumer.subscribe(&[&consume_topic]).unwrap(); + consumer.poll(TIMEOUT).unwrap().unwrap(); + + // commit initial consumer offset + let mut commit_tpl = TopicPartitionList::new(); + commit_tpl.add_partition_offset(&consume_topic, 0, Offset::Offset(COMMIT_OFFSET)); + consumer.commit(&commit_tpl, CommitMode::Sync).unwrap(); + + // start transaction + producer.init_transactions(TIMEOUT).unwrap(); + producer.begin_transaction().unwrap(); + + // send offsets to transaction + let cgm = consumer.group_metadata(); + let mut txn_tpl = TopicPartitionList::new(); + txn_tpl.add_partition_offset(consume_topic, 0, Offset::Offset(SEND_OFFSET)); + producer + .send_offsets_to_transaction(&txn_tpl, &cgm, TIMEOUT) + .unwrap(); + + // produce records in transaction + for _ in 0..PRODUCE_COUNT { + producer + .send( + BaseRecord::to(produce_topic) + .payload("A") + .key("B") + .partition(0), + ) + .unwrap(); + } +} + +fn assert_transaction( + consumer: &BaseConsumer, + consume_topic: &str, + produce_topic: &str, + consumer_offset: i64, + produced_count: usize, +) { + // check consumer committed offset + let committed = consumer.committed(TIMEOUT).unwrap(); + assert_eq!( + committed.find_partition(consume_topic, 0).unwrap().offset(), + Offset::Offset(consumer_offset) + ); + + // check how many records have been produced after the transaction + let txn_consumer = create_base_consumer(&rand_test_group()); + let mut tpl = TopicPartitionList::new(); + tpl.add_partition(produce_topic, 0); + txn_consumer.assign(&tpl).unwrap(); + + // count all messages in produce_topic + let consumed = txn_consumer + .iter() + .take_while(|r| !matches!(r, Err(KafkaError::PartitionEOF(_)))) + .count(); + + assert_eq!(consumed, produced_count); +} + +#[tokio::test] +async fn test_transaction_abort() { + let consumer = create_base_consumer(&rand_test_group()); + let producer = create_base_producer(&rand_test_transactional_id()); + + let consume_topic = rand_test_topic(); + let produce_topic = rand_test_topic(); + + prepare_transaction(&consumer, &producer, &consume_topic, &produce_topic).await; + + producer.flush(TIMEOUT); + producer.abort_transaction(TIMEOUT).unwrap(); + + assert_transaction(&consumer, &consume_topic, &produce_topic, COMMIT_OFFSET, 0); +} + +#[tokio::test] +async fn test_transaction_commit() { + let consumer = create_base_consumer(&rand_test_group()); + let producer = create_base_producer(&rand_test_transactional_id()); + + let consume_topic = rand_test_topic(); + let produce_topic = rand_test_topic(); + + prepare_transaction(&consumer, &producer, &consume_topic, &produce_topic).await; + + producer.commit_transaction(TIMEOUT).unwrap(); + + assert_transaction( + &consumer, + &consume_topic, + &produce_topic, + SEND_OFFSET, + PRODUCE_COUNT, + ); +} diff --git a/tests/utils.rs b/tests/utils.rs index 6eef5f217..8257913dc 100644 --- a/tests/utils.rs +++ b/tests/utils.rs @@ -46,6 +46,14 @@ pub fn rand_test_group() -> String { format!("__test_{}", id) } +pub fn rand_test_transactional_id() -> String { + let id = rand::thread_rng() + .gen_ascii_chars() + .take(10) + .collect::(); + format!("__test_{}", id) +} + pub fn get_bootstrap_server() -> String { env::var("KAFKA_HOST").unwrap_or_else(|_| "localhost:9092".to_owned()) } From 5b55930a8a6af5b93ba3b59a66296969a8d234a0 Mon Sep 17 00:00:00 2001 From: Robert Ignat Date: Tue, 15 Sep 2020 16:53:35 +0300 Subject: [PATCH 12/15] Add transactions example --- examples/transactions.rs | 210 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 210 insertions(+) create mode 100644 examples/transactions.rs diff --git a/examples/transactions.rs b/examples/transactions.rs new file mode 100644 index 000000000..1c3c04440 --- /dev/null +++ b/examples/transactions.rs @@ -0,0 +1,210 @@ +use std::time::Duration; + +use clap::{App, Arg}; +use futures::StreamExt; +use log::{info, warn}; + +use rdkafka::config::ClientConfig; +use rdkafka::consumer::stream_consumer::StreamConsumer; +use rdkafka::consumer::Consumer; +use rdkafka::message::Message; +use rdkafka::producer::future_producer::{FutureProducer, FutureRecord}; +use rdkafka::util::get_rdkafka_version; + +use crate::example_utils::setup_logger; + +mod example_utils; + +const TIMEOUT: Duration = Duration::from_secs(5); + +// Start a consumer and a transactional producer. +// The consumer will subscribe to all in topics and the producer will forward +// all incoming messages to the out topic, committing a transaction for every +// 100 consumed and produced (processed) messages. +async fn process( + brokers: &str, + group_id: &str, + in_topics: &[&str], + transactional_id: &str, + out_topic: &str, +) { + let consumer: StreamConsumer = ClientConfig::new() + .set("bootstrap.servers", brokers) + .set("group.id", group_id) + .set("enable.partition.eof", "false") + .set("enable.auto.commit", "false") + .create() + .expect("Consumer creation failed"); + + let producer: FutureProducer = ClientConfig::new() + .set("bootstrap.servers", brokers) + .set("transactional.id", transactional_id) + .set("message.timeout.ms", "5000") + .set("transaction.timeout.ms", "900000") + .create() + .expect("Producer creation failed"); + + consumer + .subscribe(in_topics) + .expect("Can't subscribe to in topics"); + + producer + .init_transactions(TIMEOUT) + .expect("Can't init transactions"); + + producer + .begin_transaction() + .expect("Can't begin transaction"); + + let mut message_stream = consumer.start(); + + let mut processed = 0; + + while let Some(message) = message_stream.next().await { + match message { + Err(e) => { + warn!("Kafka error consuming messages: {}", e); + reset_transaction(&producer, &consumer); + } + + Ok(m) => { + let record = FutureRecord::to(out_topic) + .key(m.key().unwrap_or(b"")) + .payload(m.payload().unwrap_or(b"")); + + match producer.send(record, TIMEOUT).await { + Err((e, _)) => { + warn!("Kafka error producing message: {}", e); + reset_transaction(&producer, &consumer); + } + Ok(_) => { + processed += 1; + println!("sent message to {} ", out_topic); + } + } + } + } + + if processed >= 100 { + // send current consumer position to transaction + let position = &consumer.position().expect("Can't get consumer position"); + + producer + .send_offsets_to_transaction(position, &consumer.group_metadata(), TIMEOUT) + .expect("Can't send consumer offsets to transaction"); + + producer + .commit_transaction(TIMEOUT) + .expect("Can't commit transaction"); + + // start a new transaction + producer + .begin_transaction() + .expect("Can't begin transaction"); + + processed = 0; + } + } +} + +// Abort the current transaction, seek consumer to last committed offsets and +// start a new transaction. +fn reset_transaction(producer: &FutureProducer, consumer: &StreamConsumer) { + producer + .abort_transaction(TIMEOUT) + .expect("Can't abort transaction"); + + seek_to_committed(consumer); + + producer + .begin_transaction() + .expect("Can't begin transaction"); +} + +fn seek_to_committed(consumer: &StreamConsumer) { + let committed = consumer + .committed(TIMEOUT) + .expect("Can't get consumer committed offsets"); + + for e in committed.elements().iter() { + consumer + .seek(e.topic(), e.partition(), e.offset(), TIMEOUT) + .expect(&format!( + "Can't seek consumer to {}, {}: {:?}", + e.topic(), + e.partition(), + e.offset() + )); + } +} + +#[tokio::main] +async fn main() { + let matches = App::new("transactions example") + .version(option_env!("CARGO_PKG_VERSION").unwrap_or("")) + .about("Command line transactional message processor") + .arg( + Arg::with_name("brokers") + .short("b") + .long("brokers") + .help("Broker list in kafka format") + .takes_value(true) + .default_value("localhost:9092"), + ) + .arg( + Arg::with_name("log-conf") + .long("log-conf") + .help("Configure the logging format (example: 'rdkafka=trace')") + .takes_value(true), + ) + .arg( + Arg::with_name("group-id") + .short("g") + .long("group-id") + .help("Consumer group id") + .takes_value(true) + .default_value("example_transaction_consumer_group_id"), + ) + .arg( + Arg::with_name("in-topics") + .short("i") + .long("in-topics") + .help("Topic list to consume from") + .takes_value(true) + .multiple(true) + .required(true), + ) + .arg( + Arg::with_name("transactional-id") + .short("t") + .long("transactional-id") + .help("Producer transactional id") + .takes_value(true) + .default_value("example_transaction_producer_id"), + ) + .arg( + Arg::with_name("out-topic") + .short("o") + .long("out-topic") + .help("Topic to produce to") + .takes_value(true) + .required(true), + ) + .get_matches(); + + setup_logger(true, matches.value_of("log-conf")); + + let (version_n, version_s) = get_rdkafka_version(); + info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s); + + let brokers = matches.value_of("brokers").unwrap(); + let group_id = matches.value_of("group-id").unwrap(); + let in_topics = matches + .values_of("in-topics") + .unwrap() + .collect::>(); + let transactional_id = matches.value_of("transactional-id").unwrap(); + let out_topic = matches.value_of("out-topic").unwrap(); + + process(brokers, group_id, &in_topics, transactional_id, out_topic).await; +} From 5f5ee88585a7288aae45becb66fef5eea1453ec8 Mon Sep 17 00:00:00 2001 From: Robert Ignat Date: Tue, 15 Sep 2020 17:12:55 +0300 Subject: [PATCH 13/15] Improve transactions documentation --- src/consumer/mod.rs | 3 ++- src/lib.rs | 14 ++++++++++ src/producer/base_producer.rs | 47 ++++++++++++++++++++++++++++----- src/producer/future_producer.rs | 22 ++++++++++++--- src/producer/mod.rs | 40 ++++++++++++++++++++++++++++ 5 files changed, 116 insertions(+), 10 deletions(-) diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index b2e6fa206..74ce6e355 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -363,7 +363,8 @@ pub trait Consumer { self.get_base_consumer().client() } - /// Returns the consumer group metadata needed to send offsets to producer transactions. + /// Returns the consumer group metadata needed to send offsets to producer + /// transactions. fn group_metadata(&self) -> ConsumerGroupMetadata { self.get_base_consumer().group_metadata() } diff --git a/src/lib.rs b/src/lib.rs index 9d7190d52..fbc80bbb0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,6 +31,7 @@ //! - Access to group metadata (list groups, list members of groups, hostnames, //! etc.). //! - Access to producer and consumer metrics, errors and callbacks. +//! - Transactional producers. //! //! ### One million messages per second //! @@ -108,6 +109,17 @@ //! delivery semantics, check the [message delivery semantics] chapter in the //! Kafka documentation. //! +//! ### Exactly-once semantics +//! +//! Exactly-once semantics (EOS) can be achieved by using transactional producers and +//! transaction aware consumers. This allows produced records and consumer +//! offsets to be committed or aborted atomically and only committed messages +//! to be read by the consumer. EOS is useful in read-process-write scenarios +//! that require messages to be processed exactly once. +//! +//! To learn more about using transactions in `rdkafka` check the +//! [Transactional producer API documentation] and the [transactions example]. +//! //! ### Users //! //! Here are some of the projects using rust-rdkafka: @@ -210,11 +222,13 @@ //! [`Stream`]: https://docs.rs/futures/*/futures/stream/trait.Stream.html //! [`StreamConsumer`]: https://docs.rs/rdkafka/*/rdkafka/consumer/stream_consumer/struct.StreamConsumer.html //! [`ThreadedProducer`]: https://docs.rs/rdkafka/*/rdkafka/producer/base_producer/struct.ThreadedProducer.html +//! [Transactional producer API documentation]: https://docs.rs/rdkafka/*/rdkafka/producer/index.html#transactional-producer-api //! [`log`]: https://docs.rs/log //! [`env_logger`]: https://docs.rs/env_logger //! [Apache Kafka]: https://kafka.apache.org //! [asynchronous processing example]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/asynchronous_processing.rs //! [at-least-once delivery example]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/at_least_once.rs +//! [transactions example]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/transactions.rs //! [smol runtime example]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/smol_runtime.rs //! [broker-compat]: https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#broker-version-compatibility //! [`examples`]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/ diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index 5521bad36..f3360dddf 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -430,6 +430,9 @@ impl BaseProducer { } /// Initialize transactions for the producer instance. + /// + /// Requires that a `transactional.id` be set for the producer and must be + /// called before any other transaction or send operations. pub fn init_transactions>(&self, timeout: T) -> KafkaResult<()> { let init_error: *const RDKafkaError = unsafe { rdsys::rd_kafka_init_transactions(self.native_ptr(), timeout.into().as_millis()) @@ -441,6 +444,10 @@ impl BaseProducer { } /// Begin a new transaction. + /// + /// A successful call to `init_transactions` must be made before calling + /// `begin_transaction`. All records produced and consumer offsets sent + /// after calling will be committed or aborted atomically. pub fn begin_transaction(&self) -> KafkaResult<()> { let begin_error: *const RDKafkaError = unsafe { rdsys::rd_kafka_begin_transaction(self.native_ptr()) }; @@ -450,9 +457,16 @@ impl BaseProducer { Ok(()) } - /// Sends a list of topic partition offsets to the consumer group coordinator for `cgm`, and - /// marks the offsets as part of the current transaction. These offsets will be considered - /// committed only if the transaction is committed successfully. + /// Sends a list of topic partition offsets to the consumer group + /// coordinator for `cgm`, and marks the offsets as part of the current + /// transaction. These offsets will be considered committed only if the + /// transaction is committed successfully. + /// + /// The offsets should be the next message your application will consume, + /// i.e.: the last processed message's offset + 1 for each partition. + /// + /// To get `ConsumerGroupMetadata` use `Consumer::group_metadata` on the + /// consumer that offsets are being sent for. pub fn send_offsets_to_transaction>( &self, offsets: &TopicPartitionList, @@ -474,6 +488,8 @@ impl BaseProducer { } /// Commit the current transaction. + /// + /// Flushes the producer before actually committing. pub fn commit_transaction>(&self, timeout: T) -> KafkaResult<()> { let commit_error: *const RDKafkaError = unsafe { rdsys::rd_kafka_commit_transaction(self.native_ptr(), timeout.into().as_millis()) @@ -485,6 +501,9 @@ impl BaseProducer { } /// Aborts the ongoing transaction. + /// + /// Should be used when receiving non-fatal abortable transaction errors. + /// Outstanding messages will be purged. pub fn abort_transaction>(&self, timeout: T) -> KafkaResult<()> { let abort_error: *const RDKafkaError = unsafe { rdsys::rd_kafka_abort_transaction(self.native_ptr(), timeout.into().as_millis()) @@ -611,18 +630,29 @@ impl ThreadedProducer { } /// Initialize transactions for the producer instance. + /// + /// Requires that a `transactional.id` be set for the producer and must be + /// called before any other transaction or send operations. pub fn init_transactions>(&self, timeout: T) -> KafkaResult<()> { self.producer.init_transactions(timeout) } /// Begin a new transaction. + /// + /// A successful call to `init_transactions` must be made before calling + /// `begin_transaction`. All records produced and consumer offsets sent + /// after calling will be committed or aborted atomically. pub fn begin_transaction(&self) -> KafkaResult<()> { self.producer.begin_transaction() } - /// Sends a list of topic partition offsets to the consumer group coordinator for `cgm`, and - /// marks the offsets as part of the current transaction. These offsets will be considered - /// committed only if the transaction is committed successfully. + /// Sends a list of topic partition offsets to the consumer group + /// coordinator for `cgm`, and marks the offsets as part of the current + /// transaction. These offsets will be considered committed only if the + /// transaction is committed successfully. + /// + /// The offsets should be the next message your application will consume, + /// i.e.: the last processed message's offset + 1 for each partition. pub fn send_offsets_to_transaction>( &self, offsets: &TopicPartitionList, @@ -634,11 +664,16 @@ impl ThreadedProducer { } /// Commit the current transaction. + /// + /// Flushes the producer before actually committing. pub fn commit_transaction>(&self, timeout: T) -> KafkaResult<()> { self.producer.commit_transaction(timeout) } /// Abort the ongoing transaction. + /// + /// Should be used when receiving non-fatal abortable transaction errors. + /// Outstanding messages will be purged. pub fn abort_transaction>(&self, timeout: T) -> KafkaResult<()> { self.producer.abort_transaction(timeout) } diff --git a/src/producer/future_producer.rs b/src/producer/future_producer.rs index 5238c25e9..96bb30bed 100644 --- a/src/producer/future_producer.rs +++ b/src/producer/future_producer.rs @@ -371,18 +371,29 @@ impl FutureProducer { } /// Initialize transactions for the producer instance. + /// + /// Requires that a `transactional.id` be set for the producer and must be + /// called before any other transaction or send operations. pub fn init_transactions>(&self, timeout: T) -> KafkaResult<()> { self.producer.init_transactions(timeout) } /// Begin a new transaction. + /// + /// A successful call to `init_transactions` must be made before calling + /// `begin_transaction`. All records produced and consumer offsets sent + /// after calling will be committed or aborted atomically. pub fn begin_transaction(&self) -> KafkaResult<()> { self.producer.begin_transaction() } - /// Sends a list of topic partition offsets to the consumer group coordinator for `cgm`, and - /// marks the offsets as part of the current transaction. These offsets will be considered - /// committed only if the transaction is committed successfully. + /// Sends a list of topic partition offsets to the consumer group + /// coordinator for `cgm`, and marks the offsets as part of the current + /// transaction. These offsets will be considered committed only if the + /// transaction is committed successfully. + /// + /// The offsets should be the next message your application will consume, + /// i.e.: the last processed message's offset + 1 for each partition. pub fn send_offsets_to_transaction>( &self, offsets: &TopicPartitionList, @@ -394,11 +405,16 @@ impl FutureProducer { } /// Commit the current transaction. + /// + /// Flushes the producer before actually committing. pub fn commit_transaction>(&self, timeout: T) -> KafkaResult<()> { self.producer.commit_transaction(timeout) } /// Abort the ongoing transaction. + /// + /// Should be used when receiving non-fatal abortable transaction errors. + /// Outstanding messages will be purged. pub fn abort_transaction>(&self, timeout: T) -> KafkaResult<()> { self.producer.abort_transaction(timeout) } diff --git a/src/producer/mod.rs b/src/producer/mod.rs index cb30aef14..589cc8175 100644 --- a/src/producer/mod.rs +++ b/src/producer/mod.rs @@ -75,6 +75,46 @@ //! available (for more information, check the documentation of the futures //! crate). //! +//! ## Transactional producer API +//! +//! All `rust-rdkafka` producers support transactions. To configure a producer +//! for transactions set `transactional.id` to an identifier unique to the +//! application. +//! +//! Transactional producers work together with transaction aware consumers +//! configured with `isolation.level = read_committed` (default). +//! +//! After creating a transactional producer, it must be initialized with +//! `init_transactions`. +//! +//! To start a new transaction use `begin_transaction`. +//! There can be **only one ongoing transaction** at a time per producer. All +//! records sent after starting a transaction and before committing or aborting +//! it will be part of the current transaction. +//! +//! Consumer offsets can be sent as part of the ongoing transaction using +//! `send_offsets_to_transaction` and will be committed atomically with the +//! other records sent in the transaction. +//! +//! The current transaction can be committed with `commit_transaction` or +//! aborted using `abort_transaction`. Afterwards, a new transaction can begin. +//! +//! ### Errors +//! +//! Errors returned by transaction methods may: +//! * be retriable, `RDKafkaError::is_retriable`: indicates that the operation +//! may be retried. +//! * require abort, `RDKafkaError::txn_requires_abort`: the current transaction +//! must be aborted and a new one may begin. +//! * be fatal, `KafkaError::is_fatal` or `RDKafkaError::is_fatal`: the producer +//! must be stopped and the application terminated. +//! +//! For more details about transactions check the [librdkafka documentation], +//! "Transactional producer API" section. +//! +//! [librdkafka documentation]: +//! https://docs.confluent.io/5.5.1/clients/librdkafka/rdkafka_8h.html +//! //! ## Configuration //! //! ### Producer configuration From f8b369448ee86d8b839d1f9fa2160881cd9e987e Mon Sep 17 00:00:00 2001 From: Robert Ignat Date: Wed, 30 Sep 2020 10:23:35 +0300 Subject: [PATCH 14/15] impl Send Sync for ConsumerGroupMetadata --- src/consumer/mod.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 74ce6e355..230113cc4 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -166,6 +166,9 @@ impl ConsumerGroupMetadata { } } +unsafe impl Send for ConsumerGroupMetadata {} +unsafe impl Sync for ConsumerGroupMetadata {} + /// Common trait for all consumers. /// /// # Note about object safety From 39c057814a9a4fe48efafaa2c26fbf1a5684363d Mon Sep 17 00:00:00 2001 From: Robert Ignat Date: Tue, 17 Nov 2020 10:40:32 +0200 Subject: [PATCH 15/15] Fix KafkaError methods --- src/error.rs | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/src/error.rs b/src/error.rs index d2c994d22..02898c54f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -185,8 +185,29 @@ impl fmt::Display for KafkaError { impl error::Error for KafkaError { fn source(&self) -> Option<&(dyn error::Error + 'static)> { - self.rdkafka_error_code() - .map(|e| e as &(dyn error::Error + 'static)) + match self { + KafkaError::AdminOp(_) => None, + KafkaError::AdminOpCreation(_) => None, + KafkaError::Canceled => None, + KafkaError::ClientConfig(_, _, _, _) => None, + KafkaError::ClientCreation(_) => None, + KafkaError::ConsumerCommit(err) => Some(err), + KafkaError::Global(err) => Some(err), + KafkaError::GroupListFetch(err) => Some(err), + KafkaError::MessageConsumption(err) => Some(err), + KafkaError::MessageProduction(err) => Some(err), + KafkaError::MetadataFetch(err) => Some(err), + KafkaError::NoMessageReceived => None, + KafkaError::Nul(_) => None, + KafkaError::OffsetFetch(err) => Some(err), + KafkaError::PartitionEOF(_) => None, + KafkaError::PauseResume(_) => None, + KafkaError::Seek(_) => None, + KafkaError::SetPartitionOffset(err) => Some(err), + KafkaError::StoreOffset(err) => Some(err), + KafkaError::Subscription(_) => None, + KafkaError::Transaction(err) => Some(err), + } } } @@ -211,7 +232,7 @@ impl KafkaError { /// Returns the [`RDKafkaErrorCode`] underlying this error, if any. #[allow(clippy::match_same_arms)] - pub fn rdkafka_error_code(&self) -> Option<&RDKafkaErrorCode> { + pub fn rdkafka_error_code(&self) -> Option { match self { KafkaError::AdminOp(_) => None, KafkaError::AdminOpCreation(_) => None,