Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add metrics generator for num_calls #286

Merged
merged 17 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions hypertrace-metrics-generator/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
subprojects {
group = "org.hypertrace.metrics.generator"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
plugins {
`java-library`
id("org.hypertrace.publish-plugin")
id("org.hypertrace.avro-plugin")
}

dependencies {
api("org.apache.avro:avro:1.10.2")
}

tasks.named<org.hypertrace.gradle.avro.CheckAvroCompatibility>("avroCompatibilityCheck") {
enabled = false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
@namespace("org.hypertrace.metrics.generator.api")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we still adding avro messages? I thought we had fully switched to proto for new things, even across kafka

Spacing also looks off.

Copy link
Contributor Author

@kotharironak kotharironak Nov 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we still adding Avro messages?

Are you referring to all the new pipeline - alerting and metric or something else?

I will check with @laxman-traceable If I can directly go with java based serialization with StateStore and get away with avro messages here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding - could be wrong - was that we initially used avro over kafka because that's what it had support for, and proto everywhere else. Newer versions have equal levels of proto support, so rather than using two different serialization formats, unifying on just proto for all new messages would be more desirable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked with @laxman-traceable. And as per him, avro goes better with kafka overall, and as you said the newer versions have support for schema registry for proto too. But, as you said, in this metrics pipeline, proto is the main transport layer, so he was also suggesting to go with Proto for consistency. Changing it to proto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

protocol MetricIdentityProtocol {
record MetricIdentity {
long timestamp_millis = 0;
union {null, string} metric_key = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have much context, but what is a metric key? where's the tenant id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tenant_id is part of metric attribute -

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metric_key is UUID from metric_name and set of metric's attribute name:value pair. This is for reducing serde cost of updating the state store on every input message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still trying to understand - would this mean there are no actual attributes available for querying, and that everything needs to be queried by ID only? I couldn't find metrics that all have attribute A for example. How would things like filtering work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Such global queries filtering is possible in PromQL language.
{} refers to filtering expression - https://prometheus.io/docs/prometheus/latest/querying/basics/#instant-vector-selectors

As an example, below query returns all metrics having attribute (or label)
pod with value raw-spans-grouper : {pod="raw-spans-grouper}

However, the filtering expression with a specific metric_name will be able to filter within a group of those series - metric_name{<lable filtering>}.
So, if we are looking for num_calls (is a metric) for a given service (is an attribute), we can express as sum(num_calls{service="1234"})

I had listed out a few mappings here - hypertrace/hypertrace#295 (comment)

Can you give one more example that you are trying to refer to?

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
@namespace("org.hypertrace.metrics.generator.api")
protocol MetricProtocol {
record Metric {
union {null, string} name = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this different than the key? If we're emitting these for every timestamp, wouldn't we want to put as little in the value message as possible - metadata like name description and unit could be stored elsewhere.

Are attributes holding the numbers? If so, why are they typed string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this different than the key?
From this <metric_name> and are used form a key.

So, for a given timestamp window, I just store Metric once, and when we emit the metric, that's the time it is used. Reaming, time for each incoming message, to save serde cost, I used metric_key for aggregating value in a different store.

Are attributes holding the numbers? If so, why are they typed string?
This represents the Prometheus metric name and its labels - https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels

And, they are strings.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, for a given timestamp window, I just store Metric once, and when we emit the metric, that's the time it is used. Reaming, time for each incoming message, to save serde cost, I used metric_key for aggregating value in a different store.

Here I meant, that we're storing the attributes and name in every stored metric timewindow, as I understand it. Is that to support querying by attribute and name, not just the id as I was asking about in the other comment thread?

This represents the Prometheus metric name and its labels - https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels

OK that makes sense, that's what I expected - but where is the actual metric value itself in this message? or it elsewhere? The only numeric-typed thing I see is the timestamp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As part of this PR, I am aggregating num_calls metrics to give a time window (say 15s). During this aggregation, I am storing the intermediate result in the state store <MetricIdentity, Value> while punctuating, I am emitting this value after converting to otlp metrics.

map<string> attributes = {};
union {null, string} description = null;
union {null, string} unit = null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
plugins {
java
application
jacoco
id("org.hypertrace.docker-java-application-plugin")
id("org.hypertrace.docker-publish-plugin")
id("org.hypertrace.jacoco-report-plugin")
}

application {
mainClass.set("org.hypertrace.core.serviceframework.PlatformServiceLauncher")
}

hypertraceDocker {
defaultImage {
javaApplication {
serviceName.set("${project.name}")
adminPort.set(8099)
}
}
}

tasks.test {
useJUnitPlatform()
}

dependencies {
// common and framework
implementation(project(":hypertrace-metrics-generator:hypertrace-metrics-generator-api"))
implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api"))
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.31")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.31")
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21")

// open telemetry proto
implementation("io.opentelemetry:opentelemetry-proto:1.6.0-alpha")

// test
testImplementation("org.junit.jupiter:junit-jupiter:5.7.1")
testImplementation("org.mockito:mockito-core:3.8.0")
testImplementation("org.junit-pioneer:junit-pioneer:1.3.8")
testImplementation("org.apache.kafka:kafka-streams-test-utils:6.0.1-ccs")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package org.hypertrace.metrics.generator;

import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.InstrumentationLibrary;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.Gauge;
import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.resource.v1.Resource;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.KeyValueStore;
import org.hypertrace.metrics.generator.api.Metric;
import org.hypertrace.metrics.generator.api.MetricIdentity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetricEmitPunctuator implements Punctuator {
private static final Logger LOGGER = LoggerFactory.getLogger(MetricEmitPunctuator.class);

private static final String RESOURCE_KEY_SERVICE = "service";
private static final String RESOURCE_KEY_SERVICE_VALUE = "metrics-generator";
private static final String INSTRUMENTATION_LIB_NAME = "Generated-From-View";

private MetricIdentity key;
private ProcessorContext context;
private KeyValueStore<MetricIdentity, Long> metricIdentityStore;
private KeyValueStore<MetricIdentity, Metric> metricStore;
private Cancellable cancellable;
private To outputTopicProducer;

public MetricEmitPunctuator(
MetricIdentity key,
ProcessorContext context,
KeyValueStore<MetricIdentity, Long> metricIdentityStore,
KeyValueStore<MetricIdentity, Metric> metricStore,
To outputTopicProducer) {
this.key = key;
this.context = context;
this.metricIdentityStore = metricIdentityStore;
this.metricStore = metricStore;
this.outputTopicProducer = outputTopicProducer;
}

public void setCancellable(Cancellable cancellable) {
this.cancellable = cancellable;
}

@Override
public void punctuate(long timestamp) {
// always cancel the punctuator else it will get re-scheduled automatically
cancellable.cancel();

// read the value from a key
Long value = metricIdentityStore.get(this.key);
if (value != null) {
long diff = timestamp - this.key.getTimestampMillis();
LOGGER.debug("Metrics with key:{} is emitted after duration {}", this.key, diff);

Metric metric = metricStore.get(this.key);
metricIdentityStore.delete(this.key);
metricStore.delete(this.key);
// convert to Resource Metrics
ResourceMetrics resourceMetrics = convertToResourceMetric(this.key, value, metric);
context.forward(null, resourceMetrics, outputTopicProducer);
} else {
LOGGER.debug("The value for metrics with key:{} is null", this.key);
}
}

private ResourceMetrics convertToResourceMetric(
MetricIdentity metricIdentity, Long value, Metric metric) {
ResourceMetrics.Builder resourceMetricsBuilder = ResourceMetrics.newBuilder();
resourceMetricsBuilder.setResource(
Resource.newBuilder()
.addAttributes(
io.opentelemetry.proto.common.v1.KeyValue.newBuilder()
.setKey(RESOURCE_KEY_SERVICE)
.setValue(
AnyValue.newBuilder().setStringValue(RESOURCE_KEY_SERVICE_VALUE).build())
.build()));

io.opentelemetry.proto.metrics.v1.Metric.Builder metricBuilder =
io.opentelemetry.proto.metrics.v1.Metric.newBuilder();
metricBuilder.setName(metric.getName());
metricBuilder.setDescription(metric.getDescription());
metricBuilder.setUnit(metric.getUnit());

NumberDataPoint.Builder numberDataPointBuilder = NumberDataPoint.newBuilder();
List<KeyValue> attributes = toAttributes(metric.getAttributes());
numberDataPointBuilder.addAllAttributes(attributes);
numberDataPointBuilder.setTimeUnixNano(
TimeUnit.NANOSECONDS.convert(metricIdentity.getTimestampMillis(), TimeUnit.MILLISECONDS));
numberDataPointBuilder.setAsInt(value);

Gauge.Builder gaugeBuilder = Gauge.newBuilder();
gaugeBuilder.addDataPoints(numberDataPointBuilder.build());
metricBuilder.setGauge(gaugeBuilder.build());

resourceMetricsBuilder.addInstrumentationLibraryMetrics(
InstrumentationLibraryMetrics.newBuilder()
.addMetrics(metricBuilder.build())
.setInstrumentationLibrary(
InstrumentationLibrary.newBuilder().setName(INSTRUMENTATION_LIB_NAME).build())
.build());

return resourceMetricsBuilder.build();
}

private List<io.opentelemetry.proto.common.v1.KeyValue> toAttributes(Map<String, String> labels) {
List<io.opentelemetry.proto.common.v1.KeyValue> attributes =
labels.entrySet().stream()
.map(
k -> {
io.opentelemetry.proto.common.v1.KeyValue.Builder keyValueBuilder =
io.opentelemetry.proto.common.v1.KeyValue.newBuilder();
keyValueBuilder.setKey(k.getKey());
String value = k.getValue() != null ? k.getValue() : "";
keyValueBuilder.setValue(AnyValue.newBuilder().setStringValue(value).build());
return keyValueBuilder.build();
})
.collect(Collectors.toList());
return attributes;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package org.hypertrace.metrics.generator;

import com.typesafe.config.Config;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serdes.LongSerde;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp;
import org.hypertrace.core.serviceframework.config.ConfigClient;
import org.hypertrace.metrics.generator.api.Metric;
import org.hypertrace.metrics.generator.api.MetricIdentity;
import org.hypertrace.viewgenerator.api.RawServiceView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetricsGenerator extends KafkaStreamsApp {
private static final Logger LOGGER = LoggerFactory.getLogger(MetricsGenerator.class);
public static final String INPUT_TOPIC_CONFIG_KEY = "input.topic";
public static final String OUTPUT_TOPIC_CONFIG_KEY = "output.topic";
public static final String METRICS_GENERATOR_JOB_CONFIG = "metrics-generator-job-config";
public static final String METRICS_IDENTITY_STORE = "metric-identity-store";
public static final String METRICS_IDENTITY_VALUE_STORE = "metric-identity-value-Store";
public static final String OUTPUT_TOPIC_PRODUCER = "output-topic-producer";

public MetricsGenerator(ConfigClient configClient) {
super(configClient);
}

@Override
public StreamsBuilder buildTopology(
Map<String, Object> streamsProperties,
StreamsBuilder streamsBuilder,
Map<String, KStream<?, ?>> inputStreams) {

Config jobConfig = getJobConfig(streamsProperties);
String inputTopic = jobConfig.getString(INPUT_TOPIC_CONFIG_KEY);
String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY);

KStream<String, RawServiceView> inputStream =
(KStream<String, RawServiceView>) inputStreams.get(inputTopic);
if (inputStream == null) {
inputStream = streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(), (Serde) null));
inputStreams.put(inputTopic, inputStream);
}

// Retrieve the default value serde defined in config and use it
Serde valueSerde = defaultValueSerde(streamsProperties);
Serde keySerde = defaultKeySerde(streamsProperties);

StoreBuilder<KeyValueStore<MetricIdentity, Long>> metricIdentityStoreBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(METRICS_IDENTITY_STORE), keySerde, new LongSerde())
.withCachingEnabled();

StoreBuilder<KeyValueStore<MetricIdentity, Metric>> metricIdentityToValueStoreBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(METRICS_IDENTITY_VALUE_STORE), keySerde, valueSerde)
.withCachingEnabled();

streamsBuilder.addStateStore(metricIdentityStoreBuilder);
streamsBuilder.addStateStore(metricIdentityToValueStoreBuilder);

Produced<byte[], ResourceMetrics> outputTopicProducer =
Produced.with(Serdes.ByteArray(), new OtlpMetricsSerde());
outputTopicProducer = outputTopicProducer.withName(OUTPUT_TOPIC_PRODUCER);

inputStream
.transform(
MetricsProcessor::new,
Named.as(MetricsProcessor.class.getSimpleName()),
METRICS_IDENTITY_STORE,
METRICS_IDENTITY_VALUE_STORE)
.to(outputTopic, outputTopicProducer);

return streamsBuilder;
}

@Override
public String getJobConfigKey() {
return METRICS_GENERATOR_JOB_CONFIG;
}

@Override
public Logger getLogger() {
return LOGGER;
}

@Override
public List<String> getInputTopics(Map<String, Object> properties) {
Config jobConfig = getJobConfig(properties);
return List.of(jobConfig.getString(INPUT_TOPIC_CONFIG_KEY));
}

@Override
public List<String> getOutputTopics(Map<String, Object> properties) {
Config jobConfig = getJobConfig(properties);
return List.of(jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY));
}

private Config getJobConfig(Map<String, Object> properties) {
return (Config) properties.get(getJobConfigKey());
}

private Serde defaultValueSerde(Map<String, Object> properties) {
StreamsConfig config = new StreamsConfig(properties);
return config.defaultValueSerde();
}

private Serde defaultKeySerde(Map<String, Object> properties) {
StreamsConfig config = new StreamsConfig(properties);
return config.defaultKeySerde();
}
}
Loading