Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for custom IPartitionEventHandler instances in subscriptions #154

Merged
merged 6 commits into from
Sep 2, 2020

Conversation

IgorFedchenko
Copy link
Contributor

Close #149

@IgorFedchenko IgorFedchenko marked this pull request as draft August 23, 2020 12:51
@IgorFedchenko IgorFedchenko marked this pull request as ready for review August 23, 2020 13:10
@IgorFedchenko
Copy link
Contributor Author

Got "Unexpected Kafka message" here - see #99

@IgorFedchenko
Copy link
Contributor Author

@Aaronontheweb I think we can merge this?

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@IgorFedchenko need to add some documentation explaining how and when to use this feature. For the time being you can add it to the README.

@IgorFedchenko
Copy link
Contributor Author

@Aaronontheweb Done

@ayoung
Copy link

ayoung commented Aug 25, 2020

This is great. Thanks for adding this. I didn’t look into this but does the Kafka lib support making these async?

Comment on lines +403 to +413

Sometimes you may need to add custom handling for partition events, like assigning partition to consumer. To do that, you will need:

1. To write custom implementation of `IPartitionEventHandler` interface:
```c#
class CustomEventsHandler : IPartitionEventHandler
{
/// <inheritdoc />
public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer)
{
// Your code here
Copy link

@ayoung ayoung Aug 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we use this to ensure that:

  1. the stream has been drained
  2. it is safe to clear all state in the actor system (for example, kill all of the shadow actors) before allowing reassignment?

Can you add details around the timing of when OnRevoke is called in relation to the stream? Does it mean that no more new messages are being sent by the source when this is called?

One potential solution I could take would be to clean up state within OnAssign instead. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well... Let's assume that you are using only one partition that is assigned/revoked from your consumer (otherwise you will need to keep track of the partitions assigned/revoked).

I do not fully understand your use case, but OnRevoke is called once kafka tells consumer that partition is revoked - which means that you are not going to receive more messages from kafka anymore.
But - since all of the stuff here is eventually consistent, in theory you can receive some messages from the stream after OnRevoke was invoked. Our internal kafka consumer will not receive messages anymore, but some messages could already be received / buffered and on the half way to your actors.

So you can not rely on OnRevoke as a signal that stream is drained. But you have revokedTopicPartitions which contain partitions offsets. I think you can wait until you receive message that has an offset matching revoked partition offset - that is going to be the last message from partition. And here you will not receive new messages until next partition assignment.

Doing cleanup on OnAssign method also sounds good to me. But if cleanup make take a while, you should probably do that in background, and make sure that you do not start handling new messages until cleanup is finished.

@IgorFedchenko
Copy link
Contributor Author

This is great. Thanks for adding this. I didn’t look into this but does the Kafka lib support making these async?

@ayoung I do not think so - this partition event handler callbacks are invoked in synchronous handlers which are set up for kafka client we use internally. Since it does not support async callbacks, we do not support async handlers.

But I want to note here that this callbacks are executed by consumer in the same thread it handles messages and other kafka events, so it is not desirable to do something heavy/time consuming here. Probably using some background task / actor to handle this events without blocking consumer is a way to go.

@Aaronontheweb
Copy link
Member

@IgorFedchenko

Can you add details around the timing of when OnRevoke is called in relation to the stream? Does it mean that no more new messages are being sent by the source when this is called?

One potential solution I could take would be to clean up state within OnAssign instead. Thoughts?

any thoughts on these?

@IgorFedchenko
Copy link
Contributor Author

@Aaronontheweb Already responded here: #154 (comment)

@ayoung
Copy link

ayoung commented Aug 29, 2020

@IgorFedchenko

"I don't fully understand your use case..."

I don't think my use case is unique. Clean up must include draining the stream. Let me explain.

Clean up will take as long as it takes to kill the shadow actors. I don't see any other external I/O needing to happen during the clean up routine. Killing actors is quick. BUT, as said before, we do have to ensure that the stream has been drained before cleaning up. If we clean up too soon, we may be killing an actor that still has messages in flight.

This is a problem because, the time between the partition revocation and assignment is the synchronization point between all of the consumers in the group. So in order to guarantee ordering semantics, revocation means that the consumer must give up ownership of the partition. That is, it must not continue to process messages in the stream, even if it is async in the background. This is to prevent partition assignment and consumption occurring on the new consumer in the group while the old consumer is still working on draining messages in the stream thus causing concurrency issues WRT ordering.

This info was taken from the java kafka client docs but I'm confident that this applies in .NET as well.

Under normal conditions, if a partition is reassigned from one consumer to another, then the old consumer will always invoke onPartitionsRevoked for that partition prior to the new consumer invoking onPartitionsAssigned for the same partition. So if offsets or other state is saved in the onPartitionsRevoked call by one consumer member, it will be always accessible by the time the other consumer member taking over that partition and triggering its onPartitionsAssigned callback to load the state.

This is only possible if the stream has finished draining before reassignment occurs.

@IgorFedchenko
Copy link
Contributor Author

@ayoung All I was saying is that you are not working with kafka directly - you are working with our actors, that are consuming messages from kafka. And this OnPartitionRevoked callback occures on kafka layer, which you will have access to after this PR. By no means, after this callback is called no message will be consumed from kafka by our actors, until nextOnPartitionAssigned callback invoked. But some messages might be already consumed by our actors and waiting for your actors to pull for them.

If I got your point right, you have an actors that are using KafkaConsumer.PlainSource (or other source) to get messages, and you want to make cleanup in there when partitions are revoked. If so, you can handle OnPartitionRevoked , and tell your actors what offset it reached in each of the revoked partitions. Once the last message they received will have same offset - you can safely cleanup anything, kill processing actors, etc. Not right after handling OnPartitionRevoked, because some messages could still be processed by our actors and going to be delivered to your actors.

If you need to drain the stream during cleanup, you can use DrainingControl as you were doing here: #129 (comment) - there is a method to call that will start draining task (I guess it is Shutdown() or something like that).

If it will be too complicated to make cleanup in eventual consistent way (because you can get OnPartitionAssigned while performing cleanup), I guess you can try Ask pattern to perform all cleanup (which includes waiting and consuming all pending messages) in OnPartitionRevoked callback.

Makes sense? Or I am still something in what you are trying to achieve? I am describing general solution as I can see it.

@Aaronontheweb
Copy link
Member

I get the problem that @ayoung is trying to solve - but I don't think there's a method for 100% globally cleaning up in-flight messages on re-assigned partitions without explicitly checking for them between the source stage and whichever stage does the commit other than failing the upstream stage and recreating the entire graph - which would be heavy-handed.

If I were an end-user trying to solve this problem, the best type of tool for handling something like this would be a Killswitch that behaves like a Where filter - which can be activated by the OnPartitionRevoked callkback and can filter out any messages that are downstream from the revoked partition.

That's just an idea - not something we should implement on this PR.

@ayoung , does merging this solve some of your problem?

@Aaronontheweb
Copy link
Member

@IgorFedchenko on the JVM, wasn't there a built-in actor type that gets used to subscribe to these events and produce notifications? Or was that just an example?

@IgorFedchenko
Copy link
Contributor Author

@Aaronontheweb No, there is no any built-in actor for this - just gave an example of how to handle callbacks to not block consumer thread. I could implement something around that, but it feels like for users it will be quicker to implement their own class then to find and read about built-it one in the docs :D

@ayoung
Copy link

ayoung commented Aug 31, 2020

@Aaronontheweb,

does merging this solve some of your problem?

It certainly gives me more to think about how I might be able to solve the problem but as of this very moment, I don't see a clear path to a solution even with this PR.

Regarding IKillSwitch - Doesn't the completion of the stream also stop and dispose of the internal Kafka Consumer? If it does, that would further exasperate the situation because not only is the Consumer Group being rebalanced (that's probably what caused the revocation in the first place), but now we're additionally going to stop the consumer during the whole rebalancing act. Ideally, we would like to keep the consumers running, and by consequence, keep the stream alive.

@IgorFedchenko,

I understand your proposal and appreciate the suggestion. However I still see a problem here that can only be solved by knowing when the stream is completely drained. Yes, by comparing the last seen offset with the one given in revokedTopicPartitions, I may know when to clean up but I won't know when I'm ready for reassignment. I will only be ready for reassignment when the stream is drained and all flow elements have been committed with Committer.Sink(). Reassignment can't happen any sooner than this.

@IgorFedchenko
Copy link
Contributor Author

@ayoung Hmm... If I understand you right, you want to

  1. Make sure you will not get any more messages from the source (that's what you call "drain the stream")
  2. Make sure that all consumed messages are processed by all stream stages (up to Committer.Sink())
  3. Allow partitions reassignment after that. Not allow reassignment (and getting next messages from new partitions from the source) before point 1 and 2.

Is it right? If so, I am not sure that you can somehow disallow reassignment other than by shutting down the stream (and consumer). If you are fine with reassignment in general, but need to know that you will not consume new messages until cleanup is finished, you may think about making no demand from downstream for new messages. That is, something like KafkaConsumer.PlainSource(...).Select(m => _manualEventSetWhenCleanupFinished.WaitOne()) (or something more elegant). This way even after reassignment internal kafka consumer will not pull new messages from kafka since there is no demand for them (downstream is not ready and does not request new messages).

Well, my idea about Select and manual event is bad, because you need to consume last message from revoked partition and only after that remove the demand on new messages until cleanup finished... Probably there is another flow more appropriate for this. But I hope you got the idea.

@Aaronontheweb
Copy link
Member

Aaronontheweb commented Sep 1, 2020

Regarding IKillSwitch - Doesn't the completion of the stream also stop and dispose of the internal Kafka Consumer? If it does, that would further exasperate the situation because not only is the Consumer Group being rebalanced (that's probably what caused the revocation in the first place), but now we're additionally going to stop the consumer during the whole rebalancing act. Ideally, we would like to keep the consumers running, and by consequence, keep the stream alive.

A KillSwitch would - but what I was proposing was something similar to that which simply invokes a Where filter instead of killing the pipeline. The key feature of a KillSwitch is that it's a Flow<T,T> stage that can be activated from outside the stream.

Edit: that way you could simply drop any buffered messages from the partition that are no longer assigned to this consumer before the offsets get stored.

@Aaronontheweb Aaronontheweb merged commit 3dd904e into akkadotnet:dev Sep 2, 2020
@IgorFedchenko IgorFedchenko deleted the partition-events branch October 28, 2020 15:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add PartitionAssignmentHandler public API support
3 participants