Skip to content

Commit

Permalink
Always end streams in rebalance listener, support lost partitions
Browse files Browse the repository at this point in the history
Previously, streams would only be ended in the rebalance listener for revoked partitions. Now, they are ended there even when `restartStreamsOnRebalancing` is used.

Lost partitions are no longer treated as being revoked. With this change, streams of lost partitions are interrupted. Interrupting them prevents these streams from processing and committing more data.

A nice side effect is that Zio-kafka is now faster when the rebalance listener was _not_ called; the 'fast track'.

The main reason for this change is to prepare awaiting commits from within the rebalance listener which will prevent duplicate consuming of records (see #830).

Also: fix test `restartStreamsOnRebalancing mode closes all partition streams` so that it detects rebalances properly on fast computers.
  • Loading branch information
erikvanoosten committed Oct 28, 2023
1 parent 35fdcc9 commit 074ab47
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 147 deletions.
28 changes: 17 additions & 11 deletions zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
test("restartStreamsOnRebalancing mode closes all partition streams") {
val nrPartitions = 5
val nrMessages = 100
val partitionIds = (0 until nrPartitions).toList

for {
// Produce messages on several partitions
Expand All @@ -749,9 +750,9 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
}

// Consume messages
messagesReceived <-
ZIO.foreach((0 until nrPartitions).toList)(i => Ref.make[Int](0).map(i -> _)).map(_.toMap)
drainCount <- Ref.make(0)
// A map with partition as key, and a messages-received-counter Ref as value:
messagesReceived <- ZIO.foreach(partitionIds)(i => Ref.make[Int](0).map(i -> _)).map(_.toMap)
drainCount <- Ref.make(0)
subscription = Subscription.topics(topic)
fib <- ZIO
.logAnnotate("consumer", "1") {
Expand All @@ -765,9 +766,10 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.flatMapPar(Int.MaxValue) { case (tp, partitionStream) =>
ZStream.finalizer(ZIO.logDebug(s"TP ${tp.toString} finalizer")) *>
partitionStream.mapChunksZIO { records =>
OffsetBatch(records.map(_.offset)).commit *> messagesReceived(tp.partition)
.update(_ + records.size)
.as(records)
for {
_ <- OffsetBatch(records.map(_.offset)).commit
_ <- messagesReceived(tp.partition).update(_ + records.size)
} yield records
}
}
.runDrain
Expand Down Expand Up @@ -797,6 +799,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
_ <- ZIO
.foreach(messagesReceived.values)(_.get)
.map(_.sum)
.debug("Messages received by Fib1: ")
.repeat(Schedule.recurUntil((n: Int) => n == nrMessages) && Schedule.fixed(100.millis))

// Starting a new consumer that will stop after receiving 20 messages,
Expand All @@ -819,17 +822,20 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.fork

// Waiting until fib1's partition streams got restarted because of the rebalancing
_ <- drainCount.get.repeat(Schedule.recurUntil((n: Int) => n == 1) && Schedule.fixed(100.millis))
// Note: on fast computers we may not never see `drainCount == 1` but `2` immediately, therefore
// we need to check for `drainCount >= 1`.
_ <- drainCount.get.repeat(Schedule.recurUntil((_: Int) >= 1) && Schedule.fixed(100.millis))
_ <- ZIO.logDebug("Consumer 1 finished rebalancing")

// All messages processed, the partition streams of fib are still running.
// Saving the values and resetting the counters
messagesReceived0 <-
ZIO
.foreach((0 until nrPartitions).toList) { i =>
messagesReceived(i).get.flatMap { v =>
Ref.make(v).map(r => i -> r)
} <* messagesReceived(i).set(0)
.foreach(partitionIds) { i =>
for {
v <- messagesReceived(i).getAndSet(0)
p <- Ref.make(v).map(i -> _)
} yield p
}
.map(_.toMap)

Expand Down
Loading

0 comments on commit 074ab47

Please sign in to comment.