Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable possibility to compact state topic when Tamer creates it #1390

Merged
merged 2 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions core/src/main/scala/tamer/Tamer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import log.effect.zio.ZioLogWriter.log4sFromName
import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.config.{TopicConfig => KTopicConfig}
import zio._
import zio.kafka.admin._
import zio.kafka.admin.AdminClient.{DescribeTopicsOptions, ListTopicsOptions, NewTopic, TopicDescription}
Expand Down Expand Up @@ -251,7 +252,13 @@ object Tamer {
.mapBoth(_ => topicConfig.maybeTopicOptions, (_, topicConfig.maybeTopicOptions))
.foldZIO(
{
case Some(TopicOptions(partitions, replicas)) =>
case Some(TopicOptions(partitions, replicas, true)) =>
log.info(
s"topic ${topicConfig.topicName} does not exist in Kafka. Given auto_create is set to true, creating it with $partitions partitions, $replicas replicas and compaction enabled"
) *> adminClient.createTopic(
NewTopic(topicConfig.topicName, partitions, replicas, Map(KTopicConfig.CLEANUP_POLICY_CONFIG -> KTopicConfig.CLEANUP_POLICY_COMPACT))
)
case Some(TopicOptions(partitions, replicas, false)) =>
log.info(
s"topic ${topicConfig.topicName} does not exist in Kafka. Given auto_create is set to true, creating it with $partitions partitions and $replicas replicas"
) *> adminClient.createTopic(NewTopic(topicConfig.topicName, partitions, replicas))
Expand All @@ -261,7 +268,7 @@ object Tamer {
)
},
{
case (TopicDescription(_, _, tpInfo, Some(acls)), Some(TopicOptions(partitions, replicas)))
case (TopicDescription(_, _, tpInfo, Some(acls)), Some(TopicOptions(partitions, replicas, _)))
if tpInfo.size == partitions && tpInfo.forall(_.replicas.size == replicas) && expectedACL.subsetOf(acls) =>
log
.info(
Expand All @@ -274,7 +281,7 @@ object Tamer {
s"verified topic ${topicConfig.topicName} successfully. Kafka informs us that it has ${tpInfo.size} partitions and ${tpInfo.head.replicas.size} replicas, but it satisfies all expected ACLs (${expectedACL
.mkString(", ")}), proceeding"
)
case (TopicDescription(_, _, tpInfo, Some(acls)), Some(TopicOptions(partitions, replicas))) if expectedACL.subsetOf(acls) =>
case (TopicDescription(_, _, tpInfo, Some(acls)), Some(TopicOptions(partitions, replicas, _))) if expectedACL.subsetOf(acls) =>
log
.warn(
s"inconsistencies in topic ${topicConfig.topicName}. Kafka informs us that it has ${tpInfo.size} partitions and ${tpInfo.head.replicas.size} replicas, expecting $partitions partitions and ${tpInfo.head.replicas.size} replicas, but it satisfies all expected ACLs (${expectedACL
Expand Down
22 changes: 12 additions & 10 deletions core/src/main/scala/tamer/config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,15 @@ object RegistryConfig {
}.optional
}

final case class TopicOptions(partitions: Int, replicas: Short)
final case class TopicOptions(partitions: Int, replicas: Short, compaction: Boolean)
object TopicOptions {
val config: Config[Option[TopicOptions]] =
def config(compactionDefault: Boolean): Config[Option[TopicOptions]] =
(Config.boolean("auto_create").withDefault(false) ++
Config.int("partitions").withDefault(1) ++
Config.int("replicas").map(_.toShort).withDefault(1.toShort)).map {
case (true, partitions, replicas) => Some(TopicOptions(partitions, replicas))
case _ => None
Config.int("replicas").map(_.toShort).withDefault(1.toShort) ++
Config.boolean("compaction").withDefault(compactionDefault)).map {
case (true, partitions, replicas, compaction) => Some(TopicOptions(partitions, replicas, compaction))
case _ => None
}
}

Expand All @@ -66,9 +67,10 @@ object TopicConfig {
topicName = topicName,
maybeTopicOptions = None
)
val config: Config[TopicConfig] = (Config.string("topic") ++ TopicOptions.config).map { case (topicName, maybeTopicOptions) =>
TopicConfig(topicName, maybeTopicOptions)
}
def config(compactionDefault: Boolean): Config[TopicConfig] =
(Config.string("topic") ++ TopicOptions.config(compactionDefault)).map { case (topicName, maybeTopicOptions) =>
TopicConfig(topicName, maybeTopicOptions)
}
}

final case class KafkaConfig(
Expand Down Expand Up @@ -112,8 +114,8 @@ object KafkaConfig {
RegistryConfig.config.nested("schema_registry") ++
Config.duration("close_timeout") ++
Config.int("buffer_size") ++
TopicConfig.config.nested("sink") ++
TopicConfig.config.nested("state") ++
TopicConfig.config(compactionDefault = false).nested("sink") ++
TopicConfig.config(compactionDefault = true).nested("state") ++
Config.string("group_id") ++
Config.string("client_id") ++
Config.string("transactional_id")
Expand Down
1 change: 1 addition & 0 deletions db/local/runDatabaseSimple.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export KAFKA_STATE_TOPIC=state
export KAFKA_STATE_AUTO_CREATE=on
export KAFKA_STATE_PARTITIONS=1
export KAFKA_STATE_REPLICAS=1
export KAFKA_STATE_COMPACTION=on
export KAFKA_GROUP_ID=groupid
export KAFKA_CLIENT_ID=clientid
export KAFKA_TRANSACTIONAL_ID=transactionid
Expand Down
4 changes: 2 additions & 2 deletions example/src/main/scala/tamer/s3/S3Generalized.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ object S3Generalized extends ZIOAppDefault {
Some(RegistryConfig("http://localhost:8081")),
10.seconds,
50,
TopicConfig("sink", Some(TopicOptions(1, 1))),
TopicConfig("state", Some(TopicOptions(1, 1))),
TopicConfig("sink", Some(TopicOptions(1, 1, false))),
TopicConfig("state", Some(TopicOptions(1, 1, true))),
"groupid",
"clientid",
"s3-generalized-id"
Expand Down
1 change: 1 addition & 0 deletions rest/local/runRestBasicAuth.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export KAFKA_STATE_TOPIC=state
export KAFKA_STATE_AUTO_CREATE=on
export KAFKA_STATE_PARTITIONS=1
export KAFKA_STATE_REPLICAS=1
export KAFKA_STATE_COMPACTION=on
export KAFKA_GROUP_ID=groupid
export KAFKA_CLIENT_ID=clientid
export KAFKA_TRANSACTIONAL_ID=transactionid
Expand Down
1 change: 1 addition & 0 deletions rest/local/runRestCustomAuth.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export KAFKA_STATE_TOPIC=state
export KAFKA_STATE_AUTO_CREATE=on
export KAFKA_STATE_PARTITIONS=1
export KAFKA_STATE_REPLICAS=1
export KAFKA_STATE_COMPACTION=on
export KAFKA_GROUP_ID=groupid
export KAFKA_CLIENT_ID=clientid
export KAFKA_TRANSACTIONAL_ID=transactionid
Expand Down
1 change: 1 addition & 0 deletions rest/local/runRestDynamicData.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export KAFKA_STATE_TOPIC=state
export KAFKA_STATE_AUTO_CREATE=on
export KAFKA_STATE_PARTITIONS=1
export KAFKA_STATE_REPLICAS=1
export KAFKA_STATE_COMPACTION=on
export KAFKA_GROUP_ID=groupid
export KAFKA_CLIENT_ID=clientid
export KAFKA_TRANSACTIONAL_ID=transactionid
Expand Down
1 change: 1 addition & 0 deletions rest/local/runRestSimple.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export KAFKA_STATE_TOPIC=state
export KAFKA_STATE_AUTO_CREATE=on
export KAFKA_STATE_PARTITIONS=1
export KAFKA_STATE_REPLICAS=1
export KAFKA_STATE_COMPACTION=on
export KAFKA_GROUP_ID=groupid
export KAFKA_CLIENT_ID=clientid
export KAFKA_TRANSACTIONAL_ID=transactionid
Expand Down
1 change: 1 addition & 0 deletions s3/local/runS3Generalized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export KAFKA_STATE_TOPIC=state
export KAFKA_STATE_AUTO_CREATE=on
export KAFKA_STATE_PARTITIONS=1
export KAFKA_STATE_REPLICAS=1
export KAFKA_STATE_COMPACTION=on
export KAFKA_GROUP_ID=groupid
export KAFKA_CLIENT_ID=clientid
export KAFKA_TRANSACTIONAL_ID=transactionid
Expand Down
1 change: 1 addition & 0 deletions s3/local/runS3Simple.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export KAFKA_STATE_TOPIC=state
export KAFKA_STATE_AUTO_CREATE=on
export KAFKA_STATE_PARTITIONS=1
export KAFKA_STATE_REPLICAS=1
export KAFKA_STATE_COMPACTION=on
export KAFKA_GROUP_ID=groupid
export KAFKA_CLIENT_ID=clientid
export KAFKA_TRANSACTIONAL_ID=transactionid
Expand Down
Loading