Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry ErrNotCoordinatorForConsumer in ConsumerGroup.newSession #1231

Merged
merged 2 commits into from
Apr 3, 2019

Conversation

thomaslee
Copy link
Contributor

Group coordinators seem to be selected from among those brokers which lead __consumer_offsets partitions, so when leadership of __consumer_offsets partitions move from one broker to another (e.g. due to an outage, or a controlled reassignment of partition replicas) the coordinator for a given group may move with it.

Sarama usually seems to handle these events by returning nil from ConsumerGroup.Consume() after a heartbeat fails due to a ErrNotCoordinatorForConsumer. In these cases it seems like you can just call ConsumerGroup.Consume() again to ask the consumer to resume where it left off. If you do this, what you typically see something like this in the logs when group coordinators move:

[sarama] 2018/12/04 01:32:58 consumer_group: <nil>
[sarama] 2018/12/04 01:32:58 consumer/broker/0 closed dead subscription to repro-topic/1
[sarama] 2018/12/04 01:32:58 consumer/broker/1 closed dead subscription to repro-topic/2
[sarama] 2018/12/04 01:32:58 consumer/broker/2 closed dead subscription to repro-topic/0
[sarama] 2018/12/04 01:32:58 client/coordinator requesting coordinator for consumergroup test-group from localhost:9292
[sarama] 2018/12/04 01:32:58 client/coordinator coordinator for consumergroup test-group is #1 (192.168.1.4:9192)
[sarama] 2018/12/04 01:32:58 calling Consume()
[sarama] 2018/12/04 01:32:59 client/metadata fetching metadata for [repro-topic] from broker localhost:9292
[sarama] 2018/12/04 01:32:59 client/coordinator requesting coordinator for consumergroup test-group from localhost:9292
[sarama] 2018/12/04 01:32:59 client/coordinator coordinator for consumergroup test-group is #1 (192.168.1.4:9192)
[sarama] 2018/12/04 01:32:59 consumer/broker/1 added subscription to repro-topic/2
[sarama] 2018/12/04 01:32:59 consumer/broker/0 added subscription to repro-topic/1
[sarama] 2018/12/04 01:32:59 consumer/broker/2 added subscription to repro-topic/0

However, if you're unlucky ConsumerGroup.Consume() can sometimes return ErrNotCoordinatorForConsumer & fail in such a way that the ConsumerGroup never looks up the new group coordinator:

[sarama] 2018/12/04 02:05:25 consumer_group: <nil>
[sarama] 2018/12/04 02:05:25 consumer/broker/0 closed dead subscription to repro-topic/1
[sarama] 2018/12/04 02:05:25 consumer/broker/1 closed dead subscription to repro-topic/2
[sarama] 2018/12/04 02:05:25 consumer/broker/2 closed dead subscription to repro-topic/0
[sarama] 2018/12/04 02:05:25 client/coordinator requesting coordinator for consumergroup test-group from localhost:9092
[sarama] 2018/12/04 02:05:25 client/coordinator coordinator for consumergroup test-group is #2 (192.168.1.4:9292)
[sarama] 2018/12/04 02:05:25 client/coordinator requesting coordinator for consumergroup test-group from localhost:9092
[sarama] 2018/12/04 02:05:25 client/coordinator coordinator for consumergroup test-group is #2 (192.168.1.4:9292)
[sarama] 2018/12/04 02:05:25 client/coordinator requesting coordinator for consumergroup test-group from localhost:9092
[sarama] 2018/12/04 02:05:25 client/coordinator coordinator for consumergroup test-group is #2 (192.168.1.4:9292)
[repro] 2018/12/04 02:05:25 calling Consume()
[sarama] 2018/12/04 02:05:25 client/metadata fetching metadata for [repro-topic] from broker localhost:9092
[repro] 2018/12/04 02:05:25 kafka server: Request was for a consumer group that is not coordinated by this broker.
*consumer stops*

At this point the ConsumerGroup seems to be in a broken state: checking the return value of Consume() for ErrNotCoordinatorForConsumer & simply retrying the call to Consume() doesn't help since repeated calls to ConsumerGroup.Consume() will continue to return ErrNotCoordinatorForConsumer forever. Something like this:

[sarama] 2018/12/04 02:18:48 client/metadata fetching metadata for [repro-topic] from broker localhost:9192
[repro] 2018/12/04 02:18:48 ErrNotCoordinatorForConsumer
[repro] 2018/12/04 02:18:48 calling Consume()
[sarama] 2018/12/04 02:18:48 client/metadata fetching metadata for [repro-topic] from broker localhost:9192
[repro] 2018/12/04 02:18:48 ErrNotCoordinatorForConsumer
[repro] 2018/12/04 02:18:48 calling Consume()

This is obviously not a great state for the consumer to be in & can obviously cause problems for consumers during basic cluster maintenance operations or broker outages/downtime.

We seem to get into this state because we're not attempting coordinator refreshes during calls to ConsumerGroup.newSession(), which is what I'm attempting to address in this PR. With this change applied, I can't reproduce the issue.

Example log output (prior to applying this change)

See this gist for an example of the problem this is trying to fix: https://gist.githubusercontent.com/thomaslee/b70498216fb04f6b02f26c21ef93a046/raw/6ac08ba203f6f86c75c65b9851acb7366c38fc7a/sarama-bug.txt

At the end of this output, the consumer process exits. (Note that the ** output here is from some light edits of vendored Sarama code for diagnostic purposes. Note too the whining about MaxWaitTime being low, which I don't believe is actually a prerequisite to reproducing this bug.)

Possible work-around

I haven't actually tried this myself, but it seems like until this PR is merged folks should be able to simply call Client.RefreshCoordinator(yourGroupHere) when ConsumerGroup.Consume() returns ErrNotCoordinatorForConsumer (perhaps with some backoff logic). Note that this requires users to create their ConsumerGroups using NewConsumerGroupFromClient.

@ghost ghost added the cla-needed label Dec 4, 2018
@thomaslee
Copy link
Contributor Author

Signed the CLA.

@ghost ghost removed the cla-needed label Dec 4, 2018
@ShaneSaww
Copy link

Something to possibly include in this would be that the heartbeatLoop can check for the ErrNotCoordinatorForConsumer and then run RefreshCoordinator to update the coordinator without having to drop out of the consume func.

It would change https://github.com/Shopify/sarama/blob/96e43a884d5ef985c98dc02e5ec6904a2b8b1d1c/consumer_group.go#L664 to look something like

switch resp.Err {
        case ErrNoError:
            retries = s.parent.config.Metadata.Retry.Max
        case ErrRebalanceInProgress, ErrUnknownMemberId, ErrIllegalGeneration:
            return
        case ErrNotCoordinatorForConsumer:
             err = s.parent.client.RefreshCoordinator(s.parent.groupID)
             if err != nil {
                s.parent.handleError(err,"",-1)
                return
            }
        default:
            s.parent.handleError(err, "", -1)
            return
        }

@prune998
Copy link
Contributor

no update ?

@bai bai requested review from varun06 and sam-obeid March 19, 2019 06:01
@sam-obeid
Copy link

This is reasonable and would improve consumer resiliency. @thomaslee Could you add a unit test to cover this use case?

@varun06
Copy link
Contributor

varun06 commented Mar 19, 2019

This is nice, please add a test and we can get it going.

@thomaslee
Copy link
Contributor Author

@sam-obeid @varun06 sure, but what's the best way to test this? consumer_group_test.go is a bit bare. I might be able to get functional_consumer_group_test.go to poke in the vicinity of this code by nudging ZK to trigger partition reassignments & force __consumer_offsets partitions to move around, but it's not guaranteed to hit the bug.

Copy link
Contributor

@varun06 varun06 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, this edge case is bit tricky to test. code looks good to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants