Skip to content

Commit

Permalink
removed debug code & updated method/variable naming for Kafka Streams
Browse files Browse the repository at this point in the history
  • Loading branch information
obenkenobi committed Mar 7, 2023
1 parent 2540ce9 commit bf59176
Show file tree
Hide file tree
Showing 7 changed files with 12 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@

public class NewRelicMetricsReporter implements MetricsReporter {

private static final boolean kafkaMetricsDebug = NewRelic.getAgent().getConfig().getValue("kafka.metrics.debug.enabled", false);
private static final boolean KAFKA_METRICS_DEBUG = NewRelic.getAgent().getConfig().getValue("kafka.metrics.debug.enabled", false);

private static final boolean metricsAsEvents = NewRelic.getAgent().getConfig().getValue("kafka.metrics.as_events.enabled", false);
private static final boolean METRICS_AS_EVENTS = NewRelic.getAgent().getConfig().getValue("kafka.metrics.as_events.enabled", false);

private static final long reportingIntervalInSeconds = NewRelic.getAgent().getConfig().getValue("kafka.metrics.interval", 30);
private static final long REPORTING_INTERVAL_IN_SECONDS = NewRelic.getAgent().getConfig().getValue("kafka.metrics.interval", 30);

private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, buildThreadFactory("com.nr.instrumentation.kafka.streams.NewRelicMetricsReporter-%d"));

Expand All @@ -38,7 +38,7 @@ public class NewRelicMetricsReporter implements MetricsReporter {
public void init(final List<KafkaMetric> initMetrics) {
for (KafkaMetric kafkaMetric : initMetrics) {
String metricGroupAndName = getMetricGroupAndName(kafkaMetric);
if (kafkaMetricsDebug) {
if (KAFKA_METRICS_DEBUG) {
AgentBridge.getAgent().getLogger().log(Level.FINEST, "init(): {0} = {1}", metricGroupAndName, kafkaMetric.metricName());
}
metrics.put(metricGroupAndName, kafkaMetric);
Expand All @@ -54,11 +54,11 @@ public void run() {
Object metricValue = metric.getValue().metricValue();
if (metricValue instanceof Double) {
final float value = ((Double) metricValue).floatValue();
if (kafkaMetricsDebug) {
if (KAFKA_METRICS_DEBUG) {
AgentBridge.getAgent().getLogger().log(Level.FINEST, "getMetric: {0} = {1}", metric.getKey(), value);
}
if (!Float.isNaN(value) && !Float.isInfinite(value)) {
if (metricsAsEvents) {
if (METRICS_AS_EVENTS) {
eventData.put(metric.getKey().replace('/', '.'), value);
} else {
NewRelic.recordMetric(metricPrefix + metric.getKey(), value);
Expand All @@ -67,20 +67,20 @@ public void run() {
}
}

if (metricsAsEvents) {
if (METRICS_AS_EVENTS) {
NewRelic.getAgent().getInsights().recordCustomEvent("KafkaStreamsMetrics", eventData);
}
} catch (Exception e) {
AgentBridge.getAgent().getLogger().log(Level.FINE, e, "Unable to record kafka metrics");
}
}
}, 0L, reportingIntervalInSeconds, TimeUnit.SECONDS);
}, 0L, REPORTING_INTERVAL_IN_SECONDS, TimeUnit.SECONDS);
}

@Override
public void metricChange(final KafkaMetric metric) {
String metricGroupAndName = getMetricGroupAndName(metric);
if (kafkaMetricsDebug) {
if (KAFKA_METRICS_DEBUG) {
AgentBridge.getAgent().getLogger().log(Level.FINEST, "metricChange(): {0} = {1}", metricGroupAndName, metric.metricName());
}
metrics.put(metricGroupAndName, metric);
Expand All @@ -89,7 +89,7 @@ public void metricChange(final KafkaMetric metric) {
@Override
public void metricRemoval(final KafkaMetric metric) {
String metricGroupAndName = getMetricGroupAndName(metric);
if (kafkaMetricsDebug) {
if (KAFKA_METRICS_DEBUG) {
AgentBridge.getAgent().getLogger().log(Level.FINEST, "metricRemoval(): {0} = {1}", metricGroupAndName, metric.metricName());
}
metrics.remove(metricGroupAndName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,9 @@ private void assertScopedMetricExists(String txnName, String ...metricNames) {
}

private void assertUnscopedMetricExists(String ... metricNames) {
int notFoundMetricCount = 0;
Set<String> existingMetrics= InstrumentationTestRunner.getIntrospector().getUnscopedMetrics().keySet();
for (String metricName : metricNames) {
Assert.assertTrue("metric not found: " + metricName, existingMetrics.contains(metricName));
}
System.out.println(notFoundMetricCount + " metrics not found");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,9 @@ private void assertScopedMetricExists(String txnName, String ...metricNames) {
}

private void assertUnscopedMetricExists(String ... metricNames) {
int notFoundMetricCount = 0;
Set<String> existingMetrics= InstrumentationTestRunner.getIntrospector().getUnscopedMetrics().keySet();
for (String metricName : metricNames) {
Assert.assertTrue("metric not found: " + metricName, existingMetrics.contains(metricName));
}
System.out.println(notFoundMetricCount + " metrics not found");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,9 @@ private void assertScopedMetricExists(String txnName, String ...metricNames) {
}

private void assertUnscopedMetricExists(String ... metricNames) {
int notFoundMetricCount = 0;
Set<String> existingMetrics= InstrumentationTestRunner.getIntrospector().getUnscopedMetrics().keySet();
for (String metricName : metricNames) {
Assert.assertTrue("metric not found: " + metricName, existingMetrics.contains(metricName));
}
System.out.println(notFoundMetricCount + " metrics not found");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public static String getAppIdWithClientIdSuffix(StreamsConfig streamsConfig) {
return applicationId + "/" + clientId;
}

public static String getApplicationIdWithSuffix(String threadName) {
public static String getAppIdWithSuffix(String threadName) {
final String defaultAppId = "APPLICATION_ID_UNKNOWN";
String nrClientId = StreamsSpansUtil.parseClientId(threadName);
if (nrClientId == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public abstract class StreamThread_Instrumentation extends Thread {
@Trace(dispatcher = true)
void runOnce() {
if (this.nrApplicationIdWithSuffix == null) {
this.nrApplicationIdWithSuffix = StreamsSpansUtil.getApplicationIdWithSuffix(this.getName());
this.nrApplicationIdWithSuffix = StreamsSpansUtil.getAppIdWithSuffix(this.getName());
}
StreamsSpansUtil.initTransaction(this.nrApplicationIdWithSuffix);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,9 @@ private void assertScopedMetricExists(String txnName, String ...metricNames) {
}

private void assertUnscopedMetricExists(String ... metricNames) {
int notFoundMetricCount = 0;
Set<String> existingMetrics= InstrumentationTestRunner.getIntrospector().getUnscopedMetrics().keySet();
for (String metricName : metricNames) {
Assert.assertTrue("metric not found: " + metricName, existingMetrics.contains(metricName));
}
System.out.println(notFoundMetricCount + " metrics not found");
}
}

0 comments on commit bf59176

Please sign in to comment.