diff --git a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj index 6afb9617e..c7a1e4e39 100644 --- a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj +++ b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj @@ -61,7 +61,6 @@ - diff --git a/projects/RabbitMQ.Client/client/impl/ModelBase.cs b/projects/RabbitMQ.Client/client/impl/ModelBase.cs index a54d9f69e..1524eb163 100644 --- a/projects/RabbitMQ.Client/client/impl/ModelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ModelBase.cs @@ -39,9 +39,6 @@ using System.Threading; using System.Threading.Tasks; -using OpenTelemetry; -using OpenTelemetry.Context.Propagation; - using RabbitMQ.Client.client.framing; using RabbitMQ.Client.client.impl; using RabbitMQ.Client.ConsumerDispatching; @@ -57,7 +54,6 @@ internal abstract class ModelBase : IModel, IRecoverable ///Only used to kick-start a connection open ///sequence. See internal BlockingCell m_connectionStartCell; - private static readonly TextMapPropagator Propagator = new CompositeTextMapPropagator(new TextMapPropagator[] { new TraceContextPropagator(), new BaggagePropagator() }); private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue(); private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true); @@ -912,8 +908,13 @@ public BasicGetResult BasicGet(string queue, bool autoAck) public void BasicPublish(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { - void InjectTraceContextIntoBasicProperties(IBasicProperties props, string key, string value) + void InjectTraceContextIntoBasicProperties(object propsObj, string key, string value) { + if (propsObj is not IBasicProperties props) + { + return; + } + if (props.Headers == null) { props.Headers = new Dictionary(); @@ -961,7 +962,7 @@ void InjectTraceContextIntoBasicProperties(IBasicProperties props, string key, s } // Inject the ActivityContext into the message headers to propagate trace context to the receiving service. - Propagator.Inject(new PropagationContext(sendActivity.Context, Baggage.Current), props, InjectTraceContextIntoBasicProperties); + DistributedContextPropagator.Current.Inject(sendActivity, props, InjectTraceContextIntoBasicProperties); ModelSend(in cmd, (BasicProperties)props, body); return; diff --git a/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs b/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs index d4ba59951..ce44cbc12 100644 --- a/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs +++ b/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs @@ -7,9 +7,6 @@ using System.Threading; using System.Threading.Tasks; -using OpenTelemetry; -using OpenTelemetry.Context.Propagation; - using RabbitMQ.Client.Events; using RabbitMQ.Client.Framing.Impl; @@ -18,7 +15,6 @@ namespace RabbitMQ.Client internal class RabbitMQActivitySource { internal static ActivitySource source = new ActivitySource("RabbitMQ.Client", typeof(RabbitMQActivitySource).Assembly.GetCustomAttribute().InformationalVersion); - private static readonly TextMapPropagator Propagator = new CompositeTextMapPropagator(new TextMapPropagator[] { new TraceContextPropagator(), new BaggagePropagator() }); static RabbitMQActivitySource() { @@ -46,11 +42,20 @@ internal static Activity Receive(string routingKey, string exchange, ulong deliv if (source.HasListeners()) { // Extract the PropagationContext of the upstream parent from the message headers. - PropagationContext parentContext = Propagator.Extract(default, readOnlyBasicProperties, ExtractTraceContextFromBasicProperties); - Baggage.Current = parentContext.Baggage; - Activity activity = StartRabbitMQActivity($"{routingKey} receive", ActivityKind.Consumer, parentContext.ActivityContext); + DistributedContextPropagator.Current.ExtractTraceIdAndState(readOnlyBasicProperties, ExtractTraceIdAndState, out string traceId, out string traceState); + IEnumerable> baggage = DistributedContextPropagator.Current.ExtractBaggage(readOnlyBasicProperties, ExtractTraceIdAndState); + ActivityContext.TryParse(traceId, traceState, out ActivityContext parentContext); + Activity activity = StartRabbitMQActivity($"{routingKey} receive", ActivityKind.Consumer, parentContext); if (activity != null && activity.IsAllDataRequested) { + if (baggage != null) + { + foreach (var item in baggage) + { + Activity.Current?.SetBaggage(item.Key, item.Value); + } + } + PopulateMessagingTags("receive", routingKey, exchange, deliveryTag, readOnlyBasicProperties, bodySize, activity); } @@ -65,12 +70,20 @@ internal static Activity Process(BasicDeliverEventArgs deliverEventArgs) if (source.HasListeners()) { // Extract the PropagationContext of the upstream parent from the message headers. - PropagationContext parentContext = Propagator.Extract(default, deliverEventArgs.BasicProperties, ExtractTraceContextFromBasicProperties); - Baggage.Current = parentContext.Baggage; - - Activity activity = StartRabbitMQActivity($"{deliverEventArgs.RoutingKey} process", ActivityKind.Consumer, parentContext.ActivityContext); + DistributedContextPropagator.Current.ExtractTraceIdAndState(deliverEventArgs.BasicProperties, ExtractTraceIdAndState, out string traceId, out string traceState); + IEnumerable> baggage = DistributedContextPropagator.Current.ExtractBaggage(deliverEventArgs.BasicProperties, ExtractTraceIdAndState); + ActivityContext.TryParse(traceId, traceState, out ActivityContext parentContext); + Activity activity = StartRabbitMQActivity($"{deliverEventArgs.RoutingKey} process", ActivityKind.Consumer, parentContext); if (activity != null && activity.IsAllDataRequested) { + if (baggage != null) + { + foreach (var item in baggage) + { + Activity.Current?.SetBaggage(item.Key, item.Value); + } + } + PopulateMessagingTags("process", deliverEventArgs.RoutingKey, deliverEventArgs.Exchange, deliverEventArgs.DeliveryTag, deliverEventArgs.BasicProperties, deliverEventArgs.Body.Length, activity); } @@ -123,12 +136,18 @@ private static void PopulateMessagingTags(string operation, string routingKey, s } } } - - static IEnumerable ExtractTraceContextFromBasicProperties(T props, string key) where T : IReadOnlyBasicProperties + + private static void ExtractTraceIdAndState(object carrier, string name, out string value, out IEnumerable values) { - if (props.Headers.TryGetValue(key, out var value) && value is byte[] bytes) + if (carrier is IReadOnlyBasicProperties props && props.Headers is not null && props.Headers.TryGetValue(name, out object propsVal) && propsVal is byte[] bytes) + { + value = Encoding.UTF8.GetString(bytes); + values = default; + } + else { - yield return Encoding.UTF8.GetString(bytes); + value = default; + values = default; } } }