From 266d7407d35fc4e556ce612f94cc4b4027de3806 Mon Sep 17 00:00:00 2001 From: Ronak Date: Tue, 19 Oct 2021 16:10:04 +0530 Subject: [PATCH 1/8] feat: adds pull based metrics exporter --- hypertrace-metrics-exporter/build.gradle.kts | 3 + .../build.gradle.kts | 62 +++++++++ .../exporter/MetricsExporterService.java | 56 ++++++++ .../consumer/MetricsKafkaConsumer.java | 124 +++++++++++++++++ .../producer/InMemoryMetricsProducer.java | 59 ++++++++ .../exporter/utils/OtlpToObjectConverter.java | 127 ++++++++++++++++++ .../resources/configs/common/application.conf | 23 ++++ .../src/main/resources/log4j2.properties | 23 ++++ settings.gradle.kts | 1 + 9 files changed, 478 insertions(+) create mode 100644 hypertrace-metrics-exporter/build.gradle.kts create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterService.java create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/consumer/MetricsKafkaConsumer.java create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/producer/InMemoryMetricsProducer.java create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/utils/OtlpToObjectConverter.java create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/log4j2.properties diff --git a/hypertrace-metrics-exporter/build.gradle.kts b/hypertrace-metrics-exporter/build.gradle.kts new file mode 100644 index 000000000..612342977 --- /dev/null +++ b/hypertrace-metrics-exporter/build.gradle.kts @@ -0,0 +1,3 @@ +subprojects { + group = "org.hypertrace.metrics.exporter" +} \ No newline at end of file diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts new file mode 100644 index 000000000..5177800dc --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts @@ -0,0 +1,62 @@ +plugins { + java + application + jacoco + id("org.hypertrace.docker-java-application-plugin") + id("org.hypertrace.docker-publish-plugin") + id("org.hypertrace.jacoco-report-plugin") +} + +application { + mainClass.set("org.hypertrace.core.serviceframework.PlatformServiceLauncher") +} + +hypertraceDocker { + defaultImage { + javaApplication { + serviceName.set("${project.name}") + adminPort.set(8099) + } + } +} + +tasks.test { + useJUnitPlatform() +} + +dependencies { + // internal projects deps + implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api")) + + // common and framework + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") + + // open telemetry + implementation("io.opentelemetry:opentelemetry-api:1.8.0-SNAPSHOT") + implementation("io.opentelemetry:opentelemetry-api-metrics:1.8.0-alpha-SNAPSHOT") + implementation("io.opentelemetry:opentelemetry-sdk:1.8.0-SNAPSHOT") + implementation("io.opentelemetry:opentelemetry-exporter-otlp-common:1.8.0-SNAPSHOT") + implementation("io.opentelemetry:opentelemetry-sdk-metrics:1.8.0-alpah-SNAPSHOT") + implementation("io.opentelemetry:opentelemetry-exporter-otlp-metrics:1.8.0-alpha-SNAPSHOT") + implementation("io.opentelemetry:opentelemetry-exporter-prometheus:1.8.0-alpha-SNAPSHOT") + + // open telemetry proto + implementation("io.opentelemetry:opentelemetry-proto:1.6.0-alpha") + + // jetty server + implementation("org.eclipse.jetty:jetty-server:9.4.42.v20210604") + implementation("org.eclipse.jetty:jetty-servlet:9.4.42.v20210604") + + // prometheus metrics servelet + implementation("io.prometheus:simpleclient_servlet:0.12.0") + + // kafka + implementation("org.apache.kafka:kafka-clients:2.6.0") + + // test + testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") + testImplementation("org.mockito:mockito-core:3.8.0") + testImplementation("com.google.code.gson:gson:2.8.7") +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterService.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterService.java new file mode 100644 index 000000000..18f198cee --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterService.java @@ -0,0 +1,56 @@ +package org.hypertrace.metrics.exporter; + +import com.typesafe.config.Config; +import io.opentelemetry.exporter.prometheus.PrometheusCollector; +import org.hypertrace.core.serviceframework.PlatformService; +import org.hypertrace.core.serviceframework.config.ConfigClient; +import org.hypertrace.metrics.exporter.consumer.MetricsKafkaConsumer; +import org.hypertrace.metrics.exporter.producer.InMemoryMetricsProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsExporterService extends PlatformService { + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsExporterService.class); + + private static final String BUFFER_CONFIG_KEY = "buffer.config"; + private static final String MAX_QUEUE_SIZE = "max.queue.size"; + + private MetricsKafkaConsumer metricsKafkaConsumer; + private Config config; + private InMemoryMetricsProducer inMemoryMetricsProducer; + + public MetricsExporterService(ConfigClient configClient, Config config) { + super(configClient); + this.config = config; + } + + @Override + public void doInit() { + config = (config != null) ? config : getAppConfig(); + int maxQueueSize = config.getConfig(BUFFER_CONFIG_KEY).getInt(MAX_QUEUE_SIZE); + inMemoryMetricsProducer = new InMemoryMetricsProducer(maxQueueSize); + metricsKafkaConsumer = new MetricsKafkaConsumer(config, inMemoryMetricsProducer); + PrometheusCollector.create().apply(inMemoryMetricsProducer); + } + + @Override + public void doStart() { + Thread metricsConsumerThread = new Thread(metricsKafkaConsumer); + metricsConsumerThread.start(); + try { + metricsConsumerThread.join(); + } catch (InterruptedException e) { + LOGGER.error("Exception in starting the thread:", e); + } + } + + @Override + public void doStop() { + metricsKafkaConsumer.close(); + } + + @Override + public boolean healthCheck() { + return true; + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/consumer/MetricsKafkaConsumer.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/consumer/MetricsKafkaConsumer.java new file mode 100644 index 000000000..0e215593d --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/consumer/MetricsKafkaConsumer.java @@ -0,0 +1,124 @@ +package org.hypertrace.metrics.exporter.consumer; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.typesafe.config.Config; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.hypertrace.metrics.exporter.producer.InMemoryMetricsProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsKafkaConsumer implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsKafkaConsumer.class); + private static final int CONSUMER_POLL_TIMEOUT_MS = 100; + + private static final String KAFKA_CONFIG_KEY = "kafka.config"; + private static final String INPUT_TOPIC_KEY = "input.topic"; + + private final KafkaConsumer consumer; + private final InMemoryMetricsProducer inMemoryMetricsProducer; + private final AtomicBoolean running = new AtomicBoolean(false); + + public MetricsKafkaConsumer(Config config, InMemoryMetricsProducer inMemoryMetricsProducer) { + Properties props = new Properties(); + props.putAll( + mergeProperties(getBaseProperties(), getFlatMapConfig(config.getConfig(KAFKA_CONFIG_KEY)))); + consumer = new KafkaConsumer(props); + consumer.subscribe(Collections.singletonList(config.getString(INPUT_TOPIC_KEY))); + this.inMemoryMetricsProducer = inMemoryMetricsProducer; + } + + public void run() { + running.set(true); + while (running.get()) { + // check if any message to commit + if (inMemoryMetricsProducer.isCommitOffset()) { + // consumer.commitSync(); + inMemoryMetricsProducer.clearCommitOffset(); + } + + // read new data + List resourceMetrics = consume(); + if (!resourceMetrics.isEmpty()) { + inMemoryMetricsProducer.addMetricData(resourceMetrics); + } + waitForSec((long) (1000L * 0.1)); + } + } + + public void stop() { + running.set(false); + } + + public List consume() { + List resourceMetrics = new ArrayList<>(); + + ConsumerRecords records = + consumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT_MS)); + records.forEach( + record -> { + try { + resourceMetrics.add(ResourceMetrics.parseFrom(record.value())); + } catch (InvalidProtocolBufferException e) { + LOGGER.error("Invalid record with exception", e); + } + }); + + return resourceMetrics; + } + + public void close() { + consumer.close(); + } + + private Map getBaseProperties() { + Map baseProperties = new HashMap<>(); + baseProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "hypertrace-metrics-exporter"); + baseProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + baseProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); + baseProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); + baseProperties.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + baseProperties.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + return baseProperties; + } + + private Map getFlatMapConfig(Config config) { + Map propertiesMap = new HashMap(); + config.entrySet().stream() + .forEach( + (entry) -> { + propertiesMap.put((String) entry.getKey(), config.getString((String) entry.getKey())); + }); + return propertiesMap; + } + + private Map mergeProperties( + Map baseProps, Map props) { + Objects.requireNonNull(baseProps); + props.forEach(baseProps::put); + return baseProps; + } + + private void waitForSec(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + LOGGER.debug("waiting for pushing next records were intruppted"); + } + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/producer/InMemoryMetricsProducer.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/producer/InMemoryMetricsProducer.java new file mode 100644 index 000000000..d14072dc4 --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/producer/InMemoryMetricsProducer.java @@ -0,0 +1,59 @@ +package org.hypertrace.metrics.exporter.producer; + +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.export.MetricProducer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import org.hypertrace.metrics.exporter.utils.OtlpToObjectConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class InMemoryMetricsProducer implements MetricProducer { + private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryMetricsProducer.class); + private BlockingQueue metricDataQueue; + private final AtomicBoolean commitOffset = new AtomicBoolean(false); + + public InMemoryMetricsProducer(int maxQueueSize) { + this.metricDataQueue = new ArrayBlockingQueue(maxQueueSize); + } + + public void addMetricData(List resourceMetrics) { + try { + for (ResourceMetrics rm : resourceMetrics) { + List metricData = OtlpToObjectConverter.toMetricData(rm); + for (MetricData md : metricData) { + this.metricDataQueue.put(md); + } + } + } catch (InterruptedException exception) { + LOGGER.info("This thread was intruppted, so we might loose copying some metrics "); + } + } + + public Collection collectAllMetrics() { + List metricDataList = new ArrayList<>(); + int max = 0; + while (max < 100 && this.metricDataQueue.peek() != null) { + metricDataList.add(this.metricDataQueue.poll()); + max++; + } + return metricDataList; + } + + public void setCommitOffset() { + commitOffset.set(true); + } + + public void clearCommitOffset() { + commitOffset.set(false); + } + + public boolean isCommitOffset() { + return commitOffset.get(); + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/utils/OtlpToObjectConverter.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/utils/OtlpToObjectConverter.java new file mode 100644 index 000000000..2536b4307 --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/utils/OtlpToObjectConverter.java @@ -0,0 +1,127 @@ +package org.hypertrace.metrics.exporter.utils; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.proto.common.v1.InstrumentationLibrary; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.Gauge; +import io.opentelemetry.proto.metrics.v1.Metric.DataCase; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.Sum; +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.DoubleGaugeData; +import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.DoubleSumData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.resources.Resource; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +public class OtlpToObjectConverter { + + public static Resource toResource(io.opentelemetry.proto.resource.v1.Resource otlpResource) { + return Resource.create(toAttributes(otlpResource.getAttributesList())); + } + + public static InstrumentationLibraryInfo toInstrumentationLibraryInfo( + InstrumentationLibrary otlpInstrumentationLibraryInfo) { + return InstrumentationLibraryInfo.create( + otlpInstrumentationLibraryInfo.getName(), otlpInstrumentationLibraryInfo.getVersion()); + } + + public static Attributes toAttributes(List keyValues) { + AttributesBuilder attributesBuilder = Attributes.builder(); + keyValues.forEach( + keyValue -> { + attributesBuilder.put(keyValue.getKey(), keyValue.getValue().getStringValue()); + }); + return attributesBuilder.build(); + } + + public static List toDoublePointData(List numberDataPoints) { + return numberDataPoints.stream() + .map( + numberDataPoint -> + DoublePointData.create( + numberDataPoint.getStartTimeUnixNano(), + numberDataPoint.getTimeUnixNano(), + toAttributes(numberDataPoint.getAttributesList()), + numberDataPoint.getAsInt())) + .collect(Collectors.toList()); + } + + public static List toMetricData(ResourceMetrics resourceMetrics) { + List metricData = new ArrayList<>(); + Resource resource = toResource(resourceMetrics.getResource()); + resourceMetrics + .getInstrumentationLibraryMetricsList() + .forEach( + instrumentationLibraryMetrics -> { + InstrumentationLibraryInfo instrumentationLibraryInfo = + toInstrumentationLibraryInfo( + instrumentationLibraryMetrics.getInstrumentationLibrary()); + instrumentationLibraryMetrics + .getMetricsList() + .forEach( + metric -> { + // get type : for now only support gauge + if (metric.getDataCase().equals(DataCase.GAUGE)) { + Gauge gaugeMetric = metric.getGauge(); + String name = metric.getName(); + String description = metric.getDescription(); + String unit = metric.getUnit(); + DoubleGaugeData data = + DoubleGaugeData.create( + toDoublePointData(gaugeMetric.getDataPointsList())); + MetricData md = + MetricData.createDoubleGauge( + resource, + instrumentationLibraryInfo, + name, + description, + unit, + data); + metricData.add(md); + } else if (metric.getDataCase().equals(DataCase.SUM)) { + Sum sumMetric = metric.getSum(); + boolean isMonotonic = sumMetric.getIsMonotonic(); + AggregationTemporality temporality; + if (sumMetric + .getAggregationTemporality() + .equals( + io.opentelemetry.proto.metrics.v1.AggregationTemporality + .AGGREGATION_TEMPORALITY_CUMULATIVE)) { + temporality = AggregationTemporality.CUMULATIVE; + } else if (sumMetric + .getAggregationTemporality() + .equals( + io.opentelemetry.proto.metrics.v1.AggregationTemporality + .AGGREGATION_TEMPORALITY_DELTA)) { + temporality = AggregationTemporality.DELTA; + } else { + temporality = AggregationTemporality.CUMULATIVE; + } + + DoubleSumData doubleSumData = + DoubleSumData.create( + isMonotonic, + temporality, + toDoublePointData(sumMetric.getDataPointsList())); + MetricData md = + MetricData.createDoubleSum( + resource, + instrumentationLibraryInfo, + metric.getName(), + metric.getDescription(), + metric.getUnit(), + doubleSumData); + metricData.add(md); + } + }); + }); + return metricData; + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf new file mode 100644 index 000000000..1ab91e7e4 --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf @@ -0,0 +1,23 @@ +service.name = hypertrace-metrics-exporter +service.admin.port = 8099 + +main.class = org.hypertrace.metrics.exporter.MetricsExporterService + +input.topic = "enriched-otlp-metrics" + +buffer.config { + max.queue.size = 5000 +} + +kafka.config = { + application.id = hypertrace-metrics-exporter-job + bootstrap.servers = "localhost:9092" + bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS} +} + +logger.names = ["file"] +logger.file.dir = "/var/logs/metrics-generator" + +metrics.reporter.prefix = org.hypertrace.metrics.exporter.MetricsExporterService +metrics.reporter.names = ["prometheus"] +metrics.reportInterval = 60 \ No newline at end of file diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/log4j2.properties b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/log4j2.properties new file mode 100644 index 000000000..d91bc7bfe --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/log4j2.properties @@ -0,0 +1,23 @@ +status=error +name=PropertiesConfig +appender.console.type=Console +appender.console.name=STDOUT +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.type=RollingFile +appender.rolling.name=ROLLING_FILE +appender.rolling.fileName=${sys:service.name:-service}.log +appender.rolling.filePattern=${sys:service.name:-service}-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz +appender.rolling.layout.type=PatternLayout +appender.rolling.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.policies.type=Policies +appender.rolling.policies.time.type=TimeBasedTriggeringPolicy +appender.rolling.policies.time.interval=3600 +appender.rolling.policies.time.modulate=true +appender.rolling.policies.size.type=SizeBasedTriggeringPolicy +appender.rolling.policies.size.size=20MB +appender.rolling.strategy.type=DefaultRolloverStrategy +appender.rolling.strategy.max=5 +rootLogger.level=INFO +rootLogger.appenderRef.stdout.ref=STDOUT +rootLogger.appenderRef.rolling.ref=ROLLING_FILE diff --git a/settings.gradle.kts b/settings.gradle.kts index 571660d40..2e19348e2 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -36,6 +36,7 @@ include("span-normalizer:span-normalizer-constants") // metrics pipeline include("hypertrace-metrics-processor:hypertrace-metrics-processor") +include("hypertrace-metrics-exporter:hypertrace-metrics-exporter") // utils include("semantic-convention-utils") From 391d144086edc8eb9243d96cdf713851f73f02c3 Mon Sep 17 00:00:00 2001 From: Ronak Date: Tue, 19 Oct 2021 16:34:19 +0530 Subject: [PATCH 2/8] feat: adds hypertrace ingester hook for metrics exporter --- hypertrace-ingester/build.gradle.kts | 17 ++++++++++---- .../ingester/HypertraceIngester.java | 22 +++++++++++++++++++ .../exporter/MetricsExporterService.java | 5 ++++- 3 files changed, 39 insertions(+), 5 deletions(-) diff --git a/hypertrace-ingester/build.gradle.kts b/hypertrace-ingester/build.gradle.kts index c587865b3..b98d6156e 100644 --- a/hypertrace-ingester/build.gradle.kts +++ b/hypertrace-ingester/build.gradle.kts @@ -26,8 +26,8 @@ hypertraceDocker { dependencies { implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30-SNAPSHOT") implementation("org.hypertrace.core.datamodel:data-model:0.1.19") implementation("org.hypertrace.core.viewgenerator:view-generator-framework:0.3.1") implementation("com.typesafe:config:1.4.1") @@ -41,6 +41,7 @@ dependencies { implementation(project(":hypertrace-trace-enricher:hypertrace-trace-enricher")) implementation(project(":hypertrace-view-generator:hypertrace-view-generator")) implementation(project(":hypertrace-metrics-processor:hypertrace-metrics-processor")) + implementation(project(":hypertrace-metrics-exporter:hypertrace-metrics-exporter")) testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") testImplementation("org.mockito:mockito-core:3.8.0") @@ -81,7 +82,11 @@ tasks.register("copyServiceConfigs") { createCopySpec("hypertrace-metrics-processor", "hypertrace-metrics-processor", "main", - "common") + "common"), + createCopySpec("hypertrace-metrics-exporter", + "hypertrace-metrics-exporter", + "main", + "common") ).into("./build/resources/main/configs/") } @@ -137,7 +142,11 @@ tasks.register("copyServiceConfigsTest") { createCopySpec("hypertrace-metrics-processor", "hypertrace-metrics-processor", "test", - "hypertrace-metrics-processor") + "hypertrace-metrics-processor"), + createCopySpec("hypertrace-metrics-exporter", + "hypertrace-metrics-exporter", + "test", + "hypertrace-metrics-exporter") ).into("./build/resources/test/configs/") } diff --git a/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java b/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java index 78e6afa58..3151c8979 100644 --- a/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java +++ b/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java @@ -17,6 +17,7 @@ import org.hypertrace.core.serviceframework.config.ConfigUtils; import org.hypertrace.core.spannormalizer.SpanNormalizer; import org.hypertrace.core.viewgenerator.service.MultiViewGeneratorLauncher; +import org.hypertrace.metrics.exporter.MetricsExporterService; import org.hypertrace.metrics.processor.MetricsProcessor; import org.hypertrace.traceenricher.trace.enricher.TraceEnricher; import org.slf4j.Logger; @@ -30,9 +31,12 @@ public class HypertraceIngester extends KafkaStreamsApp { private static final String HYPERTRACE_INGESTER_JOB_CONFIG = "hypertrace-ingester-job-config"; private Map> jobNameToSubTopology = new HashMap<>(); + MetricsExporterService metricsExporterService; public HypertraceIngester(ConfigClient configClient) { super(configClient); + metricsExporterService = new MetricsExporterService(configClient); + metricsExporterService.setConfig(getSubJobConfig("hypertrace-metrics-exporter")); } private KafkaStreamsApp getSubTopologyInstance(String name) { @@ -118,6 +122,24 @@ public List getOutputTopics(Map properties) { return outputTopics.stream().collect(Collectors.toList()); } + @Override + protected void doInit() { + super.doInit(); + this.metricsExporterService.doInit(); + } + + @Override + protected void doStart() { + super.doStart(); + this.metricsExporterService.doStart(); + } + + @Override + protected void doStop() { + super.doStop(); + this.metricsExporterService.doStop(); + } + private List getSubTopologiesNames(Map properties) { return getJobConfig(properties).getStringList("sub.topology.names"); } diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterService.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterService.java index 18f198cee..88877f3d8 100644 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterService.java +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterService.java @@ -19,8 +19,11 @@ public class MetricsExporterService extends PlatformService { private Config config; private InMemoryMetricsProducer inMemoryMetricsProducer; - public MetricsExporterService(ConfigClient configClient, Config config) { + public MetricsExporterService(ConfigClient configClient) { super(configClient); + } + + public void setConfig(Config config) { this.config = config; } From 8cad5b6152936a4a473a7196fc5dab76c46ef639 Mon Sep 17 00:00:00 2001 From: Ronak Date: Wed, 20 Oct 2021 17:02:10 +0530 Subject: [PATCH 3/8] refactor and cleaned the code --- .../exporter/MetricsExporterService.java | 8 +- .../consumer/MetricsKafkaConsumer.java | 101 ++++++------- .../producer/InMemoryMetricsProducer.java | 60 ++++---- .../utils/OtlpProtoToMetricDataConverter.java | 135 ++++++++++++++++++ .../exporter/utils/OtlpToObjectConverter.java | 127 ---------------- .../resources/configs/common/application.conf | 1 + 6 files changed, 204 insertions(+), 228 deletions(-) create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/utils/OtlpProtoToMetricDataConverter.java delete mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/utils/OtlpToObjectConverter.java diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterService.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterService.java index 88877f3d8..0ea7bb0a3 100644 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterService.java +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterService.java @@ -12,11 +12,8 @@ public class MetricsExporterService extends PlatformService { private static final Logger LOGGER = LoggerFactory.getLogger(MetricsExporterService.class); - private static final String BUFFER_CONFIG_KEY = "buffer.config"; - private static final String MAX_QUEUE_SIZE = "max.queue.size"; - - private MetricsKafkaConsumer metricsKafkaConsumer; private Config config; + private MetricsKafkaConsumer metricsKafkaConsumer; private InMemoryMetricsProducer inMemoryMetricsProducer; public MetricsExporterService(ConfigClient configClient) { @@ -30,8 +27,7 @@ public void setConfig(Config config) { @Override public void doInit() { config = (config != null) ? config : getAppConfig(); - int maxQueueSize = config.getConfig(BUFFER_CONFIG_KEY).getInt(MAX_QUEUE_SIZE); - inMemoryMetricsProducer = new InMemoryMetricsProducer(maxQueueSize); + inMemoryMetricsProducer = new InMemoryMetricsProducer(config); metricsKafkaConsumer = new MetricsKafkaConsumer(config, inMemoryMetricsProducer); PrometheusCollector.create().apply(inMemoryMetricsProducer); } diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/consumer/MetricsKafkaConsumer.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/consumer/MetricsKafkaConsumer.java index 0e215593d..a1950f78a 100644 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/consumer/MetricsKafkaConsumer.java +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/consumer/MetricsKafkaConsumer.java @@ -3,83 +3,83 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.typesafe.config.Config; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.sdk.metrics.data.MetricData; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.hypertrace.metrics.exporter.producer.InMemoryMetricsProducer; +import org.hypertrace.metrics.exporter.utils.OtlpProtoToMetricDataConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MetricsKafkaConsumer implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(MetricsKafkaConsumer.class); + private static final int CONSUMER_POLL_TIMEOUT_MS = 100; + private static final int QUEUE_WAIT_TIME_MS = 500; private static final String KAFKA_CONFIG_KEY = "kafka.config"; private static final String INPUT_TOPIC_KEY = "input.topic"; private final KafkaConsumer consumer; private final InMemoryMetricsProducer inMemoryMetricsProducer; - private final AtomicBoolean running = new AtomicBoolean(false); public MetricsKafkaConsumer(Config config, InMemoryMetricsProducer inMemoryMetricsProducer) { - Properties props = new Properties(); - props.putAll( - mergeProperties(getBaseProperties(), getFlatMapConfig(config.getConfig(KAFKA_CONFIG_KEY)))); - consumer = new KafkaConsumer(props); + consumer = new KafkaConsumer<>(prepareProperties(config.getConfig(KAFKA_CONFIG_KEY))); consumer.subscribe(Collections.singletonList(config.getString(INPUT_TOPIC_KEY))); this.inMemoryMetricsProducer = inMemoryMetricsProducer; } public void run() { - running.set(true); - while (running.get()) { - // check if any message to commit - if (inMemoryMetricsProducer.isCommitOffset()) { - // consumer.commitSync(); - inMemoryMetricsProducer.clearCommitOffset(); - } - - // read new data - List resourceMetrics = consume(); - if (!resourceMetrics.isEmpty()) { - inMemoryMetricsProducer.addMetricData(resourceMetrics); - } - waitForSec((long) (1000L * 0.1)); + while (true) { + List resourceMetrics = new ArrayList<>(); + + ConsumerRecords records = + consumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT_MS)); + + records.forEach( + record -> { + try { + resourceMetrics.add(ResourceMetrics.parseFrom(record.value())); + } catch (InvalidProtocolBufferException e) { + LOGGER.warn("Skipping record due to error", e); + } + }); + + resourceMetrics.forEach( + rm -> { + List metricData = OtlpProtoToMetricDataConverter.toMetricData(rm); + boolean result = false; + while (!result) { + result = inMemoryMetricsProducer.addMetricData(metricData); + waitForSec(QUEUE_WAIT_TIME_MS); + } + }); } } - public void stop() { - running.set(false); + public void close() { + consumer.close(); } - public List consume() { - List resourceMetrics = new ArrayList<>(); - - ConsumerRecords records = - consumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT_MS)); - records.forEach( - record -> { - try { - resourceMetrics.add(ResourceMetrics.parseFrom(record.value())); - } catch (InvalidProtocolBufferException e) { - LOGGER.error("Invalid record with exception", e); - } - }); - - return resourceMetrics; - } + private Properties prepareProperties(Config config) { + Map overrideProperties = new HashMap(); + config.entrySet().stream() + .forEach(e -> overrideProperties.put(e.getKey(), config.getString(e.getKey()))); - public void close() { - consumer.close(); + Map baseProperties = getBaseProperties(); + overrideProperties.forEach(baseProperties::put); + + Properties properties = new Properties(); + properties.putAll(baseProperties); + return properties; } private Map getBaseProperties() { @@ -97,28 +97,11 @@ private Map getBaseProperties() { return baseProperties; } - private Map getFlatMapConfig(Config config) { - Map propertiesMap = new HashMap(); - config.entrySet().stream() - .forEach( - (entry) -> { - propertiesMap.put((String) entry.getKey(), config.getString((String) entry.getKey())); - }); - return propertiesMap; - } - - private Map mergeProperties( - Map baseProps, Map props) { - Objects.requireNonNull(baseProps); - props.forEach(baseProps::put); - return baseProps; - } - private void waitForSec(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { - LOGGER.debug("waiting for pushing next records were intruppted"); + LOGGER.debug("Interrupted while waiting for Queue to be empty"); } } } diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/producer/InMemoryMetricsProducer.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/producer/InMemoryMetricsProducer.java index d14072dc4..2eff4a951 100644 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/producer/InMemoryMetricsProducer.java +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/producer/InMemoryMetricsProducer.java @@ -1,6 +1,6 @@ package org.hypertrace.metrics.exporter.producer; -import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import com.typesafe.config.Config; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.MetricProducer; import java.util.ArrayList; @@ -8,52 +8,40 @@ import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import org.hypertrace.metrics.exporter.utils.OtlpToObjectConverter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class InMemoryMetricsProducer implements MetricProducer { - private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryMetricsProducer.class); + + private static final String BUFFER_CONFIG_KEY = "buffer.config"; + private static final String MAX_QUEUE_SIZE = "max.queue.size"; + private static final String MAX_BATCH_SIZE = "max.queue.size"; + private BlockingQueue metricDataQueue; - private final AtomicBoolean commitOffset = new AtomicBoolean(false); + private int maxQueueSize; + private int maxBatchSize; - public InMemoryMetricsProducer(int maxQueueSize) { - this.metricDataQueue = new ArrayBlockingQueue(maxQueueSize); + public InMemoryMetricsProducer(Config config) { + maxQueueSize = config.getConfig(BUFFER_CONFIG_KEY).getInt(MAX_QUEUE_SIZE); + maxBatchSize = config.getConfig(BUFFER_CONFIG_KEY).getInt(MAX_BATCH_SIZE); + this.metricDataQueue = new ArrayBlockingQueue<>(maxQueueSize); } - public void addMetricData(List resourceMetrics) { - try { - for (ResourceMetrics rm : resourceMetrics) { - List metricData = OtlpToObjectConverter.toMetricData(rm); - for (MetricData md : metricData) { - this.metricDataQueue.put(md); - } - } - } catch (InterruptedException exception) { - LOGGER.info("This thread was intruppted, so we might loose copying some metrics "); + public boolean addMetricData(List metricData) { + if (this.metricDataQueue.size() + metricData.size() >= maxQueueSize) { + return false; } - } - public Collection collectAllMetrics() { - List metricDataList = new ArrayList<>(); - int max = 0; - while (max < 100 && this.metricDataQueue.peek() != null) { - metricDataList.add(this.metricDataQueue.poll()); - max++; + for (MetricData md : metricData) { + this.metricDataQueue.offer(md); } - return metricDataList; - } - public void setCommitOffset() { - commitOffset.set(true); + return true; } - public void clearCommitOffset() { - commitOffset.set(false); - } - - public boolean isCommitOffset() { - return commitOffset.get(); + public Collection collectAllMetrics() { + List metricData = new ArrayList<>(); + while (metricData.size() < maxBatchSize && this.metricDataQueue.peek() != null) { + metricData.add(this.metricDataQueue.poll()); + } + return metricData; } } diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/utils/OtlpProtoToMetricDataConverter.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/utils/OtlpProtoToMetricDataConverter.java new file mode 100644 index 000000000..45961af3c --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/utils/OtlpProtoToMetricDataConverter.java @@ -0,0 +1,135 @@ +package org.hypertrace.metrics.exporter.utils; + +import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE; +import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.proto.common.v1.InstrumentationLibrary; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.Gauge; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.Sum; +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.DoubleGaugeData; +import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.DoubleSumData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.resources.Resource; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +public class OtlpProtoToMetricDataConverter { + + private static Resource toResource(io.opentelemetry.proto.resource.v1.Resource otlpResource) { + return Resource.create(toAttributes(otlpResource.getAttributesList())); + } + + private static InstrumentationLibraryInfo toInstrumentationLibraryInfo( + InstrumentationLibrary otlpInstrumentationLibraryInfo) { + return InstrumentationLibraryInfo.create( + otlpInstrumentationLibraryInfo.getName(), otlpInstrumentationLibraryInfo.getVersion()); + } + + private static Attributes toAttributes(List keyValues) { + AttributesBuilder attributesBuilder = Attributes.builder(); + keyValues.forEach( + keyValue -> { + attributesBuilder.put(keyValue.getKey(), keyValue.getValue().getStringValue()); + }); + return attributesBuilder.build(); + } + + private static List toDoublePointData(List numberDataPoints) { + return numberDataPoints.stream() + .map( + numberDataPoint -> + DoublePointData.create( + numberDataPoint.getStartTimeUnixNano(), + numberDataPoint.getTimeUnixNano(), + toAttributes(numberDataPoint.getAttributesList()), + numberDataPoint.getAsInt())) + .collect(Collectors.toList()); + } + + private static AggregationTemporality toAggregationTemporality( + io.opentelemetry.proto.metrics.v1.AggregationTemporality aggregationTemporality) { + switch (aggregationTemporality) { + case AGGREGATION_TEMPORALITY_CUMULATIVE: + return AggregationTemporality.CUMULATIVE; + case AGGREGATION_TEMPORALITY_DELTA: + return AggregationTemporality.DELTA; + default: + return AggregationTemporality.CUMULATIVE; + } + } + + private static MetricData toGaugeMetricData( + Resource resource, InstrumentationLibraryInfo instrumentationLibraryInfo, Metric metric) { + Gauge gaugeMetric = metric.getGauge(); + + DoubleGaugeData data = + DoubleGaugeData.create(toDoublePointData(gaugeMetric.getDataPointsList())); + + return MetricData.createDoubleGauge( + resource, + instrumentationLibraryInfo, + metric.getName(), + metric.getDescription(), + metric.getUnit(), + data); + } + + private static MetricData toSumMetricData( + Resource resource, InstrumentationLibraryInfo instrumentationLibraryInfo, Metric metric) { + Sum sumMetric = metric.getSum(); + + DoubleSumData doubleSumData = + DoubleSumData.create( + sumMetric.getIsMonotonic(), + toAggregationTemporality(sumMetric.getAggregationTemporality()), + toDoublePointData(sumMetric.getDataPointsList())); + + return MetricData.createDoubleSum( + resource, + instrumentationLibraryInfo, + metric.getName(), + metric.getDescription(), + metric.getUnit(), + doubleSumData); + } + + private static MetricData toMetricData( + Resource resource, InstrumentationLibraryInfo instrumentationLibraryInfo, Metric metric) { + switch (metric.getDataCase()) { + case GAUGE: + return toGaugeMetricData(resource, instrumentationLibraryInfo, metric); + case SUM: + return toSumMetricData(resource, instrumentationLibraryInfo, metric); + default: + throw new UnsupportedOperationException( + String.format("Unsupported metric type: %s", metric.getDataCase())); + } + } + + public static List toMetricData(ResourceMetrics resourceMetrics) { + List metricData = new ArrayList<>(); + Resource resource = toResource(resourceMetrics.getResource()); + resourceMetrics + .getInstrumentationLibraryMetricsList() + .forEach( + instrumentationLibraryMetrics -> { + InstrumentationLibraryInfo instrumentationLibraryInfo = + toInstrumentationLibraryInfo( + instrumentationLibraryMetrics.getInstrumentationLibrary()); + instrumentationLibraryMetrics + .getMetricsList() + .forEach(metric -> toMetricData(resource, instrumentationLibraryInfo, metric)); + }); + return metricData; + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/utils/OtlpToObjectConverter.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/utils/OtlpToObjectConverter.java deleted file mode 100644 index 2536b4307..000000000 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/utils/OtlpToObjectConverter.java +++ /dev/null @@ -1,127 +0,0 @@ -package org.hypertrace.metrics.exporter.utils; - -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.common.AttributesBuilder; -import io.opentelemetry.proto.common.v1.InstrumentationLibrary; -import io.opentelemetry.proto.common.v1.KeyValue; -import io.opentelemetry.proto.metrics.v1.Gauge; -import io.opentelemetry.proto.metrics.v1.Metric.DataCase; -import io.opentelemetry.proto.metrics.v1.NumberDataPoint; -import io.opentelemetry.proto.metrics.v1.ResourceMetrics; -import io.opentelemetry.proto.metrics.v1.Sum; -import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; -import io.opentelemetry.sdk.metrics.data.AggregationTemporality; -import io.opentelemetry.sdk.metrics.data.DoubleGaugeData; -import io.opentelemetry.sdk.metrics.data.DoublePointData; -import io.opentelemetry.sdk.metrics.data.DoubleSumData; -import io.opentelemetry.sdk.metrics.data.MetricData; -import io.opentelemetry.sdk.resources.Resource; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; - -public class OtlpToObjectConverter { - - public static Resource toResource(io.opentelemetry.proto.resource.v1.Resource otlpResource) { - return Resource.create(toAttributes(otlpResource.getAttributesList())); - } - - public static InstrumentationLibraryInfo toInstrumentationLibraryInfo( - InstrumentationLibrary otlpInstrumentationLibraryInfo) { - return InstrumentationLibraryInfo.create( - otlpInstrumentationLibraryInfo.getName(), otlpInstrumentationLibraryInfo.getVersion()); - } - - public static Attributes toAttributes(List keyValues) { - AttributesBuilder attributesBuilder = Attributes.builder(); - keyValues.forEach( - keyValue -> { - attributesBuilder.put(keyValue.getKey(), keyValue.getValue().getStringValue()); - }); - return attributesBuilder.build(); - } - - public static List toDoublePointData(List numberDataPoints) { - return numberDataPoints.stream() - .map( - numberDataPoint -> - DoublePointData.create( - numberDataPoint.getStartTimeUnixNano(), - numberDataPoint.getTimeUnixNano(), - toAttributes(numberDataPoint.getAttributesList()), - numberDataPoint.getAsInt())) - .collect(Collectors.toList()); - } - - public static List toMetricData(ResourceMetrics resourceMetrics) { - List metricData = new ArrayList<>(); - Resource resource = toResource(resourceMetrics.getResource()); - resourceMetrics - .getInstrumentationLibraryMetricsList() - .forEach( - instrumentationLibraryMetrics -> { - InstrumentationLibraryInfo instrumentationLibraryInfo = - toInstrumentationLibraryInfo( - instrumentationLibraryMetrics.getInstrumentationLibrary()); - instrumentationLibraryMetrics - .getMetricsList() - .forEach( - metric -> { - // get type : for now only support gauge - if (metric.getDataCase().equals(DataCase.GAUGE)) { - Gauge gaugeMetric = metric.getGauge(); - String name = metric.getName(); - String description = metric.getDescription(); - String unit = metric.getUnit(); - DoubleGaugeData data = - DoubleGaugeData.create( - toDoublePointData(gaugeMetric.getDataPointsList())); - MetricData md = - MetricData.createDoubleGauge( - resource, - instrumentationLibraryInfo, - name, - description, - unit, - data); - metricData.add(md); - } else if (metric.getDataCase().equals(DataCase.SUM)) { - Sum sumMetric = metric.getSum(); - boolean isMonotonic = sumMetric.getIsMonotonic(); - AggregationTemporality temporality; - if (sumMetric - .getAggregationTemporality() - .equals( - io.opentelemetry.proto.metrics.v1.AggregationTemporality - .AGGREGATION_TEMPORALITY_CUMULATIVE)) { - temporality = AggregationTemporality.CUMULATIVE; - } else if (sumMetric - .getAggregationTemporality() - .equals( - io.opentelemetry.proto.metrics.v1.AggregationTemporality - .AGGREGATION_TEMPORALITY_DELTA)) { - temporality = AggregationTemporality.DELTA; - } else { - temporality = AggregationTemporality.CUMULATIVE; - } - - DoubleSumData doubleSumData = - DoubleSumData.create( - isMonotonic, - temporality, - toDoublePointData(sumMetric.getDataPointsList())); - MetricData md = - MetricData.createDoubleSum( - resource, - instrumentationLibraryInfo, - metric.getName(), - metric.getDescription(), - metric.getUnit(), - doubleSumData); - metricData.add(md); - } - }); - }); - return metricData; - } -} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf index 1ab91e7e4..16a968d23 100644 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf @@ -7,6 +7,7 @@ input.topic = "enriched-otlp-metrics" buffer.config { max.queue.size = 5000 + max.batch.size = 1000 } kafka.config = { From 844a632ccd03ac70ce6ee22f6187cbdb303b9049 Mon Sep 17 00:00:00 2001 From: Ronak Date: Wed, 20 Oct 2021 21:17:26 +0530 Subject: [PATCH 4/8] handles the runtime and interrupt exception --- .../consumer/MetricsKafkaConsumer.java | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/consumer/MetricsKafkaConsumer.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/consumer/MetricsKafkaConsumer.java index a1950f78a..a6395498a 100644 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/consumer/MetricsKafkaConsumer.java +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/consumer/MetricsKafkaConsumer.java @@ -24,6 +24,7 @@ public class MetricsKafkaConsumer implements Runnable { private static final int CONSUMER_POLL_TIMEOUT_MS = 100; private static final int QUEUE_WAIT_TIME_MS = 500; + private static final int WAIT_TIME_MS = 100; private static final String KAFKA_CONFIG_KEY = "kafka.config"; private static final String INPUT_TOPIC_KEY = "input.topic"; @@ -38,7 +39,7 @@ public MetricsKafkaConsumer(Config config, InMemoryMetricsProducer inMemoryMetri } public void run() { - while (true) { + while (true && !Thread.currentThread().isInterrupted()) { List resourceMetrics = new ArrayList<>(); ConsumerRecords records = @@ -55,13 +56,19 @@ record -> { resourceMetrics.forEach( rm -> { - List metricData = OtlpProtoToMetricDataConverter.toMetricData(rm); - boolean result = false; - while (!result) { - result = inMemoryMetricsProducer.addMetricData(metricData); - waitForSec(QUEUE_WAIT_TIME_MS); + try { + List metricData = OtlpProtoToMetricDataConverter.toMetricData(rm); + boolean result = false; + while (!result) { + result = inMemoryMetricsProducer.addMetricData(metricData); + waitForSec(QUEUE_WAIT_TIME_MS); + } + } catch (Exception e) { + LOGGER.debug("skipping the resource metrics due to error: {}", e); } }); + + waitForSec(WAIT_TIME_MS); } } @@ -101,7 +108,8 @@ private void waitForSec(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { - LOGGER.debug("Interrupted while waiting for Queue to be empty"); + Thread.currentThread().interrupt(); + LOGGER.debug("While waiting, the consumer thread has interrupted"); } } } From 552a77bebd8e45081ae6cd0fe50820e3e532c445 Mon Sep 17 00:00:00 2001 From: Ronak Date: Wed, 20 Oct 2021 21:42:57 +0530 Subject: [PATCH 5/8] clean up the deps and updated with newer deps --- hypertrace-ingester/build.gradle.kts | 4 +-- .../build.gradle.kts | 28 ++++++------------- .../exporter/MetricsExporterService.java | 4 +++ .../build.gradle.kts | 4 +-- .../build.gradle.kts | 4 +-- .../raw-spans-grouper/build.gradle.kts | 4 +-- .../span-normalizer/build.gradle.kts | 4 +-- 7 files changed, 22 insertions(+), 30 deletions(-) diff --git a/hypertrace-ingester/build.gradle.kts b/hypertrace-ingester/build.gradle.kts index b98d6156e..e9711609b 100644 --- a/hypertrace-ingester/build.gradle.kts +++ b/hypertrace-ingester/build.gradle.kts @@ -26,8 +26,8 @@ hypertraceDocker { dependencies { implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30-SNAPSHOT") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30") implementation("org.hypertrace.core.datamodel:data-model:0.1.19") implementation("org.hypertrace.core.viewgenerator:view-generator-framework:0.3.1") implementation("com.typesafe:config:1.4.1") diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts index 5177800dc..7ff01cc1a 100644 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts @@ -25,33 +25,21 @@ tasks.test { } dependencies { - // internal projects deps - implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api")) - // common and framework - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30-SNAPSHOT") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30-SNAPSHOT") - implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30") // open telemetry - implementation("io.opentelemetry:opentelemetry-api:1.8.0-SNAPSHOT") - implementation("io.opentelemetry:opentelemetry-api-metrics:1.8.0-alpha-SNAPSHOT") - implementation("io.opentelemetry:opentelemetry-sdk:1.8.0-SNAPSHOT") - implementation("io.opentelemetry:opentelemetry-exporter-otlp-common:1.8.0-SNAPSHOT") - implementation("io.opentelemetry:opentelemetry-sdk-metrics:1.8.0-alpah-SNAPSHOT") - implementation("io.opentelemetry:opentelemetry-exporter-otlp-metrics:1.8.0-alpha-SNAPSHOT") - implementation("io.opentelemetry:opentelemetry-exporter-prometheus:1.8.0-alpha-SNAPSHOT") + implementation("io.opentelemetry:opentelemetry-sdk-metrics:1.7.0-alpah") + // TODO: Upgrade opentelemetry-exporter-prometheus to 1.8.0 release when available + // to include time stamp related changes + // https://github.com/open-telemetry/opentelemetry-java/pull/3700 + // For now, the exported time stamp will be the current time stamp. + implementation("io.opentelemetry:opentelemetry-exporter-prometheus:1.7.0-alpha") // open telemetry proto implementation("io.opentelemetry:opentelemetry-proto:1.6.0-alpha") - // jetty server - implementation("org.eclipse.jetty:jetty-server:9.4.42.v20210604") - implementation("org.eclipse.jetty:jetty-servlet:9.4.42.v20210604") - - // prometheus metrics servelet - implementation("io.prometheus:simpleclient_servlet:0.12.0") - // kafka implementation("org.apache.kafka:kafka-clients:2.6.0") diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterService.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterService.java index 0ea7bb0a3..86628c10d 100644 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterService.java +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterService.java @@ -29,6 +29,10 @@ public void doInit() { config = (config != null) ? config : getAppConfig(); inMemoryMetricsProducer = new InMemoryMetricsProducer(config); metricsKafkaConsumer = new MetricsKafkaConsumer(config, inMemoryMetricsProducer); + // TODO: Upgrade opentelemetry-exporter-prometheus to 1.8.0 release when available + // to include time stamp related changes + // https://github.com/open-telemetry/opentelemetry-java/pull/3700 + // For now, the exported time stamp will be the current time stamp. PrometheusCollector.create().apply(inMemoryMetricsProducer); } diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts b/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts index 0d318f4d0..60b1e068e 100644 --- a/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts @@ -29,8 +29,8 @@ dependencies { implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api")) // frameworks - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30") implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") // open telemetry proto diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher/build.gradle.kts b/hypertrace-trace-enricher/hypertrace-trace-enricher/build.gradle.kts index 3585a30f1..ba7755c19 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher/build.gradle.kts +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher/build.gradle.kts @@ -36,8 +36,8 @@ tasks.test { dependencies { implementation(project(":hypertrace-trace-enricher:hypertrace-trace-enricher-impl")) implementation("org.hypertrace.core.datamodel:data-model:0.1.19") - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30") implementation("org.hypertrace.entity.service:entity-service-client:0.8.5") implementation("com.typesafe:config:1.4.1") diff --git a/raw-spans-grouper/raw-spans-grouper/build.gradle.kts b/raw-spans-grouper/raw-spans-grouper/build.gradle.kts index 6332c209f..7293d48fa 100644 --- a/raw-spans-grouper/raw-spans-grouper/build.gradle.kts +++ b/raw-spans-grouper/raw-spans-grouper/build.gradle.kts @@ -39,8 +39,8 @@ dependencies { } implementation(project(":span-normalizer:span-normalizer-api")) implementation("org.hypertrace.core.datamodel:data-model:0.1.19") - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30") implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") implementation("com.typesafe:config:1.4.1") diff --git a/span-normalizer/span-normalizer/build.gradle.kts b/span-normalizer/span-normalizer/build.gradle.kts index 0f02e9fd3..d7dff757a 100644 --- a/span-normalizer/span-normalizer/build.gradle.kts +++ b/span-normalizer/span-normalizer/build.gradle.kts @@ -35,8 +35,8 @@ dependencies { implementation(project(":semantic-convention-utils")) implementation("org.hypertrace.core.datamodel:data-model:0.1.19") - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30") implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") // Required for the GRPC clients. From 8f467d5124b07f48acf4be65ffc52913a7f0e732 Mon Sep 17 00:00:00 2001 From: Ronak Date: Wed, 20 Oct 2021 21:47:42 +0530 Subject: [PATCH 6/8] handling closing conditions --- .../hypertrace/metrics/exporter/MetricsExporterService.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterService.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterService.java index 86628c10d..511686bfe 100644 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterService.java +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterService.java @@ -14,6 +14,7 @@ public class MetricsExporterService extends PlatformService { private Config config; private MetricsKafkaConsumer metricsKafkaConsumer; + private Thread metricsConsumerThread; private InMemoryMetricsProducer inMemoryMetricsProducer; public MetricsExporterService(ConfigClient configClient) { @@ -29,6 +30,7 @@ public void doInit() { config = (config != null) ? config : getAppConfig(); inMemoryMetricsProducer = new InMemoryMetricsProducer(config); metricsKafkaConsumer = new MetricsKafkaConsumer(config, inMemoryMetricsProducer); + metricsConsumerThread = new Thread(metricsKafkaConsumer); // TODO: Upgrade opentelemetry-exporter-prometheus to 1.8.0 release when available // to include time stamp related changes // https://github.com/open-telemetry/opentelemetry-java/pull/3700 @@ -38,7 +40,7 @@ public void doInit() { @Override public void doStart() { - Thread metricsConsumerThread = new Thread(metricsKafkaConsumer); + metricsConsumerThread = new Thread(metricsKafkaConsumer); metricsConsumerThread.start(); try { metricsConsumerThread.join(); @@ -49,6 +51,7 @@ public void doStart() { @Override public void doStop() { + metricsConsumerThread.interrupt(); metricsKafkaConsumer.close(); } From 8d4024690afdf609c35fa8bab06550fcb631f4f0 Mon Sep 17 00:00:00 2001 From: Ronak Date: Thu, 21 Oct 2021 16:00:03 +0530 Subject: [PATCH 7/8] test: adds test for otlp proto to metric data conversion --- .../utils/OtlpProtoToMetricDataConverter.java | 12 +- .../OtlpProtoToMetricDataConverterTest.java | 217 ++++++++++++++++++ 2 files changed, 224 insertions(+), 5 deletions(-) create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/utils/OtlpProtoToMetricDataConverterTest.java diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/utils/OtlpProtoToMetricDataConverter.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/utils/OtlpProtoToMetricDataConverter.java index 45961af3c..7f7d6b499 100644 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/utils/OtlpProtoToMetricDataConverter.java +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/utils/OtlpProtoToMetricDataConverter.java @@ -1,6 +1,5 @@ package org.hypertrace.metrics.exporter.utils; -import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE; import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA; import io.opentelemetry.api.common.Attributes; @@ -52,15 +51,15 @@ private static List toDoublePointData(List num numberDataPoint.getStartTimeUnixNano(), numberDataPoint.getTimeUnixNano(), toAttributes(numberDataPoint.getAttributesList()), - numberDataPoint.getAsInt())) + numberDataPoint.hasAsInt() + ? numberDataPoint.getAsInt() + : numberDataPoint.getAsDouble())) .collect(Collectors.toList()); } private static AggregationTemporality toAggregationTemporality( io.opentelemetry.proto.metrics.v1.AggregationTemporality aggregationTemporality) { switch (aggregationTemporality) { - case AGGREGATION_TEMPORALITY_CUMULATIVE: - return AggregationTemporality.CUMULATIVE; case AGGREGATION_TEMPORALITY_DELTA: return AggregationTemporality.DELTA; default: @@ -128,7 +127,10 @@ public static List toMetricData(ResourceMetrics resourceMetrics) { instrumentationLibraryMetrics.getInstrumentationLibrary()); instrumentationLibraryMetrics .getMetricsList() - .forEach(metric -> toMetricData(resource, instrumentationLibraryInfo, metric)); + .forEach( + metric -> + metricData.add( + toMetricData(resource, instrumentationLibraryInfo, metric))); }); return metricData; } diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/utils/OtlpProtoToMetricDataConverterTest.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/utils/OtlpProtoToMetricDataConverterTest.java new file mode 100644 index 000000000..2ab985e13 --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/utils/OtlpProtoToMetricDataConverterTest.java @@ -0,0 +1,217 @@ +package org.hypertrace.metrics.exporter.utils; + +import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.DELTA; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.InstrumentationLibrary; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.AggregationTemporality; +import io.opentelemetry.proto.metrics.v1.Gauge; +import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.Sum; +import io.opentelemetry.proto.resource.v1.Resource; +import io.opentelemetry.sdk.metrics.data.MetricData; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class OtlpProtoToMetricDataConverterTest { + + @Test + public void testGuageMetricData() { + // test for int values + ResourceMetrics resourceMetrics = + prepareMetric("int_num_calls", "number of calls", 10, "Gauge"); + + List underTestMetricData = + OtlpProtoToMetricDataConverter.toMetricData(resourceMetrics); + + Assertions.assertEquals(1, underTestMetricData.size()); + Assertions.assertEquals("int_num_calls", underTestMetricData.get(0).getName()); + Assertions.assertEquals(1, underTestMetricData.get(0).getDoubleGaugeData().getPoints().size()); + underTestMetricData + .get(0) + .getDoubleGaugeData() + .getPoints() + .forEach( + dpd -> { + Assertions.assertEquals(10.0, dpd.getValue()); + }); + + // test for double values + ResourceMetrics resourceMetrics1 = + prepareMetric("double_num_calls", "number of calls", 5.5, "Gauge"); + + List underTestMetricData1 = + underTestMetricData = OtlpProtoToMetricDataConverter.toMetricData(resourceMetrics1); + + Assertions.assertEquals(1, underTestMetricData1.size()); + Assertions.assertEquals("double_num_calls", underTestMetricData1.get(0).getName()); + Assertions.assertEquals(1, underTestMetricData1.get(0).getDoubleGaugeData().getPoints().size()); + underTestMetricData1 + .get(0) + .getDoubleGaugeData() + .getPoints() + .forEach( + dpd -> { + Assertions.assertEquals(5.5, dpd.getValue()); + }); + } + + @Test + public void testSumMetricData() { + // test for int values + ResourceMetrics resourceMetrics = prepareMetric("int_sum_calls", "number of calls", 10, "Sum"); + + List underTestMetricData = + OtlpProtoToMetricDataConverter.toMetricData(resourceMetrics); + + Assertions.assertEquals(1, underTestMetricData.size()); + Assertions.assertEquals("int_sum_calls", underTestMetricData.get(0).getName()); + Assertions.assertEquals(1, underTestMetricData.get(0).getDoubleSumData().getPoints().size()); + underTestMetricData + .get(0) + .getDoubleSumData() + .getPoints() + .forEach( + dpd -> { + Assertions.assertEquals(10.0, dpd.getValue()); + }); + Assertions.assertEquals(false, underTestMetricData.get(0).getDoubleSumData().isMonotonic()); + Assertions.assertEquals( + DELTA, underTestMetricData.get(0).getDoubleSumData().getAggregationTemporality()); + + // test for double values + ResourceMetrics resourceMetrics1 = + prepareMetric("double_sum_calls", "number of calls", 10.5, "Sum"); + + List underTestMetricData1 = + OtlpProtoToMetricDataConverter.toMetricData(resourceMetrics1); + + Assertions.assertEquals(1, underTestMetricData1.size()); + Assertions.assertEquals("double_sum_calls", underTestMetricData1.get(0).getName()); + Assertions.assertEquals(1, underTestMetricData1.get(0).getDoubleSumData().getPoints().size()); + underTestMetricData1 + .get(0) + .getDoubleSumData() + .getPoints() + .forEach( + dpd -> { + Assertions.assertEquals(10.5, dpd.getValue()); + }); + Assertions.assertEquals(false, underTestMetricData1.get(0).getDoubleSumData().isMonotonic()); + Assertions.assertEquals( + DELTA, underTestMetricData1.get(0).getDoubleSumData().getAggregationTemporality()); + } + + private Resource prepareResource() { + return Resource.newBuilder() + .addAttributes( + io.opentelemetry.proto.common.v1.KeyValue.newBuilder() + .setKey("Service") + .setValue( + AnyValue.newBuilder().setStringValue("hypertrace-metrics-exporter").build()) + .build()) + .build(); + } + + private NumberDataPoint prepareNumberDataPoint(Number value) { + List attributes = + toAttributes( + Map.of( + "tenant_id", "__default", + "service_id", "1234", + "api_id", "4567")); + + NumberDataPoint.Builder numberDataPointBuilder = NumberDataPoint.newBuilder(); + numberDataPointBuilder.addAllAttributes(attributes); + numberDataPointBuilder.setTimeUnixNano( + TimeUnit.NANOSECONDS.convert( + 1634119810000L /*2021-10-13:10-10-10 GMT*/, TimeUnit.MILLISECONDS)); + + if (value instanceof Integer) { + numberDataPointBuilder.setAsInt(value.intValue()); + } else { + numberDataPointBuilder.setAsDouble(value.doubleValue()); + } + + return numberDataPointBuilder.build(); + } + + private Metric prepareGaugeMetric(String metricName, String metricDesc, Number value) { + io.opentelemetry.proto.metrics.v1.Metric.Builder metricBuilder = + io.opentelemetry.proto.metrics.v1.Metric.newBuilder(); + metricBuilder.setName(metricName); + metricBuilder.setDescription(metricDesc); + metricBuilder.setUnit("1"); + + NumberDataPoint numberDataPoint = prepareNumberDataPoint(value); + + Gauge.Builder gaugeBuilder = Gauge.newBuilder(); + gaugeBuilder.addDataPoints(numberDataPoint); + metricBuilder.setGauge(gaugeBuilder.build()); + return metricBuilder.build(); + } + + private Metric prepareSumMetric(String metricName, String metricDesc, Number value) { + io.opentelemetry.proto.metrics.v1.Metric.Builder metricBuilder = + io.opentelemetry.proto.metrics.v1.Metric.newBuilder(); + metricBuilder.setName(metricName); + metricBuilder.setDescription(metricDesc); + metricBuilder.setUnit("1"); + + NumberDataPoint numberDataPoint = prepareNumberDataPoint(value); + + Sum.Builder sumBuilder = Sum.newBuilder(); + sumBuilder.addDataPoints(numberDataPoint); + sumBuilder.setAggregationTemporality(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA); + sumBuilder.setIsMonotonic(false); + metricBuilder.setSum(sumBuilder.build()); + return metricBuilder.build(); + } + + private ResourceMetrics prepareMetric( + String metricName, String metricDesc, Number value, String type) { + + ResourceMetrics.Builder resourceMetricsBuilder = ResourceMetrics.newBuilder(); + resourceMetricsBuilder.setResource(prepareResource()); + + Metric metric; + if (type.equals("Gauge")) { + metric = prepareGaugeMetric(metricName, metricDesc, value); + } else { + metric = prepareSumMetric(metricName, metricDesc, value); + } + + resourceMetricsBuilder.addInstrumentationLibraryMetrics( + InstrumentationLibraryMetrics.newBuilder() + .addMetrics(metric) + .setInstrumentationLibrary( + InstrumentationLibrary.newBuilder().setName("Generated").build()) + .build()); + + return resourceMetricsBuilder.build(); + } + + private List toAttributes(Map labels) { + List attributes = + labels.entrySet().stream() + .map( + k -> { + io.opentelemetry.proto.common.v1.KeyValue.Builder keyValueBuilder = + io.opentelemetry.proto.common.v1.KeyValue.newBuilder(); + keyValueBuilder.setKey(k.getKey()); + String value = k.getValue() != null ? k.getValue() : ""; + keyValueBuilder.setValue(AnyValue.newBuilder().setStringValue(value).build()); + return keyValueBuilder.build(); + }) + .collect(Collectors.toList()); + return attributes; + } +} From b944b9ea055a877426bc8ab5cf8d35c00273b379 Mon Sep 17 00:00:00 2001 From: Ronak Date: Thu, 21 Oct 2021 16:40:19 +0530 Subject: [PATCH 8/8] test: add unit test for in-memory metrics producer --- .../producer/InMemoryMetricsProducer.java | 4 +- .../producer/InMemoryMetricsProducerTest.java | 72 ++++++++++ .../OtlpProtoToMetricDataConverterTest.java | 119 +---------------- .../exporter/utils/ResourceMetricsUtils.java | 125 ++++++++++++++++++ .../application.conf | 24 ++++ 5 files changed, 224 insertions(+), 120 deletions(-) create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/producer/InMemoryMetricsProducerTest.java create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/utils/ResourceMetricsUtils.java create mode 100644 hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/resources/configs/hypertrace-metrics-exporter/application.conf diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/producer/InMemoryMetricsProducer.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/producer/InMemoryMetricsProducer.java index 2eff4a951..34c6aedd9 100644 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/producer/InMemoryMetricsProducer.java +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/producer/InMemoryMetricsProducer.java @@ -13,7 +13,7 @@ public class InMemoryMetricsProducer implements MetricProducer { private static final String BUFFER_CONFIG_KEY = "buffer.config"; private static final String MAX_QUEUE_SIZE = "max.queue.size"; - private static final String MAX_BATCH_SIZE = "max.queue.size"; + private static final String MAX_BATCH_SIZE = "max.batch.size"; private BlockingQueue metricDataQueue; private int maxQueueSize; @@ -26,7 +26,7 @@ public InMemoryMetricsProducer(Config config) { } public boolean addMetricData(List metricData) { - if (this.metricDataQueue.size() + metricData.size() >= maxQueueSize) { + if (this.metricDataQueue.size() + metricData.size() > maxQueueSize) { return false; } diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/producer/InMemoryMetricsProducerTest.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/producer/InMemoryMetricsProducerTest.java new file mode 100644 index 000000000..52d081e4c --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/producer/InMemoryMetricsProducerTest.java @@ -0,0 +1,72 @@ +package org.hypertrace.metrics.exporter.producer; + +import static org.hypertrace.metrics.exporter.utils.ResourceMetricsUtils.prepareMetric; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.sdk.metrics.data.MetricData; +import java.util.List; +import org.hypertrace.metrics.exporter.utils.OtlpProtoToMetricDataConverter; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class InMemoryMetricsProducerTest { + + private InMemoryMetricsProducer underTest; + + @BeforeEach + public void setUp() { + Config config = + ConfigFactory.parseURL( + getClass() + .getClassLoader() + .getResource("configs/hypertrace-metrics-exporter/application.conf")); + + underTest = new InMemoryMetricsProducer(config); + } + + @Test + public void testAddMetricDataAndCollectData() { + // insert 1 data + ResourceMetrics resourceMetrics = prepareMetric("int_num_calls", "number of calls", 1, "Gauge"); + List inMetricData = OtlpProtoToMetricDataConverter.toMetricData(resourceMetrics); + + Assertions.assertTrue(underTest.addMetricData(inMetricData)); + + // insert 2nd data + ResourceMetrics resourceMetrics1 = + prepareMetric("double_num_calls", "number of calls", 2.5, "Gauge"); + List inMetricData1 = OtlpProtoToMetricDataConverter.toMetricData(resourceMetrics1); + + // assert that can't add + Assertions.assertTrue(underTest.addMetricData(inMetricData1)); + + // insert 3nd data + ResourceMetrics resourceMetrics2 = + prepareMetric("double_num_calls", "number of calls", 3.5, "Gauge"); + List inMetricData2 = OtlpProtoToMetricDataConverter.toMetricData(resourceMetrics2); + + // assert that can't add + Assertions.assertFalse(underTest.addMetricData(inMetricData2)); + + // Now read data + List outData = (List) underTest.collectAllMetrics(); + Assertions.assertEquals(1, outData.size()); + Assertions.assertEquals(inMetricData.get(0), outData.get(0)); + + outData = (List) underTest.collectAllMetrics(); + Assertions.assertEquals(1, outData.size()); + Assertions.assertEquals(inMetricData1.get(0), outData.get(0)); + + outData = (List) underTest.collectAllMetrics(); + Assertions.assertEquals(0, outData.size()); + + // reinsert 3rd data point + Assertions.assertTrue(underTest.addMetricData(inMetricData2)); + outData = (List) underTest.collectAllMetrics(); + Assertions.assertEquals(1, outData.size()); + Assertions.assertEquals(inMetricData2.get(0), outData.get(0)); + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/utils/OtlpProtoToMetricDataConverterTest.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/utils/OtlpProtoToMetricDataConverterTest.java index 2ab985e13..fc1c74c1a 100644 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/utils/OtlpProtoToMetricDataConverterTest.java +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/utils/OtlpProtoToMetricDataConverterTest.java @@ -1,23 +1,11 @@ package org.hypertrace.metrics.exporter.utils; import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.DELTA; +import static org.hypertrace.metrics.exporter.utils.ResourceMetricsUtils.prepareMetric; -import io.opentelemetry.proto.common.v1.AnyValue; -import io.opentelemetry.proto.common.v1.InstrumentationLibrary; -import io.opentelemetry.proto.common.v1.KeyValue; -import io.opentelemetry.proto.metrics.v1.AggregationTemporality; -import io.opentelemetry.proto.metrics.v1.Gauge; -import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics; -import io.opentelemetry.proto.metrics.v1.Metric; -import io.opentelemetry.proto.metrics.v1.NumberDataPoint; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; -import io.opentelemetry.proto.metrics.v1.Sum; -import io.opentelemetry.proto.resource.v1.Resource; import io.opentelemetry.sdk.metrics.data.MetricData; import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -109,109 +97,4 @@ public void testSumMetricData() { Assertions.assertEquals( DELTA, underTestMetricData1.get(0).getDoubleSumData().getAggregationTemporality()); } - - private Resource prepareResource() { - return Resource.newBuilder() - .addAttributes( - io.opentelemetry.proto.common.v1.KeyValue.newBuilder() - .setKey("Service") - .setValue( - AnyValue.newBuilder().setStringValue("hypertrace-metrics-exporter").build()) - .build()) - .build(); - } - - private NumberDataPoint prepareNumberDataPoint(Number value) { - List attributes = - toAttributes( - Map.of( - "tenant_id", "__default", - "service_id", "1234", - "api_id", "4567")); - - NumberDataPoint.Builder numberDataPointBuilder = NumberDataPoint.newBuilder(); - numberDataPointBuilder.addAllAttributes(attributes); - numberDataPointBuilder.setTimeUnixNano( - TimeUnit.NANOSECONDS.convert( - 1634119810000L /*2021-10-13:10-10-10 GMT*/, TimeUnit.MILLISECONDS)); - - if (value instanceof Integer) { - numberDataPointBuilder.setAsInt(value.intValue()); - } else { - numberDataPointBuilder.setAsDouble(value.doubleValue()); - } - - return numberDataPointBuilder.build(); - } - - private Metric prepareGaugeMetric(String metricName, String metricDesc, Number value) { - io.opentelemetry.proto.metrics.v1.Metric.Builder metricBuilder = - io.opentelemetry.proto.metrics.v1.Metric.newBuilder(); - metricBuilder.setName(metricName); - metricBuilder.setDescription(metricDesc); - metricBuilder.setUnit("1"); - - NumberDataPoint numberDataPoint = prepareNumberDataPoint(value); - - Gauge.Builder gaugeBuilder = Gauge.newBuilder(); - gaugeBuilder.addDataPoints(numberDataPoint); - metricBuilder.setGauge(gaugeBuilder.build()); - return metricBuilder.build(); - } - - private Metric prepareSumMetric(String metricName, String metricDesc, Number value) { - io.opentelemetry.proto.metrics.v1.Metric.Builder metricBuilder = - io.opentelemetry.proto.metrics.v1.Metric.newBuilder(); - metricBuilder.setName(metricName); - metricBuilder.setDescription(metricDesc); - metricBuilder.setUnit("1"); - - NumberDataPoint numberDataPoint = prepareNumberDataPoint(value); - - Sum.Builder sumBuilder = Sum.newBuilder(); - sumBuilder.addDataPoints(numberDataPoint); - sumBuilder.setAggregationTemporality(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA); - sumBuilder.setIsMonotonic(false); - metricBuilder.setSum(sumBuilder.build()); - return metricBuilder.build(); - } - - private ResourceMetrics prepareMetric( - String metricName, String metricDesc, Number value, String type) { - - ResourceMetrics.Builder resourceMetricsBuilder = ResourceMetrics.newBuilder(); - resourceMetricsBuilder.setResource(prepareResource()); - - Metric metric; - if (type.equals("Gauge")) { - metric = prepareGaugeMetric(metricName, metricDesc, value); - } else { - metric = prepareSumMetric(metricName, metricDesc, value); - } - - resourceMetricsBuilder.addInstrumentationLibraryMetrics( - InstrumentationLibraryMetrics.newBuilder() - .addMetrics(metric) - .setInstrumentationLibrary( - InstrumentationLibrary.newBuilder().setName("Generated").build()) - .build()); - - return resourceMetricsBuilder.build(); - } - - private List toAttributes(Map labels) { - List attributes = - labels.entrySet().stream() - .map( - k -> { - io.opentelemetry.proto.common.v1.KeyValue.Builder keyValueBuilder = - io.opentelemetry.proto.common.v1.KeyValue.newBuilder(); - keyValueBuilder.setKey(k.getKey()); - String value = k.getValue() != null ? k.getValue() : ""; - keyValueBuilder.setValue(AnyValue.newBuilder().setStringValue(value).build()); - return keyValueBuilder.build(); - }) - .collect(Collectors.toList()); - return attributes; - } } diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/utils/ResourceMetricsUtils.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/utils/ResourceMetricsUtils.java new file mode 100644 index 000000000..6ce0b9fea --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/utils/ResourceMetricsUtils.java @@ -0,0 +1,125 @@ +package org.hypertrace.metrics.exporter.utils; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.InstrumentationLibrary; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.AggregationTemporality; +import io.opentelemetry.proto.metrics.v1.Gauge; +import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.Sum; +import io.opentelemetry.proto.resource.v1.Resource; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class ResourceMetricsUtils { + private static Resource prepareResource() { + return Resource.newBuilder() + .addAttributes( + io.opentelemetry.proto.common.v1.KeyValue.newBuilder() + .setKey("Service") + .setValue( + AnyValue.newBuilder().setStringValue("hypertrace-metrics-exporter").build()) + .build()) + .build(); + } + + private static NumberDataPoint prepareNumberDataPoint(Number value) { + List attributes = + toAttributes( + Map.of( + "tenant_id", "__default", + "service_id", "1234", + "api_id", "4567")); + + NumberDataPoint.Builder numberDataPointBuilder = NumberDataPoint.newBuilder(); + numberDataPointBuilder.addAllAttributes(attributes); + numberDataPointBuilder.setTimeUnixNano( + TimeUnit.NANOSECONDS.convert( + 1634119810000L /*2021-10-13:10-10-10 GMT*/, TimeUnit.MILLISECONDS)); + + if (value instanceof Integer) { + numberDataPointBuilder.setAsInt(value.intValue()); + } else { + numberDataPointBuilder.setAsDouble(value.doubleValue()); + } + + return numberDataPointBuilder.build(); + } + + private static List toAttributes( + Map labels) { + List attributes = + labels.entrySet().stream() + .map( + k -> { + io.opentelemetry.proto.common.v1.KeyValue.Builder keyValueBuilder = + io.opentelemetry.proto.common.v1.KeyValue.newBuilder(); + keyValueBuilder.setKey(k.getKey()); + String value = k.getValue() != null ? k.getValue() : ""; + keyValueBuilder.setValue(AnyValue.newBuilder().setStringValue(value).build()); + return keyValueBuilder.build(); + }) + .collect(Collectors.toList()); + return attributes; + } + + private static Metric prepareGaugeMetric(String metricName, String metricDesc, Number value) { + io.opentelemetry.proto.metrics.v1.Metric.Builder metricBuilder = + io.opentelemetry.proto.metrics.v1.Metric.newBuilder(); + metricBuilder.setName(metricName); + metricBuilder.setDescription(metricDesc); + metricBuilder.setUnit("1"); + + NumberDataPoint numberDataPoint = prepareNumberDataPoint(value); + + Gauge.Builder gaugeBuilder = Gauge.newBuilder(); + gaugeBuilder.addDataPoints(numberDataPoint); + metricBuilder.setGauge(gaugeBuilder.build()); + return metricBuilder.build(); + } + + private static Metric prepareSumMetric(String metricName, String metricDesc, Number value) { + io.opentelemetry.proto.metrics.v1.Metric.Builder metricBuilder = + io.opentelemetry.proto.metrics.v1.Metric.newBuilder(); + metricBuilder.setName(metricName); + metricBuilder.setDescription(metricDesc); + metricBuilder.setUnit("1"); + + NumberDataPoint numberDataPoint = prepareNumberDataPoint(value); + + Sum.Builder sumBuilder = Sum.newBuilder(); + sumBuilder.addDataPoints(numberDataPoint); + sumBuilder.setAggregationTemporality(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA); + sumBuilder.setIsMonotonic(false); + metricBuilder.setSum(sumBuilder.build()); + return metricBuilder.build(); + } + + public static ResourceMetrics prepareMetric( + String metricName, String metricDesc, Number value, String type) { + + ResourceMetrics.Builder resourceMetricsBuilder = ResourceMetrics.newBuilder(); + resourceMetricsBuilder.setResource(prepareResource()); + + Metric metric; + if (type.equals("Gauge")) { + metric = prepareGaugeMetric(metricName, metricDesc, value); + } else { + metric = prepareSumMetric(metricName, metricDesc, value); + } + + resourceMetricsBuilder.addInstrumentationLibraryMetrics( + InstrumentationLibraryMetrics.newBuilder() + .addMetrics(metric) + .setInstrumentationLibrary( + InstrumentationLibrary.newBuilder().setName("Generated").build()) + .build()); + + return resourceMetricsBuilder.build(); + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/resources/configs/hypertrace-metrics-exporter/application.conf b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/resources/configs/hypertrace-metrics-exporter/application.conf new file mode 100644 index 000000000..227d90013 --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/resources/configs/hypertrace-metrics-exporter/application.conf @@ -0,0 +1,24 @@ +service.name = hypertrace-metrics-exporter +service.admin.port = 8099 + +main.class = org.hypertrace.metrics.exporter.MetricsExporterService + +input.topic = "enriched-otlp-metrics" + +buffer.config { + max.queue.size = 2 + max.batch.size = 1 +} + +kafka.config = { + application.id = hypertrace-metrics-exporter-job + bootstrap.servers = "localhost:9092" + bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS} +} + +logger.names = ["file"] +logger.file.dir = "/var/logs/metrics-generator" + +metrics.reporter.prefix = org.hypertrace.metrics.exporter.MetricsExporterService +metrics.reporter.names = ["prometheus"] +metrics.reportInterval = 60 \ No newline at end of file