-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add metrics generator for num_calls #286
Changes from 12 commits
e1abece
64d4ffc
6f83dcf
262e3ef
a791f4d
12b62d3
4274c3e
6d23be0
9f6ce53
c944c55
55ddb93
d15c56d
b012d7a
0e7fbda
3fdb88e
5577503
b7b4a84
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
subprojects { | ||
group = "org.hypertrace.metrics.generator" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
plugins { | ||
`java-library` | ||
id("org.hypertrace.publish-plugin") | ||
id("org.hypertrace.avro-plugin") | ||
} | ||
|
||
dependencies { | ||
api("org.apache.avro:avro:1.10.2") | ||
} | ||
|
||
tasks.named<org.hypertrace.gradle.avro.CheckAvroCompatibility>("avroCompatibilityCheck") { | ||
enabled = false | ||
} |
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
@@ -0,0 +1,7 @@ | ||||
@namespace("org.hypertrace.metrics.generator.api") | ||||
protocol MetricIdentityProtocol { | ||||
record MetricIdentity { | ||||
long timestamp_millis = 0; | ||||
union {null, string} metric_key = null; | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't have much context, but what is a metric key? where's the tenant id? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tenant_id is part of metric attribute - Line 77 in 9f6ce53
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. metric_key is UUID from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. still trying to understand - would this mean there are no actual attributes available for querying, and that everything needs to be queried by ID only? I couldn't find metrics that all have attribute A for example. How would things like filtering work? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Such global queries filtering is possible in PromQL language. As an example, below query returns all metrics having attribute (or label) However, the filtering expression with a specific metric_name will be able to filter within a group of those series - I had listed out a few mappings here - hypertrace/hypertrace#295 (comment) Can you give one more example that you are trying to refer to? |
||||
} | ||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
@namespace("org.hypertrace.metrics.generator.api") | ||
protocol MetricProtocol { | ||
record Metric { | ||
union {null, string} name = null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this different than the key? If we're emitting these for every timestamp, wouldn't we want to put as little in the value message as possible - metadata like name description and unit could be stored elsewhere. Are attributes holding the numbers? If so, why are they typed string? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
So, for a given timestamp window, I just store
And, they are strings. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Here I meant, that we're storing the attributes and name in every stored metric timewindow, as I understand it. Is that to support querying by attribute and name, not just the id as I was asking about in the other comment thread?
OK that makes sense, that's what I expected - but where is the actual metric value itself in this message? or it elsewhere? The only numeric-typed thing I see is the timestamp. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As part of this PR, I am aggregating |
||
map<string> attributes = {}; | ||
union {null, string} description = null; | ||
union {null, string} unit = null; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
plugins { | ||
java | ||
application | ||
jacoco | ||
id("org.hypertrace.docker-java-application-plugin") | ||
id("org.hypertrace.docker-publish-plugin") | ||
id("org.hypertrace.jacoco-report-plugin") | ||
} | ||
|
||
application { | ||
mainClass.set("org.hypertrace.core.serviceframework.PlatformServiceLauncher") | ||
} | ||
|
||
hypertraceDocker { | ||
defaultImage { | ||
javaApplication { | ||
serviceName.set("${project.name}") | ||
adminPort.set(8099) | ||
} | ||
} | ||
} | ||
|
||
tasks.test { | ||
useJUnitPlatform() | ||
} | ||
|
||
dependencies { | ||
// common and framework | ||
implementation(project(":hypertrace-metrics-generator:hypertrace-metrics-generator-api")) | ||
implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api")) | ||
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.31") | ||
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.31") | ||
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") | ||
|
||
// open telemetry proto | ||
implementation("io.opentelemetry:opentelemetry-proto:1.6.0-alpha") | ||
|
||
// test | ||
testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") | ||
testImplementation("org.mockito:mockito-core:3.8.0") | ||
testImplementation("org.junit-pioneer:junit-pioneer:1.3.8") | ||
testImplementation("org.apache.kafka:kafka-streams-test-utils:6.0.1-ccs") | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
package org.hypertrace.metrics.generator; | ||
|
||
import io.opentelemetry.proto.common.v1.AnyValue; | ||
import io.opentelemetry.proto.common.v1.InstrumentationLibrary; | ||
import io.opentelemetry.proto.common.v1.KeyValue; | ||
import io.opentelemetry.proto.metrics.v1.Gauge; | ||
import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics; | ||
import io.opentelemetry.proto.metrics.v1.NumberDataPoint; | ||
import io.opentelemetry.proto.metrics.v1.ResourceMetrics; | ||
import io.opentelemetry.proto.resource.v1.Resource; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.stream.Collectors; | ||
import org.apache.kafka.streams.processor.Cancellable; | ||
import org.apache.kafka.streams.processor.ProcessorContext; | ||
import org.apache.kafka.streams.processor.Punctuator; | ||
import org.apache.kafka.streams.processor.To; | ||
import org.apache.kafka.streams.state.KeyValueStore; | ||
import org.hypertrace.metrics.generator.api.Metric; | ||
import org.hypertrace.metrics.generator.api.MetricIdentity; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class MetricEmitPunctuator implements Punctuator { | ||
private static final Logger LOGGER = LoggerFactory.getLogger(MetricEmitPunctuator.class); | ||
|
||
private static final String RESOURCE_KEY_SERVICE = "service"; | ||
private static final String RESOURCE_KEY_SERVICE_VALUE = "metrics-generator"; | ||
private static final String INSTRUMENTATION_LIB_NAME = "Generated-From-View"; | ||
|
||
private MetricIdentity key; | ||
private ProcessorContext context; | ||
private KeyValueStore<MetricIdentity, Long> metricIdentityStore; | ||
private KeyValueStore<MetricIdentity, Metric> metricStore; | ||
private Cancellable cancellable; | ||
private To outputTopicProducer; | ||
|
||
public MetricEmitPunctuator( | ||
MetricIdentity key, | ||
ProcessorContext context, | ||
KeyValueStore<MetricIdentity, Long> metricIdentityStore, | ||
KeyValueStore<MetricIdentity, Metric> metricStore, | ||
To outputTopicProducer) { | ||
this.key = key; | ||
this.context = context; | ||
this.metricIdentityStore = metricIdentityStore; | ||
this.metricStore = metricStore; | ||
this.outputTopicProducer = outputTopicProducer; | ||
} | ||
|
||
public void setCancellable(Cancellable cancellable) { | ||
this.cancellable = cancellable; | ||
} | ||
|
||
@Override | ||
public void punctuate(long timestamp) { | ||
// always cancel the punctuator else it will get re-scheduled automatically | ||
cancellable.cancel(); | ||
|
||
// read the value from a key | ||
Long value = metricIdentityStore.get(this.key); | ||
if (value != null) { | ||
long diff = timestamp - this.key.getTimestampMillis(); | ||
LOGGER.debug("Metrics with key:{} is emitted after duration {}", this.key, diff); | ||
|
||
Metric metric = metricStore.get(this.key); | ||
metricIdentityStore.delete(this.key); | ||
metricStore.delete(this.key); | ||
// convert to Resource Metrics | ||
ResourceMetrics resourceMetrics = convertToResourceMetric(this.key, value, metric); | ||
context.forward(null, resourceMetrics, outputTopicProducer); | ||
} else { | ||
LOGGER.debug("The value for metrics with key:{} is null", this.key); | ||
} | ||
} | ||
|
||
private ResourceMetrics convertToResourceMetric( | ||
MetricIdentity metricIdentity, Long value, Metric metric) { | ||
ResourceMetrics.Builder resourceMetricsBuilder = ResourceMetrics.newBuilder(); | ||
resourceMetricsBuilder.setResource( | ||
Resource.newBuilder() | ||
.addAttributes( | ||
io.opentelemetry.proto.common.v1.KeyValue.newBuilder() | ||
.setKey(RESOURCE_KEY_SERVICE) | ||
.setValue( | ||
AnyValue.newBuilder().setStringValue(RESOURCE_KEY_SERVICE_VALUE).build()) | ||
.build())); | ||
|
||
io.opentelemetry.proto.metrics.v1.Metric.Builder metricBuilder = | ||
io.opentelemetry.proto.metrics.v1.Metric.newBuilder(); | ||
metricBuilder.setName(metric.getName()); | ||
metricBuilder.setDescription(metric.getDescription()); | ||
metricBuilder.setUnit(metric.getUnit()); | ||
|
||
NumberDataPoint.Builder numberDataPointBuilder = NumberDataPoint.newBuilder(); | ||
List<KeyValue> attributes = toAttributes(metric.getAttributes()); | ||
numberDataPointBuilder.addAllAttributes(attributes); | ||
numberDataPointBuilder.setTimeUnixNano( | ||
TimeUnit.NANOSECONDS.convert(metricIdentity.getTimestampMillis(), TimeUnit.MILLISECONDS)); | ||
numberDataPointBuilder.setAsInt(value); | ||
|
||
Gauge.Builder gaugeBuilder = Gauge.newBuilder(); | ||
gaugeBuilder.addDataPoints(numberDataPointBuilder.build()); | ||
metricBuilder.setGauge(gaugeBuilder.build()); | ||
|
||
resourceMetricsBuilder.addInstrumentationLibraryMetrics( | ||
InstrumentationLibraryMetrics.newBuilder() | ||
.addMetrics(metricBuilder.build()) | ||
.setInstrumentationLibrary( | ||
InstrumentationLibrary.newBuilder().setName(INSTRUMENTATION_LIB_NAME).build()) | ||
.build()); | ||
|
||
return resourceMetricsBuilder.build(); | ||
} | ||
|
||
private List<io.opentelemetry.proto.common.v1.KeyValue> toAttributes(Map<String, String> labels) { | ||
List<io.opentelemetry.proto.common.v1.KeyValue> attributes = | ||
labels.entrySet().stream() | ||
.map( | ||
k -> { | ||
io.opentelemetry.proto.common.v1.KeyValue.Builder keyValueBuilder = | ||
io.opentelemetry.proto.common.v1.KeyValue.newBuilder(); | ||
keyValueBuilder.setKey(k.getKey()); | ||
String value = k.getValue() != null ? k.getValue() : ""; | ||
keyValueBuilder.setValue(AnyValue.newBuilder().setStringValue(value).build()); | ||
return keyValueBuilder.build(); | ||
}) | ||
.collect(Collectors.toList()); | ||
return attributes; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
package org.hypertrace.metrics.generator; | ||
|
||
import com.typesafe.config.Config; | ||
import io.opentelemetry.proto.metrics.v1.ResourceMetrics; | ||
import java.util.List; | ||
import java.util.Map; | ||
import org.apache.kafka.common.serialization.Serde; | ||
import org.apache.kafka.common.serialization.Serdes; | ||
import org.apache.kafka.common.serialization.Serdes.LongSerde; | ||
import org.apache.kafka.streams.StreamsBuilder; | ||
import org.apache.kafka.streams.StreamsConfig; | ||
import org.apache.kafka.streams.kstream.Consumed; | ||
import org.apache.kafka.streams.kstream.KStream; | ||
import org.apache.kafka.streams.kstream.Named; | ||
import org.apache.kafka.streams.kstream.Produced; | ||
import org.apache.kafka.streams.state.KeyValueStore; | ||
import org.apache.kafka.streams.state.StoreBuilder; | ||
import org.apache.kafka.streams.state.Stores; | ||
import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp; | ||
import org.hypertrace.core.serviceframework.config.ConfigClient; | ||
import org.hypertrace.metrics.generator.api.Metric; | ||
import org.hypertrace.metrics.generator.api.MetricIdentity; | ||
import org.hypertrace.viewgenerator.api.RawServiceView; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class MetricsGenerator extends KafkaStreamsApp { | ||
private static final Logger LOGGER = LoggerFactory.getLogger(MetricsGenerator.class); | ||
public static final String INPUT_TOPIC_CONFIG_KEY = "input.topic"; | ||
public static final String OUTPUT_TOPIC_CONFIG_KEY = "output.topic"; | ||
public static final String METRICS_GENERATOR_JOB_CONFIG = "metrics-generator-job-config"; | ||
public static final String METRICS_IDENTITY_STORE = "metric-identity-store"; | ||
public static final String METRICS_IDENTITY_VALUE_STORE = "metric-identity-value-Store"; | ||
public static final String OUTPUT_TOPIC_PRODUCER = "output-topic-producer"; | ||
|
||
public MetricsGenerator(ConfigClient configClient) { | ||
super(configClient); | ||
} | ||
|
||
@Override | ||
public StreamsBuilder buildTopology( | ||
Map<String, Object> streamsProperties, | ||
StreamsBuilder streamsBuilder, | ||
Map<String, KStream<?, ?>> inputStreams) { | ||
|
||
Config jobConfig = getJobConfig(streamsProperties); | ||
String inputTopic = jobConfig.getString(INPUT_TOPIC_CONFIG_KEY); | ||
String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY); | ||
|
||
KStream<String, RawServiceView> inputStream = | ||
(KStream<String, RawServiceView>) inputStreams.get(inputTopic); | ||
if (inputStream == null) { | ||
inputStream = streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(), (Serde) null)); | ||
inputStreams.put(inputTopic, inputStream); | ||
} | ||
|
||
// Retrieve the default value serde defined in config and use it | ||
Serde valueSerde = defaultValueSerde(streamsProperties); | ||
Serde keySerde = defaultKeySerde(streamsProperties); | ||
|
||
StoreBuilder<KeyValueStore<MetricIdentity, Long>> metricIdentityStoreBuilder = | ||
Stores.keyValueStoreBuilder( | ||
Stores.persistentKeyValueStore(METRICS_IDENTITY_STORE), keySerde, new LongSerde()) | ||
.withCachingEnabled(); | ||
|
||
StoreBuilder<KeyValueStore<MetricIdentity, Metric>> metricIdentityToValueStoreBuilder = | ||
Stores.keyValueStoreBuilder( | ||
Stores.persistentKeyValueStore(METRICS_IDENTITY_VALUE_STORE), keySerde, valueSerde) | ||
.withCachingEnabled(); | ||
|
||
streamsBuilder.addStateStore(metricIdentityStoreBuilder); | ||
streamsBuilder.addStateStore(metricIdentityToValueStoreBuilder); | ||
|
||
Produced<byte[], ResourceMetrics> outputTopicProducer = | ||
Produced.with(Serdes.ByteArray(), new OtlpMetricsSerde()); | ||
outputTopicProducer = outputTopicProducer.withName(OUTPUT_TOPIC_PRODUCER); | ||
|
||
inputStream | ||
.transform( | ||
MetricsProcessor::new, | ||
Named.as(MetricsProcessor.class.getSimpleName()), | ||
METRICS_IDENTITY_STORE, | ||
METRICS_IDENTITY_VALUE_STORE) | ||
.to(outputTopic, outputTopicProducer); | ||
|
||
return streamsBuilder; | ||
} | ||
|
||
@Override | ||
public String getJobConfigKey() { | ||
return METRICS_GENERATOR_JOB_CONFIG; | ||
} | ||
|
||
@Override | ||
public Logger getLogger() { | ||
return LOGGER; | ||
} | ||
|
||
@Override | ||
public List<String> getInputTopics(Map<String, Object> properties) { | ||
Config jobConfig = getJobConfig(properties); | ||
return List.of(jobConfig.getString(INPUT_TOPIC_CONFIG_KEY)); | ||
} | ||
|
||
@Override | ||
public List<String> getOutputTopics(Map<String, Object> properties) { | ||
Config jobConfig = getJobConfig(properties); | ||
return List.of(jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY)); | ||
} | ||
|
||
private Config getJobConfig(Map<String, Object> properties) { | ||
return (Config) properties.get(getJobConfigKey()); | ||
} | ||
|
||
private Serde defaultValueSerde(Map<String, Object> properties) { | ||
StreamsConfig config = new StreamsConfig(properties); | ||
return config.defaultValueSerde(); | ||
} | ||
|
||
private Serde defaultKeySerde(Map<String, Object> properties) { | ||
StreamsConfig config = new StreamsConfig(properties); | ||
return config.defaultKeySerde(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we still adding avro messages? I thought we had fully switched to proto for new things, even across kafka
Spacing also looks off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you referring to all the new pipeline - alerting and metric or something else?
I will check with @laxman-traceable If I can directly go with java based serialization with StateStore and get away with avro messages here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding - could be wrong - was that we initially used avro over kafka because that's what it had support for, and proto everywhere else. Newer versions have equal levels of proto support, so rather than using two different serialization formats, unifying on just proto for all new messages would be more desirable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Talked with @laxman-traceable. And as per him, avro goes better with kafka overall, and as you said the newer versions have support for schema registry for proto too. But, as you said, in this metrics pipeline, proto is the main transport layer, so he was also suggesting to go with Proto for consistency. Changing it to proto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done