Skip to content

Commit

Permalink
Reproducer for #852 copied from @erikvanoosten Gist: https://gist.git…
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii committed Jun 2, 2023
1 parent cfd8415 commit 0b3d946
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 53 deletions.
3 changes: 1 addition & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,6 @@ lazy val zioKafkaExample =
.settings(
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % "2.0.13",
"dev.zio" %% "zio-kafka" % "2.3.1",
"dev.zio" %% "zio-kafka-testkit" % "2.3.1" % Test,
"dev.zio" %% "zio-test" % "2.0.13" % Test,
"ch.qos.logback" % "logback-classic" % "1.4.6",
"dev.zio" %% "zio-logging-slf4j2" % "2.1.13",
Expand All @@ -175,6 +173,7 @@ lazy val zioKafkaExample =
// [error] org.scala-lang.modules:scala-collection-compat _3, _2.13
crossScalaVersions -= scala3.value
)
.dependsOn(zioKafka, zioKafkaTestkit)

addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt")
addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck")
Expand Down
66 changes: 43 additions & 23 deletions zio-kafka-example/src/main/scala/zio/kafka/example/Main.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package zio.kafka.example

import io.github.embeddedkafka.{ EmbeddedK, EmbeddedKafka, EmbeddedKafkaConfig }
import org.apache.kafka.clients.producer.ProducerRecord
import zio._
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.{ Consumer, ConsumerSettings, Subscription }
import zio.kafka.consumer.Consumer.AutoOffsetStrategy
import zio.kafka.consumer.{ Consumer, ConsumerSettings, OffsetBatch, Subscription }
import zio.kafka.producer.{ Producer, ProducerSettings }
import zio.kafka.serde.Serde
import zio.logging.backend.SLF4J

Expand Down Expand Up @@ -41,32 +43,50 @@ object Main extends ZIOAppDefault {

private val topic = "test-topic"

private def consumerLayer(kafka: MyKafka): ZLayer[Any, Throwable, Consumer] = {
val consumerSettings =
ConsumerSettings(kafka.bootstrapServers)
.withPollTimeout(500.millis)
.withGroupId("test")
private val consumerLayer: ZLayer[MyKafka, Throwable, Consumer] =
ZLayer.scoped {
ZIO.serviceWithZIO[MyKafka] { kafka =>
val consumerSettings =
ConsumerSettings(kafka.bootstrapServers)
.withGroupId("group1")
.withOffsetRetrieval(Consumer.OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest))
Consumer.make(consumerSettings)
}
}

ZLayer.make[Consumer](
ZLayer.succeed(consumerSettings),
ZLayer.succeed(Diagnostics.NoOp),
Consumer.live
)
}
private val producerLayer: ZLayer[MyKafka, Throwable, Producer] =
ZLayer.scoped {
ZIO.serviceWithZIO[MyKafka] { kafka =>
val producerSettings = ProducerSettings(kafka.bootstrapServers)
Producer.make(producerSettings)
}
}

override def run: ZIO[ZIOAppArgs with Scope, Any, Any] =
ZIO.addFinalizer(ZIO.logInfo("Stopping app")) *>
(
for {
_ <- ZIO.logInfo(s"Starting app")
kafka <- ZIO.service[MyKafka]
stream = Consumer
.plainStream(Subscription.topics(topic), Serde.string, Serde.string)
.provideLayer(consumerLayer(kafka))
_ <- ZIO.logInfo(s"Consuming messages...")
consumed <- stream.take(1000).tap(r => ZIO.logInfo(s"Consumed record $r")).runCount
_ <- ZIO.logInfo(s"Consumed $consumed records")
_ <- ZIO.logInfo(s"Starting app")
_ <- Producer.produceChunk(
Chunk.fromIterable(1 to 1000).map(n => new ProducerRecord(topic, n, n.toString)),
Serde.int,
Serde.string
)
_ <- Consumer
.plainStream(Subscription.topics(topic), Serde.int, Serde.string)
.take(100)
.groupedWithin(10, 100.millis)
.mapZIOPar(2)(c => ZIO.debug(c.size) as c.map(_.offset))
.map(OffsetBatch.apply)
.debug("Offset")
.mapZIO(_.commit)
.debug("Commit")
.runDrain
_ <- ZIO.logInfo("Ready!")
} yield ()
).provideSomeLayer[ZIOAppArgs with Scope](MyKafka.embedded)

).provide(
MyKafka.embedded,
consumerLayer,
producerLayer
)
}
40 changes: 14 additions & 26 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer
import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment
import zio.stream._

import java.util
import scala.jdk.CollectionConverters._

//noinspection SimplifyWhenInspection
Expand Down Expand Up @@ -107,38 +106,27 @@ private[consumer] final class Runloop private (
private val commit: Map[TopicPartition, Long] => Task[Unit] =
offsets =>
for {
_ <- ZIO.debug(s"Committing offsets: $offsets")
p <- Promise.make[Throwable, Unit]
_ <- commandQueue.offer(RunloopCommand.Commit(offsets, p)).unit
_ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets))
_ <- p.await
} yield ()

private def doCommit(cmd: RunloopCommand.Commit): UIO[Unit] = {
val offsets = cmd.offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset + 1) }
val cont = (e: Exit[Throwable, Unit]) => cmd.cont.done(e).asInstanceOf[UIO[Unit]]
val onSuccess = cont(Exit.unit) <* diagnostics.emit(DiagnosticEvent.Commit.Success(offsets))
val onFailure: Throwable => UIO[Unit] = {
case _: RebalanceInProgressException =>
ZIO.logDebug(s"Rebalance in progress, retrying commit for offsets $offsets") *>
commandQueue.offer(cmd).unit
case err =>
cont(Exit.fail(err)) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsets, err))
}
val callback =
new OffsetCommitCallback {
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit =
Unsafe.unsafe { implicit u =>
runtime.unsafe.run(if (exception eq null) onSuccess else onFailure(exception)).getOrThrowFiberFailure()
}
}

// We don't wait for the completion of the commit here, because it
// will only complete once we poll again.
consumer.runloopAccess { c =>
ZIO
.attempt(c.commitAsync(offsets.asJava, callback))
.catchAll(onFailure)
}
val offsets = cmd.offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset + 1) }

consumer
.runloopAccess(c => ZIO.attempt(c.commitSync(offsets.asJava)))
.foldZIO(
{
case _: RebalanceInProgressException =>
ZIO.logDebug(s"Rebalance in progress, retrying commit for offsets $offsets") *>
commandQueue.offer(cmd).unit
case err => cmd.fail(err) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsets, err))
},
_ => cmd.succeed <* diagnostics.emit(DiagnosticEvent.Commit.Success(offsets))
)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ object RunloopCommand {
case object StopAllStreams extends StreamCommand

final case class Commit(offsets: Map[TopicPartition, Long], cont: Promise[Throwable, Unit]) extends StreamCommand {
@inline def isDone: UIO[Boolean] = cont.isDone
@inline def isPending: UIO[Boolean] = isDone.negate
@inline def succeed: UIO[Unit] = cont.succeed(()).unit
@inline def fail(e: Throwable): UIO[Unit] = cont.fail(e).unit
@inline def isDone: UIO[Boolean] = cont.isDone
@inline def isPending: UIO[Boolean] = isDone.negate
}

/** Used by a stream to request more records. */
Expand Down

0 comments on commit 0b3d946

Please sign in to comment.