Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add transactional producer support #323

Merged
merged 2 commits into from
Jan 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ The main features provided at the moment are:
- Access to group metadata (list groups, list members of groups, hostnames,
etc.).
- Access to producer and consumer metrics, errors and callbacks.
- Exactly-once semantics (EOS) via idempotent and transactional producers
and read-committed consumers.

### One million messages per second

Expand Down Expand Up @@ -116,6 +118,19 @@ To see how to implement at-least-once delivery with `rdkafka`, check out the
delivery semantics, check the [message delivery semantics] chapter in the
Kafka documentation.

### Exactly-once semantics

Exactly-once semantics (EOS) can be achieved using transactional producers,
which allow produced records and consumer offsets to be committed or aborted
atomically. Consumers that set their `isolation.level` to `read_committed`
will only observe committed messages.

EOS is useful in read-process-write scenarios that require messages to be
processed exactly once.

To learn more about using transactions in rust-rdkafka, see the
[Transactions](producer-transactions) section of the producer documentation.

### Users

Here are some of the projects using rust-rdkafka:
Expand Down Expand Up @@ -239,6 +254,7 @@ logging framework.
[librdkafka]: https://github.com/edenhill/librdkafka
[librdkafka-config]: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
[message delivery semantics]: https://kafka.apache.org/0101/documentation.html#semantics
[producer-transactions]: https://docs.rs/rdkafka/*/rdkafka/producer/#transactions
[rdkafka-sys-features]: https://github.com/fede1024/rust-rdkafka/tree/master/rdkafka-sys/README.md#features
[rdkafka-sys-known-issues]: https://github.com/fede1024/rust-rdkafka/tree/master/rdkafka-sys/README.md#known-issues
[smol]: https://docs.rs/smol
Expand Down
8 changes: 8 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).
<a name="0.25.0"></a>
## 0.25.0 (Unreleased)

* Add support for transactional producers. The new methods are
`Producer::init_transactions`, `Producer::begin_transaction`,
`Producer::commit_transaction`, `Producer::abort_transaction`, and
`Producer::send_offsets_to_transaction`.

Thanks to [@roignpar] for the implementation.

* **Breaking change.** Rename `RDKafkaError` to `RDKafkaErrorCode`. This makes
space for the new `RDKafkaError` type, which mirrors the `rd_kafka_error_t`
type added to librdkafka in v1.4.0.
Expand Down Expand Up @@ -84,6 +91,7 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).
when the `tokio` feature is disabled.

[@Marwes]: https://github.com/Marwes
[@roignpar]: https://github.com/roignpar

<a name="0.24.0"></a>
## 0.24.0 (2020-07-08)
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ services:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
- KAFKA_NUM_PARTITIONS=3
- CONFLUENT_SUPPORT_METRICS_ENABLE=0
ports: ["9092:9092"]
Expand Down
142 changes: 121 additions & 21 deletions rdkafka-sys/src/bindings.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,53 @@
/* automatically generated by rust-bindgen */
/* automatically generated by rust-bindgen 0.56.0 */

use libc::{c_char, c_int, c_void, size_t, sockaddr, ssize_t, FILE};
use num_enum::TryFromPrimitive;

