Skip to content

Commit

Permalink
translate otel messaging.* to ecs (elastic#5334)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrew Wilkins <axwalk@gmail.com>
  • Loading branch information
stuartnelson3 and axw committed May 31, 2021
1 parent 0c2562f commit f7bc183
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 11 deletions.
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ https://github.com/elastic/apm-server/compare/7.13\...master[View commits]
* Add metric_type and unit to field metadata of system metrics {pull}5230[5230]
* Upgrade Go to 1.15.12 {pull}[]
* Display apm-server url in fleet ui's apm-server integration {pull}4895[4895]
* Translate otel messaging.* semantic conventions to ECS {pull}5334[5334]
* Add support for dynamic histogram metrics {pull}5239[5239]
* Tail-sampling processor now resumes subscription from previous position after restart {pull}5350[5350]

Expand Down
40 changes: 31 additions & 9 deletions processor/otel/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,12 @@ func (c *Consumer) convertSpan(
var span *model.Span

name := otelSpan.Name()
if root || otelSpan.Kind() == pdata.SpanKindSERVER {
// Message consumption results in either a transaction or a span based
// on whether the consumption is active or passive. Otel spans
// currently do not have the metadata to make this distinction. For
// now, we assume that the majority of consumption is passive, and
// therefore start a transaction whenever span kind == consumer.
if root || otelSpan.Kind() == pdata.SpanKindSERVER || otelSpan.Kind() == pdata.SpanKindCONSUMER {
transaction = &model.Transaction{
Metadata: metadata,
ID: spanID,
Expand Down Expand Up @@ -313,10 +318,8 @@ func translateTransaction(
case conventions.AttributeNetHostName:
netHostName = stringval

// messaging
//
// TODO(axw) translate OpenTelemtry messaging conventions.
case "message_bus.destination":
// messaging.*
case "message_bus.destination", conventions.AttributeMessagingDestination:
message.QueueName = stringval
isMessaging = true

Expand Down Expand Up @@ -425,6 +428,11 @@ func translateSpan(span pdata.Span, metadata model.Metadata, event *model.Span)
httpScheme string = "http"
)

var (
messageSystem string
messageOperation string
)

var http model.HTTP
var message model.Message
var db model.DB
Expand Down Expand Up @@ -517,12 +525,18 @@ func translateSpan(span pdata.Span, metadata model.Metadata, event *model.Span)
netPeerName = stringval
}

// messaging
//
// TODO(axw) translate OpenTelemtry messaging conventions.
case "message_bus.destination":
// messaging.*
case "message_bus.destination", conventions.AttributeMessagingDestination:
message.QueueName = stringval
isMessagingSpan = true
case conventions.AttributeMessagingOperation:
messageOperation = stringval
isMessagingSpan = true
case conventions.AttributeMessagingSystem:
messageSystem = stringval
destinationService.Resource = stringval
destinationService.Name = stringval
isMessagingSpan = true

// rpc.*
//
Expand Down Expand Up @@ -650,6 +664,14 @@ func translateSpan(span pdata.Span, metadata model.Metadata, event *model.Span)
event.DB = &db
case isMessagingSpan:
event.Type = "messaging"
event.Subtype = messageSystem
if messageOperation == "" && span.Kind() == pdata.SpanKindPRODUCER {
messageOperation = "send"
}
event.Action = messageOperation
if destinationService.Resource != "" && message.QueueName != "" {
destinationService.Resource += "/" + message.QueueName
}
event.Message = &message
case isRPCSpan:
event.Type = "external"
Expand Down
51 changes: 49 additions & 2 deletions processor/otel/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,47 @@ func TestRPCSpan(t *testing.T) {
}, span.DestinationService)
}

func TestMessagingTransaction(t *testing.T) {
tx := transformTransactionWithAttributes(t, map[string]pdata.AttributeValue{
"messaging.destination": pdata.NewAttributeValueString("myQueue"),
}, func(s pdata.Span) {
s.SetKind(pdata.SpanKindCONSUMER)
// Set parentID to imply this isn't the root, but
// kind==CONSUMER should still force the span to be translated
// as a transaction.
s.SetParentSpanID(pdata.NewSpanID([8]byte{3}))
})
assert.Equal(t, "messaging", tx.Type)
assert.Empty(t, tx.Labels)
assert.Equal(t, &model.Message{
QueueName: "myQueue",
}, tx.Message)
}

func TestMessagingSpan(t *testing.T) {
span := transformSpanWithAttributes(t, map[string]pdata.AttributeValue{
"messaging.system": pdata.NewAttributeValueString("kafka"),
"messaging.destination": pdata.NewAttributeValueString("myTopic"),
"net.peer.ip": pdata.NewAttributeValueString("10.20.30.40"),
"net.peer.port": pdata.NewAttributeValueInt(123),
}, func(s pdata.Span) {
s.SetKind(pdata.SpanKindPRODUCER)
})
assert.Equal(t, "messaging", span.Type)
assert.Equal(t, "kafka", span.Subtype)
assert.Equal(t, "send", span.Action)
assert.Empty(t, span.Labels)
assert.Equal(t, &model.Destination{
Address: "10.20.30.40",
Port: 123,
}, span.Destination)
assert.Equal(t, &model.DestinationService{
Type: "messaging",
Name: "kafka",
Resource: "kafka/myTopic",
}, span.DestinationService)
}

func TestArrayLabels(t *testing.T) {
stringArray := pdata.NewAttributeValueArray()
stringArray.ArrayVal().Append(pdata.NewAttributeValueString("string1"))
Expand Down Expand Up @@ -1055,23 +1096,29 @@ func jaegerKeyValue(k string, v interface{}) jaegermodel.KeyValue {
return kv
}

func transformTransactionWithAttributes(t *testing.T, attrs map[string]pdata.AttributeValue) *model.Transaction {
func transformTransactionWithAttributes(t *testing.T, attrs map[string]pdata.AttributeValue, configFns ...func(pdata.Span)) *model.Transaction {
traces, spans := newTracesSpans()
otelSpan := pdata.NewSpan()
otelSpan.SetTraceID(pdata.NewTraceID([16]byte{1}))
otelSpan.SetSpanID(pdata.NewSpanID([8]byte{2}))
for _, fn := range configFns {
fn(otelSpan)
}
otelSpan.Attributes().InitFromMap(attrs)
spans.Spans().Append(otelSpan)
events := transformTraces(t, traces)
return events.Transactions[0]
}

func transformSpanWithAttributes(t *testing.T, attrs map[string]pdata.AttributeValue) *model.Span {
func transformSpanWithAttributes(t *testing.T, attrs map[string]pdata.AttributeValue, configFns ...func(pdata.Span)) *model.Span {
traces, spans := newTracesSpans()
otelSpan := pdata.NewSpan()
otelSpan.SetTraceID(pdata.NewTraceID([16]byte{1}))
otelSpan.SetSpanID(pdata.NewSpanID([8]byte{2}))
otelSpan.SetParentSpanID(pdata.NewSpanID([8]byte{3}))
for _, fn := range configFns {
fn(otelSpan)
}
otelSpan.Attributes().InitFromMap(attrs)
spans.Spans().Append(otelSpan)
events := transformTraces(t, traces)
Expand Down

0 comments on commit f7bc183

Please sign in to comment.