Skip to content

Commit

Permalink
Add default to manual offset retrieval
Browse files Browse the repository at this point in the history
According to the javadoc of `seek`, when a given offset is invalid, it falls back to the `auto.offset.reset` configuration. In this change we allow the user to set that configuration when `Manual` offset retrieval is used.

In addition, add documentation on how to configure offset retrieval.

Note: this change is source compatible, but not binary compatible.
  • Loading branch information
erikvanoosten committed Dec 13, 2023
1 parent 4a963ac commit d0123ea
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 6 deletions.
9 changes: 7 additions & 2 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,14 @@ object Consumer {
def metrics: RIO[Consumer, Map[MetricName, Metric]] =
ZIO.serviceWithZIO(_.metrics)

/** See [[ConsumerSettings.withOffsetRetrieval()]]. */
sealed trait OffsetRetrieval
object OffsetRetrieval {
final case class Auto(reset: AutoOffsetStrategy = AutoOffsetStrategy.Latest) extends OffsetRetrieval
final case class Manual(getOffsets: Set[TopicPartition] => Task[Map[TopicPartition, Long]]) extends OffsetRetrieval
final case class Auto(reset: AutoOffsetStrategy = AutoOffsetStrategy.Latest) extends OffsetRetrieval
final case class Manual(
getOffsets: Set[TopicPartition] => Task[Map[TopicPartition, Long]],
defaultStrategy: AutoOffsetStrategy = AutoOffsetStrategy.Latest
) extends OffsetRetrieval
}

sealed trait AutoOffsetStrategy { self =>
Expand All @@ -415,6 +419,7 @@ object Consumer {
}
}

/** See [[ConsumerSettings.withOffsetRetrieval()]]. */
object AutoOffsetStrategy {
case object Earliest extends AutoOffsetStrategy
case object Latest extends AutoOffsetStrategy
Expand Down
37 changes: 34 additions & 3 deletions zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ final case class ConsumerSettings(
maxRebalanceDuration: Option[Duration] = None,
fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy()
) {
private[this] def autoOffsetResetConfig: Map[String, String] = offsetRetrieval match {
case OffsetRetrieval.Auto(reset) => Map(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> reset.toConfig)
case OffsetRetrieval.Manual(_) => Map.empty
private[this] def autoOffsetResetConfig: Map[String, String] = {
val resetStrategy = offsetRetrieval match {
case OffsetRetrieval.Auto(reset) => reset
case OffsetRetrieval.Manual(_, defaultStrategy) => defaultStrategy
}
Map(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> resetStrategy.toConfig)
}

/**
Expand Down Expand Up @@ -99,6 +102,34 @@ final case class ConsumerSettings(
def withGroupInstanceId(groupInstanceId: String): ConsumerSettings =
withProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId)

/**
* Which offset to start consuming from for new partitions.
*
* The options are:
* {{{
* import zio.kafka.consumer.Consumer._
* OffsetRetrieval.Auto(AutoOffsetStrategy.Latest) // the default
* OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest)
* OffsetRetrieval.Auto(AutoOffsetStrategy.None)
* OffsetRetrieval.Manual(getOffsets, defaultStrategy)
* }}}
*
* The `Auto` options make consuming start from the latest committed offset. When no committed offset is available,
* the given offset strategy is used and consuming starts from the `Latest` offset (the default), the `Earliest`
* offset, or results in an error for `None`.
*
* The `Manual` option allows fine grained control over which offset to consume from. The provided `getOffsets`
* function needs to return an offset for each topic-partition that is being assigned. When the returned offset is
* smaller than the log start offset or larger than the log end offset, the `defaultStrategy` is used and consuming
* starts from the `Latest` offset (the default), the `Earliest` offset, or results in an error for `None`.
*
* TODO: describe what happens when a topic-partition is missing from the result of calling `getOffsets`???
*
* This configuration applies for both subscribed and assigned partitions.
*
* This method sets the `auto.offset.reset` Kafka configuration. See
* https://kafka.apache.org/documentation/#consumerconfigs_auto.offset.reset for more information.
*/
def withOffsetRetrieval(retrieval: OffsetRetrieval): ConsumerSettings =
copy(offsetRetrieval = retrieval)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ private[consumer] final class Runloop private (
private def doSeekForNewPartitions(c: ByteArrayKafkaConsumer, tps: Set[TopicPartition]): Task[Set[TopicPartition]] =
offsetRetrieval match {
case OffsetRetrieval.Auto(_) => ZIO.succeed(Set.empty)
case OffsetRetrieval.Manual(getOffsets) =>
case OffsetRetrieval.Manual(getOffsets, _) =>
if (tps.isEmpty) ZIO.succeed(Set.empty)
else
getOffsets(tps)
Expand Down

0 comments on commit d0123ea

Please sign in to comment.