From 20e631faf9dbb46750a6ed408c9818f15082ea6a Mon Sep 17 00:00:00 2001 From: Julien Jean Paul Sirocchi Date: Thu, 1 Feb 2024 12:54:21 -0800 Subject: [PATCH] Enable possibility to compact state topic when Tamer creates it --- core/src/main/scala/tamer/Tamer.scala | 13 ++++++++++--- core/src/main/scala/tamer/config.scala | 22 ++++++++++++---------- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/tamer/Tamer.scala b/core/src/main/scala/tamer/Tamer.scala index 6a533cc9..6549e869 100644 --- a/core/src/main/scala/tamer/Tamer.scala +++ b/core/src/main/scala/tamer/Tamer.scala @@ -5,6 +5,7 @@ import log.effect.zio.ZioLogWriter.log4sFromName import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata} import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.{KafkaException, TopicPartition} +import org.apache.kafka.common.config.{TopicConfig => KTopicConfig} import zio._ import zio.kafka.admin._ import zio.kafka.admin.AdminClient.{DescribeTopicsOptions, ListTopicsOptions, NewTopic, TopicDescription} @@ -251,7 +252,13 @@ object Tamer { .mapBoth(_ => topicConfig.maybeTopicOptions, (_, topicConfig.maybeTopicOptions)) .foldZIO( { - case Some(TopicOptions(partitions, replicas)) => + case Some(TopicOptions(partitions, replicas, true)) => + log.info( + s"topic ${topicConfig.topicName} does not exist in Kafka. Given auto_create is set to true, creating it with $partitions partitions, $replicas replicas and compaction enabled" + ) *> adminClient.createTopic( + NewTopic(topicConfig.topicName, partitions, replicas, Map(KTopicConfig.CLEANUP_POLICY_CONFIG -> KTopicConfig.CLEANUP_POLICY_COMPACT)) + ) + case Some(TopicOptions(partitions, replicas, false)) => log.info( s"topic ${topicConfig.topicName} does not exist in Kafka. Given auto_create is set to true, creating it with $partitions partitions and $replicas replicas" ) *> adminClient.createTopic(NewTopic(topicConfig.topicName, partitions, replicas)) @@ -261,7 +268,7 @@ object Tamer { ) }, { - case (TopicDescription(_, _, tpInfo, Some(acls)), Some(TopicOptions(partitions, replicas))) + case (TopicDescription(_, _, tpInfo, Some(acls)), Some(TopicOptions(partitions, replicas, _))) if tpInfo.size == partitions && tpInfo.forall(_.replicas.size == replicas) && expectedACL.subsetOf(acls) => log .info( @@ -274,7 +281,7 @@ object Tamer { s"verified topic ${topicConfig.topicName} successfully. Kafka informs us that it has ${tpInfo.size} partitions and ${tpInfo.head.replicas.size} replicas, but it satisfies all expected ACLs (${expectedACL .mkString(", ")}), proceeding" ) - case (TopicDescription(_, _, tpInfo, Some(acls)), Some(TopicOptions(partitions, replicas))) if expectedACL.subsetOf(acls) => + case (TopicDescription(_, _, tpInfo, Some(acls)), Some(TopicOptions(partitions, replicas, _))) if expectedACL.subsetOf(acls) => log .warn( s"inconsistencies in topic ${topicConfig.topicName}. Kafka informs us that it has ${tpInfo.size} partitions and ${tpInfo.head.replicas.size} replicas, expecting $partitions partitions and ${tpInfo.head.replicas.size} replicas, but it satisfies all expected ACLs (${expectedACL diff --git a/core/src/main/scala/tamer/config.scala b/core/src/main/scala/tamer/config.scala index 07583ec4..433530e0 100644 --- a/core/src/main/scala/tamer/config.scala +++ b/core/src/main/scala/tamer/config.scala @@ -49,14 +49,15 @@ object RegistryConfig { }.optional } -final case class TopicOptions(partitions: Int, replicas: Short) +final case class TopicOptions(partitions: Int, replicas: Short, compaction: Boolean) object TopicOptions { - val config: Config[Option[TopicOptions]] = + def config(compactionDefault: Boolean): Config[Option[TopicOptions]] = (Config.boolean("auto_create").withDefault(false) ++ Config.int("partitions").withDefault(1) ++ - Config.int("replicas").map(_.toShort).withDefault(1.toShort)).map { - case (true, partitions, replicas) => Some(TopicOptions(partitions, replicas)) - case _ => None + Config.int("replicas").map(_.toShort).withDefault(1.toShort) ++ + Config.boolean("compaction").withDefault(compactionDefault)).map { + case (true, partitions, replicas, compaction) => Some(TopicOptions(partitions, replicas, compaction)) + case _ => None } } @@ -66,9 +67,10 @@ object TopicConfig { topicName = topicName, maybeTopicOptions = None ) - val config: Config[TopicConfig] = (Config.string("topic") ++ TopicOptions.config).map { case (topicName, maybeTopicOptions) => - TopicConfig(topicName, maybeTopicOptions) - } + def config(compactionDefault: Boolean): Config[TopicConfig] = + (Config.string("topic") ++ TopicOptions.config(compactionDefault)).map { case (topicName, maybeTopicOptions) => + TopicConfig(topicName, maybeTopicOptions) + } } final case class KafkaConfig( @@ -112,8 +114,8 @@ object KafkaConfig { RegistryConfig.config.nested("schema_registry") ++ Config.duration("close_timeout") ++ Config.int("buffer_size") ++ - TopicConfig.config.nested("sink") ++ - TopicConfig.config.nested("state") ++ + TopicConfig.config(compactionDefault = false).nested("sink") ++ + TopicConfig.config(compactionDefault = true).nested("state") ++ Config.string("group_id") ++ Config.string("client_id") ++ Config.string("transactional_id")