Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add producer transaction support #293

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ services:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
- KAFKA_NUM_PARTITIONS=3
- CONFLUENT_SUPPORT_METRICS_ENABLE=0
ports: ["9092:9092"]
Expand Down
210 changes: 210 additions & 0 deletions examples/transactions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
use std::time::Duration;

use clap::{App, Arg};
use futures::StreamExt;
use log::{info, warn};

use rdkafka::config::ClientConfig;
use rdkafka::consumer::stream_consumer::StreamConsumer;
use rdkafka::consumer::Consumer;
use rdkafka::message::Message;
use rdkafka::producer::future_producer::{FutureProducer, FutureRecord};
use rdkafka::util::get_rdkafka_version;

use crate::example_utils::setup_logger;

mod example_utils;

const TIMEOUT: Duration = Duration::from_secs(5);

// Start a consumer and a transactional producer.
// The consumer will subscribe to all in topics and the producer will forward
// all incoming messages to the out topic, committing a transaction for every
// 100 consumed and produced (processed) messages.
async fn process(
brokers: &str,
group_id: &str,
in_topics: &[&str],
transactional_id: &str,
out_topic: &str,
) {
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("group.id", group_id)
.set("enable.partition.eof", "false")
.set("enable.auto.commit", "false")
.create()
.expect("Consumer creation failed");

let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("transactional.id", transactional_id)
.set("message.timeout.ms", "5000")
.set("transaction.timeout.ms", "900000")
.create()
.expect("Producer creation failed");

consumer
.subscribe(in_topics)
.expect("Can't subscribe to in topics");

producer
.init_transactions(TIMEOUT)
.expect("Can't init transactions");

producer
.begin_transaction()
.expect("Can't begin transaction");

let mut message_stream = consumer.start();

let mut processed = 0;

while let Some(message) = message_stream.next().await {
match message {
Err(e) => {
warn!("Kafka error consuming messages: {}", e);
reset_transaction(&producer, &consumer);
}

Ok(m) => {
let record = FutureRecord::to(out_topic)
.key(m.key().unwrap_or(b""))
.payload(m.payload().unwrap_or(b""));

match producer.send(record, TIMEOUT).await {
Err((e, _)) => {
warn!("Kafka error producing message: {}", e);
reset_transaction(&producer, &consumer);
}
Ok(_) => {
processed += 1;
println!("sent message to {} ", out_topic);
}
}
}
}

if processed >= 100 {
// send current consumer position to transaction
let position = &consumer.position().expect("Can't get consumer position");

producer
.send_offsets_to_transaction(position, &consumer.group_metadata(), TIMEOUT)
.expect("Can't send consumer offsets to transaction");

producer
.commit_transaction(TIMEOUT)
.expect("Can't commit transaction");

// start a new transaction
producer
.begin_transaction()
.expect("Can't begin transaction");

processed = 0;
}
}
}

// Abort the current transaction, seek consumer to last committed offsets and
// start a new transaction.
fn reset_transaction(producer: &FutureProducer, consumer: &StreamConsumer) {
producer
.abort_transaction(TIMEOUT)
.expect("Can't abort transaction");

seek_to_committed(consumer);

producer
.begin_transaction()
.expect("Can't begin transaction");
}

fn seek_to_committed(consumer: &StreamConsumer) {
let committed = consumer
.committed(TIMEOUT)
.expect("Can't get consumer committed offsets");

for e in committed.elements().iter() {
consumer
.seek(e.topic(), e.partition(), e.offset(), TIMEOUT)
.expect(&format!(
"Can't seek consumer to {}, {}: {:?}",
e.topic(),
e.partition(),
e.offset()
));
}
}

#[tokio::main]
async fn main() {
let matches = App::new("transactions example")
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
.about("Command line transactional message processor")
.arg(
Arg::with_name("brokers")
.short("b")
.long("brokers")
.help("Broker list in kafka format")
.takes_value(true)
.default_value("localhost:9092"),
)
.arg(
Arg::with_name("log-conf")
.long("log-conf")
.help("Configure the logging format (example: 'rdkafka=trace')")
.takes_value(true),
)
.arg(
Arg::with_name("group-id")
.short("g")
.long("group-id")
.help("Consumer group id")
.takes_value(true)
.default_value("example_transaction_consumer_group_id"),
)
.arg(
Arg::with_name("in-topics")
.short("i")
.long("in-topics")
.help("Topic list to consume from")
.takes_value(true)
.multiple(true)
.required(true),
)
.arg(
Arg::with_name("transactional-id")
.short("t")
.long("transactional-id")
.help("Producer transactional id")
.takes_value(true)
.default_value("example_transaction_producer_id"),
)
.arg(
Arg::with_name("out-topic")
.short("o")
.long("out-topic")
.help("Topic to produce to")
.takes_value(true)
.required(true),
)
.get_matches();

setup_logger(true, matches.value_of("log-conf"));

let (version_n, version_s) = get_rdkafka_version();
info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s);

