Skip to content

Commit

Permalink
Merge pull request #1170 from newrelic/NEWRELIC-57520-kafka-streams
Browse files Browse the repository at this point in the history
Newrelic-57520 kafka streams
  • Loading branch information
obenkenobi authored Mar 8, 2023
2 parents 258ab8d + bf59176 commit e7fd205
Show file tree
Hide file tree
Showing 56 changed files with 2,282 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
@InstrumentationTestConfig(includePrefixes = "org.apache.kafka")
public class Kafka3MessageTest {
@Rule
public KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.0.0"));
public KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.0"));

private final String TOPIC = "life-universe-everything";
private final String ANOTHER_TOPIC = "vogon-poetry";
Expand Down Expand Up @@ -84,7 +84,7 @@ private boolean readMessages() {
consumer.subscribe(Collections.singleton(TOPIC));

// setting a timeout so this does not drag forever if something goes wrong.
long waitUntil = System.currentTimeMillis() + 5000L;
long waitUntil = System.currentTimeMillis() + 15000L;
while (waitUntil > System.currentTimeMillis()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
messagesRead += records.count();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,17 @@

package org.apache.kafka.clients.producer;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.Future;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.agent.bridge.NoOpDistributedTracePayload;
import com.newrelic.agent.bridge.Transaction;
import com.newrelic.api.agent.DistributedTracePayload;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.Transaction;
import com.newrelic.api.agent.weaver.NewField;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.Future;

@Weave(originalName = "org.apache.kafka.clients.producer.KafkaProducer")
public class KafkaProducer_Instrumentation<K, V> {

Expand Down
22 changes: 22 additions & 0 deletions instrumentation/kafka-streams-metrics-1.0.0/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
jar {
manifest {
attributes 'Implementation-Title': 'com.newrelic.instrumentation.kafka-streams-metrics-1.0.0',
'Implementation-Title-Alias': 'kafka-streams-metrics'
}
}

dependencies {
implementation(project(":agent-bridge"))
implementation("org.apache.kafka:kafka-streams:1.0.0")

testImplementation("org.testcontainers:kafka:1.16.3")
}

verifyInstrumentation {
passesOnly 'org.apache.kafka:kafka-streams:[1.0.0,)'
}

site {
title 'Kafka'
type 'Messaging'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
*
* * Copyright 2023 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/
package com.nr.instrumentation.kafka.streams;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.NewRelic;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;

public class NewRelicMetricsReporter implements MetricsReporter {

private static final boolean KAFKA_METRICS_DEBUG = NewRelic.getAgent().getConfig().getValue("kafka.metrics.debug.enabled", false);

private static final boolean METRICS_AS_EVENTS = NewRelic.getAgent().getConfig().getValue("kafka.metrics.as_events.enabled", false);

private static final long REPORTING_INTERVAL_IN_SECONDS = NewRelic.getAgent().getConfig().getValue("kafka.metrics.interval", 30);

private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, buildThreadFactory("com.nr.instrumentation.kafka.streams.NewRelicMetricsReporter-%d"));

private final Map<String, KafkaMetric> metrics = new ConcurrentHashMap<>();

@Override
public void init(final List<KafkaMetric> initMetrics) {
for (KafkaMetric kafkaMetric : initMetrics) {
String metricGroupAndName = getMetricGroupAndName(kafkaMetric);
if (KAFKA_METRICS_DEBUG) {
AgentBridge.getAgent().getLogger().log(Level.FINEST, "init(): {0} = {1}", metricGroupAndName, kafkaMetric.metricName());
}
metrics.put(metricGroupAndName, kafkaMetric);
}

final String metricPrefix = "Kafka/Streams/";
executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
Map<String, Object> eventData = new HashMap<>();
for (final Map.Entry<String, KafkaMetric> metric : metrics.entrySet()) {
Object metricValue = metric.getValue().metricValue();
if (metricValue instanceof Double) {
final float value = ((Double) metricValue).floatValue();
if (KAFKA_METRICS_DEBUG) {
AgentBridge.getAgent().getLogger().log(Level.FINEST, "getMetric: {0} = {1}", metric.getKey(), value);
}
if (!Float.isNaN(value) && !Float.isInfinite(value)) {
if (METRICS_AS_EVENTS) {
eventData.put(metric.getKey().replace('/', '.'), value);
} else {
NewRelic.recordMetric(metricPrefix + metric.getKey(), value);
}
}
}
}

if (METRICS_AS_EVENTS) {
NewRelic.getAgent().getInsights().recordCustomEvent("KafkaStreamsMetrics", eventData);
}
} catch (Exception e) {
AgentBridge.getAgent().getLogger().log(Level.FINE, e, "Unable to record kafka metrics");
}
}
}, 0L, REPORTING_INTERVAL_IN_SECONDS, TimeUnit.SECONDS);
}

@Override
public void metricChange(final KafkaMetric metric) {
String metricGroupAndName = getMetricGroupAndName(metric);
if (KAFKA_METRICS_DEBUG) {
AgentBridge.getAgent().getLogger().log(Level.FINEST, "metricChange(): {0} = {1}", metricGroupAndName, metric.metricName());
}
metrics.put(metricGroupAndName, metric);
}

@Override
public void metricRemoval(final KafkaMetric metric) {
String metricGroupAndName = getMetricGroupAndName(metric);
if (KAFKA_METRICS_DEBUG) {
AgentBridge.getAgent().getLogger().log(Level.FINEST, "metricRemoval(): {0} = {1}", metricGroupAndName, metric.metricName());
}
metrics.remove(metricGroupAndName);
}

private String getMetricGroupAndName(final KafkaMetric metric) {
if (metric.metricName().tags().containsKey("topic")) {
// Special case for handling topic names in metrics
return metric.metricName().group() + "/" + metric.metricName().tags().get("topic") + "/" + metric.metricName().name();
}
return metric.metricName().group() + "/" + metric.metricName().name();
}

@Override
public void close() {
executor.shutdown();
metrics.clear();
}

@Override
public void configure(final Map<String, ?> configs) {
}

private static ThreadFactory buildThreadFactory(final String nameFormat) {
// fail fast if the format is invalid
String.format(nameFormat, 0);

final ThreadFactory factory = Executors.defaultThreadFactory();
final AtomicInteger count = new AtomicInteger();

return new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
final Thread thread = factory.newThread(runnable);
thread.setName(String.format(nameFormat, count.incrementAndGet()));
thread.setDaemon(true);
return thread;
}
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
*
* * Copyright 2023 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/
package org.apache.kafka.streams;

import com.newrelic.api.agent.weaver.NewField;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.WeaveAllConstructors;
import com.newrelic.api.agent.weaver.Weaver;
import com.nr.instrumentation.kafka.streams.NewRelicMetricsReporter;
import org.apache.kafka.common.metrics.Metrics;

@Weave(originalName = "org.apache.kafka.streams.KafkaStreams")
public class KafkaStreams_Instrumentation {
private final Metrics metrics = Weaver.callOriginal();

@NewField
private boolean nrMetricsInitialized;

@WeaveAllConstructors
public KafkaStreams_Instrumentation() {
if (!nrMetricsInitialized) {
metrics.addReporter(new NewRelicMetricsReporter());
nrMetricsInitialized = true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import com.newrelic.agent.introspec.InstrumentationTestConfig;
import com.newrelic.agent.introspec.InstrumentationTestRunner;
import com.newrelic.agent.introspec.MetricsHelper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import static org.junit.Assert.assertTrue;

@RunWith(InstrumentationTestRunner.class)
@InstrumentationTestConfig(includePrefixes = {"org.apache.kafka.streams"})
public class KafkaStreams1MetricsTest {
@Rule
public KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.0"));

private final String TOPIC = "life-universe-everything";
private final String OUTPUT_TOPIC = "vogon-poetry";

@Before
public void before() {
kafkaContainer.start();
}

@After
public void after() {
kafkaContainer.stop();
}

@Test
public void testStreams() throws ExecutionException, InterruptedException {
sendMessages();
runStreams();
assertMetrics();

}

private void sendMessages() throws ExecutionException, InterruptedException {
try (KafkaProducer<String, String> producer = KafkaStreamsHelper.newProducer(kafkaContainer)) {
List<Future<RecordMetadata>> futures = Arrays.asList(
producer.send(new ProducerRecord<>(TOPIC, "Life, don't talk to me about life.")),
producer.send(new ProducerRecord<>(TOPIC, "Don't Panic")),
producer.send(new ProducerRecord<>(OUTPUT_TOPIC, "Oh freddled gruntbuggly"))
);
for (Future<RecordMetadata> future : futures) {
future.get();
}
}
}

private void runStreams() throws InterruptedException {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
stream.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams kafkaStreams = KafkaStreamsHelper.newKafkaStreams(builder.build(), kafkaContainer);
try {
kafkaStreams.start();
Thread.sleep(20000);
} finally {
kafkaStreams.close();
}
}

private void assertMetrics() {
int metricCount = MetricsHelper.getUnscopedMetricCount("Kafka/Streams/kafka-metrics-count/count");
assertTrue("Metric count for \"Kafka/Streams/kafka-metrics-count/count\" is not greater than or equal 1", metricCount >= 1);
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.testcontainers.containers.KafkaContainer;

import java.util.Properties;

public class KafkaStreamsHelper {
public static final String APPLICATION_ID = "test-streams-app";
public static final String CLIENT_ID = "test-client-id";
public static KafkaProducer<String, String> newProducer(KafkaContainer kafkaContainer) {
Properties props = getProps(kafkaContainer.getBootstrapServers(), true);
return new KafkaProducer<>(props);
}

public static KafkaStreams newKafkaStreams(Topology topology, KafkaContainer kafkaContainer) {
Properties props = getProps(kafkaContainer.getBootstrapServers(), false);
return new KafkaStreams(topology, props);
}

public static Properties getProps(String bootstrapServers, boolean isClientProps) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
if (isClientProps) {
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "test-consumer-group");
} else {
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
props.put(StreamsConfig.CLIENT_ID_CONFIG, CLIENT_ID);
}
return props;
}

}
16 changes: 16 additions & 0 deletions instrumentation/kafka-streams-spans-2.0.0/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Kafka Streams Spans instrumentation
Kafka Streams is a library that runs on top of kafka clients to stream and process data from kafka.
This instrumentation module creates transaction every time records gets polled and then processed from kafka.

## Troubleshooting

If you are using kafka streams and encounter a transaction with the name: `Kafka/Streams/APPLICATION_ID_UNKNOWN`,
here are the possible causes:

1. You are using at least 2 Kafka Stream instances with the same `client.id` configured but then closed one of the streams instances.
A possible workaround is to give a different `client.id` for each instance. Another is to run each instance in a separate app.
2. If the above does not apply, and you are using the latest Kafka Streams instrumentation module, it is possible this is a silent failure created from a new
Kafka Streams version. This would likely have happened due to Kafka Streams naming their threads differently for the new version. This is because under the hood
we generally name our transactions by parsing the name of the current thread the transaction began in to get the client id. Then we use the client id to access
the `application.id` configured for your Kafka Streams instance. If this happens, please report this as this will signal us that we need a new instrumentation
module for more up-to-date Kafka Streams versions.
Loading

0 comments on commit e7fd205

Please sign in to comment.