Skip to content

Commit

Permalink
Use Promises to improve flaky tests
Browse files Browse the repository at this point in the history
  • Loading branch information
AdrielC committed Nov 22, 2024
1 parent ee4bff8 commit 87e0a5e
Showing 1 changed file with 66 additions and 41 deletions.
107 changes: 66 additions & 41 deletions zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
}.tap { case (_, idx) => ZIO.logDebug(s"Consumed $idx") }
}
.runDrain
.tap(_ => ZIO.logDebug("Stream completed"))
.zipLeft(ZIO.logDebug("Stream completed"))
.provideSomeLayer[Kafka](
consumer(client, Some(group))
)
Expand Down Expand Up @@ -530,18 +530,29 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {

// Consume messages
subscription = Subscription.topics(topic)
assignedPartitionsRef <- Ref.make(Set.empty[Int]) // Set of partition numbers
// Create a Promise to signal when consumer1 has processed half the partitions
consumer1Ready <- Promise.make[Nothing, Unit]
consumer1 <- Consumer
.partitionedStream(subscription, Serde.string, Serde.string)
.flatMapPar(nrPartitions) { case (tp, partition) =>
ZStream
.fromZIO(partition.runDrain)
.fromZIO(
consumer1Ready
.succeed(())
.whenZIO(
assignedPartitionsRef
.updateAndGet(_ + tp.partition())
.map(_.size >= (nrPartitions / 2))) *>
partition.runDrain
)
.as(tp)
}
.take(nrPartitions.toLong / 2)
.runDrain
.provideSomeLayer[Kafka](consumer(client1, Some(group)))
.fork
_ <- Live.live(ZIO.sleep(5.seconds))
_ <- consumer1Ready.await
consumer2 <- Consumer
.partitionedStream(subscription, Serde.string, Serde.string)
.take(nrPartitions.toLong / 2)
Expand Down Expand Up @@ -569,37 +580,44 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {

_ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic, partitions = nrPartitions))
_ <- ZIO.foreachDiscard(1 to nrMessages) { i =>
produceMany(topic, partition = i % nrPartitions, kvs = List(s"key$i" -> s"msg$i"))
}
produceMany(topic, partition = i % nrPartitions, kvs = List(s"key$i" -> s"msg$i"))
}

// Consume messages
subscription = Subscription.topics(topic)
consumer1Ready <- Promise.make[Nothing, Unit]
assignedPartitionsRef <- Ref.make(Set.empty[Int]) // Set of partition numbers
consumer1 <- Consumer
.partitionedStream(subscription, Serde.string, Serde.string)
.flatMapPar(nrPartitions) { case (tp, partition) =>
ZStream
.fromZIO(partition.runDrain)
.as(tp)
}
.take(nrPartitions.toLong / 2)
.runDrain
.provideSomeLayer[Kafka](
consumer(client1, Some(group), diagnostics = diagnostics)
)
.fork
.partitionedStream(subscription, Serde.string, Serde.string)
.flatMapPar(nrPartitions) { case (tp, partition) =>
ZStream
.fromZIO(
consumer1Ready
.succeed(())
.whenZIO(assignedPartitionsRef
.updateAndGet(_ + tp.partition())
.map(_.size >= (nrPartitions / 2))) *>
partition.runDrain)
.as(tp)
}
.take(nrPartitions.toLong / 2)
.runDrain
.provideSomeLayer[Kafka](
consumer(client1, Some(group), diagnostics = diagnostics)
)
.fork
diagnosticStream <- ZStream
.fromQueue(diagnostics.queue)
.collect { case rebalance: DiagnosticEvent.Rebalance => rebalance }
.runCollect
.fork
_ <- ZIO.sleep(5.seconds)
.fromQueue(diagnostics.queue)
.collect { case rebalance: DiagnosticEvent.Rebalance => rebalance }
.runCollect
.fork
_ <- consumer1Ready.await
consumer2 <- Consumer
.partitionedStream(subscription, Serde.string, Serde.string)
.take(nrPartitions.toLong / 2)
.runDrain
.provideSomeLayer[Kafka](consumer(client2, Some(group)))
.fork
_ <- consumer1.join
.partitionedStream(subscription, Serde.string, Serde.string)
.take(nrPartitions.toLong / 2)
.runDrain
.provideSomeLayer[Kafka](consumer(client2, Some(group)))
.fork
_ <- consumer1.join
_ <- consumer2.join
} yield diagnosticStream.join
Expand Down Expand Up @@ -1480,7 +1498,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
test(
"it's possible to start a new consumption session from a Consumer that had a consumption session stopped previously"
) {
val numberOfMessages: Int = 100000
val numberOfMessages: Int = 100000
val messagesToConsumeBeforeStop = 1000 // Adjust this threshold as needed
val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i"))

def test(diagnostics: Diagnostics): ZIO[Producer & Scope & Kafka, Throwable, TestResult] =
Expand All @@ -1490,22 +1509,28 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
settings <- consumerSettings(clientId = clientId)
consumer <- Consumer.make(settings, diagnostics = diagnostics)
_ <- produceMany(topic, kvs)
// Create a Ref to track messages consumed and a Promise to signal when to stop consumption
messagesConsumedRef <- Ref.make(0)
stopPromise <- Promise.make[Nothing, Unit]
// Starting a consumption session to start the Runloop.
fiber <-
consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.tap(_ => ZIO.sleep(1.millisecond)) // sleep to avoid consuming all messages in under 200 millis
.take(numberOfMessages.toLong)
.runCount
.forkScoped
_ <- ZIO.sleep(200.millis)
_ <- consumer.stopConsumption
fiber <- consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.mapZIO { _ =>
messagesConsumedRef.updateAndGet(_ + 1).flatMap { count =>
if (count >= messagesToConsumeBeforeStop) stopPromise.succeed(()).as(1L)
else ZIO.succeed(1L)
}
}
.take(numberOfMessages.toLong)
.runSum
.forkScoped

// Wait for the consumption to reach the desired threshold
_ <- stopPromise.await
_ <- consumer.stopConsumption
consumed0 <- fiber.join
_ <- ZIO.logDebug(s"consumed0: $consumed0")

_ <- ZIO.logDebug("About to sleep 5 seconds")
_ <- ZIO.sleep(5.seconds)
_ <- ZIO.logDebug("Slept 5 seconds")
consumed1 <- consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.take(numberOfMessages.toLong)
Expand Down

0 comments on commit 87e0a5e

Please sign in to comment.