Skip to content

Commit

Permalink
Implement default commit timeout (#982)
Browse files Browse the repository at this point in the history
  • Loading branch information
vladimirkl authored Jul 16, 2023
1 parent d7ae709 commit 219727e
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import zio._
import zio.kafka.ZIOSpecDefaultSlf4j
import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, OffsetRetrieval }
import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, CommitTimeout, OffsetRetrieval }
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization.{
ConsumerFinalized,
Expand Down Expand Up @@ -1192,7 +1192,27 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
)
}
)
)
),
test("commit timeout") {
val kvs = (1 to 100).toList.map(i => (s"key$i", s"msg$i"))
for {
topic <- randomTopic
client <- randomClient
group <- randomGroup

_ <- produceMany(topic, kvs)

result <- Consumer
.plainStream(Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.take(11)
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit) // Hangs without timeout
.runDrain
.exit
.provideSomeLayer[Kafka](consumer(client, Some(group), commitTimeout = 2.seconds))
} yield assert(result)(equalTo(Exit.fail(CommitTimeout)))
} @@ TestAspect.flaky(10) @@ TestAspect.timeout(20.seconds)
)
.provideSome[Scope & Kafka](producer)
.provideSomeShared[Scope](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,15 @@ object KafkaTestUtils {
restartStreamOnRebalancing: Boolean = false,
`max.poll.records`: Int = 100, // settings this higher can cause concurrency bugs to go unnoticed
runloopTimeout: Duration = ConsumerSettings.defaultRunloopTimeout,
commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout,
properties: Map[String, String] = Map.empty
): URIO[Kafka, ConsumerSettings] =
ZIO.serviceWith[Kafka] { (kafka: Kafka) =>
val settings = ConsumerSettings(kafka.bootstrapServers)
.withClientId(clientId)
.withCloseTimeout(5.seconds)
.withPollTimeout(100.millis)
.withCommitTimeout(commitTimeout)
.withRunloopTimeout(runloopTimeout)
.withProperties(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
Expand Down Expand Up @@ -187,6 +189,7 @@ object KafkaTestUtils {
allowAutoCreateTopics: Boolean = true,
diagnostics: Diagnostics = Diagnostics.NoOp,
restartStreamOnRebalancing: Boolean = false,
commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout,
properties: Map[String, String] = Map.empty
): ZLayer[Kafka, Throwable, Consumer] =
(ZLayer(
Expand All @@ -197,7 +200,8 @@ object KafkaTestUtils {
allowAutoCreateTopics = allowAutoCreateTopics,
offsetRetrieval = offsetRetrieval,
restartStreamOnRebalancing = restartStreamOnRebalancing,
properties = properties
properties = properties,
commitTimeout = commitTimeout
)
) ++ ZLayer.succeed(diagnostics)) >>> Consumer.live

Expand Down
2 changes: 2 additions & 0 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ trait Consumer {
object Consumer {
case object RunloopTimeout extends RuntimeException("Timeout in Runloop") with NoStackTrace

case object CommitTimeout extends RuntimeException("Commit timeout") with NoStackTrace

private final class Live private[Consumer] (
consumer: ConsumerAccess,
runloopAccess: RunloopAccess
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ final case class ConsumerSettings(
properties: Map[String, AnyRef] = Map.empty,
closeTimeout: Duration = 30.seconds,
pollTimeout: Duration = 50.millis,
commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout,
offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(),
rebalanceListener: RebalanceListener = RebalanceListener.noop,
restartStreamOnRebalancing: Boolean = false,
Expand All @@ -48,6 +49,9 @@ final case class ConsumerSettings(
def withCloseTimeout(timeout: Duration): ConsumerSettings =
copy(closeTimeout = timeout)

def withCommitTimeout(timeout: Duration): ConsumerSettings =
copy(commitTimeout = timeout)

def withClientId(clientId: String): ConsumerSettings =
withProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)

Expand Down Expand Up @@ -127,4 +131,5 @@ final case class ConsumerSettings(

object ConsumerSettings {
val defaultRunloopTimeout: Duration = 4.minutes
val defaultCommitTimeout: Duration = 15.seconds
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.RebalanceInProgressException
import zio._
import zio.kafka.consumer.Consumer.{ OffsetRetrieval, RunloopTimeout }
import zio.kafka.consumer.Consumer.{ CommitTimeout, OffsetRetrieval, RunloopTimeout }
import zio.kafka.consumer._
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics }
Expand All @@ -22,6 +22,7 @@ private[consumer] final class Runloop private (
hasGroupId: Boolean,
consumer: ConsumerAccess,
pollTimeout: Duration,
commitTimeout: Duration,
runloopTimeout: Duration,
commandQueue: Queue[RunloopCommand],
lastRebalanceEvent: Ref.Synchronized[Option[Runloop.RebalanceEvent]],
Expand Down Expand Up @@ -120,7 +121,7 @@ private[consumer] final class Runloop private (
p <- Promise.make[Throwable, Unit]
_ <- commandQueue.offer(RunloopCommand.Commit(offsets, p)).unit
_ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets))
_ <- p.await
_ <- p.await.timeoutFail(CommitTimeout)(commitTimeout)
} yield ()

private def doCommit(cmd: RunloopCommand.Commit): UIO[Unit] = {
Expand Down Expand Up @@ -560,6 +561,7 @@ private[consumer] object Runloop {
hasGroupId: Boolean,
consumer: ConsumerAccess,
pollTimeout: Duration,
commitTimeout: Duration,
diagnostics: Diagnostics,
offsetRetrieval: OffsetRetrieval,
userRebalanceListener: RebalanceListener,
Expand All @@ -580,6 +582,7 @@ private[consumer] object Runloop {
hasGroupId = hasGroupId,
consumer = consumer,
pollTimeout = pollTimeout,
commitTimeout = commitTimeout,
runloopTimeout = runloopTimeout,
commandQueue = commandQueue,
lastRebalanceEvent = lastRebalanceEvent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ private[consumer] object RunloopAccess {
hasGroupId = settings.hasGroupId,
consumer = consumerAccess,
pollTimeout = settings.pollTimeout,
commitTimeout = settings.commitTimeout,
diagnostics = diagnostics,
offsetRetrieval = settings.offsetRetrieval,
userRebalanceListener = settings.rebalanceListener,
Expand Down

0 comments on commit 219727e

Please sign in to comment.