Skip to content

Commit

Permalink
Make settings properties uniform
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvanoosten committed Dec 17, 2023
1 parent 87e30f7 commit ca64b04
Showing 1 changed file with 8 additions and 11 deletions.
19 changes: 8 additions & 11 deletions zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,6 @@ final case class ConsumerSettings(
maxRebalanceDuration: Option[Duration] = None,
fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy()
) {
private[this] def autoOffsetResetConfig: Map[String, String] = {
val resetStrategy = offsetRetrieval match {
case OffsetRetrieval.Auto(reset) => reset
case OffsetRetrieval.Manual(_, defaultStrategy) => defaultStrategy
}
Map(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> resetStrategy.toConfig)
}

/**
* Tunes the consumer for high throughput.
Expand Down Expand Up @@ -77,9 +70,7 @@ final case class ConsumerSettings(
.withFetchStrategy(QueueSizeBasedFetchStrategy(partitionPreFetchBufferLimit = 512))

def driverSettings: Map[String, AnyRef] =
Map(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false"
) ++ autoOffsetResetConfig ++ properties
Map(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false") ++ properties

def withBootstrapServers(servers: List[String]): ConsumerSettings =
withProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers.mkString(","))
Expand Down Expand Up @@ -132,8 +123,14 @@ final case class ConsumerSettings(
* This method sets the `auto.offset.reset` Kafka configuration. See
* https://kafka.apache.org/documentation/#consumerconfigs_auto.offset.reset for more information.
*/
def withOffsetRetrieval(retrieval: OffsetRetrieval): ConsumerSettings =
def withOffsetRetrieval(retrieval: OffsetRetrieval): ConsumerSettings = {
val resetStrategy = offsetRetrieval match {
case OffsetRetrieval.Auto(reset) => reset
case OffsetRetrieval.Manual(_, defaultStrategy) => defaultStrategy
}
copy(offsetRetrieval = retrieval)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, resetStrategy.toConfig)
}

/**
* The maximum time to block while polling the Kafka consumer. The Kafka consumer will return earlier when the maximum
Expand Down

0 comments on commit ca64b04

Please sign in to comment.