Skip to content

Commit

Permalink
Use our canonical naming convention for KafkaMetrics tags
Browse files Browse the repository at this point in the history
This also makes the `client.id` tag the same as it was in `KafkaConsumerMetrics`. Notably, `kafka-version` tag has also been changed to `kafka.version`. With some meter registries, this will effectively be no change (e.g. Prometheus), but for others it may be a breaking change to the tag name, but the goal is to correct the oversight of not aligning KafkaMetrics with KafkaConsumerMetrics initially. Behavior can be restored with a MeterFilter.

Resolves micrometer-metrics#2256
  • Loading branch information
shakuzen committed Oct 29, 2020
1 parent b444dd1 commit 4576774
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class KafkaMetrics implements MeterBinder, AutoCloseable {
static final String VERSION_METRIC_NAME = "version";
static final String START_TIME_METRIC_NAME = "start-time-ms";
static final Duration DEFAULT_REFRESH_INTERVAL = Duration.ofSeconds(60);
static final String KAFKA_VERSION_TAG_NAME = "kafka-version";
static final String KAFKA_VERSION_TAG_NAME = "kafka.version";
static final String DEFAULT_VALUE = "unknown";

private final Supplier<Map<MetricName, ? extends Metric>> metricsSupplier;
Expand Down Expand Up @@ -221,7 +221,7 @@ private static ToDoubleFunction<Metric> toMetricValue() {

private List<Tag> meterTags(Metric metric, boolean includeCommonTags) {
List<Tag> tags = new ArrayList<>();
metric.metricName().tags().forEach((key, value) -> tags.add(Tag.of(key, value)));
metric.metricName().tags().forEach((key, value) -> tags.add(Tag.of(key.replaceAll("-", "."), value)));
tags.add(Tag.of(KAFKA_VERSION_TAG_NAME, kafkaVersion));
extraTags.forEach(tags::add);
if (includeCommonTags) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void shouldManageProducerAndConsumerMetrics() {
int producerMetrics = registry.getMeters().size();
assertThat(registry.getMeters()).hasSizeGreaterThan(0);
assertThat(registry.getMeters())
.extracting(m -> m.getId().getTag("kafka-version"))
.extracting(m -> m.getId().getTag("kafka.version"))
.allMatch(v -> !v.isEmpty());

Properties consumerConfigs = new Properties();
Expand All @@ -83,7 +83,7 @@ void shouldManageProducerAndConsumerMetrics() {
int producerAndConsumerMetrics = registry.getMeters().size();
assertThat(registry.getMeters()).hasSizeGreaterThan(producerMetrics);
assertThat(registry.getMeters())
.extracting(m -> m.getId().getTag("kafka-version"))
.extracting(m -> m.getId().getTag("kafka.version"))
.allMatch(v -> !v.isEmpty());

String topic = "test";
Expand All @@ -99,7 +99,7 @@ void shouldManageProducerAndConsumerMetrics() {
int producerAndConsumerMetricsAfterSend = registry.getMeters().size();
assertThat(registry.getMeters()).hasSizeGreaterThan(producerAndConsumerMetrics);
assertThat(registry.getMeters())
.extracting(m -> m.getId().getTag("kafka-version"))
.extracting(m -> m.getId().getTag("kafka.version"))
.allMatch(v -> !v.isEmpty());

consumer.subscribe(Collections.singletonList(topic));
Expand All @@ -114,7 +114,7 @@ void shouldManageProducerAndConsumerMetrics() {

assertThat(registry.getMeters()).hasSizeGreaterThan(producerAndConsumerMetricsAfterSend);
assertThat(registry.getMeters())
.extracting(m -> m.getId().getTag("kafka-version"))
.extracting(m -> m.getId().getTag("kafka.version"))
.allMatch(v -> !v.isEmpty());

//Printing out for discovery purposes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,12 @@ void shouldRemoveOlderMeterWithLessTagsWhenCommonTagsConfigured() {

kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(1);
assertThat(registry.getMeters().get(0).getId().getTags()).containsExactlyInAnyOrder(Tag.of("kafka-version", "unknown"), Tag.of("common", "value")); // only version
assertThat(registry.getMeters().get(0).getId().getTags()).containsExactlyInAnyOrder(Tag.of("kafka.version", "unknown"), Tag.of("common", "value")); // only version

tags.put("key0", "value0");
kafkaMetrics.checkAndBindMetrics(registry);
assertThat(registry.getMeters()).hasSize(1);
assertThat(registry.getMeters().get(0).getId().getTags()).containsExactlyInAnyOrder(Tag.of("kafka-version", "unknown"), Tag.of("key0", "value0"), Tag.of("common", "value"));
assertThat(registry.getMeters().get(0).getId().getTags()).containsExactlyInAnyOrder(Tag.of("kafka.version", "unknown"), Tag.of("key0", "value0"), Tag.of("common", "value"));
}

@Issue("#2212")
Expand Down Expand Up @@ -288,6 +288,6 @@ void shouldRemoveMeterWithLessTagsWithMultipleClients() {
// Then
assertThat(registry.getMeters()).hasSize(2);
registry.getMeters().forEach(meter -> assertThat(meter.getId().getTags())
.extracting(Tag::getKey).containsOnly("key0", "key1", "client-id", "kafka-version"));
.extracting(Tag::getKey).containsOnly("key0", "key1", "client.id", "kafka.version"));
}
}

0 comments on commit 4576774

Please sign in to comment.