diff --git a/docker-compose.yaml b/docker-compose.yaml index baee434c4..85bba5efe 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -8,6 +8,8 @@ services: - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 + - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 + - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 - KAFKA_NUM_PARTITIONS=3 - CONFLUENT_SUPPORT_METRICS_ENABLE=0 ports: ["9092:9092"] diff --git a/examples/transactions.rs b/examples/transactions.rs new file mode 100644 index 000000000..1c3c04440 --- /dev/null +++ b/examples/transactions.rs @@ -0,0 +1,210 @@ +use std::time::Duration; + +use clap::{App, Arg}; +use futures::StreamExt; +use log::{info, warn}; + +use rdkafka::config::ClientConfig; +use rdkafka::consumer::stream_consumer::StreamConsumer; +use rdkafka::consumer::Consumer; +use rdkafka::message::Message; +use rdkafka::producer::future_producer::{FutureProducer, FutureRecord}; +use rdkafka::util::get_rdkafka_version; + +use crate::example_utils::setup_logger; + +mod example_utils; + +const TIMEOUT: Duration = Duration::from_secs(5); + +// Start a consumer and a transactional producer. +// The consumer will subscribe to all in topics and the producer will forward +// all incoming messages to the out topic, committing a transaction for every +// 100 consumed and produced (processed) messages. +async fn process( + brokers: &str, + group_id: &str, + in_topics: &[&str], + transactional_id: &str, + out_topic: &str, +) { + let consumer: StreamConsumer = ClientConfig::new() + .set("bootstrap.servers", brokers) + .set("group.id", group_id) + .set("enable.partition.eof", "false") + .set("enable.auto.commit", "false") + .create() + .expect("Consumer creation failed"); + + let producer: FutureProducer = ClientConfig::new() + .set("bootstrap.servers", brokers) + .set("transactional.id", transactional_id) + .set("message.timeout.ms", "5000") + .set("transaction.timeout.ms", "900000") + .create() + .expect("Producer creation failed"); + + consumer + .subscribe(in_topics) + .expect("Can't subscribe to in topics"); + + producer + .init_transactions(TIMEOUT) + .expect("Can't init transactions"); + + producer + .begin_transaction() + .expect("Can't begin transaction"); + + let mut message_stream = consumer.start(); + + let mut processed = 0; + + while let Some(message) = message_stream.next().await { + match message { + Err(e) => { + warn!("Kafka error consuming messages: {}", e); + reset_transaction(&producer, &consumer); + } + + Ok(m) => { + let record = FutureRecord::to(out_topic) + .key(m.key().unwrap_or(b"")) + .payload(m.payload().unwrap_or(b"")); + + match producer.send(record, TIMEOUT).await { + Err((e, _)) => { + warn!("Kafka error producing message: {}", e); + reset_transaction(&producer, &consumer); + } + Ok(_) => { + processed += 1; + println!("sent message to {} ", out_topic); + } + } + } + } + + if processed >= 100 { + // send current consumer position to transaction + let position = &consumer.position().expect("Can't get consumer position"); + + producer + .send_offsets_to_transaction(position, &consumer.group_metadata(), TIMEOUT) + .expect("Can't send consumer offsets to transaction"); + + producer + .commit_transaction(TIMEOUT) + .expect("Can't commit transaction"); + + // start a new transaction + producer + .begin_transaction() + .expect("Can't begin transaction"); + + processed = 0; + } + } +} + +// Abort the current transaction, seek consumer to last committed offsets and +// start a new transaction. +fn reset_transaction(producer: &FutureProducer, consumer: &StreamConsumer) { + producer + .abort_transaction(TIMEOUT) + .expect("Can't abort transaction"); + + seek_to_committed(consumer); + + producer + .begin_transaction() + .expect("Can't begin transaction"); +} + +fn seek_to_committed(consumer: &StreamConsumer) { + let committed = consumer + .committed(TIMEOUT) + .expect("Can't get consumer committed offsets"); + + for e in committed.elements().iter() { + consumer + .seek(e.topic(), e.partition(), e.offset(), TIMEOUT) + .expect(&format!( + "Can't seek consumer to {}, {}: {:?}", + e.topic(), + e.partition(), + e.offset() + )); + } +} + +#[tokio::main] +async fn main() { + let matches = App::new("transactions example") + .version(option_env!("CARGO_PKG_VERSION").unwrap_or("")) + .about("Command line transactional message processor") + .arg( + Arg::with_name("brokers") + .short("b") + .long("brokers") + .help("Broker list in kafka format") + .takes_value(true) + .default_value("localhost:9092"), + ) + .arg( + Arg::with_name("log-conf") + .long("log-conf") + .help("Configure the logging format (example: 'rdkafka=trace')") + .takes_value(true), + ) + .arg( + Arg::with_name("group-id") + .short("g") + .long("group-id") + .help("Consumer group id") + .takes_value(true) + .default_value("example_transaction_consumer_group_id"), + ) + .arg( + Arg::with_name("in-topics") + .short("i") + .long("in-topics") + .help("Topic list to consume from") + .takes_value(true) + .multiple(true) + .required(true), + ) + .arg( + Arg::with_name("transactional-id") + .short("t") + .long("transactional-id") + .help("Producer transactional id") + .takes_value(true) + .default_value("example_transaction_producer_id"), + ) + .arg( + Arg::with_name("out-topic") + .short("o") + .long("out-topic") + .help("Topic to produce to") + .takes_value(true) + .required(true), + ) + .get_matches(); + + setup_logger(true, matches.value_of("log-conf")); + + let (version_n, version_s) = get_rdkafka_version(); + info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s); + + let brokers = matches.value_of("brokers").unwrap(); + let group_id = matches.value_of("group-id").unwrap(); + let in_topics = matches + .values_of("in-topics") + .unwrap() + .collect::>(); + let transactional_id = matches.value_of("transactional-id").unwrap(); + let out_topic = matches.value_of("out-topic").unwrap(); + + process(brokers, group_id, &in_topics, transactional_id, out_topic).await; +} diff --git a/rdkafka-sys/src/types.rs b/rdkafka-sys/src/types.rs index d98c2a611..69633e7e3 100644 --- a/rdkafka-sys/src/types.rs +++ b/rdkafka-sys/src/types.rs @@ -81,6 +81,60 @@ pub type RDKafkaAdminOptions = bindings::rd_kafka_AdminOptions_t; /// Native rdkafka topic result. pub type RDKafkaTopicResult = bindings::rd_kafka_topic_result_t; +/// Native rdkafka consumer group metadata. +pub type RDKafkaConsumerGroupMetadata = bindings::rd_kafka_consumer_group_metadata_t; + +/// Native rdkafka error. +pub type RDKafkaError = bindings::rd_kafka_error_t; + +impl RDKafkaError { + pub fn code(&self) -> RDKafkaErrorCode { + unsafe { bindings::rd_kafka_error_code(self).into() } + } + + pub fn name(&self) -> String { + let cstr = unsafe { bindings::rd_kafka_error_name(self) }; + unsafe { CStr::from_ptr(cstr).to_string_lossy().into_owned() } + } + + pub fn string(&self) -> String { + let cstr = unsafe { bindings::rd_kafka_error_string(self) }; + unsafe { CStr::from_ptr(cstr).to_string_lossy().into_owned() } + } + + pub fn is_fatal(&self) -> bool { + unsafe { bindings::rd_kafka_error_is_fatal(self) == 1 } + } + + pub fn is_retriable(&self) -> bool { + unsafe { bindings::rd_kafka_error_is_retriable(self) == 1 } + } + + pub fn txn_requires_abort(&self) -> bool { + unsafe { bindings::rd_kafka_error_txn_requires_abort(self) == 1 } + } +} + +impl PartialEq for RDKafkaError { + fn eq(&self, other: &RDKafkaError) -> bool { + self.code() == other.code() + } +} + +impl Eq for RDKafkaError {} + +impl fmt::Display for RDKafkaError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.code().fmt(f) + } +} + +impl error::Error for RDKafkaError { + fn description(&self) -> &str { + "Error from underlying rdkafka library" + } +} + // ENUMS /// Client types. diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index 1e46980ee..d0e44a529 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -14,7 +14,9 @@ use rdkafka_sys::types::*; use crate::client::{Client, NativeClient, NativeQueue}; use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext}; -use crate::consumer::{CommitMode, Consumer, ConsumerContext, DefaultConsumerContext}; +use crate::consumer::{ + CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext, +}; use crate::error::{IsError, KafkaError, KafkaResult}; use crate::groups::GroupList; use crate::message::{BorrowedMessage, Message}; @@ -537,6 +539,13 @@ impl Consumer for BaseConsumer { }; Ok(()) } + + fn group_metadata(&self) -> ConsumerGroupMetadata { + unsafe { + let ptr = rdsys::rd_kafka_consumer_group_metadata(self.client.native_ptr()); + ConsumerGroupMetadata::from_ptr(ptr) + } + } } impl Drop for BaseConsumer { diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 9160bfada..230113cc4 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -14,7 +14,7 @@ use crate::groups::GroupList; use crate::message::BorrowedMessage; use crate::metadata::Metadata; use crate::topic_partition_list::{Offset, TopicPartitionList}; -use crate::util::{cstr_to_owned, Timeout}; +use crate::util::{cstr_to_owned, KafkaDrop, NativePtr, Timeout}; pub mod base_consumer; pub mod stream_consumer; @@ -143,6 +143,32 @@ pub enum CommitMode { Async = 1, } +/// Consumer group metadata used to send offsets to producer transactions. +pub struct ConsumerGroupMetadata { + ptr: NativePtr, +} + +unsafe impl KafkaDrop for RDKafkaConsumerGroupMetadata { + const TYPE: &'static str = "consumer_group_metadata"; + const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_consumer_group_metadata_destroy; +} + +impl ConsumerGroupMetadata { + pub(crate) unsafe fn from_ptr(ptr: *mut RDKafkaConsumerGroupMetadata) -> ConsumerGroupMetadata { + ConsumerGroupMetadata { + ptr: NativePtr::from_ptr(ptr).unwrap(), + } + } + + /// Returns the pointer to the internal librdkafka structure. + pub fn ptr(&self) -> *mut RDKafkaConsumerGroupMetadata { + self.ptr.ptr() + } +} + +unsafe impl Send for ConsumerGroupMetadata {} +unsafe impl Sync for ConsumerGroupMetadata {} + /// Common trait for all consumers. /// /// # Note about object safety @@ -339,4 +365,10 @@ pub trait Consumer { fn client(&self) -> &Client { self.get_base_consumer().client() } + + /// Returns the consumer group metadata needed to send offsets to producer + /// transactions. + fn group_metadata(&self) -> ConsumerGroupMetadata { + self.get_base_consumer().group_metadata() + } } diff --git a/src/error.rs b/src/error.rs index 80d9505ee..02898c54f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,8 +5,10 @@ use std::{error, ffi, fmt}; use rdkafka_sys as rdsys; use rdkafka_sys::types::*; -// Re-export rdkafka error code -pub use rdsys::types::RDKafkaErrorCode; +use crate::util::KafkaDrop; + +// Re-export rdkafka error types +pub use rdsys::types::{RDKafkaError, RDKafkaErrorCode}; /// Kafka result. pub type KafkaResult = Result; @@ -31,6 +33,17 @@ impl IsError for RDKafkaConfRes { } } +impl IsError for *const RDKafkaError { + fn is_error(self) -> bool { + !self.is_null() + } +} + +unsafe impl KafkaDrop for RDKafkaError { + const TYPE: &'static str = "error"; + const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_error_destroy; +} + // TODO: consider using macro /// Represents all possible Kafka errors. @@ -78,6 +91,8 @@ pub enum KafkaError { StoreOffset(RDKafkaErrorCode), /// Subscription creation failed. Subscription(String), + /// Transaction error. + Transaction(RDKafkaError), } impl fmt::Debug for KafkaError { @@ -129,6 +144,7 @@ impl fmt::Debug for KafkaError { KafkaError::Subscription(ref err) => { write!(f, "KafkaError (Subscription error: {})", err) } + KafkaError::Transaction(err) => write!(f, "KafkaError (Transaction error: {})", err), } } } @@ -162,14 +178,36 @@ impl fmt::Display for KafkaError { KafkaError::SetPartitionOffset(err) => write!(f, "Set partition offset error: {}", err), KafkaError::StoreOffset(err) => write!(f, "Store offset error: {}", err), KafkaError::Subscription(ref err) => write!(f, "Subscription error: {}", err), + KafkaError::Transaction(err) => write!(f, "Transaction error: {}", err), } } } impl error::Error for KafkaError { fn source(&self) -> Option<&(dyn error::Error + 'static)> { - self.rdkafka_error_code() - .map(|e| e as &(dyn error::Error + 'static)) + match self { + KafkaError::AdminOp(_) => None, + KafkaError::AdminOpCreation(_) => None, + KafkaError::Canceled => None, + KafkaError::ClientConfig(_, _, _, _) => None, + KafkaError::ClientCreation(_) => None, + KafkaError::ConsumerCommit(err) => Some(err), + KafkaError::Global(err) => Some(err), + KafkaError::GroupListFetch(err) => Some(err), + KafkaError::MessageConsumption(err) => Some(err), + KafkaError::MessageProduction(err) => Some(err), + KafkaError::MetadataFetch(err) => Some(err), + KafkaError::NoMessageReceived => None, + KafkaError::Nul(_) => None, + KafkaError::OffsetFetch(err) => Some(err), + KafkaError::PartitionEOF(_) => None, + KafkaError::PauseResume(_) => None, + KafkaError::Seek(_) => None, + KafkaError::SetPartitionOffset(err) => Some(err), + KafkaError::StoreOffset(err) => Some(err), + KafkaError::Subscription(_) => None, + KafkaError::Transaction(err) => Some(err), + } } } @@ -180,39 +218,43 @@ impl From for KafkaError { } impl KafkaError { - /// Returns if an error is `Fatal` and requires reinitialisation. + /// Returns if an error is fatal and requires reinitialisation. /// for details see https://docs.confluent.io/5.5.0/clients/librdkafka/rdkafka_8h.html pub fn is_fatal(&self) -> bool { - match self.rdkafka_error_code() { - Some(RDKafkaErrorCode::Fatal) => true, - _ => false, + if let Some(RDKafkaErrorCode::Fatal) = self.rdkafka_error_code() { + true + } else if let KafkaError::Transaction(err) = self { + err.is_fatal() + } else { + false } } /// Returns the [`RDKafkaErrorCode`] underlying this error, if any. #[allow(clippy::match_same_arms)] - pub fn rdkafka_error_code(&self) -> Option<&RDKafkaErrorCode> { + pub fn rdkafka_error_code(&self) -> Option { match self { KafkaError::AdminOp(_) => None, KafkaError::AdminOpCreation(_) => None, KafkaError::Canceled => None, KafkaError::ClientConfig(_, _, _, _) => None, KafkaError::ClientCreation(_) => None, - KafkaError::ConsumerCommit(err) => Some(err), - KafkaError::Global(err) => Some(err), - KafkaError::GroupListFetch(err) => Some(err), - KafkaError::MessageConsumption(err) => Some(err), - KafkaError::MessageProduction(err) => Some(err), - KafkaError::MetadataFetch(err) => Some(err), + KafkaError::ConsumerCommit(err) => Some(*err), + KafkaError::Global(err) => Some(*err), + KafkaError::GroupListFetch(err) => Some(*err), + KafkaError::MessageConsumption(err) => Some(*err), + KafkaError::MessageProduction(err) => Some(*err), + KafkaError::MetadataFetch(err) => Some(*err), KafkaError::NoMessageReceived => None, KafkaError::Nul(_) => None, - KafkaError::OffsetFetch(err) => Some(err), + KafkaError::OffsetFetch(err) => Some(*err), KafkaError::PartitionEOF(_) => None, KafkaError::PauseResume(_) => None, KafkaError::Seek(_) => None, - KafkaError::SetPartitionOffset(err) => Some(err), - KafkaError::StoreOffset(err) => Some(err), + KafkaError::SetPartitionOffset(err) => Some(*err), + KafkaError::StoreOffset(err) => Some(*err), KafkaError::Subscription(_) => None, + KafkaError::Transaction(err) => Some(err.code()), } } } diff --git a/src/lib.rs b/src/lib.rs index 9d7190d52..fbc80bbb0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,6 +31,7 @@ //! - Access to group metadata (list groups, list members of groups, hostnames, //! etc.). //! - Access to producer and consumer metrics, errors and callbacks. +//! - Transactional producers. //! //! ### One million messages per second //! @@ -108,6 +109,17 @@ //! delivery semantics, check the [message delivery semantics] chapter in the //! Kafka documentation. //! +//! ### Exactly-once semantics +//! +//! Exactly-once semantics (EOS) can be achieved by using transactional producers and +//! transaction aware consumers. This allows produced records and consumer +//! offsets to be committed or aborted atomically and only committed messages +//! to be read by the consumer. EOS is useful in read-process-write scenarios +//! that require messages to be processed exactly once. +//! +//! To learn more about using transactions in `rdkafka` check the +//! [Transactional producer API documentation] and the [transactions example]. +//! //! ### Users //! //! Here are some of the projects using rust-rdkafka: @@ -210,11 +222,13 @@ //! [`Stream`]: https://docs.rs/futures/*/futures/stream/trait.Stream.html //! [`StreamConsumer`]: https://docs.rs/rdkafka/*/rdkafka/consumer/stream_consumer/struct.StreamConsumer.html //! [`ThreadedProducer`]: https://docs.rs/rdkafka/*/rdkafka/producer/base_producer/struct.ThreadedProducer.html +//! [Transactional producer API documentation]: https://docs.rs/rdkafka/*/rdkafka/producer/index.html#transactional-producer-api //! [`log`]: https://docs.rs/log //! [`env_logger`]: https://docs.rs/env_logger //! [Apache Kafka]: https://kafka.apache.org //! [asynchronous processing example]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/asynchronous_processing.rs //! [at-least-once delivery example]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/at_least_once.rs +//! [transactions example]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/transactions.rs //! [smol runtime example]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/smol_runtime.rs //! [broker-compat]: https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#broker-version-compatibility //! [`examples`]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/ diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index 324c72504..f3360dddf 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -58,9 +58,11 @@ use rdkafka_sys::types::*; use crate::client::{Client, ClientContext}; use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext}; +use crate::consumer::ConsumerGroupMetadata; use crate::error::{IsError, KafkaError, KafkaResult}; use crate::message::{BorrowedMessage, OwnedHeaders, ToBytes}; use crate::util::{IntoOpaque, Timeout}; +use crate::TopicPartitionList; pub use crate::message::DeliveryResult; @@ -426,6 +428,91 @@ impl BaseProducer { pub fn client(&self) -> &Client { &*self.client_arc } + + /// Initialize transactions for the producer instance. + /// + /// Requires that a `transactional.id` be set for the producer and must be + /// called before any other transaction or send operations. + pub fn init_transactions>(&self, timeout: T) -> KafkaResult<()> { + let init_error: *const RDKafkaError = unsafe { + rdsys::rd_kafka_init_transactions(self.native_ptr(), timeout.into().as_millis()) + }; + if init_error.is_error() { + return unsafe { Err(KafkaError::Transaction(*init_error)) }; + } + Ok(()) + } + + /// Begin a new transaction. + /// + /// A successful call to `init_transactions` must be made before calling + /// `begin_transaction`. All records produced and consumer offsets sent + /// after calling will be committed or aborted atomically. + pub fn begin_transaction(&self) -> KafkaResult<()> { + let begin_error: *const RDKafkaError = + unsafe { rdsys::rd_kafka_begin_transaction(self.native_ptr()) }; + if begin_error.is_error() { + return unsafe { Err(KafkaError::Transaction(*begin_error)) }; + } + Ok(()) + } + + /// Sends a list of topic partition offsets to the consumer group + /// coordinator for `cgm`, and marks the offsets as part of the current + /// transaction. These offsets will be considered committed only if the + /// transaction is committed successfully. + /// + /// The offsets should be the next message your application will consume, + /// i.e.: the last processed message's offset + 1 for each partition. + /// + /// To get `ConsumerGroupMetadata` use `Consumer::group_metadata` on the + /// consumer that offsets are being sent for. + pub fn send_offsets_to_transaction>( + &self, + offsets: &TopicPartitionList, + cgm: &ConsumerGroupMetadata, + timeout: T, + ) -> KafkaResult<()> { + let send_error: *const RDKafkaError = unsafe { + rdsys::rd_kafka_send_offsets_to_transaction( + self.native_ptr(), + offsets.ptr(), + cgm.ptr(), + timeout.into().as_millis(), + ) + }; + if send_error.is_error() { + return unsafe { Err(KafkaError::Transaction(*send_error)) }; + } + Ok(()) + } + + /// Commit the current transaction. + /// + /// Flushes the producer before actually committing. + pub fn commit_transaction>(&self, timeout: T) -> KafkaResult<()> { + let commit_error: *const RDKafkaError = unsafe { + rdsys::rd_kafka_commit_transaction(self.native_ptr(), timeout.into().as_millis()) + }; + if commit_error.is_error() { + return unsafe { Err(KafkaError::Transaction(*commit_error)) }; + } + Ok(()) + } + + /// Aborts the ongoing transaction. + /// + /// Should be used when receiving non-fatal abortable transaction errors. + /// Outstanding messages will be purged. + pub fn abort_transaction>(&self, timeout: T) -> KafkaResult<()> { + let abort_error: *const RDKafkaError = unsafe { + rdsys::rd_kafka_abort_transaction(self.native_ptr(), timeout.into().as_millis()) + }; + if abort_error.is_error() { + return unsafe { Err(KafkaError::Transaction(*abort_error)) }; + } + Ok(()) + } } impl Clone for BaseProducer { @@ -541,6 +628,55 @@ impl ThreadedProducer { pub fn client(&self) -> &Client { self.producer.client() } + + /// Initialize transactions for the producer instance. + /// + /// Requires that a `transactional.id` be set for the producer and must be + /// called before any other transaction or send operations. + pub fn init_transactions>(&self, timeout: T) -> KafkaResult<()> { + self.producer.init_transactions(timeout) + } + + /// Begin a new transaction. + /// + /// A successful call to `init_transactions` must be made before calling + /// `begin_transaction`. All records produced and consumer offsets sent + /// after calling will be committed or aborted atomically. + pub fn begin_transaction(&self) -> KafkaResult<()> { + self.producer.begin_transaction() + } + + /// Sends a list of topic partition offsets to the consumer group + /// coordinator for `cgm`, and marks the offsets as part of the current + /// transaction. These offsets will be considered committed only if the + /// transaction is committed successfully. + /// + /// The offsets should be the next message your application will consume, + /// i.e.: the last processed message's offset + 1 for each partition. + pub fn send_offsets_to_transaction>( + &self, + offsets: &TopicPartitionList, + cgm: &ConsumerGroupMetadata, + timeout: T, + ) -> KafkaResult<()> { + self.producer + .send_offsets_to_transaction(offsets, cgm, timeout) + } + + /// Commit the current transaction. + /// + /// Flushes the producer before actually committing. + pub fn commit_transaction>(&self, timeout: T) -> KafkaResult<()> { + self.producer.commit_transaction(timeout) + } + + /// Abort the ongoing transaction. + /// + /// Should be used when receiving non-fatal abortable transaction errors. + /// Outstanding messages will be purged. + pub fn abort_transaction>(&self, timeout: T) -> KafkaResult<()> { + self.producer.abort_transaction(timeout) + } } impl Drop for ThreadedProducer { diff --git a/src/producer/future_producer.rs b/src/producer/future_producer.rs index e969c9988..96bb30bed 100644 --- a/src/producer/future_producer.rs +++ b/src/producer/future_producer.rs @@ -14,10 +14,12 @@ use futures::FutureExt; use crate::client::{Client, ClientContext, DefaultClientContext}; use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext, RDKafkaLogLevel}; +use crate::consumer::ConsumerGroupMetadata; use crate::error::{KafkaError, KafkaResult, RDKafkaErrorCode}; use crate::message::{Message, OwnedHeaders, OwnedMessage, Timestamp, ToBytes}; use crate::producer::{BaseRecord, DeliveryResult, ProducerContext, ThreadedProducer}; use crate::statistics::Statistics; +use crate::topic_partition_list::TopicPartitionList; #[cfg(feature = "tokio")] use crate::util::TokioRuntime; use crate::util::{AsyncRuntime, IntoOpaque, Timeout}; @@ -367,6 +369,55 @@ impl FutureProducer { pub fn client(&self) -> &Client> { self.producer.client() } + + /// Initialize transactions for the producer instance. + /// + /// Requires that a `transactional.id` be set for the producer and must be + /// called before any other transaction or send operations. + pub fn init_transactions>(&self, timeout: T) -> KafkaResult<()> { + self.producer.init_transactions(timeout) + } + + /// Begin a new transaction. + /// + /// A successful call to `init_transactions` must be made before calling + /// `begin_transaction`. All records produced and consumer offsets sent + /// after calling will be committed or aborted atomically. + pub fn begin_transaction(&self) -> KafkaResult<()> { + self.producer.begin_transaction() + } + + /// Sends a list of topic partition offsets to the consumer group + /// coordinator for `cgm`, and marks the offsets as part of the current + /// transaction. These offsets will be considered committed only if the + /// transaction is committed successfully. + /// + /// The offsets should be the next message your application will consume, + /// i.e.: the last processed message's offset + 1 for each partition. + pub fn send_offsets_to_transaction>( + &self, + offsets: &TopicPartitionList, + cgm: &ConsumerGroupMetadata, + timeout: T, + ) -> KafkaResult<()> { + self.producer + .send_offsets_to_transaction(offsets, cgm, timeout) + } + + /// Commit the current transaction. + /// + /// Flushes the producer before actually committing. + pub fn commit_transaction>(&self, timeout: T) -> KafkaResult<()> { + self.producer.commit_transaction(timeout) + } + + /// Abort the ongoing transaction. + /// + /// Should be used when receiving non-fatal abortable transaction errors. + /// Outstanding messages will be purged. + pub fn abort_transaction>(&self, timeout: T) -> KafkaResult<()> { + self.producer.abort_transaction(timeout) + } } #[cfg(test)] diff --git a/src/producer/mod.rs b/src/producer/mod.rs index cb30aef14..589cc8175 100644 --- a/src/producer/mod.rs +++ b/src/producer/mod.rs @@ -75,6 +75,46 @@ //! available (for more information, check the documentation of the futures //! crate). //! +//! ## Transactional producer API +//! +//! All `rust-rdkafka` producers support transactions. To configure a producer +//! for transactions set `transactional.id` to an identifier unique to the +//! application. +//! +//! Transactional producers work together with transaction aware consumers +//! configured with `isolation.level = read_committed` (default). +//! +//! After creating a transactional producer, it must be initialized with +//! `init_transactions`. +//! +//! To start a new transaction use `begin_transaction`. +//! There can be **only one ongoing transaction** at a time per producer. All +//! records sent after starting a transaction and before committing or aborting +//! it will be part of the current transaction. +//! +//! Consumer offsets can be sent as part of the ongoing transaction using +//! `send_offsets_to_transaction` and will be committed atomically with the +//! other records sent in the transaction. +//! +//! The current transaction can be committed with `commit_transaction` or +//! aborted using `abort_transaction`. Afterwards, a new transaction can begin. +//! +//! ### Errors +//! +//! Errors returned by transaction methods may: +//! * be retriable, `RDKafkaError::is_retriable`: indicates that the operation +//! may be retried. +//! * require abort, `RDKafkaError::txn_requires_abort`: the current transaction +//! must be aborted and a new one may begin. +//! * be fatal, `KafkaError::is_fatal` or `RDKafkaError::is_fatal`: the producer +//! must be stopped and the application terminated. +//! +//! For more details about transactions check the [librdkafka documentation], +//! "Transactional producer API" section. +//! +//! [librdkafka documentation]: +//! https://docs.confluent.io/5.5.1/clients/librdkafka/rdkafka_8h.html +//! //! ## Configuration //! //! ### Producer configuration diff --git a/tests/test_transactions.rs b/tests/test_transactions.rs new file mode 100644 index 000000000..0da298c3b --- /dev/null +++ b/tests/test_transactions.rs @@ -0,0 +1,159 @@ +//! Test transactions using low level consumer and producer. + +use std::collections::HashMap; +use std::time::Duration; + +use rdkafka::config::ClientConfig; +use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer, DefaultConsumerContext}; +use rdkafka::error::KafkaError; +use rdkafka::producer::{BaseProducer, BaseRecord, DefaultProducerContext}; +use rdkafka::topic_partition_list::{Offset, TopicPartitionList}; + +use crate::utils::*; + +mod utils; + +const TIMEOUT: Duration = Duration::from_secs(10); +const POPULATE_COUNT: i32 = 30; +const COMMIT_OFFSET: i64 = 10; +const SEND_OFFSET: i64 = 20; +const PRODUCE_COUNT: usize = 20; + +fn create_base_consumer(group_id: &str) -> BaseConsumer { + consumer_config( + group_id, + Some(map!( + "isolation.level" => "read_committed", + "enable.partition.eof" => "true" + )), + ) + .create() + .expect("Consumer creation failed") +} + +fn create_base_producer(transactional_id: &str) -> BaseProducer { + let mut config = ClientConfig::new(); + config + .set("bootstrap.servers", &get_bootstrap_server()) + .set("message.timeout.ms", "5000") + .set("enable.idempotence", "true") + .set("transactional.id", transactional_id) + .set("debug", "eos"); + config.set_log_level(rdkafka::config::RDKafkaLogLevel::Debug); + config.create().expect("Producer creation failed") +} + +async fn prepare_transaction( + consumer: &BaseConsumer, + producer: &BaseProducer, + consume_topic: &str, + produce_topic: &str, +) { + // populate consume_topic + populate_topic( + &consume_topic, + POPULATE_COUNT, + &value_fn, + &key_fn, + Some(0), + None, + ) + .await; + consumer.subscribe(&[&consume_topic]).unwrap(); + consumer.poll(TIMEOUT).unwrap().unwrap(); + + // commit initial consumer offset + let mut commit_tpl = TopicPartitionList::new(); + commit_tpl.add_partition_offset(&consume_topic, 0, Offset::Offset(COMMIT_OFFSET)); + consumer.commit(&commit_tpl, CommitMode::Sync).unwrap(); + + // start transaction + producer.init_transactions(TIMEOUT).unwrap(); + producer.begin_transaction().unwrap(); + + // send offsets to transaction + let cgm = consumer.group_metadata(); + let mut txn_tpl = TopicPartitionList::new(); + txn_tpl.add_partition_offset(consume_topic, 0, Offset::Offset(SEND_OFFSET)); + producer + .send_offsets_to_transaction(&txn_tpl, &cgm, TIMEOUT) + .unwrap(); + + // produce records in transaction + for _ in 0..PRODUCE_COUNT { + producer + .send( + BaseRecord::to(produce_topic) + .payload("A") + .key("B") + .partition(0), + ) + .unwrap(); + } +} + +fn assert_transaction( + consumer: &BaseConsumer, + consume_topic: &str, + produce_topic: &str, + consumer_offset: i64, + produced_count: usize, +) { + // check consumer committed offset + let committed = consumer.committed(TIMEOUT).unwrap(); + assert_eq!( + committed.find_partition(consume_topic, 0).unwrap().offset(), + Offset::Offset(consumer_offset) + ); + + // check how many records have been produced after the transaction + let txn_consumer = create_base_consumer(&rand_test_group()); + let mut tpl = TopicPartitionList::new(); + tpl.add_partition(produce_topic, 0); + txn_consumer.assign(&tpl).unwrap(); + + // count all messages in produce_topic + let consumed = txn_consumer + .iter() + .take_while(|r| !matches!(r, Err(KafkaError::PartitionEOF(_)))) + .count(); + + assert_eq!(consumed, produced_count); +} + +#[tokio::test] +async fn test_transaction_abort() { + let consumer = create_base_consumer(&rand_test_group()); + let producer = create_base_producer(&rand_test_transactional_id()); + + let consume_topic = rand_test_topic(); + let produce_topic = rand_test_topic(); + + prepare_transaction(&consumer, &producer, &consume_topic, &produce_topic).await; + + producer.flush(TIMEOUT); + producer.abort_transaction(TIMEOUT).unwrap(); + + assert_transaction(&consumer, &consume_topic, &produce_topic, COMMIT_OFFSET, 0); +} + +#[tokio::test] +async fn test_transaction_commit() { + let consumer = create_base_consumer(&rand_test_group()); + let producer = create_base_producer(&rand_test_transactional_id()); + + let consume_topic = rand_test_topic(); + let produce_topic = rand_test_topic(); + + prepare_transaction(&consumer, &producer, &consume_topic, &produce_topic).await; + + producer.commit_transaction(TIMEOUT).unwrap(); + + assert_transaction( + &consumer, + &consume_topic, + &produce_topic, + SEND_OFFSET, + PRODUCE_COUNT, + ); +} diff --git a/tests/utils.rs b/tests/utils.rs index 6eef5f217..8257913dc 100644 --- a/tests/utils.rs +++ b/tests/utils.rs @@ -46,6 +46,14 @@ pub fn rand_test_group() -> String { format!("__test_{}", id) } +pub fn rand_test_transactional_id() -> String { + let id = rand::thread_rng() + .gen_ascii_chars() + .take(10) + .collect::(); + format!("__test_{}", id) +} + pub fn get_bootstrap_server() -> String { env::var("KAFKA_HOST").unwrap_or_else(|_| "localhost:9092".to_owned()) }