From e9bbce662acebf190b13c3e1133f32cdd75f8ce4 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 31 May 2021 15:59:35 +0200 Subject: [PATCH] [7.x] translate otel messaging.* to ecs (backport #5334) (#5375) * translate otel messaging.* to ecs (#5334) Co-authored-by: Andrew Wilkins (cherry picked from commit 16618f2a00400dcde109c00037be79e24f3938c5) # Conflicts: # changelogs/head.asciidoc * Delete head.asciidoc Co-authored-by: stuart nelson --- processor/otel/consumer.go | 40 ++++++++++++++++++++------ processor/otel/consumer_test.go | 51 +++++++++++++++++++++++++++++++-- 2 files changed, 80 insertions(+), 11 deletions(-) diff --git a/processor/otel/consumer.go b/processor/otel/consumer.go index 9208458617c..f1c21267b19 100644 --- a/processor/otel/consumer.go +++ b/processor/otel/consumer.go @@ -152,7 +152,12 @@ func (c *Consumer) convertSpan( var span *model.Span name := otelSpan.Name() - if root || otelSpan.Kind() == pdata.SpanKindSERVER { + // Message consumption results in either a transaction or a span based + // on whether the consumption is active or passive. Otel spans + // currently do not have the metadata to make this distinction. For + // now, we assume that the majority of consumption is passive, and + // therefore start a transaction whenever span kind == consumer. + if root || otelSpan.Kind() == pdata.SpanKindSERVER || otelSpan.Kind() == pdata.SpanKindCONSUMER { transaction = &model.Transaction{ Metadata: metadata, ID: spanID, @@ -313,10 +318,8 @@ func translateTransaction( case conventions.AttributeNetHostName: netHostName = stringval - // messaging - // - // TODO(axw) translate OpenTelemtry messaging conventions. - case "message_bus.destination": + // messaging.* + case "message_bus.destination", conventions.AttributeMessagingDestination: message.QueueName = stringval isMessaging = true @@ -425,6 +428,11 @@ func translateSpan(span pdata.Span, metadata model.Metadata, event *model.Span) httpScheme string = "http" ) + var ( + messageSystem string + messageOperation string + ) + var http model.HTTP var message model.Message var db model.DB @@ -517,12 +525,18 @@ func translateSpan(span pdata.Span, metadata model.Metadata, event *model.Span) netPeerName = stringval } - // messaging - // - // TODO(axw) translate OpenTelemtry messaging conventions. - case "message_bus.destination": + // messaging.* + case "message_bus.destination", conventions.AttributeMessagingDestination: message.QueueName = stringval isMessagingSpan = true + case conventions.AttributeMessagingOperation: + messageOperation = stringval + isMessagingSpan = true + case conventions.AttributeMessagingSystem: + messageSystem = stringval + destinationService.Resource = stringval + destinationService.Name = stringval + isMessagingSpan = true // rpc.* // @@ -650,6 +664,14 @@ func translateSpan(span pdata.Span, metadata model.Metadata, event *model.Span) event.DB = &db case isMessagingSpan: event.Type = "messaging" + event.Subtype = messageSystem + if messageOperation == "" && span.Kind() == pdata.SpanKindPRODUCER { + messageOperation = "send" + } + event.Action = messageOperation + if destinationService.Resource != "" && message.QueueName != "" { + destinationService.Resource += "/" + message.QueueName + } event.Message = &message case isRPCSpan: event.Type = "external" diff --git a/processor/otel/consumer_test.go b/processor/otel/consumer_test.go index e97eff81009..b7dfcdfc491 100644 --- a/processor/otel/consumer_test.go +++ b/processor/otel/consumer_test.go @@ -518,6 +518,47 @@ func TestRPCSpan(t *testing.T) { }, span.DestinationService) } +func TestMessagingTransaction(t *testing.T) { + tx := transformTransactionWithAttributes(t, map[string]pdata.AttributeValue{ + "messaging.destination": pdata.NewAttributeValueString("myQueue"), + }, func(s pdata.Span) { + s.SetKind(pdata.SpanKindCONSUMER) + // Set parentID to imply this isn't the root, but + // kind==CONSUMER should still force the span to be translated + // as a transaction. + s.SetParentSpanID(pdata.NewSpanID([8]byte{3})) + }) + assert.Equal(t, "messaging", tx.Type) + assert.Empty(t, tx.Labels) + assert.Equal(t, &model.Message{ + QueueName: "myQueue", + }, tx.Message) +} + +func TestMessagingSpan(t *testing.T) { + span := transformSpanWithAttributes(t, map[string]pdata.AttributeValue{ + "messaging.system": pdata.NewAttributeValueString("kafka"), + "messaging.destination": pdata.NewAttributeValueString("myTopic"), + "net.peer.ip": pdata.NewAttributeValueString("10.20.30.40"), + "net.peer.port": pdata.NewAttributeValueInt(123), + }, func(s pdata.Span) { + s.SetKind(pdata.SpanKindPRODUCER) + }) + assert.Equal(t, "messaging", span.Type) + assert.Equal(t, "kafka", span.Subtype) + assert.Equal(t, "send", span.Action) + assert.Empty(t, span.Labels) + assert.Equal(t, &model.Destination{ + Address: "10.20.30.40", + Port: 123, + }, span.Destination) + assert.Equal(t, &model.DestinationService{ + Type: "messaging", + Name: "kafka", + Resource: "kafka/myTopic", + }, span.DestinationService) +} + func TestArrayLabels(t *testing.T) { stringArray := pdata.NewAttributeValueArray() stringArray.ArrayVal().Append(pdata.NewAttributeValueString("string1")) @@ -1055,23 +1096,29 @@ func jaegerKeyValue(k string, v interface{}) jaegermodel.KeyValue { return kv } -func transformTransactionWithAttributes(t *testing.T, attrs map[string]pdata.AttributeValue) *model.Transaction { +func transformTransactionWithAttributes(t *testing.T, attrs map[string]pdata.AttributeValue, configFns ...func(pdata.Span)) *model.Transaction { traces, spans := newTracesSpans() otelSpan := pdata.NewSpan() otelSpan.SetTraceID(pdata.NewTraceID([16]byte{1})) otelSpan.SetSpanID(pdata.NewSpanID([8]byte{2})) + for _, fn := range configFns { + fn(otelSpan) + } otelSpan.Attributes().InitFromMap(attrs) spans.Spans().Append(otelSpan) events := transformTraces(t, traces) return events.Transactions[0] } -func transformSpanWithAttributes(t *testing.T, attrs map[string]pdata.AttributeValue) *model.Span { +func transformSpanWithAttributes(t *testing.T, attrs map[string]pdata.AttributeValue, configFns ...func(pdata.Span)) *model.Span { traces, spans := newTracesSpans() otelSpan := pdata.NewSpan() otelSpan.SetTraceID(pdata.NewTraceID([16]byte{1})) otelSpan.SetSpanID(pdata.NewSpanID([8]byte{2})) otelSpan.SetParentSpanID(pdata.NewSpanID([8]byte{3})) + for _, fn := range configFns { + fn(otelSpan) + } otelSpan.Attributes().InitFromMap(attrs) spans.Spans().Append(otelSpan) events := transformTraces(t, traces)