Skip to content

Commit

Permalink
Merge branch 'master' into badge
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvanoosten authored Jan 17, 2024
2 parents 8f6e6df + ed2c8a4 commit 1929966
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 4 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,4 @@ This library is heavily inspired and made possible by the research and implement

[License](LICENSE)

Copyright 2021-2023 Itamar Ravid and the zio-kafka contributors.
Copyright 2021-2024 Itamar Ravid and the zio-kafka contributors.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,11 @@ lazy val zioKafkaExample =
.settings(
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % "2.0.20",
"dev.zio" %% "zio-kafka" % "2.7.1",
"dev.zio" %% "zio-kafka" % "2.7.2",
"dev.zio" %% "zio-logging-slf4j2" % "2.1.16",
"io.github.embeddedkafka" %% "embedded-kafka" % embeddedKafkaVersion,
logback,
"dev.zio" %% "zio-kafka-testkit" % "2.7.1" % Test,
"dev.zio" %% "zio-kafka-testkit" % "2.7.2" % Test,
"dev.zio" %% "zio-test" % "2.0.20" % Test
),
// Scala 3 compiling fails with:
Expand Down
6 changes: 6 additions & 0 deletions zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,12 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
metrics <- Producer.metrics
} yield assertTrue(metrics.nonEmpty)
},
test("partitionsFor") {
for {
topic <- randomTopic
info <- Producer.partitionsFor(topic).debug
} yield assertTrue(info.headOption.map(_.topic()) == Some(topic))
},
suite("transactions")(
test("a simple transaction") {
import Subscription._
Expand Down
16 changes: 15 additions & 1 deletion zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package zio.kafka.producer

import org.apache.kafka.clients.producer.{ KafkaProducer, Producer => JProducer, ProducerRecord, RecordMetadata }
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.{ Metric, MetricName }
import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo }
import zio._
import zio.kafka.serde.Serializer
import zio.kafka.utils.SslHelper
Expand Down Expand Up @@ -169,6 +169,11 @@ trait Producer {
records: Chunk[ProducerRecord[Array[Byte], Array[Byte]]]
): UIO[UIO[Chunk[Either[Throwable, RecordMetadata]]]]

/**
* Get the partition metadata for the given topic
*/
def partitionsFor(topic: String): Task[Chunk[PartitionInfo]]

/**
* Flushes the producer's internal buffer. This will guarantee that all records currently buffered will be transmitted
* to the broker.
Expand Down Expand Up @@ -337,6 +342,12 @@ object Producer {
): RIO[R & Producer, Chunk[RecordMetadata]] =
ZIO.serviceWithZIO[Producer](_.produceChunk(records, keySerializer, valueSerializer))

/**
* Accessor method
*/
def partitionsFor(topic: String): RIO[Producer, Chunk[PartitionInfo]] =
ZIO.serviceWithZIO(_.partitionsFor(topic))

/**
* Accessor method
*/
Expand Down Expand Up @@ -440,6 +451,9 @@ private[producer] final class ProducerLive(
} yield done.await
}

override def partitionsFor(topic: String): Task[Chunk[PartitionInfo]] =
ZIO.attemptBlocking(Chunk.fromJavaIterable(p.partitionsFor(topic)))

override def flush: Task[Unit] = ZIO.attemptBlocking(p.flush())

override def metrics: Task[Map[MetricName, Metric]] = ZIO.attemptBlocking(p.metrics().asScala.toMap)
Expand Down

0 comments on commit 1929966

Please sign in to comment.