Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(consumer): Support incremental cooperative rebalancing #53

Merged
merged 10 commits into from
Apr 1, 2022

Conversation

lynnagara
Copy link
Member

@lynnagara lynnagara commented Mar 30, 2022

KafkaConsumer now supports the cooperative-sticky partitioning strategy.
The goal of this change is to try and reduce the amount of unnecessary rebalances
that happen during Kubernetes rolling deployments.

KafkaConsumer now takes an `incremental` argument. If set to true,
the sticky assignor is used. The goal of this change is to try and
reduce the amount of unnecessary rebalances that happen during
Kubernetes rolling deployments.
@lynnagara lynnagara requested a review from mwarkentin March 31, 2022 05:37
@@ -246,40 +252,63 @@ def assignment_callback(
) -> None:
self.__state = KafkaConsumerState.ASSIGNING

try:
assignment: MutableSequence[ConfluentTopicPartition] = []
if self.__incremental_cooperative is True:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This diff got pretty weird because of indentation. This block from here to line 273 is new though, and could use a close review. It's quite different to what happens with eager rebalancing (lines 276-305), which is essentially unchanged.

@lynnagara lynnagara marked this pull request as ready for review March 31, 2022 06:02
@lynnagara lynnagara requested a review from a team as a code owner March 31, 2022 06:02
Copy link
Contributor

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tried to test locally how this performs?
Specifically:

  • how long it takes to assign a new partition to a new consumer showing up
  • how long it takes to revoke all remaining partitions (if any) from a consumer that is leaving the group? Does the group wait scheduled.rebalance.max.delay.ms before reassigning the partitions to a new consumer?
  • how long does it take to have partitions reassigned in case a node leaves and rejoins?

I think we should do a test locally before doing it in prod.

Comment on lines 186 to 189
self.__incremental_cooperative = incremental_cooperative

if self.__incremental_cooperative is True:
configuration["partition.assignment.strategy"] = "cooperative-sticky"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why providing incremental_cooperative as a dedicated flag instead of passing it through the configuration mapping like all the others config parameters ?

Copy link
Member Author

@lynnagara lynnagara Mar 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had it the other way originally but figured it was easier to pass a boolean than remember these strings then check for it to decide whether to apply incremental_assign. Thinking about it again, I might switch back to avoid changing the interface of this class though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might switch back to avoid changing the interface of this class though.

Also we would not reintroduce two separate ways to provide Kafka config that took us long to clean up

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was only one way. Passing the flag was the only way that would have worked.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am talking about all those fields we were passing via CLI that we are slowly moving to settings.

Copy link
Member Author

@lynnagara lynnagara Apr 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately passing via CLI is still the easiest option currently as we don't have a mechanism yet to provide different settings for consumers/producers of a topic via settings and this is a consumer-only configuration.

Comment on lines 497 to 499
for partition, offset in offsets.items():
self.__consumer.seek(
ConfluentTopicPartition(partition.topic.name, partition.index, offset)
Copy link
Contributor

@fpacifici fpacifici Mar 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change ? Are we never going to get to this method during assignment ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found it pretty unintuitive that the same __seek implementation was being called from both seek() and the assign callback then checking the KafkaConsumerState to effectively determine which one was making the call. Now assigning function is separate and never calls this __seek function anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, now this method is only called by the seek public method. Why not inlining it there ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was avoiding touching a lot of code earlier, but yes it is better inline since no one else calls it now. Updated.

Copy link
Contributor

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly some questions. I think this should work.

ConfluentTopicPartition
] = []

for partition in partitions:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am getting this right, you are not asking the broker anymore for the last committed offset.
Which means that you are invoking incremental_assign with the offset you received in the assignment_callback call.
Is this intentional? In the stop-the-world rebalancing, fetching the last committed offset and seeking that was meant to ensure all partitions had the same behavior (whether they were previously assigned to the same consumer or not). All partitions would be reset to the last committed offset.
I would expect it to be impossible, in the incremental case, to receive a partition you were already owning before (unless the consumer restarted). Is this guaranteed or can you still be assigned a partition you already owned before rebalancing?
If it is guaranteed and all partitions you received are "new" to this consumer, are you not resetting the offset because the offset you receive are supposed to already be the last committed ones ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you can receive a partition you already owned, but in any case I don't quite get why we would need to check the offset since the offset you are receiving should already be the correct one. The other issue I noticed with committed() is that it hangs forever (or times out if you pass it a timeout) for the cooperative-sticky strategy in the scenario where it's a new consumer group and no offset has been committed yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be important to verify.
The reason for getting the committed() offsets is so that all partitions start in a consistent way from the last committed offset after a rebalance (without doing so the partitions that are reassigned to you would start from the last consumed offset).
This behavior is visible to the application, but if we commit during revoke we should not have any issue as the two behaviors would be identical.
If we never receive a partition we already owned and we consistently commit on revoke we are good. But if we fail in committing during revoke and we get the same partition we owned in the past I don't know what would happen if we do not reset the offsets.
If committed() is taking much longer, then it is probably a non starter. Could you please verify that even if we did receive the same partition again on subscriptions (where we are testing this), we will not risk missing messages ?

Copy link
Member Author

@lynnagara lynnagara Mar 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for getting the committed() offsets is so that all partitions start in a consistent way from the last committed offset after a rebalance (without doing so the partitions that are reassigned to you would start from the last consumed offset).

I don't think this is true. The offsets passed to the assignment callback are already the committed ones not just the ones that are consumed.

Kafka guarantees that

  • only new partitions are passed in this callback not previously owned ones
  • The revoke callback is always triggered before the assignment one.

The reason for committing offsets during revoke is to avoid double processing them so the new consumer doesn't get that same offset again. But we wouldn't be skipping it either way - only processing it either once or twice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The offsets passed to the assignment callback are already the committed ones not just the ones that are consumed.

Are you sure. I thought this was the reason for making this consumer behave differently from the standard Kafka consumer https://github.com/getsentry/arroyo/blob/main/arroyo/backends/kafka/consumer.py#L109-L118.

Anyway, if Kafka guarantees that only new partitions are passed and not previously owned one, we should be good.

Copy link
Member Author

@lynnagara lynnagara Apr 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but this only applies to the partitions that are being newly assigned to the consumer. Previously all of the partitions (including those already assigned to the consumer) would be provided in the callback, now only the incremental ones are. So we are no longer rewinding back those partitions which the consumer is maintaining across a rebalance like we used to, those just continue from the same place.

Comment on lines 309 to 319
self.__consumer.resume(
[
ConfluentTopicPartition(
partition.topic.name, partition.index, offset
)
for partition, offset in offsets.items()
]
)

for partition in offsets:
self.__paused.discard(partition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for relying on the resume method for the incremental method and, instead, reimplementing its logic here for the stop-the-world rebalancing ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason, I just mostly avoided touching this implementation (it's just indented differently so it shows up here). But yes it's cleaner to reuse resume everywhere, I'll switch it over.

Comment on lines 497 to 499
for partition, offset in offsets.items():
self.__consumer.seek(
ConfluentTopicPartition(partition.topic.name, partition.index, offset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, now this method is only called by the seek public method. Why not inlining it there ?

Comment on lines 157 to 178
consumer_a = KafkaConsumer(
{
**configuration,
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"enable.auto.offset.store": False,
"group.id": group_id,
"session.timeout.ms": 10000,
},
incremental_cooperative=True,
)
consumer_b = KafkaConsumer(
{
**configuration,
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"enable.auto.offset.store": False,
"group.id": group_id,
"session.timeout.ms": 10000,
},
incremental_cooperative=True,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.
By the way, what happens if a number of consumers are incremental and others are not ? Which means what will happen at the first deployment ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't combine cooperative and non cooperative in the same group. I'm not sure of other solutions apart from stopping them all and restarting all with the new configuration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's do it one day I am on PTO.
Out of curiosity, what happens if you start a consumer that works with the incremental rebalancing system to a group that is using the standard system ?

KafkaPayload(None, f"{j}-{i}".encode("utf8"), []),
)

consumer_a.subscribe([topic])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not providing callbacks so that you can assert that only one partition is transferred ?

Copy link
Member Author

@lynnagara lynnagara Mar 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that could've worked too. Still this method of checking the assigned partitions via consumer.tell() works well enough in checking the same thing and it's copied from the test of the eager consumer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right but there is no way to tell whether all partitions were revoked and then reassigned or whether the incremental assignment actually works.

Copy link
Member Author

@lynnagara lynnagara Mar 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this was not about testing Kafka consumer internals, just that we end up with one partition assigned to each consumer and consuming on both works ok.

@lynnagara
Copy link
Member Author

  • how long it takes to assign a new partition to a new consumer showing up
  • how long it takes to revoke all remaining partitions (if any) from a consumer that is leaving the group? Does the group wait scheduled.rebalance.max.delay.ms before reassigning the partitions to a new consumer?
  • how long does it take to have partitions reassigned in case a node leaves and rejoins?

It's typically not as long as scheduled.rebalance.max.delay.ms which is 5 minutes by default. All of these usually take around 3 seconds in my local testing with 2 consumers. The test case I added adds a new consumer to an existing group and waits for a partition to assigned to it. We wait for up to 10 seconds max and it generally always passes.

Copy link
Contributor

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems good to try

@fpacifici
Copy link
Contributor

I think you should update the docstring of the KafkaConsumer class so that it reflects the behavior on incremental cooperative rebalancing. Specifically this https://github.com/getsentry/arroyo/blob/main/arroyo/backends/kafka/consumer.py#L109-L118

@lynnagara lynnagara merged commit 4854c67 into main Apr 1, 2022
@lynnagara lynnagara deleted the cooperative-rebalancing branch April 1, 2022 17:16
@mwarkentin
Copy link
Member

@lynnagara 🎉

Let me know if / when you need some help testing this. :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants