Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry ErrNotCoordinatorForConsumer in ConsumerGroup.newSession #1231

Merged
merged 2 commits into from
Apr 3, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 42 additions & 22 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,8 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co
return err
}

// Get coordinator
coordinator, err := c.client.Coordinator(c.groupID)
if err != nil {
return err
}

// Init session
sess, err := c.newSession(ctx, coordinator, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
if err == ErrClosedClient {
return ErrClosedConsumerGroup
} else if err != nil {
Expand All @@ -183,7 +177,33 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co
return sess.release(true)
}

func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) {
select {
case <-c.closed:
return nil, ErrClosedConsumerGroup
case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
}

if refreshCoordinator {
err := c.client.RefreshCoordinator(c.groupID)
if err != nil {
return c.retryNewSession(ctx, topics, handler, retries, true)
}
}

return c.newSession(ctx, topics, handler, retries-1)
}

func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
coordinator, err := c.client.Coordinator(c.groupID)
if err != nil {
if retries <= 0 {
return nil, err
}

return c.retryNewSession(ctx, topics, handler, retries, true)
}

// Join consumer group
join, err := c.joinGroupRequest(coordinator, topics)
if err != nil {
Expand All @@ -195,19 +215,19 @@ func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, top
c.memberID = join.MemberId
case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
c.memberID = ""
return c.newSession(ctx, coordinator, topics, handler, retries)
case ErrRebalanceInProgress: // retry after backoff
return c.newSession(ctx, topics, handler, retries)
case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
if retries <= 0 {
return nil, join.Err
}

select {
case <-c.closed:
return nil, ErrClosedConsumerGroup
case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrRebalanceInProgress: // retry after backoff
if retries <= 0 {
return nil, join.Err
}

return c.newSession(ctx, coordinator, topics, handler, retries-1)
return c.retryNewSession(ctx, topics, handler, retries, false)
default:
return nil, join.Err
}
Expand Down Expand Up @@ -236,19 +256,19 @@ func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, top
case ErrNoError:
case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
c.memberID = ""
return c.newSession(ctx, coordinator, topics, handler, retries)
case ErrRebalanceInProgress: // retry after backoff
return c.newSession(ctx, topics, handler, retries)
case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
if retries <= 0 {
return nil, sync.Err
}

select {
case <-c.closed:
return nil, ErrClosedConsumerGroup
case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrRebalanceInProgress: // retry after backoff
if retries <= 0 {
return nil, sync.Err
}

return c.newSession(ctx, coordinator, topics, handler, retries-1)
return c.retryNewSession(ctx, topics, handler, retries, false)
default:
return nil, sync.Err
}
Expand Down