Skip to content

Commit

Permalink
Allow committed offsets refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
rtimush committed May 4, 2018
1 parent 631fc65 commit 05e0d8e
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 44 deletions.
5 changes: 5 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ akka.kafka.consumer {
# After exceeding maxinum wakeups the consumer will stop and the stage will fail.
max-wakeups = 10

# If enabled the consumer will re-send last committed offsets periodically
# for all assigned partitions. See https://issues.apache.org/jira/browse/KAFKA-4682.
commit-refresh-enabled = false
commit-refresh-interval = 1m

# If enabled log stack traces before waking up the KafkaConsumer to give
# some indication why the KafkaConsumer is not honouring the poll-timeout
wakeup-debug = true
Expand Down
17 changes: 14 additions & 3 deletions core/src/main/scala/akka/kafka/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,14 @@ object ConsumerSettings {
val commitTimeWarning = config.getDuration("commit-time-warning", TimeUnit.MILLISECONDS).millis
val wakeupTimeout = config.getDuration("wakeup-timeout", TimeUnit.MILLISECONDS).millis
val maxWakeups = config.getInt("max-wakeups")
val commitRefreshInterval =
if (config.getBoolean("commit-refresh-enabled")) Some(config.getDuration("commit-refresh-interval", TimeUnit.MICROSECONDS).millis)
else None
val dispatcher = config.getString("use-dispatcher")
val wakeupDebug = config.getBoolean("wakeup-debug")
new ConsumerSettings[K, V](properties, keyDeserializer, valueDeserializer,
pollInterval, pollTimeout, stopTimeout, closeTimeout, commitTimeout, wakeupTimeout, maxWakeups, dispatcher,
commitTimeWarning, wakeupDebug)
pollInterval, pollTimeout, stopTimeout, closeTimeout, commitTimeout, wakeupTimeout, maxWakeups, commitRefreshInterval,
dispatcher, commitTimeWarning, wakeupDebug)
}

