diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaConsumerWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaConsumerWrapper.cs index 5991e07698..0077b9101a 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaConsumerWrapper.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaConsumerWrapper.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Text; using Confluent.Kafka; using NewRelic.Agent.Api; @@ -65,7 +66,7 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins segment.SetMessageBrokerDestination(topic); transaction.SetKafkaMessageBrokerTransactionName(MessageBrokerDestinationType.Topic, BrokerVendorName, topic); - // get the Message.Headers property and add distributed trace headers + // get the Message.Headers property and process distributed trace headers var messageAccessor = MessageAccessorDictionary.GetOrAdd(type, GetMessageAccessorFunc); var messageAsObject = messageAccessor(resultAsObject); @@ -73,7 +74,7 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins if (messageAsObject is MessageMetadata messageMetaData) { headersSize = GetHeadersSize(messageMetaData.Headers); - transaction.InsertDistributedTraceHeaders(messageMetaData, DistributedTraceHeadersSetter); + transaction.AcceptDistributedTraceHeaders(messageMetaData, DistributedTraceHeadersGetter, TransportType.Kafka); } ReportSizeMetrics(agent, transaction, topic, headersSize, messageAsObject); @@ -133,14 +134,22 @@ private static Func GetKeyAccessorFunc(Type t) => private static Func GetValueAccessorFunc(Type t) => VisibilityBypasser.Instance.GeneratePropertyAccessor(t, "Value"); - private static void DistributedTraceHeadersSetter(MessageMetadata carrier, string key, string value) + private static IEnumerable DistributedTraceHeadersGetter(MessageMetadata carrier, string key) { - carrier.Headers ??= new Headers(); - if (!string.IsNullOrEmpty(key)) + if (carrier.Headers != null) { - carrier.Headers.Remove(key); - carrier.Headers.Add(key, Encoding.ASCII.GetBytes(value)); + var headerValues = new List(); + foreach (var item in carrier.Headers) + { + if (item.Key.Equals(key, StringComparison.OrdinalIgnoreCase)) + { + var decodedHeaderValue = Encoding.UTF8.GetString(item.GetValueBytes()); + headerValues.Add(decodedHeaderValue); + } + } + return headerValues; } + return null; } private static long TryGetSize(object obj) diff --git a/tests/Agent/IntegrationTests/ContainerIntegrationTests/Tests/KafkaTests.cs b/tests/Agent/IntegrationTests/ContainerIntegrationTests/Tests/KafkaTests.cs index 3df7c75d01..3bb5c429f8 100644 --- a/tests/Agent/IntegrationTests/ContainerIntegrationTests/Tests/KafkaTests.cs +++ b/tests/Agent/IntegrationTests/ContainerIntegrationTests/Tests/KafkaTests.cs @@ -83,6 +83,8 @@ public void Test() new Assertions.ExpectedMetric { metricName = consumeTransactionName, callCount = 2 }, new Assertions.ExpectedMetric { metricName = messageBrokerConsume, callCount = 2 }, new Assertions.ExpectedMetric { metricName = messageBrokerConsume, metricScope = consumeTransactionName, callCount = 2 }, + new Assertions.ExpectedMetric { metricName = "Supportability/TraceContext/Create/Success", callCount = 2 }, + new Assertions.ExpectedMetric { metricName = "Supportability/TraceContext/Accept/Success", callCount = 2 }, }; NrAssert.Multiple( @@ -93,7 +95,7 @@ public void Test() () => Assert.True(consumeTxnSpan.UserAttributes.ContainsKey("kafka.consume.byteCount")), () => Assert.InRange((long)consumeTxnSpan.UserAttributes["kafka.consume.byteCount"], 450, 470), // includes headers () => Assert.True(consumeTxnSpan.IntrinsicAttributes.ContainsKey("traceId")), - () => Assert.False(consumeTxnSpan.IntrinsicAttributes.ContainsKey("parentId")) + () => Assert.True(consumeTxnSpan.IntrinsicAttributes.ContainsKey("parentId")) ); }