From 9ccdd35833aef85adcb05175b9ff89efde0d5667 Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Tue, 5 Jan 2021 01:54:30 -0500 Subject: [PATCH] Add transactional producer support Co-authored-by: Robert Ignat Closes #289. Closes #293. --- README.md | 16 +++ changelog.md | 8 ++ docker-compose.yaml | 2 + rdkafka-sys/src/types.rs | 12 +- src/consumer/base_consumer.rs | 13 ++- src/consumer/mod.rs | 32 +++++- src/consumer/stream_consumer.rs | 8 +- src/error.rs | 168 +++++++++++++++++++++++---- src/lib.rs | 16 +++ src/producer/base_producer.rs | 103 ++++++++++++++++- src/producer/future_producer.rs | 28 +++++ src/producer/mod.rs | 184 +++++++++++++++++++++++++++++- tests/test_transactions.rs | 195 ++++++++++++++++++++++++++++++++ tests/utils.rs | 8 ++ 14 files changed, 758 insertions(+), 35 deletions(-) create mode 100644 tests/test_transactions.rs diff --git a/README.md b/README.md index 1e326a4b8..f83f2ffd0 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,8 @@ The main features provided at the moment are: - Access to group metadata (list groups, list members of groups, hostnames, etc.). - Access to producer and consumer metrics, errors and callbacks. +- Exactly-once semantics (EOS) via idempotent and transactional producers + and read-committed consumers. ### One million messages per second @@ -116,6 +118,19 @@ To see how to implement at-least-once delivery with `rdkafka`, check out the delivery semantics, check the [message delivery semantics] chapter in the Kafka documentation. +### Exactly-once semantics + +Exactly-once semantics (EOS) can be achieved using transactional producers, +which allow produced records and consumer offsets to be committed or aborted +atomically. Consumers that set their `isolation.level` to `read_committed` +will only observe committed messages. + +EOS is useful in read-process-write scenarios that require messages to be +processed exactly once. + +To learn more about using transactions in rust-rdkafka, see the +[Transactions](producer-transactions) section of the producer documentation. + ### Users Here are some of the projects using rust-rdkafka: @@ -239,6 +254,7 @@ logging framework. [librdkafka]: https://github.com/edenhill/librdkafka [librdkafka-config]: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md [message delivery semantics]: https://kafka.apache.org/0101/documentation.html#semantics +[producer-transactions]: https://docs.rs/rdkafka/*/rdkafka/producer/#transactions [rdkafka-sys-features]: https://github.com/fede1024/rust-rdkafka/tree/master/rdkafka-sys/README.md#features [rdkafka-sys-known-issues]: https://github.com/fede1024/rust-rdkafka/tree/master/rdkafka-sys/README.md#known-issues [smol]: https://docs.rs/smol diff --git a/changelog.md b/changelog.md index 7a54a0179..0a3cb9772 100644 --- a/changelog.md +++ b/changelog.md @@ -5,6 +5,13 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md). ## 0.25.0 (Unreleased) +* Add support for transactional producers. The new methods are + `Producer::init_transactions`, `Producer::begin_transaction`, + `Producer::commit_transaction`, `Producer::abort_transaction`, and + `Producer::send_offsets_to_transaction`. + + Thanks to [@roignpar] for the implementation. + * **Breaking change.** Rename `RDKafkaError` to `RDKafkaErrorCode`. This makes space for the new `RDKafkaError` type, which mirrors the `rd_kafka_error_t` type added to librdkafka in v1.4.0. @@ -84,6 +91,7 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md). when the `tokio` feature is disabled. [@Marwes]: https://github.com/Marwes +[@roignpar]: https://github.com/roignpar ## 0.24.0 (2020-07-08) diff --git a/docker-compose.yaml b/docker-compose.yaml index baee434c4..85bba5efe 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -8,6 +8,8 @@ services: - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 + - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 + - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 - KAFKA_NUM_PARTITIONS=3 - CONFLUENT_SUPPORT_METRICS_ENABLE=0 ports: ["9092:9092"] diff --git a/rdkafka-sys/src/types.rs b/rdkafka-sys/src/types.rs index 45fb1f51e..32fef8c02 100644 --- a/rdkafka-sys/src/types.rs +++ b/rdkafka-sys/src/types.rs @@ -1,8 +1,9 @@ //! Aliases for types defined in the auto-generated bindings. use std::convert::TryFrom; +use std::error::Error; use std::ffi::CStr; -use std::{error, fmt}; +use std::fmt; use crate::bindings; use crate::helpers; @@ -42,6 +43,9 @@ pub type RDKafkaMetadataPartition = bindings::rd_kafka_metadata_partition_t; /// Native rdkafka broker information. pub type RDKafkaMetadataBroker = bindings::rd_kafka_metadata_broker_t; +/// Native rdkafka consumer group metadata. +pub type RDKafkaConsumerGroupMetadata = bindings::rd_kafka_consumer_group_metadata_t; + /// Native rdkafka state. pub type RDKafkaState = bindings::rd_kafka_s; @@ -435,11 +439,7 @@ impl fmt::Display for RDKafkaErrorCode { } } -impl error::Error for RDKafkaErrorCode { - fn description(&self) -> &str { - "Error from underlying rdkafka library" - } -} +impl Error for RDKafkaErrorCode {} #[cfg(test)] mod tests { diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index de46aaa23..fa0cab416 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -14,7 +14,9 @@ use rdkafka_sys::types::*; use crate::client::{Client, NativeClient, NativeQueue}; use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext}; -use crate::consumer::{CommitMode, Consumer, ConsumerContext, DefaultConsumerContext}; +use crate::consumer::{ + CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext, +}; use crate::error::{IsError, KafkaError, KafkaResult}; use crate::groups::GroupList; use crate::message::{BorrowedMessage, Message}; @@ -271,6 +273,15 @@ where &self.client } + fn group_metadata(&self) -> Option { + let ptr = unsafe { + NativePtr::from_ptr(rdsys::rd_kafka_consumer_group_metadata( + self.client.native_ptr(), + )) + }?; + Some(ConsumerGroupMetadata(ptr)) + } + fn subscribe(&self, topics: &[&str]) -> KafkaResult<()> { let mut tpl = TopicPartitionList::new(); for topic in topics { diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index ae78ca141..ce064cceb 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -15,7 +15,7 @@ use crate::groups::GroupList; use crate::message::BorrowedMessage; use crate::metadata::Metadata; use crate::topic_partition_list::{Offset, TopicPartitionList}; -use crate::util::{cstr_to_owned, Timeout}; +use crate::util::{cstr_to_owned, KafkaDrop, NativePtr, Timeout}; pub mod base_consumer; pub mod stream_consumer; @@ -146,6 +146,27 @@ pub enum CommitMode { Async = 1, } +/// Consumer group metadata. +/// +/// For use with [`Producer::send_offsets_to_transaction`]. +/// +/// [`Producer::send_offsets_to_transaction`]: crate::producer::Producer::send_offsets_to_transaction +pub struct ConsumerGroupMetadata(NativePtr); + +impl ConsumerGroupMetadata { + pub(crate) fn ptr(&self) -> *const RDKafkaConsumerGroupMetadata { + self.0.ptr() + } +} + +unsafe impl KafkaDrop for RDKafkaConsumerGroupMetadata { + const TYPE: &'static str = "consumer_group_metadata"; + const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_consumer_group_metadata_destroy; +} + +unsafe impl Send for ConsumerGroupMetadata {} +unsafe impl Sync for ConsumerGroupMetadata {} + /// Common trait for all consumers. /// /// # Note about object safety @@ -167,6 +188,15 @@ where self.client().context() } + /// Returns the current consumer group metadata associated with the + /// consumer. + /// + /// If the consumer was not configured with a `group.id`, returns `None`. + /// For use with [`Producer::send_offsets_to_transaction`]. + /// + /// [`Producer::send_offsets_to_transaction`]: crate::producer::Producer::send_offsets_to_transaction + fn group_metadata(&self) -> Option; + /// Subscribes the consumer to a list of topics. fn subscribe(&self, topics: &[&str]) -> KafkaResult<()>; diff --git a/src/consumer/stream_consumer.rs b/src/consumer/stream_consumer.rs index ae99658d7..4a8242a5e 100644 --- a/src/consumer/stream_consumer.rs +++ b/src/consumer/stream_consumer.rs @@ -19,7 +19,9 @@ use rdkafka_sys::types::*; use crate::client::{Client, ClientContext, NativeClient}; use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext, RDKafkaLogLevel}; use crate::consumer::base_consumer::BaseConsumer; -use crate::consumer::{CommitMode, Consumer, ConsumerContext, DefaultConsumerContext, Rebalance}; +use crate::consumer::{ + CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext, Rebalance, +}; use crate::error::{KafkaError, KafkaResult}; use crate::groups::GroupList; use crate::message::BorrowedMessage; @@ -345,6 +347,10 @@ where self.base.client() } + fn group_metadata(&self) -> Option { + self.base.group_metadata() + } + fn subscribe(&self, topics: &[&str]) -> KafkaResult<()> { self.base.subscribe(topics) } diff --git a/src/error.rs b/src/error.rs index d1b2c76e6..db9c2cf00 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,10 +1,16 @@ //! Error manipulations. -use std::{error, ffi, fmt}; +use std::error::Error; +use std::ffi::{self, CStr}; +use std::fmt; +use std::ptr; +use std::sync::Arc; use rdkafka_sys as rdsys; use rdkafka_sys::types::*; +use crate::util::{KafkaDrop, NativePtr}; + // Re-export rdkafka error code pub use rdsys::types::RDKafkaErrorCode; @@ -16,21 +22,111 @@ pub type KafkaResult = Result; /// Some librdkafka codes are informational, rather than true errors. pub trait IsError { /// Reports whether the value represents an error. - fn is_error(self) -> bool; + fn is_error(&self) -> bool; } impl IsError for RDKafkaRespErr { - fn is_error(self) -> bool { - self as i32 != RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR as i32 + fn is_error(&self) -> bool { + *self as i32 != RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR as i32 } } impl IsError for RDKafkaConfRes { - fn is_error(self) -> bool { - self as i32 != RDKafkaConfRes::RD_KAFKA_CONF_OK as i32 + fn is_error(&self) -> bool { + *self as i32 != RDKafkaConfRes::RD_KAFKA_CONF_OK as i32 + } +} + +impl IsError for RDKafkaError { + fn is_error(&self) -> bool { + self.0.is_some() + } +} + +/// Native rdkafka error. +#[derive(Clone)] +pub struct RDKafkaError(Option>>); + +unsafe impl KafkaDrop for rdsys::rd_kafka_error_t { + const TYPE: &'static str = "error"; + const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_error_destroy; +} + +unsafe impl Send for RDKafkaError {} +unsafe impl Sync for RDKafkaError {} + +impl RDKafkaError { + pub(crate) unsafe fn from_ptr(ptr: *mut rdsys::rd_kafka_error_t) -> RDKafkaError { + RDKafkaError(NativePtr::from_ptr(ptr).map(Arc::new)) + } + + fn ptr(&self) -> *const rdsys::rd_kafka_error_t { + match &self.0 { + None => ptr::null(), + Some(p) => p.ptr(), + } + } + + /// Returns the error code or [`RDKafkaErrorCode::NoError`] if the error is + /// null. + pub fn code(&self) -> RDKafkaErrorCode { + unsafe { rdsys::rd_kafka_error_code(self.ptr()).into() } + } + + /// Returns the error code name, e.g., "ERR_UNKNOWN_MEMBER_ID" or an empty + /// string if the error is null. + pub fn name(&self) -> String { + let cstr = unsafe { rdsys::rd_kafka_error_name(self.ptr()) }; + unsafe { CStr::from_ptr(cstr).to_string_lossy().into_owned() } + } + + /// Returns a human readable error string or an empty string if the error is + /// null. + pub fn string(&self) -> String { + let cstr = unsafe { rdsys::rd_kafka_error_string(self.ptr()) }; + unsafe { CStr::from_ptr(cstr).to_string_lossy().into_owned() } + } + + /// Reports whether the error is a fatal error. + /// + /// A fatal error indicates that the client instance is no longer usable. + pub fn is_fatal(&self) -> bool { + unsafe { rdsys::rd_kafka_error_is_fatal(self.ptr()) != 0 } + } + + /// Reports whether the operation that encountered the error can be retried. + pub fn is_retriable(&self) -> bool { + unsafe { rdsys::rd_kafka_error_is_retriable(self.ptr()) != 0 } + } + + /// Reports whether the error is an abortable transaction error. + pub fn txn_requires_abort(&self) -> bool { + unsafe { rdsys::rd_kafka_error_txn_requires_abort(self.ptr()) != 0 } + } +} + +impl PartialEq for RDKafkaError { + fn eq(&self, other: &RDKafkaError) -> bool { + self.code() == other.code() } } +impl Eq for RDKafkaError {} + +impl fmt::Debug for RDKafkaError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "RDKafkaError({})", self) + } +} + +impl fmt::Display for RDKafkaError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&self.string()) + } +} + +impl Error for RDKafkaError {} + // TODO: consider using macro /// Represents all possible Kafka errors. @@ -78,11 +174,13 @@ pub enum KafkaError { StoreOffset(RDKafkaErrorCode), /// Subscription creation failed. Subscription(String), + /// Transaction error. + Transaction(RDKafkaError), } impl fmt::Debug for KafkaError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { + match self { KafkaError::AdminOp(err) => write!(f, "KafkaError (Admin operation error: {})", err), KafkaError::AdminOpCreation(ref err) => { write!(f, "KafkaError (Admin operation creation error: {})", err) @@ -129,13 +227,14 @@ impl fmt::Debug for KafkaError { KafkaError::Subscription(ref err) => { write!(f, "KafkaError (Subscription error: {})", err) } + KafkaError::Transaction(err) => write!(f, "KafkaError (Transaction error: {})", err), } } } impl fmt::Display for KafkaError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { + match self { KafkaError::AdminOp(err) => write!(f, "Admin operation error: {}", err), KafkaError::AdminOpCreation(ref err) => { write!(f, "Admin operation creation error: {}", err) @@ -162,14 +261,36 @@ impl fmt::Display for KafkaError { KafkaError::SetPartitionOffset(err) => write!(f, "Set partition offset error: {}", err), KafkaError::StoreOffset(err) => write!(f, "Store offset error: {}", err), KafkaError::Subscription(ref err) => write!(f, "Subscription error: {}", err), + KafkaError::Transaction(err) => write!(f, "Transaction error: {}", err), } } } -impl error::Error for KafkaError { - fn source(&self) -> Option<&(dyn error::Error + 'static)> { - self.rdkafka_error_code() - .map(|e| e as &(dyn error::Error + 'static)) +impl Error for KafkaError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + KafkaError::AdminOp(_) => None, + KafkaError::AdminOpCreation(_) => None, + KafkaError::Canceled => None, + KafkaError::ClientConfig(..) => None, + KafkaError::ClientCreation(_) => None, + KafkaError::ConsumerCommit(err) => Some(err), + KafkaError::Global(err) => Some(err), + KafkaError::GroupListFetch(err) => Some(err), + KafkaError::MessageConsumption(err) => Some(err), + KafkaError::MessageProduction(err) => Some(err), + KafkaError::MetadataFetch(err) => Some(err), + KafkaError::NoMessageReceived => None, + KafkaError::Nul(_) => None, + KafkaError::OffsetFetch(err) => Some(err), + KafkaError::PartitionEOF(_) => None, + KafkaError::PauseResume(_) => None, + KafkaError::Seek(_) => None, + KafkaError::SetPartitionOffset(err) => Some(err), + KafkaError::StoreOffset(err) => Some(err), + KafkaError::Subscription(_) => None, + KafkaError::Transaction(err) => Some(err), + } } } @@ -182,28 +303,29 @@ impl From for KafkaError { impl KafkaError { /// Returns the [`RDKafkaErrorCode`] underlying this error, if any. #[allow(clippy::match_same_arms)] - pub fn rdkafka_error_code(&self) -> Option<&RDKafkaErrorCode> { + pub fn rdkafka_error_code(&self) -> Option { match self { KafkaError::AdminOp(_) => None, KafkaError::AdminOpCreation(_) => None, KafkaError::Canceled => None, - KafkaError::ClientConfig(_, _, _, _) => None, + KafkaError::ClientConfig(..) => None, KafkaError::ClientCreation(_) => None, - KafkaError::ConsumerCommit(err) => Some(err), - KafkaError::Global(err) => Some(err), - KafkaError::GroupListFetch(err) => Some(err), - KafkaError::MessageConsumption(err) => Some(err), - KafkaError::MessageProduction(err) => Some(err), - KafkaError::MetadataFetch(err) => Some(err), + KafkaError::ConsumerCommit(err) => Some(*err), + KafkaError::Global(err) => Some(*err), + KafkaError::GroupListFetch(err) => Some(*err), + KafkaError::MessageConsumption(err) => Some(*err), + KafkaError::MessageProduction(err) => Some(*err), + KafkaError::MetadataFetch(err) => Some(*err), KafkaError::NoMessageReceived => None, KafkaError::Nul(_) => None, - KafkaError::OffsetFetch(err) => Some(err), + KafkaError::OffsetFetch(err) => Some(*err), KafkaError::PartitionEOF(_) => None, KafkaError::PauseResume(_) => None, KafkaError::Seek(_) => None, - KafkaError::SetPartitionOffset(err) => Some(err), - KafkaError::StoreOffset(err) => Some(err), + KafkaError::SetPartitionOffset(err) => Some(*err), + KafkaError::StoreOffset(err) => Some(*err), KafkaError::Subscription(_) => None, + KafkaError::Transaction(err) => Some(err.code()), } } } diff --git a/src/lib.rs b/src/lib.rs index 728e23cc7..b81b89112 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,6 +31,8 @@ //! - Access to group metadata (list groups, list members of groups, hostnames, //! etc.). //! - Access to producer and consumer metrics, errors and callbacks. +//! - Exactly-once semantics (EOS) via idempotent and transactional producers +//! and read-committed consumers. //! //! ### One million messages per second //! @@ -108,6 +110,19 @@ //! delivery semantics, check the [message delivery semantics] chapter in the //! Kafka documentation. //! +//! ### Exactly-once semantics +//! +//! Exactly-once semantics (EOS) can be achieved using transactional producers, +//! which allow produced records and consumer offsets to be committed or aborted +//! atomically. Consumers that set their `isolation.level` to `read_committed` +//! will only observe committed messages. +//! +//! EOS is useful in read-process-write scenarios that require messages to be +//! processed exactly once. +//! +//! To learn more about using transactions in rust-rdkafka, see the +//! [Transactions](producer-transactions) section of the producer documentation. +//! //! ### Users //! //! Here are some of the projects using rust-rdkafka: @@ -231,6 +246,7 @@ //! [librdkafka]: https://github.com/edenhill/librdkafka //! [librdkafka-config]: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md //! [message delivery semantics]: https://kafka.apache.org/0101/documentation.html#semantics +//! [producer-transactions]: https://docs.rs/rdkafka/*/rdkafka/producer/#transactions //! [rdkafka-sys-features]: https://github.com/fede1024/rust-rdkafka/tree/master/rdkafka-sys/README.md#features //! [rdkafka-sys-known-issues]: https://github.com/fede1024/rust-rdkafka/tree/master/rdkafka-sys/README.md#known-issues //! [smol]: https://docs.rs/smol diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index 2fc51da3d..d52ccf4ef 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -58,9 +58,11 @@ use rdkafka_sys::types::*; use crate::client::Client; use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext}; -use crate::error::{IsError, KafkaError, KafkaResult}; +use crate::consumer::ConsumerGroupMetadata; +use crate::error::{IsError, KafkaError, KafkaResult, RDKafkaError}; use crate::message::{BorrowedMessage, OwnedHeaders, ToBytes}; use crate::producer::{DefaultProducerContext, Producer, ProducerContext}; +use crate::topic_partition_list::TopicPartitionList; use crate::util::{IntoOpaque, Timeout}; pub use crate::message::DeliveryResult; @@ -390,6 +392,79 @@ where fn in_flight_count(&self) -> i32 { unsafe { rdsys::rd_kafka_outq_len(self.native_ptr()) } } + + fn init_transactions>(&self, timeout: T) -> KafkaResult<()> { + let ret = unsafe { + RDKafkaError::from_ptr(rdsys::rd_kafka_init_transactions( + self.native_ptr(), + timeout.into().as_millis(), + )) + }; + if ret.is_error() { + Err(KafkaError::Transaction(ret)) + } else { + Ok(()) + } + } + + fn begin_transaction(&self) -> KafkaResult<()> { + let ret = + unsafe { RDKafkaError::from_ptr(rdsys::rd_kafka_begin_transaction(self.native_ptr())) }; + if ret.is_error() { + Err(KafkaError::Transaction(ret)) + } else { + Ok(()) + } + } + + fn send_offsets_to_transaction>( + &self, + offsets: &TopicPartitionList, + cgm: &ConsumerGroupMetadata, + timeout: T, + ) -> KafkaResult<()> { + let ret = unsafe { + RDKafkaError::from_ptr(rdsys::rd_kafka_send_offsets_to_transaction( + self.native_ptr(), + offsets.ptr(), + cgm.ptr(), + timeout.into().as_millis(), + )) + }; + if ret.is_error() { + Err(KafkaError::Transaction(ret)) + } else { + Ok(()) + } + } + + fn commit_transaction>(&self, timeout: T) -> KafkaResult<()> { + let ret = unsafe { + RDKafkaError::from_ptr(rdsys::rd_kafka_commit_transaction( + self.native_ptr(), + timeout.into().as_millis(), + )) + }; + if ret.is_error() { + Err(KafkaError::Transaction(ret)) + } else { + Ok(()) + } + } + + fn abort_transaction>(&self, timeout: T) -> KafkaResult<()> { + let ret = unsafe { + RDKafkaError::from_ptr(rdsys::rd_kafka_abort_transaction( + self.native_ptr(), + timeout.into().as_millis(), + )) + }; + if ret.is_error() { + Err(KafkaError::Transaction(ret)) + } else { + Ok(()) + } + } } impl Clone for BaseProducer @@ -514,6 +589,32 @@ where fn in_flight_count(&self) -> i32 { self.producer.in_flight_count() } + + fn init_transactions>(&self, timeout: T) -> KafkaResult<()> { + self.producer.init_transactions(timeout) + } + + fn begin_transaction(&self) -> KafkaResult<()> { + self.producer.begin_transaction() + } + + fn send_offsets_to_transaction>( + &self, + offsets: &TopicPartitionList, + cgm: &ConsumerGroupMetadata, + timeout: T, + ) -> KafkaResult<()> { + self.producer + .send_offsets_to_transaction(offsets, cgm, timeout) + } + + fn commit_transaction>(&self, timeout: T) -> KafkaResult<()> { + self.producer.commit_transaction(timeout) + } + + fn abort_transaction>(&self, timeout: T) -> KafkaResult<()> { + self.producer.abort_transaction(timeout) + } } impl Drop for ThreadedProducer diff --git a/src/producer/future_producer.rs b/src/producer/future_producer.rs index 0b7141a98..36b215707 100644 --- a/src/producer/future_producer.rs +++ b/src/producer/future_producer.rs @@ -15,10 +15,12 @@ use futures::FutureExt; use crate::client::{Client, ClientContext, DefaultClientContext}; use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext, RDKafkaLogLevel}; +use crate::consumer::ConsumerGroupMetadata; use crate::error::{KafkaError, KafkaResult, RDKafkaErrorCode}; use crate::message::{Message, OwnedHeaders, OwnedMessage, Timestamp, ToBytes}; use crate::producer::{BaseRecord, DeliveryResult, Producer, ProducerContext, ThreadedProducer}; use crate::statistics::Statistics; +use crate::topic_partition_list::TopicPartitionList; use crate::util::{AsyncRuntime, DefaultRuntime, IntoOpaque, Timeout}; // @@ -357,6 +359,32 @@ where fn in_flight_count(&self) -> i32 { self.producer.in_flight_count() } + + fn init_transactions>(&self, timeout: T) -> KafkaResult<()> { + self.producer.init_transactions(timeout) + } + + fn begin_transaction(&self) -> KafkaResult<()> { + self.producer.begin_transaction() + } + + fn send_offsets_to_transaction>( + &self, + offsets: &TopicPartitionList, + cgm: &ConsumerGroupMetadata, + timeout: T, + ) -> KafkaResult<()> { + self.producer + .send_offsets_to_transaction(offsets, cgm, timeout) + } + + fn commit_transaction>(&self, timeout: T) -> KafkaResult<()> { + self.producer.commit_transaction(timeout) + } + + fn abort_transaction>(&self, timeout: T) -> KafkaResult<()> { + self.producer.abort_transaction(timeout) + } } #[cfg(test)] diff --git a/src/producer/mod.rs b/src/producer/mod.rs index c7fc5fd95..9f873b493 100644 --- a/src/producer/mod.rs +++ b/src/producer/mod.rs @@ -18,8 +18,8 @@ //! regular intervals to process those events; the thread calling `poll` will be //! the one executing the user-specified delivery callback for every delivery //! event. If `poll` is not called, or not frequently enough, the producer will -//! return a `RDKafkaError::QueueFull` error and it won't be able to send any -//! other message until more delivery event are processed via `poll`. The +//! return a [`RDKafkaErrorCode::QueueFull`] error and it won't be able to send +//! any other message until more delivery event are processed via `poll`. The //! `QueueFull` error can also be returned if Kafka is not able to receive the //! messages quickly enough. //! @@ -75,6 +75,47 @@ //! available (for more information, check the documentation of the futures //! crate). //! +//! ## Transactions +//! +//! All rust-rdkafka producers support transactions. Transactional producers +//! work together with transaction-aware consumers configured with the default +//! `isolation.level` of `read_committed`. +//! +//! To configure a producer for transactions set `transactional.id` to an +//! identifier unique to the application when creating the producer. After +//! creating the producer, you must initialize it with +//! [`Producer::init_transactions`]. +//! +//! To start a new transaction use [`Producer::begin_transaction`]. There can be +//! **only one ongoing transaction** at a time per producer. All records sent +//! after starting a transaction and before committing or aborting it will +//! automatically be associated with that transaction. +//! +//! Once you have initialized transactions on a producer, you are not permitted +//! to produce messages outside of a transaction. +//! +//! Consumer offsets can be sent as part of the ongoing transaction using +//! `send_offsets_to_transaction` and will be committed atomically with the +//! other records sent in the transaction. +//! +//! The current transaction can be committed with +//! [`Producer::commit_transaction`] or aborted using +//! [`Producer::abort_transaction`]. Afterwards, a new transaction can begin. +//! +//! ### Errors +//! +//! Errors returned by transaction methods may: +//! +//! * be retriable ([`RDKafkaError::is_retriable`]), in which case the operation +//! that encountered the error may be retried. +//! * require abort ([`RDKafkaError::txn_requires_abort`], in which case the +//! current transaction must be aborted and a new transaction begun. +//! * be fatal ([`RDKafkaError::is_fatal`]), in which case the producer must be +//! stopped and the application terminated. +//! +//! For more details about transactions, see the [Transactional Producer] +//! section of the librdkafka introduction. +//! //! ## Configuration //! //! ### Producer configuration @@ -112,10 +153,18 @@ //! locally and limits the time a produced message waits for successful //! delivery. A time of 0 is infinite. Default: 300000. //! +//! [`RDKafkaErrorCode::QueueFull`]: crate::error::RDKafkaErrorCode::QueueFull +//! [`RDKafkaError::is_retriable`]: crate::error::RDKafkaError::is_retriable +//! [`RDKafkaError::txn_requires_abort`]: crate::error::RDKafkaError::txn_requires_abort +//! [`RDKafkaError::is_fatal`]: crate::error::RDKafkaError::is_fatal +//! [Transactional Producer]: https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#transactional-producer use std::sync::Arc; use crate::client::{Client, ClientContext}; +use crate::consumer::ConsumerGroupMetadata; +use crate::error::KafkaResult; +use crate::topic_partition_list::TopicPartitionList; use crate::util::{IntoOpaque, Timeout}; pub mod base_producer; @@ -187,4 +236,135 @@ where /// This method should be called before termination to ensure delivery of /// all enqueued messages. It will call `poll()` internally. fn flush>(&self, timeout: T); + + /// Enable sending transactions with this producer. + /// + /// # Prerequisites + /// + /// * The configuration used to create the producer must include a + /// `transactional.id` setting. + /// * You must not have sent any messages or called any of the other + /// transaction-related functions. + /// + /// # Details + /// + /// This function ensures any transactions initiated by previous producers + /// with the same `transactional.id` are completed. Any transactions left + /// open by any such previous producers will be aborted. + /// + /// Once previous transactions have been fenced, this function acquires an + /// internal producer ID and epoch that will be used by all transactional + /// messages sent by this producer. + /// + /// If this function returns successfully, messages may only be sent to this + /// producer when a transaction is active. See + /// [`Producer::begin_transaction`]. + /// + /// This function may block for the specified `timeout`. + fn init_transactions>(&self, timeout: T) -> KafkaResult<()>; + + /// Begins a new transaction. + /// + /// # Prerequisites + /// + /// You must have successfully called [`Producer::init_transactions`]. + /// + /// # Details + /// + /// This function begins a new transaction, and implicitly associates that + /// open transaction with this producer. + /// + /// After a successful call to this function, any messages sent via this + /// producer or any calls to [`Producer::send_offsets_to_transaction`] will + /// be implicitly associated with this transaction, until the transaction is + /// finished. + /// + /// Finish the transaction by calling [`Producer::commit_transaction`] or + /// [`Producer::abort_transaction`]. + /// + /// While a transaction is open, you must perform at least one transaction + /// operation every `transaction.timeout.ms` to avoid timing out the + /// transaction on the broker. + fn begin_transaction(&self) -> KafkaResult<()>; + + /// Associates an offset commit operation with this transaction. + /// + /// # Prerequisites + /// + /// The producer must have an open transaction via a call to + /// [`Producer::begin_transaction`]. + /// + /// # Details + /// + /// Sends a list of topic partition offsets to the consumer group + /// coordinator for `cgm`, and marks the offsets as part of the current + /// transaction. These offsets will be considered committed only if the + /// transaction is committed successfully. + /// + /// The offsets should be the next message your application will consume, + /// i.e., one greater than the the last processed message's offset for each + /// partition. + /// + /// Use this method at the end of a consume-transform-produce loop, prior to + /// comitting the transaction with [`Producer::commit_transaction`]. + /// + /// This function may block for the specified `timeout`. + /// + /// # Hints + /// + /// To obtain the correct consumer group metadata, call + /// [`Consumer::group_metadata`] on the consumer for which offsets are being + /// committed. + /// + /// The consumer must not have automatic commits enabled. + /// + /// [`Consumer::group_metadata`]: crate::consumer::Consumer::group_metadata + fn send_offsets_to_transaction>( + &self, + offsets: &TopicPartitionList, + cgm: &ConsumerGroupMetadata, + timeout: T, + ) -> KafkaResult<()>; + + /// Commits the current transaction. + /// + /// # Prerequisites + /// + /// The producer must have an open transaction via a call to + /// [`Producer::begin_transaction`]. + /// + /// # Details + /// + /// Any outstanding messages will be flushed (i.e., delivered) before + /// actually committing the transaction. + /// + /// If any of the outstanding messages fail permanently, the current + /// transaction will enter an abortable error state and this function will + /// return an abortable error. You must then call + /// [`Producer::abort_transaction`] before attemping to create another + /// transaction. + /// + /// This function may block for the specified `timeout`. + fn commit_transaction>(&self, timeout: T) -> KafkaResult<()>; + + /// Aborts the current transaction. + /// + /// # Prerequisites + /// + /// The producer must have an open transaction via a call to + /// [`Producer::begin_transaction`]. + /// + /// # Details + /// + /// Any oustanding messages will be purged and failed with + /// [`RDKafkaErrorCode::PurgeInflight`] or [`RDKafkaErrorCode::PurgeQueue`]. + /// + /// This function should also be used to recover from non-fatal abortable + /// transaction errors. + /// + /// This function may block for the specified `timeout`. + /// + /// [`RDKafkaErrorCode::PurgeInflight`]: crate::error::RDKafkaErrorCode::PurgeInflight + /// [`RDKafkaErrorCode::PurgeQueue`]: crate::error::RDKafkaErrorCode::PurgeQueue + fn abort_transaction>(&self, timeout: T) -> KafkaResult<()>; } diff --git a/tests/test_transactions.rs b/tests/test_transactions.rs new file mode 100644 index 000000000..34a7f5055 --- /dev/null +++ b/tests/test_transactions.rs @@ -0,0 +1,195 @@ +//! Test transactions using the base consumer and producer. + +use std::collections::HashMap; +use std::error::Error; + +use rdkafka::config::ClientConfig; +use rdkafka::config::RDKafkaLogLevel; +use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer}; +use rdkafka::error::KafkaError; +use rdkafka::producer::{BaseProducer, BaseRecord, Producer}; +use rdkafka::topic_partition_list::{Offset, TopicPartitionList}; +use rdkafka::util::Timeout; + +use utils::*; + +mod utils; + +fn create_consumer( + config_overrides: Option>, +) -> Result { + consumer_config(&rand_test_group(), config_overrides).create() +} + +fn create_producer() -> Result { + let mut config = ClientConfig::new(); + config + .set("bootstrap.servers", &get_bootstrap_server()) + .set("message.timeout.ms", "5000") + .set("enable.idempotence", "true") + .set("transactional.id", &rand_test_transactional_id()) + .set("debug", "eos"); + config.set_log_level(RDKafkaLogLevel::Debug); + config.create() +} + +enum IsolationLevel { + ReadUncommitted, + ReadCommitted, +} + +fn count_records(topic: &str, iso: IsolationLevel) -> Result { + let consumer = create_consumer(Some(map!( + "isolation.level" => match iso { + IsolationLevel::ReadUncommitted => "read_uncommitted", + IsolationLevel::ReadCommitted => "read_committed", + }, + "enable.partition.eof" => "true" + )))?; + let mut tpl = TopicPartitionList::new(); + tpl.add_partition(topic, 0); + consumer.assign(&tpl)?; + let mut count = 0; + for message in consumer.iter() { + match message { + Ok(_) => count += 1, + Err(KafkaError::PartitionEOF(_)) => break, + Err(e) => return Err(e), + } + } + Ok(count) +} + +#[tokio::test] +async fn test_transaction_abort() -> Result<(), Box> { + let consume_topic = rand_test_topic(); + let produce_topic = rand_test_topic(); + + populate_topic(&consume_topic, 30, &value_fn, &key_fn, Some(0), None).await; + + // Create consumer and subscribe to `consume_topic`. + let consumer = create_consumer(None)?; + consumer.subscribe(&[&consume_topic])?; + consumer.poll(Timeout::Never).unwrap()?; + + // Commit the first 10 messages. + let mut commit_tpl = TopicPartitionList::new(); + commit_tpl.add_partition_offset(&consume_topic, 0, Offset::Offset(10)); + consumer.commit(&commit_tpl, CommitMode::Sync).unwrap(); + + // Create a producer and start a transaction. + let producer = create_producer()?; + producer.init_transactions(Timeout::Never)?; + producer.begin_transaction()?; + + // Tie the commit of offset 20 to the transaction. + let cgm = consumer.group_metadata().unwrap(); + let mut txn_tpl = TopicPartitionList::new(); + txn_tpl.add_partition_offset(&consume_topic, 0, Offset::Offset(20)); + producer.send_offsets_to_transaction(&txn_tpl, &cgm, Timeout::Never)?; + + // Produce 10 records in the transaction. + for _ in 0..10 { + producer + .send( + BaseRecord::to(&produce_topic) + .payload("A") + .key("B") + .partition(0), + ) + .unwrap(); + } + + // Abort the transaction, but only after producing all messages. + producer.flush(Timeout::Never); + producer.abort_transaction(Timeout::Never)?; + + // Check that no records were produced in read committed mode, but that + // the records are visible in read uncommitted mode. + assert_eq!( + count_records(&produce_topic, IsolationLevel::ReadCommitted)?, + 0, + ); + assert_eq!( + count_records(&produce_topic, IsolationLevel::ReadUncommitted)?, + 10, + ); + + // Check that the consumer's committed offset is still 10. + let committed = consumer.committed(Timeout::Never)?; + assert_eq!( + committed + .find_partition(&consume_topic, 0) + .unwrap() + .offset(), + Offset::Offset(10) + ); + + Ok(()) +} + +#[tokio::test] +async fn test_transaction_commit() -> Result<(), Box> { + let consume_topic = rand_test_topic(); + let produce_topic = rand_test_topic(); + + populate_topic(&consume_topic, 30, &value_fn, &key_fn, Some(0), None).await; + + // Create consumer and subscribe to `consume_topic`. + let consumer = create_consumer(None)?; + consumer.subscribe(&[&consume_topic])?; + consumer.poll(Timeout::Never).unwrap()?; + + // Commit the first 10 messages. + let mut commit_tpl = TopicPartitionList::new(); + commit_tpl.add_partition_offset(&consume_topic, 0, Offset::Offset(10)); + consumer.commit(&commit_tpl, CommitMode::Sync).unwrap(); + + // Create a producer and start a transaction. + let producer = create_producer()?; + producer.init_transactions(Timeout::Never)?; + producer.begin_transaction()?; + + // Tie the commit of offset 20 to the transaction. + let cgm = consumer.group_metadata().unwrap(); + let mut txn_tpl = TopicPartitionList::new(); + txn_tpl.add_partition_offset(&consume_topic, 0, Offset::Offset(20)); + producer.send_offsets_to_transaction(&txn_tpl, &cgm, Timeout::Never)?; + + // Produce 10 records in the transaction. + for _ in 0..10 { + producer + .send( + BaseRecord::to(&produce_topic) + .payload("A") + .key("B") + .partition(0), + ) + .unwrap(); + } + + // Commit the transaction. + producer.commit_transaction(Timeout::Never)?; + + // Check that 10 records were produced. + assert_eq!( + count_records(&produce_topic, IsolationLevel::ReadUncommitted)?, + 10, + ); + assert_eq!( + count_records(&produce_topic, IsolationLevel::ReadCommitted)?, + 10, + ); + + // Check that the consumer's committed offset is now 20. + let committed = consumer.committed(Timeout::Never)?; + assert_eq!( + committed + .find_partition(&consume_topic, 0) + .unwrap() + .offset(), + Offset::Offset(20) + ); + + Ok(()) +} diff --git a/tests/utils.rs b/tests/utils.rs index 6eef5f217..8257913dc 100644 --- a/tests/utils.rs +++ b/tests/utils.rs @@ -46,6 +46,14 @@ pub fn rand_test_group() -> String { format!("__test_{}", id) } +pub fn rand_test_transactional_id() -> String { + let id = rand::thread_rng() + .gen_ascii_chars() + .take(10) + .collect::(); + format!("__test_{}", id) +} + pub fn get_bootstrap_server() -> String { env::var("KAFKA_HOST").unwrap_or_else(|_| "localhost:9092".to_owned()) }