Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always end streams in rebalance listener, support lost partitions #1089

Merged
merged 14 commits into from
Nov 4, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import sbt.Def
lazy val kafkaVersion = "3.6.0"
lazy val embeddedKafkaVersion = "3.6.0" // Should be the same as kafkaVersion, except for the patch part

lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion
lazy val scalaCollectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0"
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.3.11"
lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion
lazy val scalaCollectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0"
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.3.11"

enablePlugins(ZioSbtEcosystemPlugin, ZioSbtCiPlugin)

Expand Down
200 changes: 100 additions & 100 deletions zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -738,8 +738,30 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
} yield assertCompletes
},
test("restartStreamsOnRebalancing mode closes all partition streams") {
// Test plan:
// - Throughout the test, continuously produce to all partitions of a topic.
// - Start consumer 1:
// - track which partitions are assigned after each rebalance,
// - track which streams stopped.
// - Start consumer 2 but finish after just a few records. This results in 2 rebalances for consumer 1.
// - Verify that in the first rebalance, consumer 1 ends the streams for _all_ partitions,
// and then starts them again.
//
// NOTE: we need to use the cooperative sticky assignor. The default assignor `ConsumerPartitionAssignor`,
// revokes all partitions and re-assigns them on every rebalance. This means that all streams are restarted
// on every rebalance, exactly what `restartStreamOnRebalancing` would have caused. In other words, with the
// default assignor the externally visible behavior is the same, regardless of whether
// `restartStreamOnRebalancing` is `true` or `false`.

val nrPartitions = 5
val nrMessages = 100
val partitionIds = Chunk.fromIterable(0 until nrPartitions)

def awaitRebalance[A](partitionAssignments: Ref[Chunk[A]], nr: Int): ZIO[Any, Nothing, Unit] =
partitionAssignments.get
.repeat(
Schedule.recurUntil((_: Chunk[A]).size >= nr) && Schedule.fixed(100.millis)
)
.unit

for {
// Produce messages on several partitions
Expand All @@ -750,116 +772,94 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
client2 <- randomClient

_ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic, partitions = nrPartitions))
_ <- ZIO.foreachDiscard(1 to nrMessages) { i =>
produceMany(topic, partition = i % nrPartitions, kvs = List(s"key$i" -> s"msg$i"))
}

