Skip to content

Commit

Permalink
Fix test for stopConsumption with aggregateAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvanoosten committed Nov 29, 2023
1 parent 49833d0 commit b06f0b6
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 16 deletions.
41 changes: 25 additions & 16 deletions zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -322,29 +322,38 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.provideSomeLayer[Kafka](consumer(client, Some(group)))
} yield assert(offset.map(_.offset))(isSome(equalTo(9L)))
},
test("process outstanding commits after a graceful shutdown with aggregateAsync") {
test("process outstanding commits after a graceful shutdown with aggregateAsync using `maxRebalanceDuration`") {
val kvs = (1 to 100).toList.map(i => (s"key$i", s"msg$i"))
val topic = "test-outstanding-commits"
for {
group <- randomGroup
client <- randomClient
_ <- produceMany(topic, kvs)
messagesReceived <- Ref.make[Int](0)
offset <- (Consumer
.plainStream(Subscription.topics(topic), Serde.string, Serde.string)
.mapConcatZIO { record =>
for {
nr <- messagesReceived.updateAndGet(_ + 1)
_ <- Consumer.stopConsumption.when(nr == 10)
} yield if (nr < 10) Seq(record.offset) else Seq.empty
}
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit)
.runDrain *>
Consumer.committed(Set(new TopicPartition(topic, 0))).map(_.values.head))
.provideSomeLayer[Kafka](consumer(client, Some(group), commitTimeout = 5.seconds))
} yield assert(offset.map(_.offset))(isSome(equalTo(9L)))
} @@ TestAspect.nonFlaky(10),
offset <- (
Consumer
.plainStream(Subscription.topics(topic), Serde.string, Serde.string)
.mapConcatZIO { record =>
for {
nr <- messagesReceived.updateAndGet(_ + 1)
_ <- Consumer.stopConsumption.when(nr == 10)
} yield if (nr < 10) Seq(record.offset) else Seq.empty
}
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit)
.runDrain *>
Consumer.committed(Set(new TopicPartition(topic, 0))).map(_.values.head)
)
.provideSomeLayer[Kafka](
consumer(
client, Some(group), commitTimeout = 4.seconds,
rebalanceSafeCommits = true, maxRebalanceDuration = 6.seconds
)
)
} yield {
assertTrue(offset.map(_.offset).contains(9L))
}
} @@ TestAspect.nonFlaky(5),
test("a consumer timeout interrupts the stream and shuts down the consumer") {
// Setup of this test:
// - Set the max poll interval very low: a couple of seconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ object KafkaTestUtils {
offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(),
restartStreamOnRebalancing: Boolean = false,
rebalanceSafeCommits: Boolean = false,
maxRebalanceDuration: Duration = 3.minutes,
maxPollInterval: Duration = 5.minutes,
`max.poll.records`: Int = 100, // settings this higher can cause concurrency bugs to go unnoticed
commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout,
Expand All @@ -140,6 +141,7 @@ object KafkaTestUtils {
.withOffsetRetrieval(offsetRetrieval)
.withRestartStreamOnRebalancing(restartStreamOnRebalancing)
.withRebalanceSafeCommits(rebalanceSafeCommits)
.withMaxRebalanceDuration(maxRebalanceDuration)
.withProperties(properties)

val withClientInstanceId = clientInstanceId.fold(settings)(settings.withGroupInstanceId)
Expand Down Expand Up @@ -207,6 +209,7 @@ object KafkaTestUtils {
diagnostics: Diagnostics = Diagnostics.NoOp,
restartStreamOnRebalancing: Boolean = false,
rebalanceSafeCommits: Boolean = false,
maxRebalanceDuration: Duration = 3.minutes,
commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout,
properties: Map[String, String] = Map.empty
): ZLayer[Kafka, Throwable, Consumer] =
Expand All @@ -219,6 +222,7 @@ object KafkaTestUtils {
offsetRetrieval = offsetRetrieval,
restartStreamOnRebalancing = restartStreamOnRebalancing,
rebalanceSafeCommits = rebalanceSafeCommits,
maxRebalanceDuration = maxRebalanceDuration,
properties = properties,
commitTimeout = commitTimeout
)
Expand Down

0 comments on commit b06f0b6

Please sign in to comment.