Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add metrics generator for num_calls #286

Merged
merged 17 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions hypertrace-metrics-generator/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
subprojects {
group = "org.hypertrace.metrics.generator"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import com.google.protobuf.gradle.protobuf
import com.google.protobuf.gradle.protoc

plugins {
`java-library`
id("com.google.protobuf") version "0.8.15"
}

protobuf {
protoc {
artifact = "com.google.protobuf:protoc:3.17.3"
}
}

sourceSets {
main {
java {
srcDirs("src/main/java", "build/generated/source/proto/main/java")
}
}
}

dependencies {
implementation("com.google.protobuf:protobuf-java:3.17.3")
implementation("org.apache.kafka:kafka-clients:6.0.1-ccs")
implementation("io.opentelemetry:opentelemetry-proto:1.6.0-alpha")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.hypertrace.metrics.generator.api.v1.serde;

import com.google.protobuf.InvalidProtocolBufferException;
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.hypertrace.metrics.generator.api.v1.MetricIdentity;

public class MetricIdentitySerde implements Serde<MetricIdentity> {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {}

@Override
public void close() {}

@Override
public Serializer<MetricIdentity> serializer() {
return new MetricIdentitySerde.MetricIdentitySerializer();
}

@Override
public Deserializer<MetricIdentity> deserializer() {
return new MetricIdentitySerde.MetricIdentityDeserializer();
}

private static class MetricIdentitySerializer implements Serializer<MetricIdentity> {
@Override
public byte[] serialize(String topic, MetricIdentity data) {
try {
return data.toByteArray();
} catch (Exception e) {
// ignore error
}
return null;
}
}

private static class MetricIdentityDeserializer implements Deserializer<MetricIdentity> {
@Override
public MetricIdentity deserialize(String topic, byte[] data) {
try {
return MetricIdentity.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.hypertrace.metrics.generator.api.v1.serde;

import com.google.protobuf.InvalidProtocolBufferException;
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.hypertrace.metrics.generator.api.v1.Metric;

public class MetricSerde implements Serde<Metric> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this PR, I have added custom protobuf serde for Metric, MetricIdentity, and OtlpMetrics. I am facing some issues while using kafka stream protobuf serde - had logged the ticket - hypertrace/hypertrace#312

Working on this as a follow-up. This will remove these custom objects

@Override
public void configure(Map<String, ?> configs, boolean isKey) {}

@Override
public void close() {}

@Override
public Serializer<Metric> serializer() {
return new MetricSerde.MetricSerializer();
}

@Override
public Deserializer<Metric> deserializer() {
return new MetricSerde.MetricDeserializer();
}

private static class MetricSerializer implements Serializer<Metric> {
@Override
public byte[] serialize(String topic, Metric data) {
try {
return data.toByteArray();
} catch (Exception e) {
// ignore error
}
return null;
}
}

private static class MetricDeserializer implements Deserializer<Metric> {
@Override
public Metric deserialize(String topic, byte[] data) {
try {
return Metric.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.hypertrace.metrics.generator.api.v1.serde;

import com.google.protobuf.InvalidProtocolBufferException;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OtlpMetricsSerde implements Serde<ResourceMetrics> {
private static final Logger LOGGER = LoggerFactory.getLogger(OtlpMetricsSerde.class);

@Override
public void configure(Map<String, ?> configs, boolean isKey) {}

@Override
public void close() {}

@Override
public Serializer<ResourceMetrics> serializer() {
return new OtlpMetricsSerde.OtlpMetricsSerializer();
}

@Override
public Deserializer<ResourceMetrics> deserializer() {
return new OtlpMetricsSerde.OtlpMetricsDeserializer();
}

private static class OtlpMetricsSerializer implements Serializer<ResourceMetrics> {
@Override
public byte[] serialize(String topic, ResourceMetrics data) {
try {
return data.toByteArray();
} catch (Exception e) {
LOGGER.error("serialization error:", e);
}
return null;
}
}

private static class OtlpMetricsDeserializer implements Deserializer<ResourceMetrics> {
@Override
public ResourceMetrics deserialize(String topic, byte[] data) {
try {
LOGGER.info("deserialize:{}", OtlpMetricsSerde.class.getName());
return ResourceMetrics.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
LOGGER.error("error:", e);
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
syntax = "proto3";

option java_multiple_files = true;

package org.hypertrace.metrics.generator.api.v1;


message MetricIdentity {
sfixed64 timestamp_millis = 1;
string metric_key = 2;
}

message Metric {
string name = 1;
map<string, string> attributes = 2;
string description = 3;
string unit = 4;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
plugins {
java
application
jacoco
id("org.hypertrace.docker-java-application-plugin")
id("org.hypertrace.docker-publish-plugin")
id("org.hypertrace.jacoco-report-plugin")
}

application {
mainClass.set("org.hypertrace.core.serviceframework.PlatformServiceLauncher")
}

hypertraceDocker {
defaultImage {
javaApplication {
serviceName.set("${project.name}")
adminPort.set(8099)
}
}
}

tasks.test {
useJUnitPlatform()
}

dependencies {
// common and framework
implementation(project(":hypertrace-metrics-generator:hypertrace-metrics-generator-api"))
implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api"))
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.31")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.31")
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21")

// open telemetry proto
implementation("io.opentelemetry:opentelemetry-proto:1.6.0-alpha")

// all constraints
constraints {
implementation("org.glassfish.jersey.core:jersey-common:2.34") {
because("https://snyk.io/vuln/SNYK-JAVA-ORGGLASSFISHJERSEYCORE-1255637")
}
}

// test
testImplementation("org.junit.jupiter:junit-jupiter:5.7.1")
testImplementation("org.mockito:mockito-core:3.8.0")
testImplementation("org.junit-pioneer:junit-pioneer:1.3.8")
testImplementation("org.apache.kafka:kafka-streams-test-utils:6.0.1-ccs")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package org.hypertrace.metrics.generator;

import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.InstrumentationLibrary;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.Gauge;
import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.resource.v1.Resource;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.KeyValueStore;
import org.hypertrace.metrics.generator.api.v1.Metric;
import org.hypertrace.metrics.generator.api.v1.MetricIdentity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetricEmitPunctuator implements Punctuator {
private static final Logger LOGGER = LoggerFactory.getLogger(MetricEmitPunctuator.class);

private static final String RESOURCE_KEY_SERVICE = "service";
private static final String RESOURCE_KEY_SERVICE_VALUE = "metrics-generator";
private static final String INSTRUMENTATION_LIB_NAME = "Generated-From-View";

private MetricIdentity key;
private ProcessorContext context;
private KeyValueStore<MetricIdentity, Long> metricIdentityStore;
private KeyValueStore<MetricIdentity, Metric> metricStore;
private Cancellable cancellable;
private To outputTopicProducer;

public MetricEmitPunctuator(
MetricIdentity key,
ProcessorContext context,
KeyValueStore<MetricIdentity, Long> metricIdentityStore,
KeyValueStore<MetricIdentity, Metric> metricStore,
To outputTopicProducer) {
this.key = key;
this.context = context;
this.metricIdentityStore = metricIdentityStore;
this.metricStore = metricStore;
this.outputTopicProducer = outputTopicProducer;
}

public void setCancellable(Cancellable cancellable) {
this.cancellable = cancellable;
}

@Override
public void punctuate(long timestamp) {
// always cancel the punctuator else it will get re-scheduled automatically
cancellable.cancel();

// read the value from a key
Long value = metricIdentityStore.get(this.key);
if (value != null) {
long diff = timestamp - this.key.getTimestampMillis();
LOGGER.debug("Metrics with key:{} is emitted after duration {}", this.key, diff);

Metric metric = metricStore.get(this.key);
metricIdentityStore.delete(this.key);
metricStore.delete(this.key);
// convert to Resource Metrics
ResourceMetrics resourceMetrics = convertToResourceMetric(this.key, value, metric);
context.forward(null, resourceMetrics, outputTopicProducer);
} else {
LOGGER.debug("The value for metrics with key:{} is null", this.key);
}
}

private ResourceMetrics convertToResourceMetric(
MetricIdentity metricIdentity, Long value, Metric metric) {
ResourceMetrics.Builder resourceMetricsBuilder = ResourceMetrics.newBuilder();
resourceMetricsBuilder.setResource(
Resource.newBuilder()
.addAttributes(
io.opentelemetry.proto.common.v1.KeyValue.newBuilder()
.setKey(RESOURCE_KEY_SERVICE)
.setValue(
AnyValue.newBuilder().setStringValue(RESOURCE_KEY_SERVICE_VALUE).build())
.build()));

io.opentelemetry.proto.metrics.v1.Metric.Builder metricBuilder =
io.opentelemetry.proto.metrics.v1.Metric.newBuilder();
metricBuilder.setName(metric.getName());
metricBuilder.setDescription(metric.getDescription());
metricBuilder.setUnit(metric.getUnit());

NumberDataPoint.Builder numberDataPointBuilder = NumberDataPoint.newBuilder();
List<KeyValue> attributes = toAttributes(metric.getAttributes());
numberDataPointBuilder.addAllAttributes(attributes);
numberDataPointBuilder.setTimeUnixNano(
TimeUnit.NANOSECONDS.convert(metricIdentity.getTimestampMillis(), TimeUnit.MILLISECONDS));
numberDataPointBuilder.setAsInt(value);

Gauge.Builder gaugeBuilder = Gauge.newBuilder();
gaugeBuilder.addDataPoints(numberDataPointBuilder.build());
metricBuilder.setGauge(gaugeBuilder.build());

resourceMetricsBuilder.addInstrumentationLibraryMetrics(
InstrumentationLibraryMetrics.newBuilder()
.addMetrics(metricBuilder.build())
.setInstrumentationLibrary(
InstrumentationLibrary.newBuilder().setName(INSTRUMENTATION_LIB_NAME).build())
.build());

return resourceMetricsBuilder.build();
}

private List<io.opentelemetry.proto.common.v1.KeyValue> toAttributes(Map<String, String> labels) {
List<io.opentelemetry.proto.common.v1.KeyValue> attributes =
labels.entrySet().stream()
.map(
k -> {
io.opentelemetry.proto.common.v1.KeyValue.Builder keyValueBuilder =
io.opentelemetry.proto.common.v1.KeyValue.newBuilder();
keyValueBuilder.setKey(k.getKey());
String value = k.getValue() != null ? k.getValue() : "";
keyValueBuilder.setValue(AnyValue.newBuilder().setStringValue(value).build());
return keyValueBuilder.build();
})
.collect(Collectors.toList());
return attributes;
}
}
Loading