Skip to content

Commit

Permalink
Add transactional producer support
Browse files Browse the repository at this point in the history
Co-authored-by: Robert Ignat <robert.ignat91@gmail.com>

Closes fede1024#289.
Closes fede1024#293.
  • Loading branch information
benesch committed Jan 5, 2021
1 parent 293e67e commit 60da85b
Show file tree
Hide file tree
Showing 14 changed files with 758 additions and 35 deletions.
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ The main features provided at the moment are:
- Access to group metadata (list groups, list members of groups, hostnames,
etc.).
- Access to producer and consumer metrics, errors and callbacks.
- Exactly-once semantics (EOS) via idempotent and transactional producers
and read-committed consumers.

### One million messages per second

Expand Down Expand Up @@ -116,6 +118,19 @@ To see how to implement at-least-once delivery with `rdkafka`, check out the
delivery semantics, check the [message delivery semantics] chapter in the
Kafka documentation.

### Exactly-once semantics

Exactly-once semantics (EOS) can be achieved using transactional producers,
which allow produced records and consumer offsets to be committed or aborted
atomically. Consumers that set their `isolation.level` to `read_committed`
will only observe committed messages.

EOS is useful in read-process-write scenarios that require messages to be
processed exactly once.

To learn more about using transactions in rust-rdkafka, see the
[Transactions](producer-transactions) section of the producer documentation.

### Users

Here are some of the projects using rust-rdkafka:
Expand Down Expand Up @@ -239,6 +254,7 @@ logging framework.
[librdkafka]: https://github.com/edenhill/librdkafka
[librdkafka-config]: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
[message delivery semantics]: https://kafka.apache.org/0101/documentation.html#semantics
[producer-transactions]: https://docs.rs/rdkafka/*/rdkafka/producer/#transactions
[rdkafka-sys-features]: https://github.com/fede1024/rust-rdkafka/tree/master/rdkafka-sys/README.md#features
[rdkafka-sys-known-issues]: https://github.com/fede1024/rust-rdkafka/tree/master/rdkafka-sys/README.md#known-issues
[smol]: https://docs.rs/smol
Expand Down
8 changes: 8 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).
<a name="0.25.0"></a>
## 0.25.0 (Unreleased)

* Add support for transactional producers. The new methods are
`Producer::init_transactions`, `Producer::begin_transaction`,
`Producer::commit_transaction`, `Producer::abort_transaction`, and
`Producer::send_offsets_to_transaction`.

Thanks to [@roignpar] for the implementation.

* **Breaking change.** Rename `RDKafkaError` to `RDKafkaErrorCode`. This makes
space for the new `RDKafkaError` type, which mirrors the `rd_kafka_error_t`
type added to librdkafka in v1.4.0.
Expand Down Expand Up @@ -84,6 +91,7 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).
when the `tokio` feature is disabled.

[@Marwes]: https://github.com/Marwes
[@roignpar]: https://github.com/roignpar

<a name="0.24.0"></a>
## 0.24.0 (2020-07-08)
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ services:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
- KAFKA_NUM_PARTITIONS=3
- CONFLUENT_SUPPORT_METRICS_ENABLE=0
ports: ["9092:9092"]
Expand Down
12 changes: 6 additions & 6 deletions rdkafka-sys/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
//! Aliases for types defined in the auto-generated bindings.
use std::convert::TryFrom;
use std::error::Error;
use std::ffi::CStr;
use std::{error, fmt};
use std::fmt;

use crate::bindings;
use crate::helpers;
Expand Down Expand Up @@ -42,6 +43,9 @@ pub type RDKafkaMetadataPartition = bindings::rd_kafka_metadata_partition_t;
/// Native rdkafka broker information.
pub type RDKafkaMetadataBroker = bindings::rd_kafka_metadata_broker_t;

/// Native rdkafka consumer group metadata.
pub type RDKafkaConsumerGroupMetadata = bindings::rd_kafka_consumer_group_metadata_t;

/// Native rdkafka state.
pub type RDKafkaState = bindings::rd_kafka_s;

Expand Down Expand Up @@ -435,11 +439,7 @@ impl fmt::Display for RDKafkaErrorCode {
}
}

