From 8941a61f1ddd80ec1089179b94f83f6752ab4ef9 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Wed, 13 Nov 2024 11:46:40 +0100 Subject: [PATCH 1/5] Use same amount of messages for all consumer benchmarks --- .../scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala index 67a68b193..8640adf6d 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala @@ -21,7 +21,7 @@ trait ComparisonBenchmark extends ZioBenchmark[Env] { protected final val nrPartitions: Int = 6 protected final val topicPartitions: List[TopicPartition] = (0 until nrPartitions).map(TopicPartition(topic1, _)).toList - protected final val numberOfMessages: Int = 1000000 + protected final val numberOfMessages: Int = 50000 protected final val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key$i", s"msg$i")) private val javaKafkaConsumer: ZLayer[ConsumerSettings, Throwable, LowLevelKafka] = From dad8e34694db091f9a2bc0328347fe94306faa20 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Thu, 14 Nov 2024 18:05:37 +0100 Subject: [PATCH 2/5] Let's try 10000 messages --- .../src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala | 2 +- .../scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala index 2695b05cb..4d893295d 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala @@ -19,7 +19,7 @@ import java.util.concurrent.TimeUnit class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] { val topic1 = "topic1" val nrPartitions = 6 - val nrMessages = 50000 + val nrMessages = 100000 val kvs: List[(String, String)] = List.tabulate(nrMessages)(i => (s"key$i", s"msg$i")) override protected def bootstrap: ZLayer[Any, Nothing, Kafka with Producer] = diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala index 8640adf6d..682242cce 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala @@ -21,7 +21,7 @@ trait ComparisonBenchmark extends ZioBenchmark[Env] { protected final val nrPartitions: Int = 6 protected final val topicPartitions: List[TopicPartition] = (0 until nrPartitions).map(TopicPartition(topic1, _)).toList - protected final val numberOfMessages: Int = 50000 + protected final val numberOfMessages: Int = 100000 protected final val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key$i", s"msg$i")) private val javaKafkaConsumer: ZLayer[ConsumerSettings, Throwable, LowLevelKafka] = From 00f8855f6ae49505573c25f6ff17691bea8cdeb7 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sat, 16 Nov 2024 11:30:45 +0100 Subject: [PATCH 3/5] Increase record size to 1KB Also: - extract common test parameters --- .../zio/kafka/bench/comparison => }/README.md | 12 +++++++++++- .../main/scala/zio/kafka/bench/ZioBenchmark.scala | 15 +++++++++++++++ ...mark.scala => ZioKafkaConsumerBenchmark.scala} | 12 ++++-------- ...mark.scala => ZioKafkaProducerBenchmark.scala} | 8 ++------ .../bench/comparison/ComparisonBenchmark.scala | 12 ++++-------- .../bench/comparison/KafkaClientBenchmarks.scala | 8 ++++---- .../bench/comparison/ZioKafkaBenchmarks.scala | 8 ++++---- 7 files changed, 44 insertions(+), 31 deletions(-) rename zio-kafka-bench/{src/main/scala/zio/kafka/bench/comparison => }/README.md (71%) rename zio-kafka-bench/src/main/scala/zio/kafka/bench/{ConsumerBenchmark.scala => ZioKafkaConsumerBenchmark.scala} (85%) rename zio-kafka-bench/src/main/scala/zio/kafka/bench/{ProducerBenchmark.scala => ZioKafkaProducerBenchmark.scala} (87%) diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/README.md b/zio-kafka-bench/README.md similarity index 71% rename from zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/README.md rename to zio-kafka-bench/README.md index d7d1baa15..23be96156 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/README.md +++ b/zio-kafka-bench/README.md @@ -1,6 +1,16 @@ # Comparison Benchmarks -## How to run them +## Results + +The benchmark are run from a GitHub action on every commit. The results are published on https://zio.github.io/zio-kafka/dev/bench/. + +The results are automatically pruned by [a scala script](https://github.com/zio/zio-kafka/blob/gh-pages/scripts/prune-benchmark-history.sc) on the `gh-pages` branch. + +## Interpreting the benchmarks + +To do! + +## How to run the benchmarks To run these "comparison" benchmarks, in a sbt console, run: ```scala diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioBenchmark.scala index 68880018e..0bc57f4b1 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioBenchmark.scala @@ -5,6 +5,21 @@ import zio.{ ZLayer, _ } import java.util.UUID +trait ConsumerZioBenchmark[Environment] extends ZioBenchmark[Environment] { + protected val messageCount: Int = 100000 + protected def messageData(i: Int): String = i.toString + scala.util.Random.alphanumeric.take(1024).mkString + protected val kvs: Iterable[(String, String)] = Iterable.tabulate(messageCount)(i => (s"key$i", messageData(i))) + protected val topic1 = "topic1" + protected val partitionCount = 6 +} + +trait ProducerZioBenchmark[Environment] extends ZioBenchmark[Environment] { + protected val messageCount = 500 + protected val kvs: List[(String, String)] = List.tabulate(messageCount)(i => (s"key$i", s"msg$i")) + protected val topic1 = "topic1" + protected val partitionCount = 6 +} + trait ZioBenchmark[Environment] { var runtime: Runtime.Scoped[Environment] = _ diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioKafkaConsumerBenchmark.scala similarity index 85% rename from zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala rename to zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioKafkaConsumerBenchmark.scala index 4d893295d..d16848f92 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioKafkaConsumerBenchmark.scala @@ -16,18 +16,14 @@ import java.util.concurrent.TimeUnit @State(Scope.Benchmark) @OutputTimeUnit(TimeUnit.MILLISECONDS) -class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] { - val topic1 = "topic1" - val nrPartitions = 6 - val nrMessages = 100000 - val kvs: List[(String, String)] = List.tabulate(nrMessages)(i => (s"key$i", s"msg$i")) +class ZioKafkaConsumerBenchmark extends ConsumerZioBenchmark[Kafka with Producer] { override protected def bootstrap: ZLayer[Any, Nothing, Kafka with Producer] = ZLayer.make[Kafka with Producer](Kafka.embedded, producer).orDie override def initialize: ZIO[Kafka with Producer, Throwable, Any] = for { _ <- ZIO.succeed(EmbeddedKafka.deleteTopics(List(topic1))).ignore - _ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = nrPartitions)) + _ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = partitionCount)) _ <- produceMany(topic1, kvs) } yield () @@ -47,7 +43,7 @@ class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] { _ <- Consumer .plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray) .tap { _ => - counter.updateAndGet(_ + 1).flatMap(count => Consumer.stopConsumption.when(count == nrMessages)) + counter.updateAndGet(_ + 1).flatMap(count => Consumer.stopConsumption.when(count == messageCount)) } .runDrain .provideSome[Kafka](env) @@ -67,7 +63,7 @@ class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] { .tap(batch => counter.update(_ + batch.size)) .map(OffsetBatch.apply) .mapZIO(_.commit) - .takeUntilZIO(_ => counter.get.map(_ >= nrMessages)) + .takeUntilZIO(_ => counter.get.map(_ >= messageCount)) .runDrain .provideSome[Kafka](env) } diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ProducerBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioKafkaProducerBenchmark.scala similarity index 87% rename from zio-kafka-bench/src/main/scala/zio/kafka/bench/ProducerBenchmark.scala rename to zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioKafkaProducerBenchmark.scala index a6c66e278..874ebf860 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ProducerBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioKafkaProducerBenchmark.scala @@ -14,11 +14,7 @@ import java.util.concurrent.TimeUnit @State(Scope.Benchmark) @OutputTimeUnit(TimeUnit.MILLISECONDS) -class ProducerBenchmark extends ZioBenchmark[Kafka with Producer] { - val topic1 = "topic1" - val nrPartitions = 6 - val nrMessages = 500 - val kvs: List[(String, String)] = List.tabulate(nrMessages)(i => (s"key$i", s"msg$i")) +class ZioKafkaProducerBenchmark extends ProducerZioBenchmark[Kafka with Producer] { val records: Chunk[ProducerRecord[String, String]] = Chunk.fromIterable(kvs.map { case (k, v) => new ProducerRecord(topic1, k, v) }) @@ -28,7 +24,7 @@ class ProducerBenchmark extends ZioBenchmark[Kafka with Producer] { override def initialize: ZIO[Kafka with Producer, Throwable, Any] = for { _ <- ZIO.succeed(EmbeddedKafka.deleteTopics(List(topic1))).ignore - _ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = nrPartitions)) + _ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = partitionCount)) } yield () @Benchmark diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala index 682242cce..59e015767 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala @@ -4,7 +4,7 @@ import io.github.embeddedkafka.EmbeddedKafka import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.serialization.ByteArrayDeserializer import zio.kafka.admin.AdminClient.TopicPartition -import zio.kafka.bench.ZioBenchmark +import zio.kafka.bench.ConsumerZioBenchmark import zio.kafka.bench.ZioBenchmark.randomThing import zio.kafka.bench.comparison.ComparisonBenchmark._ import zio.kafka.consumer.{ Consumer, ConsumerSettings } @@ -15,14 +15,10 @@ import zio.{ ULayer, ZIO, ZLayer } import scala.jdk.CollectionConverters._ -trait ComparisonBenchmark extends ZioBenchmark[Env] { +trait ComparisonBenchmark extends ConsumerZioBenchmark[Env] { - protected final val topic1: String = "topic1" - protected final val nrPartitions: Int = 6 protected final val topicPartitions: List[TopicPartition] = - (0 until nrPartitions).map(TopicPartition(topic1, _)).toList - protected final val numberOfMessages: Int = 100000 - protected final val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key$i", s"msg$i")) + (0 until partitionCount).map(TopicPartition(topic1, _)).toList private val javaKafkaConsumer: ZLayer[ConsumerSettings, Throwable, LowLevelKafka] = ZLayer.scoped { @@ -63,7 +59,7 @@ trait ComparisonBenchmark extends ZioBenchmark[Env] { override final def initialize: ZIO[Env, Throwable, Any] = for { _ <- ZIO.succeed(EmbeddedKafka.deleteTopics(List(topic1))).ignore - _ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = nrPartitions)) + _ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = partitionCount)) _ <- produceMany(topic1, kvs) } yield () diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/KafkaClientBenchmarks.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/KafkaClientBenchmarks.scala index 7365d05fc..f56ccb8f4 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/KafkaClientBenchmarks.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/KafkaClientBenchmarks.scala @@ -23,14 +23,14 @@ class KafkaClientBenchmarks extends ComparisonBenchmark { consumer.subscribe(java.util.Arrays.asList(topic1)) var count = 0L - while (count < numberOfMessages) { + while (count < messageCount) { val records = consumer.poll(settings.pollTimeout) count += records.count() } consumer.unsubscribe() count - }.flatMap(r => zAssert(r == numberOfMessages, s"Consumed $r messages instead of $numberOfMessages")) + }.flatMap(r => zAssert(r == messageCount, s"Consumed $r messages instead of $messageCount")) } } } @@ -46,14 +46,14 @@ class KafkaClientBenchmarks extends ComparisonBenchmark { consumer.assign(topicPartitions.map(_.asJava).asJava) var count = 0L - while (count < numberOfMessages) { + while (count < messageCount) { val records = consumer.poll(settings.pollTimeout) count += records.count() } consumer.unsubscribe() count - }.flatMap(r => zAssert(r == numberOfMessages, s"Consumed $r messages instead of $numberOfMessages")) + }.flatMap(r => zAssert(r == messageCount, s"Consumed $r messages instead of $messageCount")) } } } diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ZioKafkaBenchmarks.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ZioKafkaBenchmarks.scala index cff054455..466078a98 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ZioKafkaBenchmarks.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ZioKafkaBenchmarks.scala @@ -17,9 +17,9 @@ class ZioKafkaBenchmarks extends ComparisonBenchmark { runZIO { Consumer .plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray) - .take(numberOfMessages.toLong) + .take(messageCount.toLong) .runCount - .flatMap(r => zAssert(r == numberOfMessages, s"Consumed $r messages instead of $numberOfMessages")) + .flatMap(r => zAssert(r == messageCount, s"Consumed $r messages instead of $messageCount")) } @Benchmark @@ -32,9 +32,9 @@ class ZioKafkaBenchmarks extends ComparisonBenchmark { Serde.byteArray, Serde.byteArray ) - .take(numberOfMessages.toLong) + .take(messageCount.toLong) .runCount - .flatMap(r => zAssert(r == numberOfMessages, s"Consumed $r messages instead of $numberOfMessages")) + .flatMap(r => zAssert(r == messageCount, s"Consumed $r messages instead of $messageCount")) } } From aaffb20fdfbde61116619272d276cb1a75d0d7a7 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sat, 16 Nov 2024 12:37:53 +0100 Subject: [PATCH 4/5] Lower message count, slightly smaller messages Benches take very long with large messages. --- .../src/main/scala/zio/kafka/bench/ZioBenchmark.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioBenchmark.scala index 0bc57f4b1..a98712eae 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioBenchmark.scala @@ -6,8 +6,8 @@ import zio.{ ZLayer, _ } import java.util.UUID trait ConsumerZioBenchmark[Environment] extends ZioBenchmark[Environment] { - protected val messageCount: Int = 100000 - protected def messageData(i: Int): String = i.toString + scala.util.Random.alphanumeric.take(1024).mkString + protected val messageCount: Int = 50000 + protected def messageData(i: Int): String = i.toString + scala.util.Random.alphanumeric.take(512).mkString protected val kvs: Iterable[(String, String)] = Iterable.tabulate(messageCount)(i => (s"key$i", messageData(i))) protected val topic1 = "topic1" protected val partitionCount = 6 From b4d4e629564b4c3328732b0ebfacd979033f5a06 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Fri, 22 Nov 2024 19:56:04 +0100 Subject: [PATCH 5/5] Address review comment Also: use `record` i.s.o. `message` for consistency with all other zio-kafka and kafka documentation. --- .../main/scala/zio/kafka/bench/ZioBenchmark.scala | 12 +++++++----- .../zio/kafka/bench/ZioKafkaConsumerBenchmark.scala | 4 ++-- .../bench/comparison/KafkaClientBenchmarks.scala | 8 ++++---- .../kafka/bench/comparison/ZioKafkaBenchmarks.scala | 8 ++++---- 4 files changed, 17 insertions(+), 15 deletions(-) diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioBenchmark.scala index a98712eae..0df95b7d1 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioBenchmark.scala @@ -6,16 +6,18 @@ import zio.{ ZLayer, _ } import java.util.UUID trait ConsumerZioBenchmark[Environment] extends ZioBenchmark[Environment] { - protected val messageCount: Int = 50000 - protected def messageData(i: Int): String = i.toString + scala.util.Random.alphanumeric.take(512).mkString - protected val kvs: Iterable[(String, String)] = Iterable.tabulate(messageCount)(i => (s"key$i", messageData(i))) + private val recordDataSize = 512 + private def genString(i: Int): String = i.toString + scala.util.Random.alphanumeric.take(recordDataSize).mkString + + protected val recordCount: Int = 50000 + protected val kvs: Iterable[(String, String)] = Iterable.tabulate(recordCount)(i => (s"key$i", genString(i))) protected val topic1 = "topic1" protected val partitionCount = 6 } trait ProducerZioBenchmark[Environment] extends ZioBenchmark[Environment] { - protected val messageCount = 500 - protected val kvs: List[(String, String)] = List.tabulate(messageCount)(i => (s"key$i", s"msg$i")) + protected val recordCount = 500 + protected val kvs: List[(String, String)] = List.tabulate(recordCount)(i => (s"key$i", s"msg$i")) protected val topic1 = "topic1" protected val partitionCount = 6 } diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioKafkaConsumerBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioKafkaConsumerBenchmark.scala index d16848f92..801801b49 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioKafkaConsumerBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioKafkaConsumerBenchmark.scala @@ -43,7 +43,7 @@ class ZioKafkaConsumerBenchmark extends ConsumerZioBenchmark[Kafka with Producer _ <- Consumer .plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray) .tap { _ => - counter.updateAndGet(_ + 1).flatMap(count => Consumer.stopConsumption.when(count == messageCount)) + counter.updateAndGet(_ + 1).flatMap(count => Consumer.stopConsumption.when(count == recordCount)) } .runDrain .provideSome[Kafka](env) @@ -63,7 +63,7 @@ class ZioKafkaConsumerBenchmark extends ConsumerZioBenchmark[Kafka with Producer .tap(batch => counter.update(_ + batch.size)) .map(OffsetBatch.apply) .mapZIO(_.commit) - .takeUntilZIO(_ => counter.get.map(_ >= messageCount)) + .takeUntilZIO(_ => counter.get.map(_ >= recordCount)) .runDrain .provideSome[Kafka](env) } diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/KafkaClientBenchmarks.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/KafkaClientBenchmarks.scala index f56ccb8f4..031f0be8b 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/KafkaClientBenchmarks.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/KafkaClientBenchmarks.scala @@ -23,14 +23,14 @@ class KafkaClientBenchmarks extends ComparisonBenchmark { consumer.subscribe(java.util.Arrays.asList(topic1)) var count = 0L - while (count < messageCount) { + while (count < recordCount) { val records = consumer.poll(settings.pollTimeout) count += records.count() } consumer.unsubscribe() count - }.flatMap(r => zAssert(r == messageCount, s"Consumed $r messages instead of $messageCount")) + }.flatMap(r => zAssert(r == recordCount, s"Consumed $r records instead of $recordCount")) } } } @@ -46,14 +46,14 @@ class KafkaClientBenchmarks extends ComparisonBenchmark { consumer.assign(topicPartitions.map(_.asJava).asJava) var count = 0L - while (count < messageCount) { + while (count < recordCount) { val records = consumer.poll(settings.pollTimeout) count += records.count() } consumer.unsubscribe() count - }.flatMap(r => zAssert(r == messageCount, s"Consumed $r messages instead of $messageCount")) + }.flatMap(r => zAssert(r == recordCount, s"Consumed $r records instead of $recordCount")) } } } diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ZioKafkaBenchmarks.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ZioKafkaBenchmarks.scala index 466078a98..facd1d07b 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ZioKafkaBenchmarks.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ZioKafkaBenchmarks.scala @@ -17,9 +17,9 @@ class ZioKafkaBenchmarks extends ComparisonBenchmark { runZIO { Consumer .plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray) - .take(messageCount.toLong) + .take(recordCount.toLong) .runCount - .flatMap(r => zAssert(r == messageCount, s"Consumed $r messages instead of $messageCount")) + .flatMap(r => zAssert(r == recordCount, s"Consumed $r records instead of $recordCount")) } @Benchmark @@ -32,9 +32,9 @@ class ZioKafkaBenchmarks extends ComparisonBenchmark { Serde.byteArray, Serde.byteArray ) - .take(messageCount.toLong) + .take(recordCount.toLong) .runCount - .flatMap(r => zAssert(r == messageCount, s"Consumed $r messages instead of $messageCount")) + .flatMap(r => zAssert(r == recordCount, s"Consumed $r records instead of $recordCount")) } }