/**
Expand Down Expand Up @@ -291,6 +294,7 @@ class ConsumerSettings[K, V](
val commitTimeout: FiniteDuration,
val wakeupTimeout: FiniteDuration,
val maxWakeups: Int,
val commitRefreshInterval: Option[FiniteDuration],
val dispatcher: String,
val commitTimeWarning: FiniteDuration = 1.second,
val wakeupDebug: Boolean = true
Expand Down Expand Up @@ -365,6 +369,12 @@ class ConsumerSettings[K, V](
def withMaxWakeups(maxWakeups: Int): ConsumerSettings[K, V] =
copy(maxWakeups = maxWakeups)

def withCommitRefreshInterval(commitRefreshInterval: FiniteDuration): ConsumerSettings[K, V] =
copy(commitRefreshInterval = Some(commitRefreshInterval))

def withoutCommitRefresh(): ConsumerSettings[K, V] =
copy(commitRefreshInterval = None)

def withWakeupDebug(wakeupDebug: Boolean): ConsumerSettings[K, V] =
copy(wakeupDebug = wakeupDebug)

Expand All @@ -380,12 +390,13 @@ class ConsumerSettings[K, V](
commitTimeWarning: FiniteDuration = commitTimeWarning,
wakeupTimeout: FiniteDuration = wakeupTimeout,
maxWakeups: Int = maxWakeups,
commitRefreshInterval: Option[FiniteDuration] = commitRefreshInterval,
dispatcher: String = dispatcher,
wakeupDebug: Boolean = wakeupDebug
): ConsumerSettings[K, V] =
new ConsumerSettings[K, V](properties, keyDeserializer, valueDeserializer,
pollInterval, pollTimeout, stopTimeout, closeTimeout, commitTimeout, wakeupTimeout,
maxWakeups, dispatcher, commitTimeWarning, wakeupDebug)
maxWakeups, commitRefreshInterval, dispatcher, commitTimeWarning, wakeupDebug)

/**
* Create a `KafkaConsumer` instance from the settings.
Expand Down
122 changes: 86 additions & 36 deletions core/src/main/scala/akka/kafka/KafkaConsumerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,19 @@ import java.io.{PrintWriter, StringWriter}
import java.util
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.LockSupport
import java.util.regex.Pattern

import akka.Done
import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable, DeadLetterSuppression, NoSerializationVerificationNeeded, Props, Status, Terminated}
import akka.event.LoggingReceive
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.{Metric, MetricName, TopicPartition}

import java.util.concurrent.locks.LockSupport

import akka.Done

import scala.util.control.{NoStackTrace, NonFatal}
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.util.control.{NoStackTrace, NonFatal}

object KafkaConsumerActor {
case class StoppingException() extends RuntimeException("Kafka consumer is stopping")
Expand Down Expand Up @@ -58,6 +56,12 @@ object KafkaConsumerActor {
private[KafkaConsumerActor] final case class Poll[K, V](
target: KafkaConsumerActor[K, V], periodic: Boolean
) extends DeadLetterSuppression with NoSerializationVerificationNeeded
private[KafkaConsumerActor] final case class PartitionAssigned(
partition: TopicPartition, offset: Long
) extends DeadLetterSuppression with NoSerializationVerificationNeeded
private[KafkaConsumerActor] final case class PartitionRevoked(
partition: TopicPartition
) extends DeadLetterSuppression with NoSerializationVerificationNeeded
private val number = new AtomicInteger()
def nextNumber(): Int = {
number.incrementAndGet()
Expand All @@ -70,14 +74,21 @@ object KafkaConsumerActor {
private[kafka] def rebalanceListener(onAssign: Set[TopicPartition] => Unit, onRevoke: Set[TopicPartition] => Unit): ListenerCallbacks =
ListenerCallbacks(onAssign, onRevoke)

private class WrappedAutoPausedListener(client: Consumer[_, _], listener: ListenerCallbacks) extends ConsumerRebalanceListener with NoSerializationVerificationNeeded {
private class WrappedAutoPausedListener(client: Consumer[_, _], caller: ActorRef, listener: ListenerCallbacks) extends ConsumerRebalanceListener with NoSerializationVerificationNeeded {
import KafkaConsumerActor.Internal._
override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
client.pause(partitions)
partitions.asScala.foreach { tp =>
caller ! PartitionAssigned(tp, client.position(tp))
}
listener.onAssign(partitions.asScala.toSet)
}

override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
listener.onRevoke(partitions.asScala.toSet)
partitions.asScala.foreach { tp =>
caller ! PartitionRevoked(tp)
}
}
}
}
Expand All @@ -99,6 +110,9 @@ private[kafka] class KafkaConsumerActor[K, V](settings: ConsumerSettings[K, V])
var consumer: Consumer[K, V] = _
var subscriptions = Set.empty[SubscriptionRequest]
var commitsInProgress = 0
var commitRequestedOffsets = Map.empty[TopicPartition, Long]
var committedOffsets = Map.empty[TopicPartition, Long]
var commitRefreshDeadline: Option[Deadline] = None
var wakeups = 0
var stopInProgress = false
var delayedPollInFlight = false
Expand All @@ -109,13 +123,18 @@ private[kafka] class KafkaConsumerActor[K, V](settings: ConsumerSettings[K, V])
checkOverlappingRequests("Assign", sender(), tps)
val previousAssigned = consumer.assignment()
consumer.assign((tps.toSeq ++ previousAssigned.asScala).asJava)
tps.foreach { tp =>
self ! PartitionAssigned(tp, consumer.position(tp))
}
case AssignWithOffset(tps) =>
scheduleFirstPollTask()
checkOverlappingRequests("AssignWithOffset", sender(), tps.keySet)
val previousAssigned = consumer.assignment()
consumer.assign((tps.keys.toSeq ++ previousAssigned.asScala).asJava)
tps.foreach {
case (tp, offset) => consumer.seek(tp, offset)
case (tp, offset) =>
consumer.seek(tp, offset)
self ! PartitionAssigned(tp, offset)
}
case AssignOffsetsForTimes(timestampsToSearch) =>
scheduleFirstPollTask()
Expand All @@ -129,34 +148,12 @@ private[kafka] class KafkaConsumerActor[K, V](settings: ConsumerSettings[K, V])
val ts = oat.timestamp()
log.debug("Get offset {} from topic {} with timestamp {}", offset, tp, ts)
consumer.seek(tp, offset)
self ! PartitionAssigned(tp, offset)
}

case Commit(offsets) =>
val commitMap = offsets.mapValues(new OffsetAndMetadata(_))
val reply = sender()
commitsInProgress += 1
val startTime = System.nanoTime()
consumer.commitAsync(commitMap.asJava, new OffsetCommitCallback {
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
// this is invoked on the thread calling consumer.poll which will always be the actor, so it is safe
val duration = FiniteDuration(System.nanoTime() - startTime, NANOSECONDS)
if (duration > settings.commitTimeWarning) {
log.warning("Kafka commit took longer than `commit-time-warning`: {} ms", duration.toMillis)
}
commitsInProgress -= 1
if (exception != null) reply ! Status.Failure(exception)
else reply ! Committed(offsets.asScala.toMap)
}
})
// When many requestors, e.g. many partitions with committablePartitionedSource the
// performance is much by collecting more requests/commits before performing the poll.
// That is done by sending a message to self, and thereby collect pending messages in mailbox.
if (requestors.size == 1)
poll()
else if (!delayedPollInFlight) {
delayedPollInFlight = true
self ! delayedPollMsg
}
commitRequestedOffsets ++= offsets
commit(offsets, sender())

case s: SubscriptionRequest =>
subscriptions = subscriptions + s
Expand Down Expand Up @@ -184,6 +181,18 @@ private[kafka] class KafkaConsumerActor[K, V](settings: ConsumerSettings[K, V])
self ! delayedPollMsg
}

case PartitionAssigned(partition, offset) =>
commitRequestedOffsets += partition -> commitRequestedOffsets.getOrElse(partition, offset)
committedOffsets += partition -> committedOffsets.getOrElse(partition, offset)
commitRefreshDeadline = settings.commitRefreshInterval.map(_.fromNow)

case PartitionRevoked(partition) =>
commitRequestedOffsets -= partition
committedOffsets -= partition

case Committed(offsets) =>
committedOffsets ++= offsets.mapValues(_.offset())

case Stop =>
if (commitsInProgress == 0) {
context.stop(self)
Expand All @@ -207,9 +216,9 @@ private[kafka] class KafkaConsumerActor[K, V](settings: ConsumerSettings[K, V])

subscription match {
case Subscribe(topics, listener) =>
consumer.subscribe(topics.toList.asJava, new WrappedAutoPausedListener(consumer, listener))
consumer.subscribe(topics.toList.asJava, new WrappedAutoPausedListener(consumer, self, listener))
case SubscribePattern(pattern, listener) =>
consumer.subscribe(Pattern.compile(pattern), new WrappedAutoPausedListener(consumer, listener))
consumer.subscribe(Pattern.compile(pattern), new WrappedAutoPausedListener(consumer, self, listener))
}
}

Expand Down Expand Up @@ -264,6 +273,14 @@ private[kafka] class KafkaConsumerActor[K, V](settings: ConsumerSettings[K, V])

private def receivePoll(p: Poll[_, _]): Unit = {
if (p.target == this) {
if (commitRefreshDeadline.exists(_.isOverdue())) {
val refreshOffsets = committedOffsets.filter {
case (tp, offset) =>
commitRequestedOffsets.get(tp).contains(offset)
}
log.debug("Refreshing committed offsets: {}", refreshOffsets)
commit(refreshOffsets, context.system.deadLetters)
}
poll()
if (p.periodic)
currentPollTask = schedulePollTask()
Expand All @@ -278,7 +295,7 @@ private[kafka] class KafkaConsumerActor[K, V](settings: ConsumerSettings[K, V])

def poll(): Unit = {
val wakeupTask = context.system.scheduler.scheduleOnce(settings.wakeupTimeout) {
log.warning("KafkaConsumer poll has exceeded wake up timeout ({}ms). Waking up consumer to avoid thread starvation.", settings.wakeupTimeout.toMillis)
log.warning("KafkaConsumer poll has exceeded wake up timeout ({}ms). Waking up consumer to avoid thread starvation.", settings.wakeupTimeout.toMillis)
if (settings.wakeupDebug) {
val stacks = Thread.getAllStackTraces.asScala.map { case (k, v) => s"$k\n ${v.mkString("\n")}" }.mkString("\n\n")
log.warning("Wake up has been triggered. Dumping stacks: {}", stacks)
Expand Down Expand Up @@ -410,6 +427,39 @@ private[kafka] class KafkaConsumerActor[K, V](settings: ConsumerSettings[K, V])
}
}

private def commit(offsets: Map[TopicPartition, Long], reply: ActorRef): Unit = {
commitRefreshDeadline = settings.commitRefreshInterval.map(_.fromNow)
val commitMap = offsets.mapValues(new OffsetAndMetadata(_))
val reply = sender()
commitsInProgress += 1
val startTime = System.nanoTime()
consumer.commitAsync(commitMap.asJava, new OffsetCommitCallback {
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
// this is invoked on the thread calling consumer.poll which will always be the actor, so it is safe
val duration = FiniteDuration(System.nanoTime() - startTime, NANOSECONDS)
if (duration > settings.commitTimeWarning) {
log.warning("Kafka commit took longer than `commit-time-warning`: {} ms", duration.toMillis)
}
commitsInProgress -= 1
if (exception != null) reply ! Status.Failure(exception)
else {
val committed = Committed(offsets.asScala.toMap)
self ! committed
reply ! committed
}
}
})
// When many requestors, e.g. many partitions with committablePartitionedSource the
// performance is much by collecting more requests/commits before performing the poll.
// That is done by sending a message to self, and thereby collect pending messages in mailbox.
if (requestors.size == 1)
poll()
else if (!delayedPollInFlight) {
delayedPollInFlight = true
self ! delayedPollMsg
}
}

private def processResult(partitionsToFetch: Set[TopicPartition], rawResult: ConsumerRecords[K, V]): Unit = {
if (!rawResult.isEmpty) {
//check the we got only requested partitions and did not drop any messages
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/akka/kafka/internal/ConsumerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class ConsumerTest(_system: ActorSystem)

def testSource(mock: ConsumerMock[K, V], groupId: String = "group1", topics: Set[String] = Set("topic")): Source[CommittableMessage[K, V], Control] = {
val settings = new ConsumerSettings(Map(ConsumerConfig.GROUP_ID_CONFIG -> groupId), Some(new StringDeserializer), Some(new StringDeserializer),
1.milli, 1.milli, 1.second, closeTimeout, 1.second, 5.seconds, 3, "akka.kafka.default-dispatcher", 1.second, true) {
1.milli, 1.milli, 1.second, closeTimeout, 1.second, 5.seconds, 3, None, "akka.kafka.default-dispatcher", 1.second, true) {
override def createKafkaConsumer(): KafkaConsumer[K, V] = {
mock.mock
}
Expand Down
Loading

0 comments on commit 05e0d8e

Please sign in to comment.