let brokers = matches.value_of("brokers").unwrap();
let group_id = matches.value_of("group-id").unwrap();
let in_topics = matches
.values_of("in-topics")
.unwrap()
.collect::<Vec<&str>>();
let transactional_id = matches.value_of("transactional-id").unwrap();
let out_topic = matches.value_of("out-topic").unwrap();

process(brokers, group_id, &in_topics, transactional_id, out_topic).await;
}
54 changes: 54 additions & 0 deletions rdkafka-sys/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,60 @@ pub type RDKafkaAdminOptions = bindings::rd_kafka_AdminOptions_t;
/// Native rdkafka topic result.
pub type RDKafkaTopicResult = bindings::rd_kafka_topic_result_t;

/// Native rdkafka consumer group metadata.
pub type RDKafkaConsumerGroupMetadata = bindings::rd_kafka_consumer_group_metadata_t;

/// Native rdkafka error.
pub type RDKafkaError = bindings::rd_kafka_error_t;

impl RDKafkaError {
pub fn code(&self) -> RDKafkaErrorCode {
unsafe { bindings::rd_kafka_error_code(self).into() }
}

pub fn name(&self) -> String {
let cstr = unsafe { bindings::rd_kafka_error_name(self) };
unsafe { CStr::from_ptr(cstr).to_string_lossy().into_owned() }
}

pub fn string(&self) -> String {
let cstr = unsafe { bindings::rd_kafka_error_string(self) };
unsafe { CStr::from_ptr(cstr).to_string_lossy().into_owned() }
}

pub fn is_fatal(&self) -> bool {
unsafe { bindings::rd_kafka_error_is_fatal(self) == 1 }
}

pub fn is_retriable(&self) -> bool {
unsafe { bindings::rd_kafka_error_is_retriable(self) == 1 }
}

pub fn txn_requires_abort(&self) -> bool {
unsafe { bindings::rd_kafka_error_txn_requires_abort(self) == 1 }
}
}

impl PartialEq for RDKafkaError {
fn eq(&self, other: &RDKafkaError) -> bool {
self.code() == other.code()
}
}

impl Eq for RDKafkaError {}

impl fmt::Display for RDKafkaError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.code().fmt(f)
}
}

impl error::Error for RDKafkaError {
fn description(&self) -> &str {
"Error from underlying rdkafka library"
}
}

// ENUMS

/// Client types.
Expand Down
11 changes: 10 additions & 1 deletion src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use rdkafka_sys::types::*;

use crate::client::{Client, NativeClient, NativeQueue};
use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext};
use crate::consumer::{CommitMode, Consumer, ConsumerContext, DefaultConsumerContext};
use crate::consumer::{
CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext,
};
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::groups::GroupList;
use crate::message::{BorrowedMessage, Message};
Expand Down Expand Up @@ -537,6 +539,13 @@ impl<C: ConsumerContext> Consumer<C> for BaseConsumer<C> {
};
Ok(())
}

fn group_metadata(&self) -> ConsumerGroupMetadata {
unsafe {
let ptr = rdsys::rd_kafka_consumer_group_metadata(self.client.native_ptr());
ConsumerGroupMetadata::from_ptr(ptr)
}
}
}

impl<C: ConsumerContext> Drop for BaseConsumer<C> {
Expand Down
34 changes: 33 additions & 1 deletion src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::groups::GroupList;
use crate::message::BorrowedMessage;
use crate::metadata::Metadata;
use crate::topic_partition_list::{Offset, TopicPartitionList};
use crate::util::{cstr_to_owned, Timeout};
use crate::util::{cstr_to_owned, KafkaDrop, NativePtr, Timeout};

pub mod base_consumer;
pub mod stream_consumer;
Expand Down Expand Up @@ -143,6 +143,32 @@ pub enum CommitMode {
Async = 1,
}

/// Consumer group metadata used to send offsets to producer transactions.
pub struct ConsumerGroupMetadata {
ptr: NativePtr<RDKafkaConsumerGroupMetadata>,
}

unsafe impl KafkaDrop for RDKafkaConsumerGroupMetadata {
const TYPE: &'static str = "consumer_group_metadata";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_consumer_group_metadata_destroy;
}

impl ConsumerGroupMetadata {
pub(crate) unsafe fn from_ptr(ptr: *mut RDKafkaConsumerGroupMetadata) -> ConsumerGroupMetadata {
ConsumerGroupMetadata {
ptr: NativePtr::from_ptr(ptr).unwrap(),
}
}

/// Returns the pointer to the internal librdkafka structure.
pub fn ptr(&self) -> *mut RDKafkaConsumerGroupMetadata {
self.ptr.ptr()
}
}

unsafe impl Send for ConsumerGroupMetadata {}
unsafe impl Sync for ConsumerGroupMetadata {}

/// Common trait for all consumers.
///
/// # Note about object safety
Expand Down Expand Up @@ -339,4 +365,10 @@ pub trait Consumer<C: ConsumerContext = DefaultConsumerContext> {
fn client(&self) -> &Client<C> {
self.get_base_consumer().client()
}

/// Returns the consumer group metadata needed to send offsets to producer
/// transactions.
fn group_metadata(&self) -> ConsumerGroupMetadata {
self.get_base_consumer().group_metadata()
}
}
Loading