From 219727e567d84ff1162636e6992a4102e8a873f9 Mon Sep 17 00:00:00 2001 From: Vladimir Klyushnikov <72238+vladimirkl@users.noreply.github.com> Date: Sun, 16 Jul 2023 12:49:25 +0300 Subject: [PATCH] Implement default commit timeout (#982) --- .../zio/kafka/consumer/ConsumerSpec.scala | 24 +++++++++++++++++-- .../zio/kafka/testkit/KafkaTestUtils.scala | 6 ++++- .../scala/zio/kafka/consumer/Consumer.scala | 2 ++ .../zio/kafka/consumer/ConsumerSettings.scala | 5 ++++ .../zio/kafka/consumer/internal/Runloop.scala | 7 ++++-- .../consumer/internal/RunloopAccess.scala | 1 + 6 files changed, 40 insertions(+), 5 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index e9ccd9497..d68b93a8a 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -11,7 +11,7 @@ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition import zio._ import zio.kafka.ZIOSpecDefaultSlf4j -import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, OffsetRetrieval } +import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, CommitTimeout, OffsetRetrieval } import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization.{ ConsumerFinalized, @@ -1192,7 +1192,27 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { ) } ) - ) + ), + test("commit timeout") { + val kvs = (1 to 100).toList.map(i => (s"key$i", s"msg$i")) + for { + topic <- randomTopic + client <- randomClient + group <- randomGroup + + _ <- produceMany(topic, kvs) + + result <- Consumer + .plainStream(Subscription.Topics(Set(topic)), Serde.string, Serde.string) + .take(11) + .map(_.offset) + .aggregateAsync(Consumer.offsetBatches) + .mapZIO(_.commit) // Hangs without timeout + .runDrain + .exit + .provideSomeLayer[Kafka](consumer(client, Some(group), commitTimeout = 2.seconds)) + } yield assert(result)(equalTo(Exit.fail(CommitTimeout))) + } @@ TestAspect.flaky(10) @@ TestAspect.timeout(20.seconds) ) .provideSome[Scope & Kafka](producer) .provideSomeShared[Scope]( diff --git a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala index 73337c857..ee60eabe6 100644 --- a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala +++ b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala @@ -104,6 +104,7 @@ object KafkaTestUtils { restartStreamOnRebalancing: Boolean = false, `max.poll.records`: Int = 100, // settings this higher can cause concurrency bugs to go unnoticed runloopTimeout: Duration = ConsumerSettings.defaultRunloopTimeout, + commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout, properties: Map[String, String] = Map.empty ): URIO[Kafka, ConsumerSettings] = ZIO.serviceWith[Kafka] { (kafka: Kafka) => @@ -111,6 +112,7 @@ object KafkaTestUtils { .withClientId(clientId) .withCloseTimeout(5.seconds) .withPollTimeout(100.millis) + .withCommitTimeout(commitTimeout) .withRunloopTimeout(runloopTimeout) .withProperties( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest", @@ -187,6 +189,7 @@ object KafkaTestUtils { allowAutoCreateTopics: Boolean = true, diagnostics: Diagnostics = Diagnostics.NoOp, restartStreamOnRebalancing: Boolean = false, + commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout, properties: Map[String, String] = Map.empty ): ZLayer[Kafka, Throwable, Consumer] = (ZLayer( @@ -197,7 +200,8 @@ object KafkaTestUtils { allowAutoCreateTopics = allowAutoCreateTopics, offsetRetrieval = offsetRetrieval, restartStreamOnRebalancing = restartStreamOnRebalancing, - properties = properties + properties = properties, + commitTimeout = commitTimeout ) ) ++ ZLayer.succeed(diagnostics)) >>> Consumer.live diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index baf34b1c8..d77051b74 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -157,6 +157,8 @@ trait Consumer { object Consumer { case object RunloopTimeout extends RuntimeException("Timeout in Runloop") with NoStackTrace + case object CommitTimeout extends RuntimeException("Commit timeout") with NoStackTrace + private final class Live private[Consumer] ( consumer: ConsumerAccess, runloopAccess: RunloopAccess diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index 6e026fd27..2ed1ef608 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -25,6 +25,7 @@ final case class ConsumerSettings( properties: Map[String, AnyRef] = Map.empty, closeTimeout: Duration = 30.seconds, pollTimeout: Duration = 50.millis, + commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout, offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(), rebalanceListener: RebalanceListener = RebalanceListener.noop, restartStreamOnRebalancing: Boolean = false, @@ -48,6 +49,9 @@ final case class ConsumerSettings( def withCloseTimeout(timeout: Duration): ConsumerSettings = copy(closeTimeout = timeout) + def withCommitTimeout(timeout: Duration): ConsumerSettings = + copy(commitTimeout = timeout) + def withClientId(clientId: String): ConsumerSettings = withProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId) @@ -127,4 +131,5 @@ final case class ConsumerSettings( object ConsumerSettings { val defaultRunloopTimeout: Duration = 4.minutes + val defaultCommitTimeout: Duration = 15.seconds } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 4b0624e11..7fbee349b 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -4,7 +4,7 @@ import org.apache.kafka.clients.consumer._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.RebalanceInProgressException import zio._ -import zio.kafka.consumer.Consumer.{ OffsetRetrieval, RunloopTimeout } +import zio.kafka.consumer.Consumer.{ CommitTimeout, OffsetRetrieval, RunloopTimeout } import zio.kafka.consumer._ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } @@ -22,6 +22,7 @@ private[consumer] final class Runloop private ( hasGroupId: Boolean, consumer: ConsumerAccess, pollTimeout: Duration, + commitTimeout: Duration, runloopTimeout: Duration, commandQueue: Queue[RunloopCommand], lastRebalanceEvent: Ref.Synchronized[Option[Runloop.RebalanceEvent]], @@ -120,7 +121,7 @@ private[consumer] final class Runloop private ( p <- Promise.make[Throwable, Unit] _ <- commandQueue.offer(RunloopCommand.Commit(offsets, p)).unit _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) - _ <- p.await + _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) } yield () private def doCommit(cmd: RunloopCommand.Commit): UIO[Unit] = { @@ -560,6 +561,7 @@ private[consumer] object Runloop { hasGroupId: Boolean, consumer: ConsumerAccess, pollTimeout: Duration, + commitTimeout: Duration, diagnostics: Diagnostics, offsetRetrieval: OffsetRetrieval, userRebalanceListener: RebalanceListener, @@ -580,6 +582,7 @@ private[consumer] object Runloop { hasGroupId = hasGroupId, consumer = consumer, pollTimeout = pollTimeout, + commitTimeout = commitTimeout, runloopTimeout = runloopTimeout, commandQueue = commandQueue, lastRebalanceEvent = lastRebalanceEvent, diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 6e62bf14f..938029890 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -88,6 +88,7 @@ private[consumer] object RunloopAccess { hasGroupId = settings.hasGroupId, consumer = consumerAccess, pollTimeout = settings.pollTimeout, + commitTimeout = settings.commitTimeout, diagnostics = diagnostics, offsetRetrieval = settings.offsetRetrieval, userRebalanceListener = settings.rebalanceListener,