Skip to content

Commit

Permalink
Fix commit design
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii committed Jun 24, 2023
1 parent 7558941 commit 62efd7c
Show file tree
Hide file tree
Showing 14 changed files with 530 additions and 545 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ import org.apache.kafka.clients.consumer.ConsumerConfig
import org.openjdk.jmh.annotations._
import zio.kafka.bench.ZioBenchmark.randomThing
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.{ Consumer, Offset, OffsetBatch, Subscription }
import zio.kafka.consumer.{ Consumer, OffsetBatch, Subscription }
import zio.kafka.producer.Producer
import zio.kafka.serde.Serde
import zio.kafka.testkit.Kafka
import zio.kafka.testkit.KafkaTestUtils.{ consumerSettings, produceMany, producer }
import zio.stream.ZSink
import zio.{ durationInt, Ref, Schedule, ZIO, ZLayer }
import zio.{ durationInt, Chunk, Ref, Schedule, ZIO, ZLayer }

import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -63,11 +62,9 @@ class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] {
_ <- ZIO.logAnnotate("consumer", "1") {
Consumer
.plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray)
.map(_.offset)
.aggregateAsyncWithin(ZSink.collectAll[Offset], Schedule.fixed(100.millis))
.tap(batch => counter.update(_ + batch.size))
.map(OffsetBatch.apply)
.mapZIO(_.commit)
.mapChunksZIO(records => counter.update(_ + records.size).as(Chunk.single(OffsetBatch(records))))
.aggregateAsyncWithin(Consumer.offsetBatchesSink, Schedule.fixed(100.millis))
.mapZIO(Consumer.commit)
.takeUntilZIO(_ => counter.get.map(_ >= nrMessages))
.runDrain
.provideSome[Kafka](env)
Expand Down
10 changes: 5 additions & 5 deletions zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,9 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.transduce(ZSink.collectAllN[CommittableRecord[String, String]](20))
.mapConcatZIO { committableRecords =>
val records = committableRecords.map(_.record)
val offsetBatch = OffsetBatch(committableRecords.map(_.offset))
val offsetBatch = OffsetBatch(committableRecords)

offsetBatch.commit.as(records)
Consumer.commit(offsetBatch).as(records)
}
.runCollect
.provideSomeLayer[Kafka](consumer("adminspec-topic10", Some(consumerGroupID)))
Expand Down Expand Up @@ -301,7 +301,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
Consumer
.plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.take(count)
.foreach(_.offset.commit)
.runForeachChunk(records => Consumer.commit(OffsetBatch(records)))
.provideSomeLayer[Kafka](consumer(topic, Some(groupId)))

KafkaTestUtils.withAdmin { client =>
Expand Down Expand Up @@ -344,7 +344,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
Consumer
.plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.take(count)
.foreach(_.offset.commit)
.runForeachChunk(records => Consumer.commit(OffsetBatch(records)))
.provideSomeLayer[Kafka](consumer(topic, Some(groupId)))

KafkaTestUtils.withAdmin { client =>
Expand Down Expand Up @@ -645,7 +645,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
groupInstanceId: Option[String] = None
): ZIO[Kafka, Throwable, Unit] = Consumer
.plainStream(Subscription.topics(topicName), Serde.string, Serde.string)
.foreach(_.offset.commit)
.runForeachChunk(records => Consumer.commit(OffsetBatch(records)))
.provideSomeLayer(consumer(clientId, Some(groupId), groupInstanceId))

private def getStableConsumerGroupDescription(
Expand Down
604 changes: 308 additions & 296 deletions zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala

Large diffs are not rendered by default.

49 changes: 25 additions & 24 deletions zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.transduce(ZSink.collectAllN[CommittableRecord[String, String]](5))
.mapConcatZIO { committableRecords =>
val records = committableRecords.map(_.record)
val offsetBatch = OffsetBatch(committableRecords.map(_.offset))
val offsetBatch = OffsetBatch(committableRecords)

offsetBatch.commit.as(records)
Consumer.commit(offsetBatch).as(records)
}
.runCollect
.provideSomeLayer[Kafka](
Expand All @@ -212,9 +212,9 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.transduce(ZSink.collectAllN[CommittableRecord[String, String]](20))
.mapConcatZIO { committableRecords =>
val records = committableRecords.map(_.record)
val offsetBatch = OffsetBatch(committableRecords.map(_.offset))
val offsetBatch = OffsetBatch(committableRecords)

offsetBatch.commit.as(records)
Consumer.commit(offsetBatch).as(records)
}
.runCollect
.provideSomeLayer[Kafka](
Expand Down Expand Up @@ -287,7 +287,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.zipWithIndex
.tap { case (record, idx) =>
(Consumer.stopConsumption <* ZIO.logDebug("Stopped consumption")).when(idx == 3) *>
record.offset.commit <* ZIO.logDebug(s"Committed $idx")
Consumer.commit(OffsetBatch(record)) <* ZIO.logDebug(s"Committed $idx")
}
.tap { case (_, idx) => ZIO.logDebug(s"Consumed $idx") }
.runDrain
Expand All @@ -308,14 +308,14 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
messagesReceived <- Ref.make[Int](0)
offset <- (Consumer
.plainStream(Subscription.topics(topic), Serde.string, Serde.string)
.mapConcatZIO { record =>
.mapConcatChunkZIO { record =>
for {
nr <- messagesReceived.updateAndGet(_ + 1)
_ <- Consumer.stopConsumption.when(nr == 10)
} yield if (nr < 10) Seq(record.offset) else Seq.empty
} yield if (nr < 10) Chunk(OffsetBatch(record)) else Chunk.empty
}
.transduce(Consumer.offsetBatches)
.mapZIO(_.commit)
.transduce(Consumer.offsetBatchesSink)
.mapZIO(Consumer.commit)
.runDrain *>
Consumer.committed(Set(new TopicPartition(topic, 0))).map(_.values.head))
.provideSomeLayer[Kafka](consumer(client, Some(group)))
Expand All @@ -339,11 +339,11 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
subscription = Subscription.topics(topic)
offsets <- (Consumer
.partitionedStream(subscription, Serde.string, Serde.string)
.flatMapPar(nrPartitions)(_._2.map(_.offset))
.flatMapPar(nrPartitions)(_._2.mapChunks(records => Chunk.single(OffsetBatch(records))))
.take(nrMessages.toLong)
.transduce(Consumer.offsetBatches)
.transduce(Consumer.offsetBatchesSink)
.take(1)
.mapZIO(_.commit)
.mapZIO(Consumer.commit)
.runDrain *>
Consumer.committed((0 until nrPartitions).map(new TopicPartition(topic, _)).toSet))
.provideSomeLayer[Kafka](consumer(client, Some(group)))
Expand Down Expand Up @@ -463,9 +463,9 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.transduce(ZSink.collectAllN[CommittableRecord[String, String]](5))
.mapConcatZIO { committableRecords =>
val records = committableRecords.map(_.record)
val offsetBatch = OffsetBatch(committableRecords.map(_.offset))
val offsetBatch = OffsetBatch(committableRecords)

offsetBatch.commit.as(records)
Consumer.commit(offsetBatch).as(records)
}
.runCollect
.provideSomeLayer[Kafka](consumer(client1, Some(group)))
Expand Down Expand Up @@ -675,9 +675,10 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.flatMapPar(Int.MaxValue) { case (tp, partitionStream) =>
ZStream.finalizer(ZIO.logDebug(s"TP ${tp.toString} finalizer")) *>
partitionStream.mapChunksZIO { records =>
OffsetBatch(records.map(_.offset)).commit *> messagesReceived(tp.partition)
.update(_ + records.size)
.as(records)
Consumer.commit(OffsetBatch(records)) *>
messagesReceived(tp.partition)
.update(_ + records.size)
.as(records)
}
}
.runDrain
Expand Down Expand Up @@ -812,9 +813,9 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.fromIterable(partitions.map(_._2))
.flatMapPar(Int.MaxValue)(s => s)
.mapZIO(record => messagesReceivedConsumer1.update(_ + 1).as(record))
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(offsetBatch => offsetBatch.commit)
.mapChunks(records => Chunk.single(OffsetBatch(records)))
.aggregateAsync(Consumer.offsetBatchesSink)
.mapZIO(Consumer.commit)
.runDrain
}
.mapZIO(_ => drainCount.updateAndGet(_ + 1))
Expand All @@ -837,9 +838,9 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
Consumer
.plainStream(subscription, Serde.string, Serde.string)
.mapZIO(record => messagesReceivedConsumer2.update(_ + 1).as(record))
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(offsetBatch => offsetBatch.commit)
.mapChunks(records => Chunk.single(OffsetBatch(records)))
.aggregateAsync(Consumer.offsetBatchesSink)
.mapZIO(Consumer.commit)
.runDrain
.provideSomeLayer[Kafka](
customConsumer("consumer2", Some(group))
Expand Down Expand Up @@ -926,7 +927,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
records.map(r => new ProducerRecord(toTopic, r.key, r.value)),
Serde.string,
Serde.string,
OffsetBatch(records.map(_.offset))
OffsetBatch(records)
)
_ <- consumedMessagesCounter.update(_ + records.size)
} yield Chunk.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,10 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.plainStream(Subscription.topics(topic1), Serde.string, Serde.string)
.take(40)
.transduce(
Consumer.offsetBatches.contramap[CommittableRecord[String, String]](_.offset) <&> ZSink
.collectAll[CommittableRecord[String, String]]
Consumer.offsetBatchesSink.contramap[CommittableRecord[String, String]](OffsetBatch.apply) <&>
ZSink.collectAll[CommittableRecord[String, String]]
)
.mapZIO { case (offsetBatch, records) => offsetBatch.commit.as(records) }
.mapZIO { case (offsetBatch, records) => Consumer.commit(offsetBatch).as(records) }
.flattenChunks
.runCollect
.tap(records => recordsConsumed.update(_ ++ records))
Expand Down
36 changes: 10 additions & 26 deletions zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package zio.kafka.consumer

import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord }
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.{ TopicPartition => JTopicPartition }
import zio.RIO
import zio.kafka.serde.Deserializer
import zio.{ RIO, Task }

final case class CommittableRecord[K, V](
record: ConsumerRecord[K, V],
private val commitHandle: Map[TopicPartition, Long] => Task[Unit],
private val consumerGroupMetadata: Option[ConsumerGroupMetadata]
record: ConsumerRecord[K, V]
) {
def deserializeWith[R, K1, V1](
keyDeserializer: Deserializer[R, K1],
Expand All @@ -17,7 +15,7 @@ final case class CommittableRecord[K, V](
for {
key <- keyDeserializer.deserialize(record.topic(), record.headers(), record.key())
value <- valueDeserializer.deserialize(record.topic(), record.headers(), record.value())
} yield copy(
} yield new CommittableRecord[K1, V1](
record = new ConsumerRecord[K1, V1](
record.topic(),
record.partition(),
Expand All @@ -33,30 +31,16 @@ final case class CommittableRecord[K, V](
)
)

def key: K = record.key
def key: K = record.key()
def value: V = record.value()
def partition: Int = record.partition()
def timestamp: Long = record.timestamp()

def offset: Offset =
OffsetImpl(
topic = record.topic(),
partition = record.partition(),
offset = record.offset(),
commitHandle = commitHandle,
consumerGroupMetadata = consumerGroupMetadata
)
private[consumer] lazy val topicPartition: JTopicPartition = new JTopicPartition(record.topic(), record.partition())
private[consumer] def offset: Long = record.offset()
}

object CommittableRecord {
def apply[K, V](
record: ConsumerRecord[K, V],
commitHandle: Map[TopicPartition, Long] => Task[Unit],
consumerGroupMetadata: Option[ConsumerGroupMetadata]
): CommittableRecord[K, V] =
new CommittableRecord(
record = record,
commitHandle = commitHandle,
consumerGroupMetadata = consumerGroupMetadata
)
def apply[K, V](record: ConsumerRecord[K, V]): CommittableRecord[K, V] =
new CommittableRecord(record = record)
}
60 changes: 50 additions & 10 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package zio.kafka.consumer

import org.apache.kafka.clients.consumer.{ ConsumerRecord, OffsetAndMetadata, OffsetAndTimestamp }
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common._
import zio._
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.internal.{ ConsumerAccess, RunloopAccess }
import zio.kafka.serde.{ Deserializer, Serde }
import zio.kafka.utils.SslHelper
import zio.stream._
import zio._

import scala.jdk.CollectionConverters._
import scala.util.control.NoStackTrace
Expand Down Expand Up @@ -152,14 +152,21 @@ trait Consumer {
* Expose internal consumer metrics
*/
def metrics: Task[Map[MetricName, Metric]]

def commit(offsets: OffsetBatch): Task[Unit]

def commitOrRetry[R](policy: Schedule[R, Throwable, Any], offsets: OffsetBatch): RIO[R, Unit]

private[kafka] def consumerGroupMetadata: Task[Option[ConsumerGroupMetadata]]
}

object Consumer {
case object RunloopTimeout extends RuntimeException("Timeout in Runloop") with NoStackTrace

private final class Live private[Consumer] (
consumer: ConsumerAccess,
runloopAccess: RunloopAccess
runloopAccess: RunloopAccess,
hasGroupId: Boolean
) extends Consumer {

override def assignment: Task[Set[TopicPartition]] =
Expand Down Expand Up @@ -286,20 +293,38 @@ object Consumer {
r <- ZIO.environment[R & R1]
_ <- partitionedStream(subscription, keyDeserializer, valueDeserializer)
.flatMapPar(Int.MaxValue) { case (_, partitionStream) =>
partitionStream.mapChunksZIO(_.mapZIO((c: CommittableRecord[K, V]) => f(c.record).as(c.offset)))
partitionStream.mapChunksZIO(
_.mapZIO(c => f(c.record).as(c)).map(cs => Chunk.single(OffsetBatch(cs)))
)
}
.provideEnvironment(r)
.aggregateAsync(offsetBatches)
.mapZIO(_.commitOrRetry(commitRetryPolicy))
.aggregateAsync(offsetBatchesSink)
.mapZIO(commitOrRetry(commitRetryPolicy, _))
.runDrain
} yield ()

override def metrics: Task[Map[MetricName, Metric]] =
consumer.withConsumer(_.metrics().asScala.toMap)
}

val offsetBatches: ZSink[Any, Nothing, Offset, Nothing, OffsetBatch] =
ZSink.foldLeft[Offset, OffsetBatch](OffsetBatch.empty)(_ add _)
override def commit(offsets: OffsetBatch): Task[Unit] = runloopAccess.commit(offsets)

override def commitOrRetry[R](
policy: Schedule[R, Throwable, Any],
offsets: OffsetBatch
): RIO[R, Unit] =
runloopAccess
.commit(offsets)
.retry(
Schedule.recurWhile[Throwable] {
case _: RetriableCommitFailedException => true
case _ => false
} && policy
)

override private[kafka] val consumerGroupMetadata: Task[Option[ConsumerGroupMetadata]] =
if (hasGroupId) consumer.withConsumer(_.groupMetadata()).map(Some(_))
else ZIO.none
}

def live: RLayer[ConsumerSettings & Diagnostics, Consumer] =
ZLayer.scoped {
Expand All @@ -319,7 +344,10 @@ object Consumer {
_ <- SslHelper.validateEndpoint(settings.bootstrapServers, settings.properties)
consumerAccess <- ConsumerAccess.make(settings)
runloopAccess <- RunloopAccess.make(settings, diagnostics, consumerAccess, settings)
} yield new Live(consumerAccess, runloopAccess)
} yield new Live(consumerAccess, runloopAccess, settings.hasGroupId)

val offsetBatchesSink: ZSink[Any, Nothing, OffsetBatch, Nothing, OffsetBatch] =
ZSink.foldLeft[OffsetBatch, OffsetBatch](OffsetBatch.empty)(_ merge _)

/**
* Accessor method for [[Consumer.assignment]]
Expand Down Expand Up @@ -508,6 +536,18 @@ object Consumer {
def metrics: RIO[Consumer, Map[MetricName, Metric]] =
ZIO.serviceWithZIO(_.metrics)

/**
* Accessor method for [[Consumer.commit]]
*/
def commit(offsets: OffsetBatch): RIO[Consumer, Unit] =
ZIO.serviceWithZIO[Consumer](_.commit(offsets))

/**
* Accessor method for [[Consumer.commitOrRetry]]
*/
def commitOrRetry[R](policy: Schedule[R, Throwable, Any], offsets: OffsetBatch): RIO[Consumer & R, Unit] =
ZIO.serviceWithZIO[Consumer](_.commitOrRetry(policy, offsets))

sealed trait OffsetRetrieval
object OffsetRetrieval {
final case class Auto(reset: AutoOffsetStrategy = AutoOffsetStrategy.Latest) extends OffsetRetrieval
Expand Down
Loading

0 comments on commit 62efd7c

Please sign in to comment.