impl error::Error for RDKafkaErrorCode {
fn description(&self) -> &str {
"Error from underlying rdkafka library"
}
}
impl Error for RDKafkaErrorCode {}

#[cfg(test)]
mod tests {
Expand Down
13 changes: 12 additions & 1 deletion src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use rdkafka_sys::types::*;

use crate::client::{Client, NativeClient, NativeQueue};
use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext};
use crate::consumer::{CommitMode, Consumer, ConsumerContext, DefaultConsumerContext};
use crate::consumer::{
CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext,
};
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::groups::GroupList;
use crate::message::{BorrowedMessage, Message};
Expand Down Expand Up @@ -271,6 +273,15 @@ where
&self.client
}

fn group_metadata(&self) -> Option<ConsumerGroupMetadata> {
let ptr = unsafe {
NativePtr::from_ptr(rdsys::rd_kafka_consumer_group_metadata(
self.client.native_ptr(),
))
}?;
Some(ConsumerGroupMetadata(ptr))
}

fn subscribe(&self, topics: &[&str]) -> KafkaResult<()> {
let mut tpl = TopicPartitionList::new();
for topic in topics {
Expand Down
32 changes: 31 additions & 1 deletion src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::groups::GroupList;
use crate::message::BorrowedMessage;
use crate::metadata::Metadata;
use crate::topic_partition_list::{Offset, TopicPartitionList};
use crate::util::{cstr_to_owned, Timeout};
use crate::util::{cstr_to_owned, KafkaDrop, NativePtr, Timeout};

pub mod base_consumer;
pub mod stream_consumer;
Expand Down Expand Up @@ -146,6 +146,27 @@ pub enum CommitMode {
Async = 1,
}

/// Consumer group metadata.
///
/// For use with [`Producer::send_offsets_to_transaction`].
///
/// [`Producer::send_offsets_to_transaction`]: crate::producer::Producer::send_offsets_to_transaction
pub struct ConsumerGroupMetadata(NativePtr<RDKafkaConsumerGroupMetadata>);

impl ConsumerGroupMetadata {
pub(crate) fn ptr(&self) -> *const RDKafkaConsumerGroupMetadata {
self.0.ptr()
}
}

unsafe impl KafkaDrop for RDKafkaConsumerGroupMetadata {
const TYPE: &'static str = "consumer_group_metadata";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_consumer_group_metadata_destroy;
}

unsafe impl Send for ConsumerGroupMetadata {}
unsafe impl Sync for ConsumerGroupMetadata {}

/// Common trait for all consumers.
///
/// # Note about object safety
Expand All @@ -167,6 +188,15 @@ where
self.client().context()
}

/// Returns the current consumer group metadata associated with the
/// consumer.
///
/// If the consumer was not configured with a `group.id`, returns `None`.
/// For use with [`Producer::send_offsets_to_transaction`].
///
/// [`Producer::send_offsets_to_transaction`]: crate::producer::Producer::send_offsets_to_transaction
fn group_metadata(&self) -> Option<ConsumerGroupMetadata>;

/// Subscribes the consumer to a list of topics.
fn subscribe(&self, topics: &[&str]) -> KafkaResult<()>;

Expand Down
8 changes: 7 additions & 1 deletion src/consumer/stream_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use rdkafka_sys::types::*;
use crate::client::{Client, ClientContext, NativeClient};
use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext, RDKafkaLogLevel};
use crate::consumer::base_consumer::BaseConsumer;
use crate::consumer::{CommitMode, Consumer, ConsumerContext, DefaultConsumerContext, Rebalance};
use crate::consumer::{
CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext, Rebalance,
};
use crate::error::{KafkaError, KafkaResult};
use crate::groups::GroupList;
use crate::message::BorrowedMessage;
Expand Down Expand Up @@ -345,6 +347,10 @@ where
self.base.client()
}

fn group_metadata(&self) -> Option<ConsumerGroupMetadata> {
self.base.group_metadata()
}

fn subscribe(&self, topics: &[&str]) -> KafkaResult<()> {
self.base.subscribe(topics)
}
Expand Down
Loading

0 comments on commit 60da85b

Please sign in to comment.