pub const RD_KAFKA_VERSION: u32 = 17040127;
pub const RD_KAFKA_DEBUG_CONTEXTS : & 'static [ u8 ; 124usize ] = b"all,generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos,mock\0" ;
#[repr(C)]
pub struct __BindgenUnionField<T>(::std::marker::PhantomData<T>);
impl<T> __BindgenUnionField<T> {
#[inline]
pub const fn new() -> Self {
__BindgenUnionField(::std::marker::PhantomData)
}
#[inline]
pub unsafe fn as_ref(&self) -> &T {
::std::mem::transmute(self)
}
#[inline]
pub unsafe fn as_mut(&mut self) -> &mut T {
::std::mem::transmute(self)
}
}
impl<T> ::std::default::Default for __BindgenUnionField<T> {
#[inline]
fn default() -> Self {
Self::new()
}
}
impl<T> ::std::clone::Clone for __BindgenUnionField<T> {
#[inline]
fn clone(&self) -> Self {
Self::new()
}
}
impl<T> ::std::marker::Copy for __BindgenUnionField<T> {}
impl<T> ::std::fmt::Debug for __BindgenUnionField<T> {
fn fmt(&self, fmt: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
fmt.write_str("__BindgenUnionField")
}
}
impl<T> ::std::hash::Hash for __BindgenUnionField<T> {
fn hash<H: ::std::hash::Hasher>(&self, _state: &mut H) {}
}
impl<T> ::std::cmp::PartialEq for __BindgenUnionField<T> {
fn eq(&self, _other: &__BindgenUnionField<T>) -> bool {
true
}
}
impl<T> ::std::cmp::Eq for __BindgenUnionField<T> {}
pub const RD_KAFKA_VERSION: u32 = 17105919;
pub const RD_KAFKA_DEBUG_CONTEXTS : & 'static [u8 ; 124usize] = b"all,generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos,mock\0" ;
pub const RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE: u32 = 8;
pub const RD_KAFKA_OFFSET_BEGINNING: i32 = -2;
pub const RD_KAFKA_OFFSET_END: i32 = -1;
Expand Down Expand Up @@ -39,13 +82,13 @@ extern "C" {
pub fn rd_kafka_version_str() -> *const c_char;
}
#[repr(u32)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub enum rd_kafka_type_t {
RD_KAFKA_PRODUCER = 0,
RD_KAFKA_CONSUMER = 1,
}
#[repr(u32)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub enum rd_kafka_timestamp_type_t {
RD_KAFKA_TIMESTAMP_NOT_AVAILABLE = 0,
RD_KAFKA_TIMESTAMP_CREATE_TIME = 1,
Expand Down Expand Up @@ -108,8 +151,14 @@ pub struct rd_kafka_error_s {
_unused: [u8; 0],
}
pub type rd_kafka_error_t = rd_kafka_error_s;
#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct rd_kafka_headers_s {
_unused: [u8; 0],
}
pub type rd_kafka_headers_t = rd_kafka_headers_s;
#[repr(i32)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, TryFromPrimitive)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, TryFromPrimitive)]
pub enum rd_kafka_resp_err_t {
RD_KAFKA_RESP_ERR__BEGIN = -200,
RD_KAFKA_RESP_ERR__BAD_MSG = -199,
Expand Down Expand Up @@ -254,7 +303,13 @@ pub enum rd_kafka_resp_err_t {
RD_KAFKA_RESP_ERR_PREFERRED_LEADER_NOT_AVAILABLE = 80,
RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED = 81,
RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID = 82,
RD_KAFKA_RESP_ERR_END_ALL = 83,
RD_KAFKA_RESP_ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE = 83,
RD_KAFKA_RESP_ERR_ELECTION_NOT_NEEDED = 84,
RD_KAFKA_RESP_ERR_NO_REASSIGNMENT_IN_PROGRESS = 85,
RD_KAFKA_RESP_ERR_GROUP_SUBSCRIBED_TO_TOPIC = 86,
RD_KAFKA_RESP_ERR_INVALID_RECORD = 87,
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT = 88,
RD_KAFKA_RESP_ERR_END_ALL = 89,
}
#[repr(C)]
#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -414,7 +469,7 @@ extern "C" {
);
}
#[repr(u32)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub enum rd_kafka_vtype_t {
RD_KAFKA_VTYPE_END = 0,
RD_KAFKA_VTYPE_TOPIC = 1,
Expand All @@ -429,11 +484,36 @@ pub enum rd_kafka_vtype_t {
RD_KAFKA_VTYPE_HEADERS = 10,
}
#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct rd_kafka_headers_s {
_unused: [u8; 0],
pub struct rd_kafka_vu_s {
pub vtype: rd_kafka_vtype_t,
pub u: rd_kafka_vu_s__bindgen_ty_1,
}
pub type rd_kafka_headers_t = rd_kafka_headers_s;
#[repr(C)]
pub struct rd_kafka_vu_s__bindgen_ty_1 {
pub cstr: __BindgenUnionField<*const c_char>,
pub rkt: __BindgenUnionField<*mut rd_kafka_topic_t>,
pub i: __BindgenUnionField<c_int>,
pub i32_: __BindgenUnionField<i32>,
pub i64_: __BindgenUnionField<i64>,
pub mem: __BindgenUnionField<rd_kafka_vu_s__bindgen_ty_1__bindgen_ty_1>,
pub header: __BindgenUnionField<rd_kafka_vu_s__bindgen_ty_1__bindgen_ty_2>,
pub headers: __BindgenUnionField<*mut rd_kafka_headers_t>,
pub ptr: __BindgenUnionField<*mut c_void>,
pub _pad: __BindgenUnionField<[c_char; 64usize]>,
pub bindgen_union_field: [u64; 8usize],
}
#[repr(C)]
pub struct rd_kafka_vu_s__bindgen_ty_1__bindgen_ty_1 {
pub ptr: *mut c_void,
pub size: size_t,
}
#[repr(C)]
pub struct rd_kafka_vu_s__bindgen_ty_1__bindgen_ty_2 {
pub name: *const c_char,
pub val: *const c_void,
pub size: ssize_t,
}
pub type rd_kafka_vu_t = rd_kafka_vu_s;
extern "C" {
pub fn rd_kafka_headers_new(initial_count: size_t) -> *mut rd_kafka_headers_t;
}
Expand Down Expand Up @@ -500,6 +580,9 @@ pub type rd_kafka_message_t = rd_kafka_message_s;
extern "C" {
pub fn rd_kafka_message_destroy(rkmessage: *mut rd_kafka_message_t);
}
extern "C" {
pub fn rd_kafka_message_errstr(rkmessage: *const rd_kafka_message_t) -> *const c_char;
}
extern "C" {
pub fn rd_kafka_message_timestamp(
rkmessage: *const rd_kafka_message_t,
Expand All @@ -509,6 +592,9 @@ extern "C" {
extern "C" {
pub fn rd_kafka_message_latency(rkmessage: *const rd_kafka_message_t) -> i64;
}
extern "C" {
pub fn rd_kafka_message_broker_id(rkmessage: *const rd_kafka_message_t) -> i32;
}
extern "C" {
pub fn rd_kafka_message_headers(
rkmessage: *const rd_kafka_message_t,
Expand All @@ -531,7 +617,7 @@ extern "C" {
pub fn rd_kafka_header_cnt(hdrs: *const rd_kafka_headers_t) -> size_t;
}
#[repr(u32)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub enum rd_kafka_msg_status_t {
RD_KAFKA_MSG_STATUS_NOT_PERSISTED = 0,
RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED = 1,
Expand All @@ -541,7 +627,7 @@ extern "C" {
pub fn rd_kafka_message_status(rkmessage: *const rd_kafka_message_t) -> rd_kafka_msg_status_t;
}
#[repr(i32)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub enum rd_kafka_conf_res_t {
RD_KAFKA_CONF_UNKNOWN = -2,
RD_KAFKA_CONF_INVALID = -1,
Expand Down Expand Up @@ -769,15 +855,15 @@ extern "C" {
) -> rd_kafka_conf_res_t;
}
#[repr(u32)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub enum rd_kafka_cert_type_t {
RD_KAFKA_CERT_PUBLIC_KEY = 0,
RD_KAFKA_CERT_PRIVATE_KEY = 1,
RD_KAFKA_CERT_CA = 2,
RD_KAFKA_CERT__CNT = 3,
}
#[repr(u32)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub enum rd_kafka_cert_enc_t {
RD_KAFKA_CERT_ENC_PKCS12 = 0,
RD_KAFKA_CERT_ENC_DER = 1,
Expand Down Expand Up @@ -1318,6 +1404,13 @@ extern "C" {
extern "C" {
pub fn rd_kafka_producev(rk: *mut rd_kafka_t, ...) -> rd_kafka_resp_err_t;
}
extern "C" {
pub fn rd_kafka_produceva(
rk: *mut rd_kafka_t,
vus: *const rd_kafka_vu_t,
cnt: size_t,
) -> *mut rd_kafka_error_t;
}
extern "C" {
pub fn rd_kafka_produce_batch(
rkt: *mut rd_kafka_topic_t,
Expand Down Expand Up @@ -1466,7 +1559,7 @@ extern "C" {
pub fn rd_kafka_thread_cnt() -> c_int;
}
#[repr(u32)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub enum rd_kafka_thread_type_t {
RD_KAFKA_THREAD_MAIN = 0,
RD_KAFKA_THREAD_BACKGROUND = 1,
Expand Down Expand Up @@ -1523,10 +1616,17 @@ extern "C" {
pub fn rd_kafka_event_log(
rkev: *mut rd_kafka_event_t,
fac: *mut *const c_char,
str: *mut *const c_char,
str_: *mut *const c_char,
level: *mut c_int,
) -> c_int;
}
extern "C" {
pub fn rd_kafka_event_debug_contexts(
rkev: *mut rd_kafka_event_t,
dst: *mut c_char,
dstsize: size_t,
) -> c_int;
}
extern "C" {
pub fn rd_kafka_event_stats(rkev: *mut rd_kafka_event_t) -> *const c_char;
}
Expand Down Expand Up @@ -1788,7 +1888,7 @@ extern "C" {
pub fn rd_kafka_topic_result_name(topicres: *const rd_kafka_topic_result_t) -> *const c_char;
}
#[repr(u32)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub enum rd_kafka_admin_op_t {
RD_KAFKA_ADMIN_OP_ANY = 0,
RD_KAFKA_ADMIN_OP_CREATETOPICS = 1,
Expand Down Expand Up @@ -1989,7 +2089,7 @@ extern "C" {
) -> *mut *const rd_kafka_topic_result_t;
}
#[repr(u32)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub enum rd_kafka_ConfigSource_t {
RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG = 0,
RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG = 1,
Expand Down Expand Up @@ -2038,7 +2138,7 @@ extern "C" {
) -> *mut *const rd_kafka_ConfigEntry_t;
}
#[repr(u32)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub enum rd_kafka_ResourceType_t {
RD_KAFKA_RESOURCE_UNKNOWN = 0,
RD_KAFKA_RESOURCE_ANY = 1,
Expand Down
6 changes: 6 additions & 0 deletions rdkafka-sys/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ pub fn rd_kafka_resp_err_t_to_rdkafka_error(err: RDKafkaRespErr) -> RDKafkaError
RD_KAFKA_RESP_ERR_PREFERRED_LEADER_NOT_AVAILABLE => PreferredLeaderNotAvailable,
RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED => GroupMaxSizeReached,
RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID => FencedInstanceId,
RD_KAFKA_RESP_ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE => EligibleLeadersNotAvailable,
RD_KAFKA_RESP_ERR_ELECTION_NOT_NEEDED => ElectionNotNeeded,
RD_KAFKA_RESP_ERR_NO_REASSIGNMENT_IN_PROGRESS => NoReassignmentInProgress,
RD_KAFKA_RESP_ERR_GROUP_SUBSCRIBED_TO_TOPIC => GroupSubscribedToTopic,
RD_KAFKA_RESP_ERR_INVALID_RECORD => InvalidRecord,
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT => UnstableOffsetCommit,
RD_KAFKA_RESP_ERR_END_ALL => EndAll,
}
}
Loading