You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
May be I am missing something but this is the behavior I am observing with 0.9.0 reactive-kafka when using consumerWithOffsetSink:
On a consumer rebalance say when you happen to add a new consumer node the old consumer still keeps committing offsets to partition it no longer holds after a rebalance.
The rebalance seem to work ok from brokers point of view - on running ConsumerGroupCommand I see that the partitions are redistributed after I add a new consumer.
After the rebalance the old existing consumer still commits the same last offset it had when it was holding on to a partition which has now moved to the new consumer.
Ideally, you would expect that given the offset did not increase it will filter that partition-offset pair out in its next commit flush interval (as per the below code):
def commitGatheredOffsets(): Unit = {
log.debug("Flushing offsets to commit")
scheduledFlush = None
val offsetMapToFlush = partitionOffsetMap.diff(committedOffsetMap)
if (offsetMapToFlush.nonEmpty)
consumerActor ! CommitOffsets(offsetMapToFlush)
}
However, as you can see from the below logs, even though the committedOffsetMap contains an entry for this stale partition/offset pair, the consumer still keeps committing it:
From the above logs - shouldn't the offset entry for (my_topic-46,3740701) be removed from the partitionOffsetMap before the second flush interval (given we do a diff)?
To add to this additionally, I also see that in spite of kafka consumer "revoking" a partition and no longer returning messages for that partition because the a newer consumer has ownership of them we still try and commit that partition. Reference log:
DEBUG [2016-03-24 08:56:41,492] [] [] [] o.a.k.c.c.i.ConsumerCoordinator - Revoking previously assigned partitions [my_topic-61, my_topic-46, ....]
DEBUG [2016-03-24 08:56:41,492] [] [] [] o.a.k.c.c.i.AbstractCoordinator - (Re-)joining group my-consumer-group
DEBUG [2016-03-24 08:56:41,498] [] [] [] o.a.k.c.c.i.AbstractCoordinator - Joined group: {error_code=0,generation_id=2,group_protocol=range....
DEBUG [2016-03-24 08:56:41,498] [] [] [] o.a.k.c.c.i.ConsumerCoordinator - Performing range assignment for subscriptions {....}
DEBUG [2016-03-24 08:56:41,498] [] [] [] o.a.k.c.c.i.ConsumerCoordinator - Finished assignment: {...}
EBUG [2016-03-24 08:56:41,517] [] [] [] o.a.k.c.c.i.AbstractCoordinator - Received successful sync group response for group my-consumer-group:{error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=212 cap=212]}
DEBUG [2016-03-24 08:56:41,517] [] [] [] o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [my_topic-1, my_topic-2, ....]
DEBUG [2016-03-24 08:56:41,517] [] [] [] o.a.k.c.c.i.ConsumerCoordinator - Fetching committed offsets for partitions: [my_topic-1, my_topic-2, ....]
DEBUG [2016-03-24 08:56:41,518] [] [] [] o.a.k.c.c.i.Fetcher - Resetting offset for partition my_topic-1 to the committed offset 2720732
DEBUG [2016-03-24 08:56:41,518] [] [] [] o.a.k.c.c.i.Fetcher - Resetting offset for partition my_topic-2 to the committed offset 2720433
.....
DEBUG [2016-03-24 08:56:41,652] [] [] [] o.a.k.c.c.i.Fetcher - Ignoring fetched records for partition my_topic-46 since it is no longer fetchable
DEBUG [2016-03-24 08:56:41,899] [] [] [] o.a.k.c.c.i.ConsumerCoordinator - Committed offset 3815926 for partition my_topic-46
DEBUG [2016-03-24 08:56:41,901] [] [] [kafka-events-processor-kafka-publisher-dispatcher-31] c.s.r.k.KafkaActorPublisher akka://kafka-events-processor/user/$d - committed offsets: OffsetMap(Map(my_topic-46 -> 3740701, my_topic-67 -> 3727215,...))
As you can see from the logs above the partitions assignment after the rebalance seems to take place properly. However, the reactive-client still tries to commit to partitions it no longer holds.
The text was updated successfully, but these errors were encountered:
May be I am missing something but this is the behavior I am observing with 0.9.0 reactive-kafka when using
consumerWithOffsetSink
:On a consumer rebalance say when you happen to add a new consumer node the old consumer still keeps committing offsets to partition it no longer holds after a rebalance.
The rebalance seem to work ok from brokers point of view - on running ConsumerGroupCommand I see that the partitions are redistributed after I add a new consumer.
After the rebalance the old existing consumer still commits the same last offset it had when it was holding on to a partition which has now moved to the new consumer.
Ideally, you would expect that given the offset did not increase it will filter that partition-offset pair out in its next commit flush interval (as per the below code):
However, as you can see from the below logs, even though the
committedOffsetMap
contains an entry for this stale partition/offset pair, the consumer still keeps committing it:From the above logs - shouldn't the offset entry for (my_topic-46,3740701) be removed from the
partitionOffsetMap
before the second flush interval (given we do adiff
)?To add to this additionally, I also see that in spite of kafka consumer "revoking" a partition and no longer returning messages for that partition because the a newer consumer has ownership of them we still try and commit that partition. Reference log:
As you can see from the logs above the partitions assignment after the rebalance seems to take place properly. However, the reactive-client still tries to commit to partitions it no longer holds.
The text was updated successfully, but these errors were encountered: