Skip to content

Commit

Permalink
fix: Accept inbound tracing headers in Kafka consume method instrumen…
Browse files Browse the repository at this point in the history
…tation (#2488)

* Process inbound tracing headers on Kafka consumer side

* Assert on tracing supportability metrics in Kafka integration test

* Remove stray comment
  • Loading branch information
nr-ahemsath authored May 20, 2024
1 parent d626535 commit 476378a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using Confluent.Kafka;
using NewRelic.Agent.Api;
Expand Down Expand Up @@ -65,15 +66,15 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins
segment.SetMessageBrokerDestination(topic);
transaction.SetKafkaMessageBrokerTransactionName(MessageBrokerDestinationType.Topic, BrokerVendorName, topic);

// get the Message.Headers property and add distributed trace headers
// get the Message.Headers property and process distributed trace headers
var messageAccessor = MessageAccessorDictionary.GetOrAdd(type, GetMessageAccessorFunc);
var messageAsObject = messageAccessor(resultAsObject);

var headersSize = 0L;
if (messageAsObject is MessageMetadata messageMetaData)
{
headersSize = GetHeadersSize(messageMetaData.Headers);
transaction.InsertDistributedTraceHeaders(messageMetaData, DistributedTraceHeadersSetter);
transaction.AcceptDistributedTraceHeaders(messageMetaData, DistributedTraceHeadersGetter, TransportType.Kafka);
}

ReportSizeMetrics(agent, transaction, topic, headersSize, messageAsObject);
Expand Down Expand Up @@ -133,14 +134,22 @@ private static Func<object, object> GetKeyAccessorFunc(Type t) =>
private static Func<object, object> GetValueAccessorFunc(Type t) =>
VisibilityBypasser.Instance.GeneratePropertyAccessor<object>(t, "Value");

private static void DistributedTraceHeadersSetter(MessageMetadata carrier, string key, string value)
private static IEnumerable<string> DistributedTraceHeadersGetter(MessageMetadata carrier, string key)
{
carrier.Headers ??= new Headers();
if (!string.IsNullOrEmpty(key))
if (carrier.Headers != null)
{
carrier.Headers.Remove(key);
carrier.Headers.Add(key, Encoding.ASCII.GetBytes(value));
var headerValues = new List<string>();
foreach (var item in carrier.Headers)
{
if (item.Key.Equals(key, StringComparison.OrdinalIgnoreCase))
{
var decodedHeaderValue = Encoding.UTF8.GetString(item.GetValueBytes());
headerValues.Add(decodedHeaderValue);
}
}
return headerValues;
}
return null;
}

private static long TryGetSize(object obj)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public void Test()
new Assertions.ExpectedMetric { metricName = consumeTransactionName, callCount = 2 },
new Assertions.ExpectedMetric { metricName = messageBrokerConsume, callCount = 2 },
new Assertions.ExpectedMetric { metricName = messageBrokerConsume, metricScope = consumeTransactionName, callCount = 2 },
new Assertions.ExpectedMetric { metricName = "Supportability/TraceContext/Create/Success", callCount = 2 },
new Assertions.ExpectedMetric { metricName = "Supportability/TraceContext/Accept/Success", callCount = 2 },
};

NrAssert.Multiple(
Expand All @@ -93,7 +95,7 @@ public void Test()
() => Assert.True(consumeTxnSpan.UserAttributes.ContainsKey("kafka.consume.byteCount")),
() => Assert.InRange((long)consumeTxnSpan.UserAttributes["kafka.consume.byteCount"], 450, 470), // includes headers
() => Assert.True(consumeTxnSpan.IntrinsicAttributes.ContainsKey("traceId")),
() => Assert.False(consumeTxnSpan.IntrinsicAttributes.ContainsKey("parentId"))
() => Assert.True(consumeTxnSpan.IntrinsicAttributes.ContainsKey("parentId"))
);
}

Expand Down

0 comments on commit 476378a

Please sign in to comment.