Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify topic_mirror to include questions, notes #90

Closed
wants to merge 6 commits into from

Conversation

blt
Copy link

@blt blt commented Jul 9, 2018

This commit adjusts the topic_mirror example to include retry
in the case of enqueuing failure, removes the TopicPartitionList
in favor of performing offset commits directly with BorrowedMessage.
It's possible I've not understood the purpose of TopicPartitionList.

As it is now, I think I've saved on one allocation per incoming message
by removing TopicPartitionList, have imposed additional allocations per
failed enqueuing and have open questions around retrying when production
fails.

Apologies for some of the rustfmt reshuffling.

Relevant issue: #89

Signed-off-by: Brian L. Troutwine briant@unity3d.com

This commit adjusts the topic_mirror example to include retry
in the case of enqueuing failure, removes the TopicPartitionList
in favor of performing offset commits directly with BorrowedMessage.
It's possible I've not understood the purpose of TopicPartitionList.

As it is now, I think I've saved on one allocation per incoming message
by removing TopicPartitionList, have imposed additional allocations per
failed enqueuing and have open questions around retrying when production
fails.

Relevant issue: [fede1024#89](fede1024#89)

Signed-off-by: Brian L. Troutwine <briant@unity3d.com>
//
// Either approach, potentially, interacts with the retry
// question below.
while let Err(_) = self.source_consumer.store_offset(&m) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here m is the message that was just sent to Kafka (the copy, not the original one), so you'd be committing the offset of the copy, which is not what you want. The TopicPartitionList stores the location (offset and partition) of the original message, which is the one you want to commit.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, d'oh. Of course.

// data loss. Should, then, the author cobble together a MinHeap
// (or similar) approach to commit only when all the suitable
// offsets have come through -- implying storage here -- or rely
// on rdkafka to take care of this?
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

librdkafka doesn't provide a way to commit in order, so yes, we should implement a different way to keep track of them and only periodically commit the lowest one per partition.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, by rdkafka, I meant this crate, not librdkafka.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether rdkafka would be the right place for this functionality or not, I'm still a touch confused about how to pull this off, mostly down to lifetime management.

In this example, the TopicPartitionList will always have one originating message associated with it. My initial idea was to TopicPartitionList::elements to retrieve the single TopicPartitionListElem and store that in an BinaryHeap. There's no Ord impl for TopicPartitionListElem yet—resolvable—but the key difficulty is the lack of Sync on *mut rdkafka_sys::bindings::macos_64::rd_kafka_topic_partition_list_s. My latest commit has the full code changes to reproduce, but here's a sample of the compilation error:

error[E0277]: `*mut rdkafka_sys::bindings::macos_64::rd_kafka_topic_partition_list_s` cannot be shared between threads safely
  --> examples/topic_mirror.rs:70:10
   |
70 | impl<'a> ProducerContext for MirrorProducerContext<'a> {
   |          ^^^^^^^^^^^^^^^ `*mut rdkafka_sys::bindings::macos_64::rd_kafka_topic_partition_list_s` cannot be shared between threads safely
   |
   = help: within `rdkafka::topic_partition_list::TopicPartitionListElem<'a>`, the trait `std::marker::Sync` is not implemented for `*mut rdkafka_sys::bindings::macos_64::rd_kafka_topic_partition_list_s`
   = note: required because it appears within the type `rdkafka::TopicPartitionList`
   = note: required because it appears within the type `&'a rdkafka::TopicPartitionList`
   = note: required because it appears within the type `rdkafka::topic_partition_list::TopicPartitionListElem<'a>`
   = note: required because of the requirements on the impl of `std::marker::Sync` for `std::ptr::Unique<rdkafka::topic_partition_list::TopicPartitionListElem<'a>>`
   = note: required because it appears within the type `alloc::raw_vec::RawVec<rdkafka::topic_partition_list::TopicPartitionListElem<'a>>`
   = note: required because it appears within the type `std::vec::Vec<rdkafka::topic_partition_list::TopicPartitionListElem<'a>>`
   = note: required because it appears within the type `std::collections::BinaryHeap<rdkafka::topic_partition_list::TopicPartitionListElem<'a>>`
   = note: required because it appears within the type `MirrorProducerContext<'a>`

As per @fede1024's review, I had indeed misunderstood the purpose of
the TopicPartitionList. It is not re-introduced, allowing the program
to correctly commit offsets.

Signed-off-by: Brian L. Troutwine <briant@unity3d.com>
Brian L. Troutwine added 3 commits July 10, 2018 12:12
This current commit fails to compile for want of lifetime issues
with an underlying raw pointer in the TopicPartitionListElem. The
notion, however, is to have the producer store up a MinHeap of
known-good productions and only commit them once enough have built
up for consistency, or something.

Signed-off-by: Brian L. Troutwine <briant@unity3d.com>
This commit introduces TopicOffsetMap, a structure which records the
'floor' of a topic/partition and a MinHeap of offsets, allowing the
producer to record offsets on delivery and only commit those which
are sequential from the floor up.

This doesn't work, most obviously because `ProducerContext::delivery`
takes a `&self` rather than `&mut self`. It's also possible that this
behaviour should be included somewhere else.

Signed-off-by: Brian L. Troutwine <briant@unity3d.com>
This commit includes changes that force the MirrorProducerContext to commit
offsets in-order, from the least to greatest, no matter receipt of the
message in `delivery`. The implementation is a little grody but seems,
overall, to work. The implementation of Ord/PartialOrd for Offset is
suspicious, as is TopicOffsetMap in terms of CPU time.

I kinda wonder if the TopicOffsetMap isn't something that would be best
pushed into a Consumer variant.

Signed-off-by: Brian L. Troutwine <briant@unity3d.com>
@blt
Copy link
Author

blt commented Jul 12, 2018

@fede1024 I've added in-order offset commits to my most recent commit. It's a little gross as-is and I'm not sure if the mutability modifications I've made to BaseProducer are acceptable. After doing this I have a vague notion that this might live best in a Consumer somewhere.

@blt
Copy link
Author

blt commented Jul 13, 2018

I've realized now that I've got this wired up backward: the consumer needs to be the one to keep the offset run minheap, not the producer.

Consider the use case where the user is not trying to establish a mirror but do filtering in some fashion. There, the calling code will need to signal to the Consumer that an offset has been processed--via store_offset I guess--without involving the Producer.

I'll adjust my PR appropriately.

@blt
Copy link
Author

blt commented Jul 14, 2018

Er, no. Just moving the TopicOffsetMap into Consumer suffers the same problem of needing to adjust mutability of the self. So, I started down the road of wrapping LoggingConsumer in a struct which would hold the TopicOffsetMap which could be mutable and expose the appropriate higher-level API for cutting the Producer out when needed. This approach is going to run into issues with rebalancing. It's possible to extend the ConsumerContext impl to update the TopicOffsetMap but that fights against the wrapping-struct approach.

If the ConsumerContext functions were adjusted to be &mut self then the wrapping struct could be abandoned for coping with rebalance events. So far so good. That leaves committing offsets in-order. At present there's a ConsumerContext::commit_callback which does post-commit work. What about adjusting that to be post_commit_callback and introducing a pre_commit_callback? The pre version would be responsible for properly setting the offsets in TopicPartitionList prior to commit, in this use case.

When I left off in my last commit I thought it would be possible to
push offset ordering into come kind of consumer, or into a consumer
context. That proved to be very difficult, owing to where the offsets
are needed to make decisions.

Now, I've made a mutex protected OffsetStore that is available inside
the producer context, consumer context etc. The idea here is, whenever
a rebalance happens the internal map of OffsetStore is updated, the
polling function can make its own decisions about what to commit without
involving the producer--as noted in comments--and the producer does
basically what it used to.

This is a little involved but essentially works. Or, should. I've run
short of time today and haven't fully tested it out.

I need to make further commits to undo the fiddling I did in rdkafka.
All of that turned out to be a dead-end.

Signed-off-by: Brian L. Troutwine <briant@unity3d.com>
@blt blt closed this Nov 19, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants