Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cooperative Incremental sticky rebalance #364

Merged
merged 2 commits into from
Oct 16, 2021

Conversation

SreeniIO
Copy link
Contributor

Fixes: #363

@@ -91,6 +99,12 @@ pub trait ConsumerContext: ClientContext {
#[allow(unused_variables)]
fn pre_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {}

/// Override this to return true when using cooperative-sticky option for
/// partition.assignment.strategy
fn is_incremental_assign(&self) -> bool {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function can be avoid if there is a way to read the config partition.assignment.strategy value

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I figured out how to do this automatically.

@SreeniIO SreeniIO marked this pull request as draft April 13, 2021 08:56
@SreeniIO SreeniIO marked this pull request as ready for review April 13, 2021 09:11
@benesch benesch force-pushed the sticky-rebalance branch 2 times, most recently from 52120dc to 13b3327 Compare June 30, 2021 05:14
@benesch
Copy link
Collaborator

benesch commented Jun 30, 2021

@SreeniIO I pushed a new implementation that auto-detects whether cooperative rebalancing is in use. Could you give this a spin and let me know if it works for your use case?

@SreeniIO
Copy link
Contributor Author

@SreeniIO I pushed a new implementation that auto-detects whether cooperative rebalancing is in use. Could you give this a spin and let me know if it works for your use case?

Yes, it works fine.

For Cooperative rebalancing, since not all partitions are revoked during a rebalance, can you add the TopicPartitionList being revoked as part of the Rebalance::Revoke enum?

pub enum Rebalance<'a> {
    /// A new partition assignment is received.
    Assign(&'a TopicPartitionList),
    /// All partitions are revoked.
    Revoke, // can this be changed to Revoke(&'a TopicPartitionList),
    /// Unexpected error from Kafka.
    Error(String),
}

@oronsh
Copy link

oronsh commented Sep 24, 2021

any news about this one? :)

@benesch
Copy link
Collaborator

benesch commented Oct 16, 2021

For Cooperative rebalancing, since not all partitions are revoked during a rebalance, can you add the TopicPartitionList being revoked as part of the Rebalance::Revoke enum?

pub enum Rebalance<'a> {
    /// A new partition assignment is received.
    Assign(&'a TopicPartitionList),
    /// All partitions are revoked.
    Revoke, // can this be changed to Revoke(&'a TopicPartitionList),
    /// Unexpected error from Kafka.
    Error(String),
}

Ack. I filed #398 for this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Cooperative Incremental sticky rebalance
3 participants