Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use same amount of messages for all consumer benchmarks #1382

Merged
merged 7 commits into from
Nov 23, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
# Comparison Benchmarks

## How to run them
## Results

The benchmark are run from a GitHub action on every commit. The results are published on https://zio.github.io/zio-kafka/dev/bench/.

The results are automatically pruned by [a scala script](https://github.com/zio/zio-kafka/blob/gh-pages/scripts/prune-benchmark-history.sc) on the `gh-pages` branch.

## Interpreting the benchmarks

To do!

## How to run the benchmarks

To run these "comparison" benchmarks, in a sbt console, run:
```scala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@ import zio.{ ZLayer, _ }

import java.util.UUID

trait ConsumerZioBenchmark[Environment] extends ZioBenchmark[Environment] {
protected val messageCount: Int = 50000
protected def messageData(i: Int): String = i.toString + scala.util.Random.alphanumeric.take(512).mkString
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved
protected val kvs: Iterable[(String, String)] = Iterable.tabulate(messageCount)(i => (s"key$i", messageData(i)))
protected val topic1 = "topic1"
protected val partitionCount = 6
}

trait ProducerZioBenchmark[Environment] extends ZioBenchmark[Environment] {
protected val messageCount = 500
protected val kvs: List[(String, String)] = List.tabulate(messageCount)(i => (s"key$i", s"msg$i"))
protected val topic1 = "topic1"
protected val partitionCount = 6
}

trait ZioBenchmark[Environment] {
var runtime: Runtime.Scoped[Environment] = _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,14 @@ import java.util.concurrent.TimeUnit

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] {
val topic1 = "topic1"
val nrPartitions = 6
val nrMessages = 50000
val kvs: List[(String, String)] = List.tabulate(nrMessages)(i => (s"key$i", s"msg$i"))
class ZioKafkaConsumerBenchmark extends ConsumerZioBenchmark[Kafka with Producer] {

override protected def bootstrap: ZLayer[Any, Nothing, Kafka with Producer] =
ZLayer.make[Kafka with Producer](Kafka.embedded, producer).orDie

override def initialize: ZIO[Kafka with Producer, Throwable, Any] = for {
_ <- ZIO.succeed(EmbeddedKafka.deleteTopics(List(topic1))).ignore
_ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = nrPartitions))
_ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = partitionCount))
_ <- produceMany(topic1, kvs)
} yield ()

Expand All @@ -47,7 +43,7 @@ class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] {
_ <- Consumer
.plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray)
.tap { _ =>
counter.updateAndGet(_ + 1).flatMap(count => Consumer.stopConsumption.when(count == nrMessages))
counter.updateAndGet(_ + 1).flatMap(count => Consumer.stopConsumption.when(count == messageCount))
}
.runDrain
.provideSome[Kafka](env)
Expand All @@ -67,7 +63,7 @@ class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] {
.tap(batch => counter.update(_ + batch.size))
.map(OffsetBatch.apply)
.mapZIO(_.commit)
.takeUntilZIO(_ => counter.get.map(_ >= nrMessages))
.takeUntilZIO(_ => counter.get.map(_ >= messageCount))
.runDrain
.provideSome[Kafka](env)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@ import java.util.concurrent.TimeUnit

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
class ProducerBenchmark extends ZioBenchmark[Kafka with Producer] {
val topic1 = "topic1"
val nrPartitions = 6
val nrMessages = 500
val kvs: List[(String, String)] = List.tabulate(nrMessages)(i => (s"key$i", s"msg$i"))
class ZioKafkaProducerBenchmark extends ProducerZioBenchmark[Kafka with Producer] {
val records: Chunk[ProducerRecord[String, String]] = Chunk.fromIterable(kvs.map { case (k, v) =>
new ProducerRecord(topic1, k, v)
})
Expand All @@ -28,7 +24,7 @@ class ProducerBenchmark extends ZioBenchmark[Kafka with Producer] {

override def initialize: ZIO[Kafka with Producer, Throwable, Any] = for {
_ <- ZIO.succeed(EmbeddedKafka.deleteTopics(List(topic1))).ignore
_ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = nrPartitions))
_ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = partitionCount))
} yield ()

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import io.github.embeddedkafka.EmbeddedKafka
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import zio.kafka.admin.AdminClient.TopicPartition
import zio.kafka.bench.ZioBenchmark
import zio.kafka.bench.ConsumerZioBenchmark
import zio.kafka.bench.ZioBenchmark.randomThing
import zio.kafka.bench.comparison.ComparisonBenchmark._
import zio.kafka.consumer.{ Consumer, ConsumerSettings }
Expand All @@ -15,14 +15,10 @@ import zio.{ ULayer, ZIO, ZLayer }

import scala.jdk.CollectionConverters._

trait ComparisonBenchmark extends ZioBenchmark[Env] {
trait ComparisonBenchmark extends ConsumerZioBenchmark[Env] {

protected final val topic1: String = "topic1"
protected final val nrPartitions: Int = 6
protected final val topicPartitions: List[TopicPartition] =
(0 until nrPartitions).map(TopicPartition(topic1, _)).toList
protected final val numberOfMessages: Int = 1000000
protected final val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key$i", s"msg$i"))
(0 until partitionCount).map(TopicPartition(topic1, _)).toList

private val javaKafkaConsumer: ZLayer[ConsumerSettings, Throwable, LowLevelKafka] =
ZLayer.scoped {
Expand Down Expand Up @@ -63,7 +59,7 @@ trait ComparisonBenchmark extends ZioBenchmark[Env] {
override final def initialize: ZIO[Env, Throwable, Any] =
for {
_ <- ZIO.succeed(EmbeddedKafka.deleteTopics(List(topic1))).ignore
_ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = nrPartitions))
_ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = partitionCount))
_ <- produceMany(topic1, kvs)
} yield ()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ class KafkaClientBenchmarks extends ComparisonBenchmark {
consumer.subscribe(java.util.Arrays.asList(topic1))

var count = 0L
while (count < numberOfMessages) {
while (count < messageCount) {
val records = consumer.poll(settings.pollTimeout)
count += records.count()
}

consumer.unsubscribe()
count
}.flatMap(r => zAssert(r == numberOfMessages, s"Consumed $r messages instead of $numberOfMessages"))
}.flatMap(r => zAssert(r == messageCount, s"Consumed $r messages instead of $messageCount"))
}
}
}
Expand All @@ -46,14 +46,14 @@ class KafkaClientBenchmarks extends ComparisonBenchmark {
consumer.assign(topicPartitions.map(_.asJava).asJava)

var count = 0L
while (count < numberOfMessages) {
while (count < messageCount) {
val records = consumer.poll(settings.pollTimeout)
count += records.count()
}

consumer.unsubscribe()
count
}.flatMap(r => zAssert(r == numberOfMessages, s"Consumed $r messages instead of $numberOfMessages"))
}.flatMap(r => zAssert(r == messageCount, s"Consumed $r messages instead of $messageCount"))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ class ZioKafkaBenchmarks extends ComparisonBenchmark {
runZIO {
Consumer
.plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray)
.take(numberOfMessages.toLong)
.take(messageCount.toLong)
.runCount
.flatMap(r => zAssert(r == numberOfMessages, s"Consumed $r messages instead of $numberOfMessages"))
.flatMap(r => zAssert(r == messageCount, s"Consumed $r messages instead of $messageCount"))
}

@Benchmark
Expand All @@ -32,9 +32,9 @@ class ZioKafkaBenchmarks extends ComparisonBenchmark {
Serde.byteArray,
Serde.byteArray
)
.take(numberOfMessages.toLong)
.take(messageCount.toLong)
.runCount
.flatMap(r => zAssert(r == numberOfMessages, s"Consumed $r messages instead of $numberOfMessages"))
.flatMap(r => zAssert(r == messageCount, s"Consumed $r messages instead of $messageCount"))
}

}
Loading