Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Allow whitelisting Kafka topics #65

Merged
merged 3 commits into from
Sep 18, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ object Dependencies {
private val slf4jExclusionRule = ExclusionRule("org.slf4j")

val LightbendConfig = "com.typesafe" % "config" % "1.3.2"
val Kafka = "org.apache.kafka" %% "kafka" % "2.2.1" excludeAll (jacksonExclusionRule, log4jExclusionRule, slf4jExclusionRule)
val Kafka = "org.apache.kafka" %% "kafka" % "2.3.0" excludeAll (jacksonExclusionRule, log4jExclusionRule, slf4jExclusionRule)
val AkkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % Version.Akka
val AkkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % Version.Akka
val Logback = "ch.qos.logback" % "logback-classic" % "1.2.3"
Expand All @@ -31,10 +31,10 @@ object Dependencies {
val Fabric8Model = "io.fabric8" % "kubernetes-model" % Version.Fabric8
val Fabric8Client = "io.fabric8" % "kubernetes-client" % Version.Fabric8
val ScalaJava8Compat = "org.scala-lang.modules" %% "scala-java8-compat" % "0.9.0"

val ScalaTest = "org.scalatest" %% "scalatest" % "3.0.5" % Test
val AkkaTypedTestKit = "com.typesafe.akka" %% "akka-actor-testkit-typed" % Version.Akka % Test
val MockitoScala = "org.mockito" %% "mockito-scala" % "1.0.8" % Test
val AlpakkaKafkaTestKit = "com.typesafe.akka" %% "akka-stream-kafka-testkit" % "1.0.1" % Test excludeAll(jacksonExclusionRule, log4jExclusionRule, slf4jExclusionRule)
val AkkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.8" % Test
}
}
1 change: 1 addition & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ kafka-lag-exporter {
strimzi = ${?KAFKA_LAG_EXPORTER_STRIMZI}
}
metric-whitelist = [".*"]
topic-whitelist = [".*"]
}

akka {
Expand Down
6 changes: 4 additions & 2 deletions src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ object AppConfig {
).toMap
}.getOrElse(Map.empty[String, String])

val topicWhitelist = c.getStringList("topic-whitelist").asScala.toList

KafkaCluster(
clusterConfig.getString("name"),
clusterConfig.getString("bootstrap-brokers"),
topicWhitelist,
consumerProperties,
adminClientProperties,
labels
Expand Down Expand Up @@ -83,14 +85,15 @@ object AppConfig {
}
}

final case class KafkaCluster(name: String, bootstrapBrokers: String,
final case class KafkaCluster(name: String, bootstrapBrokers: String, topicWhitelist: List[String] = List(".*"),
consumerProperties: Map[String, String] = Map.empty,
adminClientProperties: Map[String, String] = Map.empty,
labels: Map[String, String] = Map.empty) {
override def toString(): String = {
s"""
| Cluster name: $name
| Cluster Kafka bootstrap brokers: $bootstrapBrokers
| Topic whitelist: [${topicWhitelist.mkString(", ")}]
""".stripMargin
}
}
Expand Down Expand Up @@ -121,4 +124,3 @@ final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, p
}.toMap
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster,
private[kafkalagexporter] def groupTopicPartitions(groupId: String, desc: ConsumerGroupDescription): List[Domain.GroupTopicPartition] = {
val groupTopicPartitions = for {
member <- desc.members().asScala
ktp <- member.assignment().topicPartitions().asScala
ktp <- member.assignment().topicPartitions().asScala if cluster.topicWhitelist.exists(r => ktp.topic().matches(r))
} yield Domain.GroupTopicPartition(
groupId,
member.clientId(),
Expand Down