Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transaction support #289

Closed
roignpar opened this issue Aug 13, 2020 · 5 comments · Fixed by #323
Closed

Transaction support #289

roignpar opened this issue Aug 13, 2020 · 5 comments · Fixed by #323

Comments

@roignpar
Copy link
Contributor

Now that librdkafka supports transactions, are there any plans to make them available in rust-rdkafka as well?

I need to use transactions in a read-process-write scenario and would be happy to help with the implementation.

@benesch
Copy link
Collaborator

benesch commented Aug 13, 2020

Yes, I'd love to see this! I haven't thought at all about what the API would need to look like. The right place to start is probably a proposal for/prototype of the API.

@roignpar
Copy link
Contributor Author

Looking at librdkafka and some of its wrappers for other languages (python, go, .net) I think the way to go would be to add:

  • These methods to BaseProducer, ThreadedProducer and FutureProducer:
fn init_transactions<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {}
fn begin_transaction(&self) -> KafkaResult<()> {}
fn send_offsets_to_transaction<T: Into<Timeout>>(
    &self,
    offsets: &TopicPartitionList,
    metadata: &ConsumerGroupMetadata,
    timeout: T) -> KafkaResult<()> {}
fn commit_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {}
fn abort_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {}

Maybe it would make sense to make some of these async for the FutureProducer.

  • ConsumerGroupMetadata struct;

  • Consumer and BaseConsumer method to get consumer group metadata:

fn group_metadata(&self) -> ConsumerGroupMetadata {}
  • Maybe another KafkaError variant: Transaction;

  • more things that I may not be aware of at this time.

What do you think?

@benesch
Copy link
Collaborator

benesch commented Aug 20, 2020

That looks wonderful to me! Agreed that you'd want those methods to be async fns on the FutureProducer, but that looks like it'll be tricky given the C API that librdkafka exposes. I'd recommend starting with the BaseProducer/ThreadedProducer—now that I see it all written out it looks like it might be pretty straightforward!

@roignpar
Copy link
Contributor Author

After taking a closer look at error handling: librdkafka has a new error type, rd_kafka_error_t, added in 1.4.0, which is returned by transaction related functions and "provides error attributes such as indicating if an error is retriable". rust-rdkafka doesn't seem to make use of it so far. Even though the error code (rd_kafka_resp_err_t) can be extracted using rd_kafka_error_code, I believe that a complete solution would need to use the new error type, as it indicates whether the operation should be retried, the transaction aborted, or the transactional producer terminated: docs (Transactional producer API section).

Some ways I can think of to handle this:

  1. rename RDKafkaError to RDKafkaErrorCode and use RDKafkaError as the new error type, add a Transaction variant containing RDKafkaError to KafkaError as well as is_retriable, is_abortable, is_fatal methods that would always return false for all other variants; unclear what to do with "other" errors in this case;

  2. add a new error enum type containing Retriable, Abortable, Fatal, Other variants; not sure what would be a good name for it; TransactionError isn't appropriate as librdkafka may use this error type in other contexts in the future;

  3. a combination of 1 and 2: add a Transaction variant to KafkaError containing another enum with the variants from 2.

Can't say I like one of them in particular. How would you approach error handling in this situation? Do you see other possible solutions?

@benesch
Copy link
Collaborator

benesch commented Aug 20, 2020

Oh, hrm. I think I like (1) the best. Unfortunate that we'll need to break backwards compatibility, but I don't see any other way to prevent the names of these error types from being massively confusing.

I think just add is_retriable, is_abortable, and is_fatal methods to the (new) RDKafkaError type, without trying to figure out how they map onto KafkaError. Possibly we should make init_transaction, begin_transaction, etc. return use RDKafkaError instead of KafkaError as their return type. Actually, it's more in line with the existing APIs to just return KafkaError, I think. Requiring the user to match out the internal RDKafkaError if they want details seems fine to me for now.

Hopefully in a few librdkafka releases it becomes more clear how they intend to use this new complex error type in other APIs, and we can think about redesigning rust-rdkafka's error handling to be a bit more user friendly. The big KafkaError enum is not scaling very well.

benesch added a commit to benesch/rust-rdkafka that referenced this issue Jan 5, 2021
Co-authored-by: Robert Ignat <robert.ignat91@gmail.com>

Closes fede1024#289.
Closes fede1024#293.
benesch added a commit to benesch/rust-rdkafka that referenced this issue Jan 5, 2021
Co-authored-by: Robert Ignat <robert.ignat91@gmail.com>

Closes fede1024#289.
Closes fede1024#293.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants