Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual commits are not handled appropriately after a consumer rebalance #118

Closed
saurabhdaftary opened this issue Mar 24, 2016 · 1 comment

Comments

@saurabhdaftary
Copy link

May be I am missing something but this is the behavior I am observing with 0.9.0 reactive-kafka when using consumerWithOffsetSink:

On a consumer rebalance say when you happen to add a new consumer node the old consumer still keeps committing offsets to partition it no longer holds after a rebalance.

The rebalance seem to work ok from brokers point of view - on running ConsumerGroupCommand I see that the partitions are redistributed after I add a new consumer.

After the rebalance the old existing consumer still commits the same last offset it had when it was holding on to a partition which has now moved to the new consumer.

Ideally, you would expect that given the offset did not increase it will filter that partition-offset pair out in its next commit flush interval (as per the below code):

 def commitGatheredOffsets(): Unit = {
    log.debug("Flushing offsets to commit")
    scheduledFlush = None
    val offsetMapToFlush = partitionOffsetMap.diff(committedOffsetMap)
    if (offsetMapToFlush.nonEmpty)
      consumerActor ! CommitOffsets(offsetMapToFlush)
  }

However, as you can see from the below logs, even though the committedOffsetMap contains an entry for this stale partition/offset pair, the consumer still keeps committing it:

DEBUG [2016-03-24 00:33:25,343] [] [] [kafka-events-processor-akka.actor.default-dispatcher-120] c.s.r.k.c.ConsumerCommitter akka://kafka-events-processor/user/$o - Flushing offsets to commit
DEBUG [2016-03-24 00:33:25,344] [] [] [] o.a.k.c.c.i.ConsumerCoordinator   - Committed offset 3740702 for partition my_topic-46
DEBUG [2016-03-24 00:33:25,336] [] [] [kafka-events-processor-kafka-publisher-dispatcher-109] c.s.r.k.KafkaActorPublisher akka://kafka-events-processor/user/$l - committed offsets: OffsetMap(Map(my_topic-46 -> 3740701, my_topic-67 -> 3727212,...))
TRACE [2016-03-24 00:33:28,333] [] [] [] o.a.k.c.c.i.ConsumerCoordinator  - Sending offset-commit request with {my_topic-71=OffsetAndMetadata{offset=2778977, metadata=''}, my_topic-55=OffsetAndMetadata{offset=3734666, metadata=''}, ...} to Node(1440332752, 172.22.5.202, 9092)
DEBUG [2016-03-24 00:33:28,354] [] [] [kafka-events-processor-akka.actor.default-dispatcher-120] c.s.r.k.c.ConsumerCommitter akka://kafka-events-processor/user/$o - Flushing offsets to commit
DEBUG [2016-03-24 00:33:28,354] [] [] [] o.a.k.c.c.i.ConsumerCoordinator  - Committed offset 3740702 for partition my_topic-46
DEBUG [2016-03-24 00:33:28,355] [] [] [kafka-events-processor-kafka-publisher-dispatcher-103] c.s.r.k.KafkaActorPublisher akka://kafka-events-processor/user/$l - committed offsets: OffsetMap(Map(my_topic-46 -> 3740701, my_topic-67 -> 3727215,...))

From the above logs - shouldn't the offset entry for (my_topic-46,3740701) be removed from the partitionOffsetMap before the second flush interval (given we do a diff)?

To add to this additionally, I also see that in spite of kafka consumer "revoking" a partition and no longer returning messages for that partition because the a newer consumer has ownership of them we still try and commit that partition. Reference log:

DEBUG [2016-03-24 08:56:41,492] [] [] [] o.a.k.c.c.i.ConsumerCoordinator  - Revoking previously assigned partitions [my_topic-61, my_topic-46, ....]
DEBUG [2016-03-24 08:56:41,492] [] [] [] o.a.k.c.c.i.AbstractCoordinator  - (Re-)joining group my-consumer-group
DEBUG [2016-03-24 08:56:41,498] [] [] [] o.a.k.c.c.i.AbstractCoordinator  - Joined group: {error_code=0,generation_id=2,group_protocol=range....
DEBUG [2016-03-24 08:56:41,498] [] [] [] o.a.k.c.c.i.ConsumerCoordinator  - Performing range assignment for subscriptions {....}
DEBUG [2016-03-24 08:56:41,498] [] [] [] o.a.k.c.c.i.ConsumerCoordinator  - Finished assignment: {...}
EBUG [2016-03-24 08:56:41,517] [] [] [] o.a.k.c.c.i.AbstractCoordinator  - Received successful sync group response for group my-consumer-group:{error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=212 cap=212]}
DEBUG [2016-03-24 08:56:41,517] [] [] [] o.a.k.c.c.i.ConsumerCoordinator  - Setting newly assigned partitions [my_topic-1, my_topic-2, ....]
DEBUG [2016-03-24 08:56:41,517] [] [] [] o.a.k.c.c.i.ConsumerCoordinator  - Fetching committed offsets for partitions: [my_topic-1, my_topic-2, ....]
DEBUG [2016-03-24 08:56:41,518] [] [] [] o.a.k.c.c.i.Fetcher  - Resetting offset for partition my_topic-1 to the committed offset 2720732
DEBUG [2016-03-24 08:56:41,518] [] [] [] o.a.k.c.c.i.Fetcher  - Resetting offset for partition my_topic-2 to the committed offset 2720433
.....
DEBUG [2016-03-24 08:56:41,652] [] [] [] o.a.k.c.c.i.Fetcher  - Ignoring fetched records for partition my_topic-46 since it is no longer fetchable
DEBUG [2016-03-24 08:56:41,899] [] [] [] o.a.k.c.c.i.ConsumerCoordinator  - Committed offset 3815926 for partition my_topic-46
DEBUG [2016-03-24 08:56:41,901] [] [] [kafka-events-processor-kafka-publisher-dispatcher-31] c.s.r.k.KafkaActorPublisher akka://kafka-events-processor/user/$d - committed offsets: OffsetMap(Map(my_topic-46 -> 3740701, my_topic-67 -> 3727215,...))

As you can see from the logs above the partitions assignment after the rebalance seems to take place properly. However, the reactive-client still tries to commit to partitions it no longer holds.

@kciesielski
Copy link
Contributor

Fixed in 0.10.1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants