Skip to content

Commit

Permalink
Testing if Kafka Metrics are indirectly updated not every time when a…
Browse files Browse the repository at this point in the history
… meter is polled.

related: gh-2801
  • Loading branch information
jonatan-ivanov committed Oct 14, 2021
1 parent 63e489c commit 0caaccb
Showing 1 changed file with 58 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ void shouldRemoveMeterWithLessTagsWithMultipleClients() {

@Issue("#2726")
@Test
void shouldAlwaysUseMetricFromSupplierIfInstanceChanges() {
void shouldUseMetricFromSupplierIfInstanceChanges() {
MetricName metricName = new MetricName("a0", "b0", "c0", new LinkedHashMap<>());
Value oldValue = new Value();
oldValue.record(new MetricConfig(), 1.0, System.currentTimeMillis());
Expand Down Expand Up @@ -345,4 +345,61 @@ void shouldAlwaysUseMetricFromSupplierIfInstanceChanges() {
.isEqualTo(2.0)
);
}

@Issue("#2801")
@Test
void shouldUseMetricFromSupplierIndirectly() {
AtomicReference<Map<MetricName, KafkaMetric>> metricsReference = new AtomicReference<>(new HashMap<>());

MetricName oldMetricName = new MetricName("a0", "b0", "c0", new LinkedHashMap<>());
Value oldValue = new Value();
oldValue.record(new MetricConfig(), 1.0, System.currentTimeMillis());
KafkaMetric oldMetricInstance = new KafkaMetric(this, oldMetricName, oldValue, new MetricConfig(), Time.SYSTEM);

metricsReference.get().put(oldMetricName, oldMetricInstance);

kafkaMetrics = new KafkaMetrics(metricsReference::get);
MeterRegistry registry = new SimpleMeterRegistry();

kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(1);

assertThat(registry.getMeters())
.singleElement()
.extracting(Meter::measure)
.satisfies(measurements ->
assertThat(measurements)
.singleElement()
.extracting(Measurement::getValue)
.isEqualTo(1.0)
);

metricsReference.set(new HashMap<>());
MetricName newMetricName = new MetricName("a0", "b0", "c0", new LinkedHashMap<>());
Value newValue = new Value();
newValue.record(new MetricConfig(), 2.0, System.currentTimeMillis());
KafkaMetric newMetricInstance = new KafkaMetric(this, newMetricName, newValue, new MetricConfig(), Time.SYSTEM);
metricsReference.get().put(newMetricName, newMetricInstance);

assertThat(registry.getMeters())
.singleElement()
.extracting(Meter::measure)
.satisfies(measurements ->
assertThat(measurements)
.singleElement()
.extracting(Measurement::getValue)
.isEqualTo(1.0)
); // still referencing the old value since the map is only updated in checkAndBindMetrics

kafkaMetrics.checkAndBindMetrics(registry);
assertThat(registry.getMeters())
.singleElement()
.extracting(Meter::measure)
.satisfies(measurements ->
assertThat(measurements)
.singleElement()
.extracting(Measurement::getValue)
.isEqualTo(2.0)
); // referencing the new value since the map was updated in checkAndBindMetrics
}
}

0 comments on commit 0caaccb

Please sign in to comment.