// Consume messages
messagesReceived <-
ZIO.foreach((0 until nrPartitions).toList)(i => Ref.make[Int](0).map(i -> _)).map(_.toMap)
drainCount <- Ref.make(0)
subscription = Subscription.topics(topic)
fib <- ZIO
.logAnnotate("consumer", "1") {
Consumer
.partitionedAssignmentStream(subscription, Serde.string, Serde.string)
.rechunk(1)
.mapZIO { partitions =>
ZIO.logDebug(s"Got partition assignment ${partitions.map(_._1).mkString(",")}") *>
ZStream
.fromIterable(partitions)
.flatMapPar(Int.MaxValue) { case (tp, partitionStream) =>
ZStream.finalizer(ZIO.logDebug(s"TP ${tp.toString} finalizer")) *>
partitionStream.mapChunksZIO { records =>
OffsetBatch(records.map(_.offset)).commit *> messagesReceived(tp.partition)
.update(_ + records.size)
.as(records)
}
}
.runDrain
}
.mapZIO(_ =>
drainCount.updateAndGet(_ + 1).flatMap {
case 2 => ZIO.logDebug("Stopping consumption") *> Consumer.stopConsumption
// 1: when consumer on fib2 starts
// 2: when consumer on fib2 stops, end of test
case _ => ZIO.unit
}
)
.runDrain
.provideSomeLayer[Kafka](
consumer(
client1,
Some(group),
clientInstanceId = Some("consumer1"),
restartStreamOnRebalancing = true,
properties = Map(ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "10")
)
)
// Continuously produce messages throughout the test
_ <- ZStream
.fromSchedule(Schedule.fixed(100.millis))
.mapZIO { i =>
ZIO.foreach(partitionIds) { p =>
produceMany(topic, p, Seq((s"key.$p.$i", s"msg.$p.$i")))
}
.fork
// fib is running, consuming all the published messages from all partitions.
// Waiting until it recorded all messages
_ <- ZIO
.foreach(messagesReceived.values)(_.get)
.map(_.sum)
.repeat(Schedule.recurUntil((n: Int) => n == nrMessages) && Schedule.fixed(100.millis))

// Starting a new consumer that will stop after receiving 20 messages,
// causing two rebalancing events for fib1 consumers on start and stop
fib2 <- ZIO
.logAnnotate("consumer", "2") {
}
.runDrain
.forkScoped

// Consumer 1
streamsStarted <- Ref.make[Chunk[Set[Int]]](Chunk.empty)
streamsStopped <- Ref.make[Chunk[Int]](Chunk.empty)
consumer1Settings <-
consumerSettings(
client1,
Some(group),
restartStreamOnRebalancing = true
).map {
_.withProperties(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[CooperativeStickyAssignor].getName
)
}
fib1 <- ZIO
.logAnnotate("consumer", "1") {
Consumer
.plainStream(subscription, Serde.string, Serde.string)
.take(20)
.partitionedAssignmentStream(Subscription.topics(topic), Serde.string, Serde.string)
.rechunk(1)
.mapZIO { assignments =>
ZIO.logDebug(s"Got partition assignment ${assignments.map(_._1).mkString(",")}") *>
streamsStarted.update(_ :+ assignments.map(_._1.partition()).toSet) *>
ZStream
.fromIterable(assignments)
.flatMapPar(Int.MaxValue) { case (tp, partitionStream) =>
ZStream.finalizer {
ZIO.logDebug(s"Stream for ${tp.toString} is done") *>
streamsStopped.update(_ :+ tp.partition())
} *>
partitionStream.mapChunksZIO { records =>
OffsetBatch(records.map(_.offset)).commit.as(records)
}
}
.runDrain
}
.runDrain
.provideSomeLayer[Kafka](
consumer(
client2,
Some(group),
clientInstanceId = Some("consumer2"),
properties = Map(ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "10")
)
ZLayer.succeed(consumer1Settings) >>> minimalConsumer()
)
}
.fork

// Waiting until fib1's partition streams got restarted because of the rebalancing
_ <- drainCount.get.repeat(Schedule.recurUntil((n: Int) => n == 1) && Schedule.fixed(100.millis))
_ <- ZIO.logDebug("Consumer 1 finished rebalancing")
// Wait until consumer 1 was assigned some partitions
_ <- awaitRebalance(streamsStarted, 1)

// All messages processed, the partition streams of fib are still running.
// Saving the values and resetting the counters
messagesReceived0 <-
ZIO
.foreach((0 until nrPartitions).toList) { i =>
messagesReceived(i).get.flatMap { v =>
Ref.make(v).map(r => i -> r)
} <* messagesReceived(i).set(0)
}
.map(_.toMap)
// Consumer 2
// Stop after receiving 20 messages, causing two rebalancing events for consumer 1.
consumer2Settings <- consumerSettings(client2, Some(group))
_ <- ZIO
.logAnnotate("consumer", "2") {
Consumer
.plainStream(Subscription.topics(topic), Serde.string, Serde.string)
.take(20)
.runDrain
.provideSomeLayer[Kafka](
ZLayer.succeed(consumer2Settings) >>> minimalConsumer()
)
}
.forkScoped

// Publishing another N messages - now they will be distributed among the two consumers until
// fib2 stops after 20 messages
_ <- ZIO.foreachDiscard((nrMessages + 1) to (2 * nrMessages)) { i =>
produceMany(topic, partition = i % nrPartitions, kvs = List(s"key$i" -> s"msg$i"))
}
_ <- fib2.join
_ <- ZIO.logDebug("Consumer 2 done")
_ <- fib.join
_ <- ZIO.logDebug("Consumer 1 done")
// fib2 terminates after 20 messages, fib terminates after fib2 because of the rebalancing (drainCount==2)
messagesPerPartition0 <-
ZIO.foreach(messagesReceived0.values)(_.get) // counts from the first N messages (single consumer)
messagesPerPartition <-
ZIO.foreach(messagesReceived.values)(_.get) // counts from fib after the second consumer joined

// The first set must contain all the produced messages
// The second set must have at least one and maximum N-20 (because fib2 stops after consuming 20) -
// the exact count cannot be known because fib2's termination triggers fib1's rebalancing asynchronously.
} yield assert(messagesPerPartition0)(forall(equalTo(nrMessages / nrPartitions))) &&
assert(messagesPerPartition.view.sum)(isGreaterThan(0) && isLessThanEqualTo(nrMessages - 20))
} @@ TestAspect.nonFlaky(3),
// Wait until consumer 1's partitions were revoked, and assigned again
_ <- awaitRebalance(streamsStarted, 3)
_ <- fib1.interrupt

// The started streams after each rebalance
streamsStarted <- streamsStarted.get
_ <- ZIO.logDebug(s"partitions for started streams: $streamsStarted")

streamsStopped <- streamsStopped.get
_ <- ZIO.logDebug(s"partitions for stopped streams: $streamsStopped")
} yield assertTrue(
// During the first rebalance, all partitions are stopped:
streamsStopped.take(nrPartitions).toSet == partitionIds.toSet,
// Some streams that were assigned at the beginning, are started after the first rebalance:
(streamsStarted(0) intersect streamsStarted(1)).nonEmpty
)
},
test("handles RebalanceInProgressExceptions transparently") {
val nrPartitions = 5
val nrMessages = 10000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ final class PartitionStreamControl private (
queueInfoRef.get.map(_.deadlineExceeded(now))

/** To be invoked when the partition was lost. */
private[internal] def lost: UIO[Boolean] =
interruptionPromise.fail(new RuntimeException(s"Partition ${tp.toString} was lost"))
private[internal] def lost: UIO[Boolean] = {
val lostException = new RuntimeException(s"Partition ${tp.toString} was lost") with NoStackTrace
interruptionPromise.fail(lostException)
}

/** To be invoked when the stream is no longer processing. */
private[internal] def halt: UIO[Boolean] = {
Expand Down
Loading