Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement default commit timeout #982

Merged
merged 1 commit into from
Jul 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import zio._
import zio.kafka.ZIOSpecDefaultSlf4j
import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, OffsetRetrieval }
import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, CommitTimeout, OffsetRetrieval }
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization.{
ConsumerFinalized,
Expand Down Expand Up @@ -1192,7 +1192,27 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
)
}
)
)
),
test("commit timeout") {
val kvs = (1 to 100).toList.map(i => (s"key$i", s"msg$i"))
for {
topic <- randomTopic
client <- randomClient
group <- randomGroup

_ <- produceMany(topic, kvs)

result <- Consumer
.plainStream(Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.take(11)
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit) // Hangs without timeout
.runDrain
.exit
.provideSomeLayer[Kafka](consumer(client, Some(group), commitTimeout = 2.seconds))
} yield assert(result)(equalTo(Exit.fail(CommitTimeout)))
} @@ TestAspect.flaky(10) @@ TestAspect.timeout(20.seconds)
)
.provideSome[Scope & Kafka](producer)
.provideSomeShared[Scope](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,15 @@ object KafkaTestUtils {
restartStreamOnRebalancing: Boolean = false,
`max.poll.records`: Int = 100, // settings this higher can cause concurrency bugs to go unnoticed
runloopTimeout: Duration = ConsumerSettings.defaultRunloopTimeout,
commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout,
properties: Map[String, String] = Map.empty
): URIO[Kafka, ConsumerSettings] =
ZIO.serviceWith[Kafka] { (kafka: Kafka) =>
val settings = ConsumerSettings(kafka.bootstrapServers)
.withClientId(clientId)
.withCloseTimeout(5.seconds)
.withPollTimeout(100.millis)
.withCommitTimeout(commitTimeout)
.withRunloopTimeout(runloopTimeout)
.withProperties(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
Expand Down Expand Up @@ -187,6 +189,7 @@ object KafkaTestUtils {
allowAutoCreateTopics: Boolean = true,
diagnostics: Diagnostics = Diagnostics.NoOp,
restartStreamOnRebalancing: Boolean = false,
commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout,
properties: Map[String, String] = Map.empty
): ZLayer[Kafka, Throwable, Consumer] =
(ZLayer(
Expand All @@ -197,7 +200,8 @@ object KafkaTestUtils {
allowAutoCreateTopics = allowAutoCreateTopics,
offsetRetrieval = offsetRetrieval,
restartStreamOnRebalancing = restartStreamOnRebalancing,
properties = properties
properties = properties,
commitTimeout = commitTimeout
)
) ++ ZLayer.succeed(diagnostics)) >>> Consumer.live

Expand Down
2 changes: 2 additions & 0 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ trait Consumer {
object Consumer {
case object RunloopTimeout extends RuntimeException("Timeout in Runloop") with NoStackTrace

case object CommitTimeout extends RuntimeException("Commit timeout") with NoStackTrace

private final class Live private[Consumer] (
consumer: ConsumerAccess,
runloopAccess: RunloopAccess
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ final case class ConsumerSettings(
properties: Map[String, AnyRef] = Map.empty,
closeTimeout: Duration = 30.seconds,
pollTimeout: Duration = 50.millis,
commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout,
offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(),
rebalanceListener: RebalanceListener = RebalanceListener.noop,
restartStreamOnRebalancing: Boolean = false,
Expand All @@ -48,6 +49,9 @@ final case class ConsumerSettings(
def withCloseTimeout(timeout: Duration): ConsumerSettings =
copy(closeTimeout = timeout)

def withCommitTimeout(timeout: Duration): ConsumerSettings =
copy(commitTimeout = timeout)

def withClientId(clientId: String): ConsumerSettings =
withProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)

Expand Down Expand Up @@ -127,4 +131,5 @@ final case class ConsumerSettings(

object ConsumerSettings {
val defaultRunloopTimeout: Duration = 4.minutes
val defaultCommitTimeout: Duration = 15.seconds
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.RebalanceInProgressException
import zio._
import zio.kafka.consumer.Consumer.{ OffsetRetrieval, RunloopTimeout }
import zio.kafka.consumer.Consumer.{ CommitTimeout, OffsetRetrieval, RunloopTimeout }
import zio.kafka.consumer._
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics }
Expand All @@ -22,6 +22,7 @@ private[consumer] final class Runloop private (
hasGroupId: Boolean,
consumer: ConsumerAccess,
pollTimeout: Duration,
commitTimeout: Duration,
runloopTimeout: Duration,
commandQueue: Queue[RunloopCommand],
lastRebalanceEvent: Ref.Synchronized[Option[Runloop.RebalanceEvent]],
Expand Down Expand Up @@ -120,7 +121,7 @@ private[consumer] final class Runloop private (
p <- Promise.make[Throwable, Unit]
_ <- commandQueue.offer(RunloopCommand.Commit(offsets, p)).unit
_ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets))
_ <- p.await
_ <- p.await.timeoutFail(CommitTimeout)(commitTimeout)
} yield ()

private def doCommit(cmd: RunloopCommand.Commit): UIO[Unit] = {
Expand Down Expand Up @@ -560,6 +561,7 @@ private[consumer] object Runloop {
hasGroupId: Boolean,
consumer: ConsumerAccess,
pollTimeout: Duration,
commitTimeout: Duration,
diagnostics: Diagnostics,
offsetRetrieval: OffsetRetrieval,
userRebalanceListener: RebalanceListener,
Expand All @@ -580,6 +582,7 @@ private[consumer] object Runloop {
hasGroupId = hasGroupId,
consumer = consumer,
pollTimeout = pollTimeout,
commitTimeout = commitTimeout,
runloopTimeout = runloopTimeout,
commandQueue = commandQueue,
lastRebalanceEvent = lastRebalanceEvent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ private[consumer] object RunloopAccess {
hasGroupId = settings.hasGroupId,
consumer = consumerAccess,
pollTimeout = settings.pollTimeout,
commitTimeout = settings.commitTimeout,
diagnostics = diagnostics,
offsetRetrieval = settings.offsetRetrieval,
userRebalanceListener = settings.rebalanceListener,
Expand Down