Skip to content

Commit

Permalink
Enable possibility to compact state topic when Tamer creates it
Browse files Browse the repository at this point in the history
  • Loading branch information
sirocchj committed Feb 1, 2024
1 parent baf078b commit 20e631f
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 13 deletions.
13 changes: 10 additions & 3 deletions core/src/main/scala/tamer/Tamer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import log.effect.zio.ZioLogWriter.log4sFromName
import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.config.{TopicConfig => KTopicConfig}
import zio._
import zio.kafka.admin._
import zio.kafka.admin.AdminClient.{DescribeTopicsOptions, ListTopicsOptions, NewTopic, TopicDescription}
Expand Down Expand Up @@ -251,7 +252,13 @@ object Tamer {
.mapBoth(_ => topicConfig.maybeTopicOptions, (_, topicConfig.maybeTopicOptions))
.foldZIO(
{
case Some(TopicOptions(partitions, replicas)) =>
case Some(TopicOptions(partitions, replicas, true)) =>
log.info(
s"topic ${topicConfig.topicName} does not exist in Kafka. Given auto_create is set to true, creating it with $partitions partitions, $replicas replicas and compaction enabled"
) *> adminClient.createTopic(
NewTopic(topicConfig.topicName, partitions, replicas, Map(KTopicConfig.CLEANUP_POLICY_CONFIG -> KTopicConfig.CLEANUP_POLICY_COMPACT))
)
case Some(TopicOptions(partitions, replicas, false)) =>
log.info(
s"topic ${topicConfig.topicName} does not exist in Kafka. Given auto_create is set to true, creating it with $partitions partitions and $replicas replicas"
) *> adminClient.createTopic(NewTopic(topicConfig.topicName, partitions, replicas))
Expand All @@ -261,7 +268,7 @@ object Tamer {
)
},
{
case (TopicDescription(_, _, tpInfo, Some(acls)), Some(TopicOptions(partitions, replicas)))
case (TopicDescription(_, _, tpInfo, Some(acls)), Some(TopicOptions(partitions, replicas, _)))
if tpInfo.size == partitions && tpInfo.forall(_.replicas.size == replicas) && expectedACL.subsetOf(acls) =>
log
.info(
Expand All @@ -274,7 +281,7 @@ object Tamer {
s"verified topic ${topicConfig.topicName} successfully. Kafka informs us that it has ${tpInfo.size} partitions and ${tpInfo.head.replicas.size} replicas, but it satisfies all expected ACLs (${expectedACL
.mkString(", ")}), proceeding"
)
case (TopicDescription(_, _, tpInfo, Some(acls)), Some(TopicOptions(partitions, replicas))) if expectedACL.subsetOf(acls) =>
case (TopicDescription(_, _, tpInfo, Some(acls)), Some(TopicOptions(partitions, replicas, _))) if expectedACL.subsetOf(acls) =>
log
.warn(
s"inconsistencies in topic ${topicConfig.topicName}. Kafka informs us that it has ${tpInfo.size} partitions and ${tpInfo.head.replicas.size} replicas, expecting $partitions partitions and ${tpInfo.head.replicas.size} replicas, but it satisfies all expected ACLs (${expectedACL
Expand Down
22 changes: 12 additions & 10 deletions core/src/main/scala/tamer/config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,15 @@ object RegistryConfig {
}.optional
}

final case class TopicOptions(partitions: Int, replicas: Short)
final case class TopicOptions(partitions: Int, replicas: Short, compaction: Boolean)
object TopicOptions {
val config: Config[Option[TopicOptions]] =
def config(compactionDefault: Boolean): Config[Option[TopicOptions]] =
(Config.boolean("auto_create").withDefault(false) ++
Config.int("partitions").withDefault(1) ++
Config.int("replicas").map(_.toShort).withDefault(1.toShort)).map {
case (true, partitions, replicas) => Some(TopicOptions(partitions, replicas))
case _ => None
Config.int("replicas").map(_.toShort).withDefault(1.toShort) ++
Config.boolean("compaction").withDefault(compactionDefault)).map {
case (true, partitions, replicas, compaction) => Some(TopicOptions(partitions, replicas, compaction))
case _ => None
}
}

Expand All @@ -66,9 +67,10 @@ object TopicConfig {
topicName = topicName,
maybeTopicOptions = None
)
val config: Config[TopicConfig] = (Config.string("topic") ++ TopicOptions.config).map { case (topicName, maybeTopicOptions) =>
TopicConfig(topicName, maybeTopicOptions)
}
def config(compactionDefault: Boolean): Config[TopicConfig] =
(Config.string("topic") ++ TopicOptions.config(compactionDefault)).map { case (topicName, maybeTopicOptions) =>
TopicConfig(topicName, maybeTopicOptions)
}
}

final case class KafkaConfig(
Expand Down Expand Up @@ -112,8 +114,8 @@ object KafkaConfig {
RegistryConfig.config.nested("schema_registry") ++
Config.duration("close_timeout") ++
Config.int("buffer_size") ++
TopicConfig.config.nested("sink") ++
TopicConfig.config.nested("state") ++
TopicConfig.config(compactionDefault = false).nested("sink") ++
TopicConfig.config(compactionDefault = true).nested("state") ++
Config.string("group_id") ++
Config.string("client_id") ++
Config.string("transactional_id")
Expand Down

0 comments on commit 20e631f

Please sign in to comment.