Skip to content

Commit

Permalink
Add more test cases for KsqlEngineMetrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Apurva Mehta committed Dec 15, 2017
1 parent 4dd3a42 commit f23fa85
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public class MetricCollectors {

// visible for testing.
// We need to call this from the MetricCollectorsTest because otherwise tests clobber each
// others metric data
static void initialize() {
// others metric data. We also need it from the KsqlEngineMetricsTest
public static void initialize() {
MetricConfig metricConfig = new MetricConfig().samples(100).timeWindow(1000, TimeUnit.MILLISECONDS);
List<MetricsReporter> reporters = new ArrayList<>();
reporters.add(new JmxReporter("io.confluent.ksql.metrics"));
Expand All @@ -59,8 +59,9 @@ static void initialize() {

// visible for testing.
// needs to be called from the tear down method of MetricCollectorsTest so that the tests don't
// clobber each other.
static void cleanUp() {
// clobber each other. We also need to call it from the KsqlEngineMetrics test for the same
// reason.
public static void cleanUp() {
if (metrics != null) {
metrics.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,18 @@ public class KsqlEngineMetrics implements Closeable {
private final KsqlEngine ksqlEngine;

public KsqlEngineMetrics(String metricGroupPrefix, KsqlEngine ksqlEngine) {
Metrics metrics = MetricCollectors.getMetrics();

this.ksqlEngine = ksqlEngine;

this.sensors = new ArrayList<>();
this.metricGroupName = metricGroupPrefix + "-query-stats";

Metrics metrics = MetricCollectors.getMetrics();

this.numActiveQueries = configureNumActiveQueries(metrics);
this.messagesIn = configureMessagesIn(metrics);
this.messagesOut = configureMessagesOut(metrics);
this.numIdleQueries = configureIdleQueriesSensor(metrics);
this.messageConsumptionByQuery = configureMessageConsumptionByQuerySensor(metrics);
this.errorRate = configureErrorRate(metrics);

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,54 @@
package io.confluent.ksql.internal;


import com.google.common.collect.ImmutableMap;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.confluent.ksql.KsqlEngine;
import io.confluent.ksql.metrics.ConsumerCollector;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.metrics.ProducerCollector;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class KsqlEngineMetricsTest {
private static final String METRIC_GROUP = "testGroup";
private KsqlEngine ksqlEngine;
private KsqlEngineMetrics engineMetrics;

@Before
public void setUp() {
MetricCollectors.initialize();
ksqlEngine = EasyMock.niceMock(KsqlEngine.class);
engineMetrics = new KsqlEngineMetrics(METRIC_GROUP, ksqlEngine);
}

@After
public void tearDown() {
engineMetrics.close();
MetricCollectors.cleanUp();
}

@Test
public void shouldRemoveAllSensorsOnClose() {
KsqlEngine ksqlEngine = EasyMock.niceMock(KsqlEngine.class);
KsqlEngineMetrics engineMetrics = new KsqlEngineMetrics("testGroup", ksqlEngine);
assertTrue(engineMetrics.registeredSensors().size() > 0);

engineMetrics.close();
Expand All @@ -39,6 +72,88 @@ public void shouldRemoveAllSensorsOnClose() {
engineMetrics.registeredSensors().forEach(sensor -> {
assertTrue(metrics.getSensor(sensor.name()) == null);
});
}

@Test
public void shouldRecordNumberOfActiveQueries() {
EasyMock.expect(ksqlEngine.numberOfLiveQueries()).andReturn(3L);
EasyMock.replay(ksqlEngine);
Metrics metrics = MetricCollectors.getMetrics();
double value = getMetricValue(metrics, "num-active-queries");
assertEquals(3.0, value, 0.0);
}


@Test
public void shouldRecordNumberOfPersistentQueries() {
EasyMock.expect(ksqlEngine.numberOfPersistentQueries()).andReturn(3L);
EasyMock.replay(ksqlEngine);
Metrics metrics = MetricCollectors.getMetrics();
double value = getMetricValue(metrics, "num-persistent-queries");
assertEquals(3.0, value, 0.0);
}


@Test
public void shouldRecordMessagesConsumed() {
int numMessagesConsumed = 500;
consumeMessages(numMessagesConsumed, "group1");
Metrics metrics = MetricCollectors.getMetrics();
engineMetrics.updateMetrics();
double value = getMetricValue(metrics, "messages-consumed-per-sec");
assertEquals(numMessagesConsumed / 100, Math.floor(value), 0.01);
}


@Test
public void shouldRecordMessagesProduced() {
int numMessagesProduced = 500;
produceMessages(numMessagesProduced);
Metrics metrics = MetricCollectors.getMetrics();
engineMetrics.updateMetrics();
double value = getMetricValue(metrics, "messages-produced-per-sec");
assertEquals(numMessagesProduced / 100, Math.floor(value), 0.01);
}


@Test
public void shouldRecordMessagesConsumedByQuery() {
int numMessagesConsumed = 500;
consumeMessages(numMessagesConsumed, "group1");
consumeMessages(numMessagesConsumed * 100, "group2");
Metrics metrics = MetricCollectors.getMetrics();
engineMetrics.updateMetrics();
double maxValue = getMetricValue(metrics, "messages-consumed-max");
assertEquals(numMessagesConsumed, Math.floor(maxValue), 5.0);
double minValue = getMetricValue(metrics, "messages-consumed-min");
assertEquals(numMessagesConsumed / 100, Math.floor(minValue), 0.01);
}

private double getMetricValue(Metrics metrics, String metricName) {
return Double.valueOf(
metrics.metric(metrics.metricName(metricName, METRIC_GROUP + "-query-stats"))
.metricValue().toString());
}

private void consumeMessages(int numMessages, String groupId) {
ConsumerCollector collector1 = new ConsumerCollector();
collector1.configure(ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, groupId));
Map<TopicPartition, List<ConsumerRecord<Object, Object>>> records = new HashMap<>();
List<ConsumerRecord<Object, Object>> recordList = new ArrayList<>();
for (int i = 0; i < numMessages; i++) {
recordList.add(new ConsumerRecord<>("foo", 1, 1, 1l, TimestampType
.CREATE_TIME, 1l, 10, 10, "key", "1234567890"));
}
records.put(new TopicPartition("foo", 1), recordList);
ConsumerRecords<Object, Object> consumerRecords = new ConsumerRecords<>(records);
collector1.onConsume(consumerRecords);
}

private void produceMessages(int numMessages) {
ProducerCollector collector1 = new ProducerCollector();
collector1.configure(ImmutableMap.of(ProducerConfig.CLIENT_ID_CONFIG, "client1"));
for (int i = 0; i < numMessages; i++) {
collector1.onSend(new ProducerRecord<>("foo", "key", Integer.toString(i)));
}
}
}

0 comments on commit f23fa85

Please sign in to comment.