-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added support for custom IPartitionEventHandler instances in subscriptions #154
Conversation
Got "Unexpected Kafka message" here - see #99 |
src/Akka.Streams.Kafka/Stages/Consumers/Abstract/SingleSourceStageLogic.cs
Show resolved
Hide resolved
@Aaronontheweb I think we can merge this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@IgorFedchenko need to add some documentation explaining how and when to use this feature. For the time being you can add it to the README.
src/Akka.Streams.Kafka/Stages/Consumers/Abstract/SingleSourceStageLogic.cs
Show resolved
Hide resolved
@Aaronontheweb Done |
This is great. Thanks for adding this. I didn’t look into this but does the Kafka lib support making these |
|
||
Sometimes you may need to add custom handling for partition events, like assigning partition to consumer. To do that, you will need: | ||
|
||
1. To write custom implementation of `IPartitionEventHandler` interface: | ||
```c# | ||
class CustomEventsHandler : IPartitionEventHandler | ||
{ | ||
/// <inheritdoc /> | ||
public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer) | ||
{ | ||
// Your code here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can we use this to ensure that:
- the stream has been drained
- it is safe to clear all state in the actor system (for example, kill all of the shadow actors) before allowing reassignment?
Can you add details around the timing of when OnRevoke
is called in relation to the stream? Does it mean that no more new messages are being sent by the source when this is called?
One potential solution I could take would be to clean up state within OnAssign
instead. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well... Let's assume that you are using only one partition that is assigned/revoked from your consumer (otherwise you will need to keep track of the partitions assigned/revoked).
I do not fully understand your use case, but OnRevoke is called once kafka tells consumer that partition is revoked - which means that you are not going to receive more messages from kafka anymore.
But - since all of the stuff here is eventually consistent, in theory you can receive some messages from the stream after OnRevoke
was invoked. Our internal kafka consumer will not receive messages anymore, but some messages could already be received / buffered and on the half way to your actors.
So you can not rely on OnRevoke
as a signal that stream is drained. But you have revokedTopicPartitions
which contain partitions offsets. I think you can wait until you receive message that has an offset matching revoked partition offset - that is going to be the last message from partition. And here you will not receive new messages until next partition assignment.
Doing cleanup on OnAssign
method also sounds good to me. But if cleanup make take a while, you should probably do that in background, and make sure that you do not start handling new messages until cleanup is finished.
@ayoung I do not think so - this partition event handler callbacks are invoked in synchronous handlers which are set up for kafka client we use internally. Since it does not support async callbacks, we do not support async handlers. But I want to note here that this callbacks are executed by consumer in the same thread it handles messages and other kafka events, so it is not desirable to do something heavy/time consuming here. Probably using some background task / actor to handle this events without blocking consumer is a way to go. |
any thoughts on these? |
@Aaronontheweb Already responded here: #154 (comment) |
I don't think my use case is unique. Clean up must include draining the stream. Let me explain. Clean up will take as long as it takes to kill the shadow actors. I don't see any other external I/O needing to happen during the clean up routine. Killing actors is quick. BUT, as said before, we do have to ensure that the stream has been drained before cleaning up. If we clean up too soon, we may be killing an actor that still has messages in flight. This is a problem because, the time between the partition revocation and assignment is the synchronization point between all of the consumers in the group. So in order to guarantee ordering semantics, revocation means that the consumer must give up ownership of the partition. That is, it must not continue to process messages in the stream, even if it is async in the background. This is to prevent partition assignment and consumption occurring on the new consumer in the group while the old consumer is still working on draining messages in the stream thus causing concurrency issues WRT ordering. This info was taken from the java kafka client docs but I'm confident that this applies in .NET as well.
This is only possible if the stream has finished draining before reassignment occurs. |
@ayoung All I was saying is that you are not working with kafka directly - you are working with our actors, that are consuming messages from kafka. And this If I got your point right, you have an actors that are using If you need to drain the stream during cleanup, you can use If it will be too complicated to make cleanup in eventual consistent way (because you can get Makes sense? Or I am still something in what you are trying to achieve? I am describing general solution as I can see it. |
I get the problem that @ayoung is trying to solve - but I don't think there's a method for 100% globally cleaning up in-flight messages on re-assigned partitions without explicitly checking for them between the source stage and whichever stage does the commit other than failing the upstream stage and recreating the entire graph - which would be heavy-handed. If I were an end-user trying to solve this problem, the best type of tool for handling something like this would be a That's just an idea - not something we should implement on this PR. @ayoung , does merging this solve some of your problem? |
@IgorFedchenko on the JVM, wasn't there a built-in actor type that gets used to subscribe to these events and produce notifications? Or was that just an example? |
@Aaronontheweb No, there is no any built-in actor for this - just gave an example of how to handle callbacks to not block consumer thread. I could implement something around that, but it feels like for users it will be quicker to implement their own class then to find and read about built-it one in the docs :D |
It certainly gives me more to think about how I might be able to solve the problem but as of this very moment, I don't see a clear path to a solution even with this PR. Regarding I understand your proposal and appreciate the suggestion. However I still see a problem here that can only be solved by knowing when the stream is completely drained. Yes, by comparing the last seen offset with the one given in |
@ayoung Hmm... If I understand you right, you want to
Is it right? If so, I am not sure that you can somehow disallow reassignment other than by shutting down the stream (and consumer). If you are fine with reassignment in general, but need to know that you will not consume new messages until cleanup is finished, you may think about making no demand from downstream for new messages. That is, something like Well, my idea about |
A Edit: that way you could simply drop any buffered messages from the partition that are no longer assigned to this consumer before the offsets get stored. |
Close #149