From f23fa858433c30016f85fe636b5c279177a100ec Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Fri, 15 Dec 2017 13:50:36 -0800 Subject: [PATCH] Add more test cases for KsqlEngineMetrics --- .../ksql/metrics/MetricCollectors.java | 9 +- .../ksql/internal/KsqlEngineMetrics.java | 7 +- .../ksql/internal/KsqlEngineMetricsTest.java | 119 +++++++++++++++++- 3 files changed, 125 insertions(+), 10 deletions(-) diff --git a/ksql-common/src/main/java/io/confluent/ksql/metrics/MetricCollectors.java b/ksql-common/src/main/java/io/confluent/ksql/metrics/MetricCollectors.java index 7a773425723c..cad0615c2f1c 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/metrics/MetricCollectors.java +++ b/ksql-common/src/main/java/io/confluent/ksql/metrics/MetricCollectors.java @@ -49,8 +49,8 @@ public class MetricCollectors { // visible for testing. // We need to call this from the MetricCollectorsTest because otherwise tests clobber each - // others metric data - static void initialize() { + // others metric data. We also need it from the KsqlEngineMetricsTest + public static void initialize() { MetricConfig metricConfig = new MetricConfig().samples(100).timeWindow(1000, TimeUnit.MILLISECONDS); List reporters = new ArrayList<>(); reporters.add(new JmxReporter("io.confluent.ksql.metrics")); @@ -59,8 +59,9 @@ static void initialize() { // visible for testing. // needs to be called from the tear down method of MetricCollectorsTest so that the tests don't - // clobber each other. - static void cleanUp() { + // clobber each other. We also need to call it from the KsqlEngineMetrics test for the same + // reason. + public static void cleanUp() { if (metrics != null) { metrics.close(); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java b/ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java index 2fd4331e7167..1eefacf29796 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java @@ -47,19 +47,18 @@ public class KsqlEngineMetrics implements Closeable { private final KsqlEngine ksqlEngine; public KsqlEngineMetrics(String metricGroupPrefix, KsqlEngine ksqlEngine) { - Metrics metrics = MetricCollectors.getMetrics(); - this.ksqlEngine = ksqlEngine; - this.sensors = new ArrayList<>(); this.metricGroupName = metricGroupPrefix + "-query-stats"; + + Metrics metrics = MetricCollectors.getMetrics(); + this.numActiveQueries = configureNumActiveQueries(metrics); this.messagesIn = configureMessagesIn(metrics); this.messagesOut = configureMessagesOut(metrics); this.numIdleQueries = configureIdleQueriesSensor(metrics); this.messageConsumptionByQuery = configureMessageConsumptionByQuerySensor(metrics); this.errorRate = configureErrorRate(metrics); - } @Override diff --git a/ksql-engine/src/test/java/io/confluent/ksql/internal/KsqlEngineMetricsTest.java b/ksql-engine/src/test/java/io/confluent/ksql/internal/KsqlEngineMetricsTest.java index b0108037c569..9bec8cd8cfc3 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/internal/KsqlEngineMetricsTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/internal/KsqlEngineMetricsTest.java @@ -16,21 +16,54 @@ package io.confluent.ksql.internal; +import com.google.common.collect.ImmutableMap; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.record.TimestampType; import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import io.confluent.ksql.KsqlEngine; +import io.confluent.ksql.metrics.ConsumerCollector; import io.confluent.ksql.metrics.MetricCollectors; +import io.confluent.ksql.metrics.ProducerCollector; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class KsqlEngineMetricsTest { + private static final String METRIC_GROUP = "testGroup"; + private KsqlEngine ksqlEngine; + private KsqlEngineMetrics engineMetrics; + + @Before + public void setUp() { + MetricCollectors.initialize(); + ksqlEngine = EasyMock.niceMock(KsqlEngine.class); + engineMetrics = new KsqlEngineMetrics(METRIC_GROUP, ksqlEngine); + } + + @After + public void tearDown() { + engineMetrics.close(); + MetricCollectors.cleanUp(); + } @Test public void shouldRemoveAllSensorsOnClose() { - KsqlEngine ksqlEngine = EasyMock.niceMock(KsqlEngine.class); - KsqlEngineMetrics engineMetrics = new KsqlEngineMetrics("testGroup", ksqlEngine); assertTrue(engineMetrics.registeredSensors().size() > 0); engineMetrics.close(); @@ -39,6 +72,88 @@ public void shouldRemoveAllSensorsOnClose() { engineMetrics.registeredSensors().forEach(sensor -> { assertTrue(metrics.getSensor(sensor.name()) == null); }); + } + + @Test + public void shouldRecordNumberOfActiveQueries() { + EasyMock.expect(ksqlEngine.numberOfLiveQueries()).andReturn(3L); + EasyMock.replay(ksqlEngine); + Metrics metrics = MetricCollectors.getMetrics(); + double value = getMetricValue(metrics, "num-active-queries"); + assertEquals(3.0, value, 0.0); + } + + + @Test + public void shouldRecordNumberOfPersistentQueries() { + EasyMock.expect(ksqlEngine.numberOfPersistentQueries()).andReturn(3L); + EasyMock.replay(ksqlEngine); + Metrics metrics = MetricCollectors.getMetrics(); + double value = getMetricValue(metrics, "num-persistent-queries"); + assertEquals(3.0, value, 0.0); + } + + + @Test + public void shouldRecordMessagesConsumed() { + int numMessagesConsumed = 500; + consumeMessages(numMessagesConsumed, "group1"); + Metrics metrics = MetricCollectors.getMetrics(); + engineMetrics.updateMetrics(); + double value = getMetricValue(metrics, "messages-consumed-per-sec"); + assertEquals(numMessagesConsumed / 100, Math.floor(value), 0.01); + } + + + @Test + public void shouldRecordMessagesProduced() { + int numMessagesProduced = 500; + produceMessages(numMessagesProduced); + Metrics metrics = MetricCollectors.getMetrics(); + engineMetrics.updateMetrics(); + double value = getMetricValue(metrics, "messages-produced-per-sec"); + assertEquals(numMessagesProduced / 100, Math.floor(value), 0.01); + } + + + @Test + public void shouldRecordMessagesConsumedByQuery() { + int numMessagesConsumed = 500; + consumeMessages(numMessagesConsumed, "group1"); + consumeMessages(numMessagesConsumed * 100, "group2"); + Metrics metrics = MetricCollectors.getMetrics(); + engineMetrics.updateMetrics(); + double maxValue = getMetricValue(metrics, "messages-consumed-max"); + assertEquals(numMessagesConsumed, Math.floor(maxValue), 5.0); + double minValue = getMetricValue(metrics, "messages-consumed-min"); + assertEquals(numMessagesConsumed / 100, Math.floor(minValue), 0.01); + } + + private double getMetricValue(Metrics metrics, String metricName) { + return Double.valueOf( + metrics.metric(metrics.metricName(metricName, METRIC_GROUP + "-query-stats")) + .metricValue().toString()); + } + + private void consumeMessages(int numMessages, String groupId) { + ConsumerCollector collector1 = new ConsumerCollector(); + collector1.configure(ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, groupId)); + Map>> records = new HashMap<>(); + List> recordList = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + recordList.add(new ConsumerRecord<>("foo", 1, 1, 1l, TimestampType + .CREATE_TIME, 1l, 10, 10, "key", "1234567890")); + } + records.put(new TopicPartition("foo", 1), recordList); + ConsumerRecords consumerRecords = new ConsumerRecords<>(records); + collector1.onConsume(consumerRecords); + } + private void produceMessages(int numMessages) { + ProducerCollector collector1 = new ProducerCollector(); + collector1.configure(ImmutableMap.of(ProducerConfig.CLIENT_ID_CONFIG, "client1")); + for (int i = 0; i < numMessages; i++) { + collector1.onSend(new ProducerRecord<>("foo", "key", Integer.toString(i))); + } } }