Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Achieving committing offsets only after successful productions #89

Closed
blt opened this issue Jul 3, 2018 · 5 comments · Fixed by #376
Closed

Achieving committing offsets only after successful productions #89

blt opened this issue Jul 3, 2018 · 5 comments · Fixed by #376

Comments

@blt
Copy link

blt commented Jul 3, 2018

I've got myself worked into a corner and I'm not sure how to work back out with the API as-is. I could use a little guidance. My basic ambition is laid out in an issue on the python client. That is, I'd like to write a process that reads from some input topic and writes an an output topic and commits offsets to input only if the record has been successfully produced to output.

As of 0.17, rdkafka's ProducerContext::delivery is called with a DeliveryResult that contains a BorrowedMessage to the record published to output and not the BorrowedMessage which came in from input. But, Consumer::store_offset requires a &BorrowedMessage be passed, the same that came in from the Consumer.

Okay, so I have to get the original &BorrowedMessage through to ProducerContext::delivery along with a reference to the Consumer to accomplish my aim, I think. It occurred to me that I might pass a (&BorrowedMessage, &Consumer) through as a ProducerContext::DeliveryOpaque but that seems excessively complicated. Not to mention, there will need to be a rectification layer in there somewhere so that offsets are only committed in-order. I'm really not sure how to fit that in.

I noticed that examples/at_least_once.rs accomplishes this by auto-committing explicitly stored offsets but waits synchronously for produced records to signal back with success. This is close to what I'd like to accomplish but the throughput is a little disappointing, what with the serialization on every production.

I guess, is there a more throughput oriented way to commit consumed offsets only after records that result from them have been successfully produced other than serializing on writes? It feels like there's something close with ProducerContext::delivery but I'm not quite finding it.

@blt
Copy link
Author

blt commented Jul 4, 2018

I spent a few days with rdkafka-sys and I wonder if an API adjustment for this crate wouldn't do the trick. BaseConsumer::store_offset is:

    fn store_offset(&self, message: &BorrowedMessage) -> KafkaResult<()> {
        let error = unsafe { rdsys::rd_kafka_offset_store(message.topic_ptr(), message.partition(), message.offset()) };
        if error.is_error() {
            Err(KafkaError::StoreOffset(error.into()))
        } else {
            Ok(())
        }
    }

Storing an offset is done with only the information the BorrowedMessage carries around. Would it make sense to expose a store_offset on BorrowedMessage? Alternatively, I wonder about creating a Topic trait to contain the user's offset storing logic, ensure messages are committed in the right order and so fort. Introducing a new Topic -- bound to the lifetime of its parent Consumer -- has some appeal in that I need some way to link the production of a Record to the BorrowedMessage that spawned it.

@fede1024
Copy link
Owner

fede1024 commented Jul 4, 2018

Sorry for the late reply. Yes we could add such a method in the Consumer trait. I'm working on an example solution for your problem based on the feature/threadless_futures branch. Hopefully I'll be able to share it in the next few days (I'm making some changes to the api to make it easier to use, hopefully).

@blt
Copy link
Author

blt commented Jul 7, 2018

I apologize for my late delay, I was on vacation.

Into the Consumer trait? That's distinct from what I was thinking but I look forward to your example. Please let me know if I can help out in any way. I'll be back to work on Monday and would be pleased to pitch in.

@fede1024
Copy link
Owner

fede1024 commented Jul 9, 2018

Unfortunately I didn't have as much time as I thought to dedicate to this. However this is what I've got so far. I've changed the API to make this use case a bit easier to implement. This approach is similar to the first one you mentioned, so it will likely commit out of order and potentially cause data-loss. rust-rdkafka at the moment doesn't provide a way to only commit in order, but it shouldn't be too hard to implement some additional logic to do that.

blt pushed a commit to blt/rust-rdkafka that referenced this issue Jul 9, 2018
This commit adjusts the topic_mirror example to include retry
in the case of enqueuing failure, removes the TopicPartitionList
in favor of performing offset commits directly with BorrowedMessage.
It's possible I've not understood the purpose of TopicPartitionList.

As it is now, I think I've saved on one allocation per incoming message
by removing TopicPartitionList, have imposed additional allocations per
failed enqueuing and have open questions around retrying when production
fails.

Relevant issue: [fede1024#89](fede1024#89)

Signed-off-by: Brian L. Troutwine <briant@unity3d.com>
@blt
Copy link
Author

blt commented Jul 9, 2018

@fede1024 I had some questions, comments on things I didn't understand. I figured the easiest way to carry on the conversation would be a PR, opened as #90. I've embedded my questions there.

benesch added a commit to benesch/rust-rdkafka that referenced this issue Jun 30, 2021
benesch added a commit to benesch/rust-rdkafka that referenced this issue Oct 16, 2021
benesch added a commit to benesch/rust-rdkafka that referenced this issue Oct 16, 2021
benesch added a commit to benesch/rust-rdkafka that referenced this issue Oct 16, 2021
rodoyle pushed a commit to getditto/rust-rdkafka that referenced this issue Oct 15, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants