Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Allow whitelisting Kafka topics #65

Merged
merged 3 commits into from
Sep 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@

## Introduction

Kafka Lag Exporter makes it easy to view the latency (residence time) of your [Apache Kafka](https://kafka.apache.org/)
consumer groups. It can run anywhere, but it provides features to run easily on [Kubernetes](https://kubernetes.io/)
clusters against [Strimzi](https://strimzi.io/) Kafka clusters using the [Prometheus](https://prometheus.io/) and [Grafana](https://grafana.com/)
monitoring stack. Kafka Lag Exporter is an [Akka Typed](https://doc.akka.io/docs/akka/current/typed/index.html)
Kafka Lag Exporter makes it easy to view the latency (residence time) of your [Apache Kafka](https://kafka.apache.org/)
consumer groups. It can run anywhere, but it provides features to run easily on [Kubernetes](https://kubernetes.io/)
clusters against [Strimzi](https://strimzi.io/) Kafka clusters using the [Prometheus](https://prometheus.io/) and [Grafana](https://grafana.com/)
monitoring stack. Kafka Lag Exporter is an [Akka Typed](https://doc.akka.io/docs/akka/current/typed/index.html)
application written in [Scala](https://www.scala-lang.org/).

For more information about Kafka Lag Exporter's features see Lightbend's blog post:
For more information about Kafka Lag Exporter's features see Lightbend's blog post:
[Monitor Kafka Consumer Group Latency with Kafka Lag Exporter](https://www.lightbend.com/blog/monitor-kafka-consumer-group-latency-with-kafka-lag-exporter).

**Project Status:** *beta*
Expand All @@ -54,9 +54,9 @@ For more information about Kafka Lag Exporter's features see Lightbend's blog po

## Metrics

[Prometheus](https://prometheus.io/) is a standard way to represent metrics in a modern cross-platform manner. Kafka Lag
Exporter exposes several metrics as an HTTP endpoint that can be readily scraped by Prometheus. When installed using
Helm and when enabling the Kubernetes pod self-discovery features within Prometheus server, Prometheus server will
[Prometheus](https://prometheus.io/) is a standard way to represent metrics in a modern cross-platform manner. Kafka Lag
Exporter exposes several metrics as an HTTP endpoint that can be readily scraped by Prometheus. When installed using
Helm and when enabling the Kubernetes pod self-discovery features within Prometheus server, Prometheus server will
automatically detect the HTTP endpoint and scrape its data.

**`kafka_consumergroup_group_offset`**
Expand Down Expand Up @@ -185,7 +185,7 @@ kubectl logs {POD_ID} --namespace myproject -f
## Run Standalone

To run the project in standalone mode you must first define a configuration `application.conf`. This configuration must
contain at least connection info to your Kafka cluster (`kafka-lag-exporter.clusters`). All other configuration has
contain at least connection info to your Kafka cluster (`kafka-lag-exporter.clusters`). All other configuration has
defaults defined in the project itself. See [`reference.conf`](./src/main/resources/reference.conf) for defaults.
### Configuration

Expand All @@ -201,6 +201,7 @@ General Configuration (`kafka-lag-exporter{}`)
| `clusters` | `[]` | A statically defined list of Kafka connection details. This list is optional if you choose to use the Strimzi auto-discovery feature |
| `watchers` | `{}` | Settings for Kafka cluster "watchers" used for auto-discovery. |
| `metric-whitelist` | `[".*"]` | Regex of metrics to be exposed via Prometheus endpoint. Eg. `[".*_max_lag.*", "kafka_partition_latest_offset"]` |
| `topic-whitelist` | `[".*"]` | Regex of topics monitored, e.g. `["input-.+", "output-.+"]` |

Kafka Cluster Connection Details (`kafka-lag-exporter.clusters[]`)

Expand Down Expand Up @@ -228,7 +229,7 @@ kafka-lag-exporter {
lookup-table-size = 120
clusters = [
{
name = "a-cluster"
name = "a-cluster"
bootstrap-brokers = "a-1.cluster-a.xyzcorp.com:9092,a-2.cluster-a.xyzcorp.com:9092,a-3.cluster-a.xyzcorp.com:9092"
consumer-properties = {
client.id = "consumer-client-id"
Expand All @@ -249,7 +250,7 @@ kafka-lag-exporter {

Define an `application.conf` and optionally a `logback.xml` with your configuration.

Run the Docker image. Expose metrics endpoint on the host port `8000`. Mount a config dir with your `application.conf`
Run the Docker image. Expose metrics endpoint on the host port `8000`. Mount a config dir with your `application.conf`
`logback.xml` into the container.

Ex)
Expand All @@ -267,7 +268,7 @@ See full example in [`./examples/standalone`](./examples/standalone).

## Estimate Consumer Group Time Lag

One of Kafka Lag Exporter’s more unique features is its ability to estimate the length of time that a consumer group is behind the last produced value for a particular partition, time lag (wait time). Offset lag is useful to indicate that the consumer group is lagging, but it doesn’t provide a sense of the actual latency of the consuming application.
One of Kafka Lag Exporter’s more unique features is its ability to estimate the length of time that a consumer group is behind the last produced value for a particular partition, time lag (wait time). Offset lag is useful to indicate that the consumer group is lagging, but it doesn’t provide a sense of the actual latency of the consuming application.

For example, a topic with two consumer groups may have different lag characteristics. Application A is a consumer which performs CPU intensive (and slow) business logic on each message it receives. It’s distributed across many consumer group members to handle the high load, but since its processing throughput is slower it takes longer to process each message per partition. Meanwhile Application B is a consumer which performs a simple ETL operation to land streaming data in another system, such as an HDFS data lake. It may have similar offset lag to Application A, but because it has a higher processing throughput its lag in time may be significantly less.

Expand Down Expand Up @@ -325,10 +326,10 @@ This dashboard has 4 rows that are described below.
* Consumer Group Max Offset Lag
* Consumer Group Offset Lag Top Partitions
![Consumer Group Max Time Lag](./grafana/consumer_group_max_time_lag.png)
2. **Max Consumer Group Time Lag Over Offset Lag** - One panel for each consumer group that shows the max lag
2. **Max Consumer Group Time Lag Over Offset Lag** - One panel for each consumer group that shows the max lag
in time on the left Y axis and max lag in offsets on the right Y axis. Ex)
![Max Consumer Group Time Lag Over Offset Lag Example](./grafana/max_consumer_group_time_lag_over_offset_lag.png)
3. **Max Consumer Group Time Lag Over Summed Offsets** - One panel for each consumer group that shows the max lag in time on the left Y
3. **Max Consumer Group Time Lag Over Summed Offsets** - One panel for each consumer group that shows the max lag in time on the left Y
axis. The right Y axis has the sum of latest and last consumed offsets for all group partitions. Ex)
![Max Consumer Group Time Lag Over Summed Offsets](./grafana/max_consumer_group_time_lag_over_summed_offsets.png)
4. **Kafka Lag Exporter JVM Metrics** - JVM metrics for the Kafka Lag Exporter itself.
Expand Down Expand Up @@ -394,7 +395,7 @@ docker-compose up

### Building your own Helm Chart

If you want to build your own Helm Chart and accompanying docker images you can override the Docker repository and
If you want to build your own Helm Chart and accompanying docker images you can override the Docker repository and
username with environment variables.

`DOCKER_REPOSITORY` - A custom Docker repository, such as a private company's docker repository (defaults to DockerHub)
Expand Down Expand Up @@ -426,7 +427,7 @@ Update values.yaml docker repository to docker.xyzcorp.com/foobar/kafka-lag-expo
[info] Successfully tagged docker.xyzcorp.com/foobar/kafka-lag-exporter:0.4.0-SNAPSHOT
[info] Built image docker.xyzcorp.com/foobar/kafka-lag-exporter with tags [0.4.0-SNAPSHOT]
[success] Total time: 17 s, completed 1-May-2019 2:37:28 PM
```
```

Deploy the local chart to K8s:

Expand All @@ -443,12 +444,12 @@ helm install ./charts/kafka-lag-exporter \

### Pre-requisites

The release process is orchestrated by the [`sbt-release`](https://github.com/sbt/sbt-release). Privileged access is
The release process is orchestrated by the [`sbt-release`](https://github.com/sbt/sbt-release). Privileged access is
required. Before running a release make sure the following pre-req's are met.

* Authenticated with Docker Hub with the `docker` command.
* Authenticated with GitHub
* `~/.netrc` file setup with GitHub credentials/token
* `~/.netrc` file setup with GitHub credentials/token

### Release steps

Expand Down Expand Up @@ -529,4 +530,3 @@ group partition
0.1.0

* Initial release

9 changes: 7 additions & 2 deletions charts/kafka-lag-exporter/templates/030-ConfigMap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ data:
{{ quote $whitelist }}{{- if ne $i $lastIndex -}}, {{ end }}
{{- end }}
]

topic-whitelist = [
{{- $lastIndex := sub (len .Values.topicWhitelist) 1}}
{{- range $i, $whitelist := .Values.topicWhitelist }}
{{ quote $whitelist }}{{- if ne $i $lastIndex -}}, {{ end }}
{{- end }}
]
}

akka {
Expand All @@ -70,4 +75,4 @@ data:
<root level="${ROOT_LOG_LEVEL}">
<appender-ref ref="STDOUT" />
</root>
</configuration>
</configuration>
13 changes: 13 additions & 0 deletions charts/kafka-lag-exporter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,19 @@ watchers:
metricWhitelist:
- .*

## You can use regex to control the set of topics monitored.
## For example, if you only wish to expose only certain topics, use either:
## topicWhitelist:
## - ^topic.+
##
## Or
##
## topicWhitelist:
## - topic1
## - topic2
topicWhitelist:
- .*

## The log level of the ROOT logger
rootLogLevel: INFO
## The log level of Kafka Lag Exporter
Expand Down
6 changes: 3 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ object Dependencies {
private val slf4jExclusionRule = ExclusionRule("org.slf4j")

val LightbendConfig = "com.typesafe" % "config" % "1.3.2"
val Kafka = "org.apache.kafka" %% "kafka" % "2.2.1" excludeAll (jacksonExclusionRule, log4jExclusionRule, slf4jExclusionRule)
val Kafka = "org.apache.kafka" %% "kafka" % "2.3.0" excludeAll (jacksonExclusionRule, log4jExclusionRule, slf4jExclusionRule)
val AkkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % Version.Akka
val AkkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % Version.Akka
val Logback = "ch.qos.logback" % "logback-classic" % "1.2.3"
Expand All @@ -31,10 +31,10 @@ object Dependencies {
val Fabric8Model = "io.fabric8" % "kubernetes-model" % Version.Fabric8
val Fabric8Client = "io.fabric8" % "kubernetes-client" % Version.Fabric8
val ScalaJava8Compat = "org.scala-lang.modules" %% "scala-java8-compat" % "0.9.0"

val ScalaTest = "org.scalatest" %% "scalatest" % "3.0.5" % Test
val AkkaTypedTestKit = "com.typesafe.akka" %% "akka-actor-testkit-typed" % Version.Akka % Test
val MockitoScala = "org.mockito" %% "mockito-scala" % "1.0.8" % Test
val AlpakkaKafkaTestKit = "com.typesafe.akka" %% "akka-stream-kafka-testkit" % "1.0.1" % Test excludeAll(jacksonExclusionRule, log4jExclusionRule, slf4jExclusionRule)
val AkkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.8" % Test
}
}
1 change: 1 addition & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ kafka-lag-exporter {
strimzi = ${?KAFKA_LAG_EXPORTER_STRIMZI}
}
metric-whitelist = [".*"]
topic-whitelist = [".*"]
}

akka {
Expand Down
6 changes: 4 additions & 2 deletions src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ object AppConfig {
).toMap
}.getOrElse(Map.empty[String, String])

val topicWhitelist = c.getStringList("topic-whitelist").asScala.toList

KafkaCluster(
clusterConfig.getString("name"),
clusterConfig.getString("bootstrap-brokers"),
topicWhitelist,
consumerProperties,
adminClientProperties,
labels
Expand Down Expand Up @@ -83,14 +85,15 @@ object AppConfig {
}
}

final case class KafkaCluster(name: String, bootstrapBrokers: String,
final case class KafkaCluster(name: String, bootstrapBrokers: String, topicWhitelist: List[String] = List(".*"),
consumerProperties: Map[String, String] = Map.empty,
adminClientProperties: Map[String, String] = Map.empty,
labels: Map[String, String] = Map.empty) {
override def toString(): String = {
s"""
| Cluster name: $name
| Cluster Kafka bootstrap brokers: $bootstrapBrokers
| Topic whitelist: [${topicWhitelist.mkString(", ")}]
""".stripMargin
}
}
Expand Down Expand Up @@ -121,4 +124,3 @@ final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, p
}.toMap
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster,
private[kafkalagexporter] def groupTopicPartitions(groupId: String, desc: ConsumerGroupDescription): List[Domain.GroupTopicPartition] = {
val groupTopicPartitions = for {
member <- desc.members().asScala
ktp <- member.assignment().topicPartitions().asScala
ktp <- member.assignment().topicPartitions().asScala if cluster.topicWhitelist.exists(r => ktp.topic().matches(r))
} yield Domain.GroupTopicPartition(
groupId,
member.clientId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@ import java.util.Optional

import com.lightbend.kafkalagexporter.Domain.GroupOffsets
import com.lightbend.kafkalagexporter.KafkaClient.KafkaTopicPartitionOps

import org.apache.kafka.clients.admin.{ ConsumerGroupDescription, MemberAssignment, MemberDescription }
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.{ ConsumerGroupState, TopicPartition => TP }

import org.mockito.MockitoSugar

import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{FreeSpec, Matchers}

Expand Down Expand Up @@ -104,5 +109,52 @@ class KafkaClientSpec extends FreeSpec with Matchers with TestData with MockitoS
groupOffsets.size shouldEqual 3
groupOffsets(gtp2) shouldEqual None
}

"groupTopicPartition will default to fetching all topics" in {
val tmpHost = "brokers"
val client = new KafkaClient(cluster, groupId, FiniteDuration(0, "ms"))
val tps = Set(topicPartition0, topicPartition1, topicPartition2, topic2Partition0).map(tp => new TP(tp.topic, tp.partition)).asJava
val members = List(new MemberDescription(consumerId, clientId, tmpHost, new MemberAssignment(tps))).asJava

val description = new ConsumerGroupDescription(groupId, true, members, "", ConsumerGroupState.STABLE, node)

client.groupTopicPartitions(groupId, description) should contain theSameElementsAs List(gtp0, gtp1, gtp2, gt2p0).map(gtp => gtp.copy(host = tmpHost))
}

"groupTopicPartition will only fetch whitelisted topic when whitelist contains a single topic" in {
val tmpHost = "brokers"
val tmpCluster = cluster.copy(topicWhitelist = List(topic2))
val client = new KafkaClient(tmpCluster, groupId, FiniteDuration(0, "ms"))
val tps = Set(topicPartition0, topicPartition1, topicPartition2, topic2Partition0).map(tp => new TP(tp.topic, tp.partition)).asJava
val members = List(new MemberDescription(consumerId, clientId, tmpHost, new MemberAssignment(tps))).asJava

val description = new ConsumerGroupDescription(groupId, true, members, "", ConsumerGroupState.STABLE, node)

client.groupTopicPartitions(groupId, description) should contain theSameElementsAs List(gt2p0).map(gtp => gtp.copy(host = tmpHost))
}

"groupTopicPartition will only fetch whitelisted topics when whitelist is a regex against multiple topics" in {
val tmpHost = "brokers"
val tmpCluster = cluster.copy(topicWhitelist = List("test.+"))
val client = new KafkaClient(tmpCluster, groupId, FiniteDuration(0, "ms"))
val tps = Set(topicPartition0, topicPartition1, topicPartition2, topic2Partition0).map(tp => new TP(tp.topic, tp.partition)).asJava
val members = List(new MemberDescription(consumerId, clientId, tmpHost, new MemberAssignment(tps))).asJava

val description = new ConsumerGroupDescription(groupId, true, members, "", ConsumerGroupState.STABLE, node)

client.groupTopicPartitions(groupId, description) should contain theSameElementsAs List(gtp0, gtp1, gtp2, gt2p0).map(gtp => gtp.copy(host = tmpHost))
}

"groupTopicPartition will not return any topics if the whitelist is empty" in {
val tmpHost = "brokers"
val tmpCluster = cluster.copy(topicWhitelist = List.empty)
val client = new KafkaClient(tmpCluster, groupId, FiniteDuration(0, "ms"))
val tps = Set(topicPartition0, topicPartition1, topicPartition2, topic2Partition0).map(tp => new TP(tp.topic, tp.partition)).asJava
val members = List(new MemberDescription(consumerId, clientId, tmpHost, new MemberAssignment(tps))).asJava

val description = new ConsumerGroupDescription(groupId, true, members, "", ConsumerGroupState.STABLE, node)

client.groupTopicPartitions(groupId, description) shouldBe empty
}
}
}
6 changes: 6 additions & 0 deletions src/test/scala/com/lightbend/kafkalagexporter/TestData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,25 @@ package com.lightbend.kafkalagexporter

import com.lightbend.kafkalagexporter.Domain._

import org.apache.kafka.common.Node

trait TestData {
val cluster = KafkaCluster("default", "brokers:9092")
val node = new Node(1001, "brokers", 9092)
val groupId = "testGroupId"
val clientId = "testClientId"
val consumerId = "testConsumerId"
val topic = "test-topic"
val topic2 = "test-topic-2"
val topicPartition0 = TopicPartition(topic, 0)
val topicPartition1 = TopicPartition(topic, 1)
val topicPartition2 = TopicPartition(topic, 2)
val topic2Partition0 = TopicPartition(topic2, 0)
val gtpSingleMember = GroupTopicPartition(groupId, clientId, consumerId, "/127.0.0.1", topicPartition0.topic, topicPartition0.partition)
val gtp0 = GroupTopicPartition(groupId, clientId, consumerId, "/127.0.0.1", topicPartition0.topic, topicPartition0.partition)
val gtp1 = GroupTopicPartition(groupId, clientId, consumerId, "/127.0.0.2", topicPartition1.topic, topicPartition1.partition)
val gtp2 = GroupTopicPartition(groupId, clientId, consumerId, "/127.0.0.3", topicPartition2.topic, topicPartition2.partition)
val gt2p0 = GroupTopicPartition(groupId, clientId, consumerId, "/127.0.0.4", topic2Partition0.topic, topic2Partition0.partition)
val lookupTableOnePoint = LookupTable.Table(20)
lookupTableOnePoint.addPoint(LookupTable.Point(100, 100))
}