Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

translate otel messaging.* to ecs #5334

Merged
merged 15 commits into from
May 31, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions model/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package model

import (
"fmt"
"net/http"
"strings"

"github.com/elastic/beats/v7/libbeat/common"
)
Expand All @@ -29,6 +31,27 @@ type Message struct {
Headers http.Header
AgeMillis *int
QueueName string
// TODO: Do we want to add all these keys to Message? Or should they be
// parsed as individual strings and added to the span in
// processor/otel/consumer.go?
RoutingKey string
System string
Operation string
stuartnelson3 marked this conversation as resolved.
Show resolved Hide resolved
}

// Name returns a Span name for a message following the transaction/span naming schema.
// https://github.com/elastic/apm/blob/master/specs/agents/tracing-instrumentation-messaging.md#naming
func (m *Message) Name() string {
return fmt.Sprintf("%s %s %s %s",
m.System, strings.ToUpper(m.Operation), m.direction(), m.QueueName,
)
}

func (m *Message) direction() string {
if m.Operation == "send" {
return "to"
}
return "from"
stuartnelson3 marked this conversation as resolved.
Show resolved Hide resolved
}

// Fields returns a MapStr holding the transformed message information
Expand All @@ -47,5 +70,14 @@ func (m *Message) Fields() common.MapStr {
if len(m.Headers) > 0 {
fields.set("headers", m.Headers)
}
if m.RoutingKey != "" {
fields.set("routing_key", m.RoutingKey)
}
if m.System != "" {
fields.set("system", m.System)
}
if m.Operation != "" {
fields.set("operation", m.Operation)
}
return common.MapStr(fields)
}
2 changes: 2 additions & 0 deletions model/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Metadata struct {
UserAgent UserAgent
Client Client
Cloud Cloud
Message Message
stuartnelson3 marked this conversation as resolved.
Show resolved Hide resolved
Labels common.MapStr
}

Expand All @@ -43,5 +44,6 @@ func (m *Metadata) set(fields *mapStr, eventLabels common.MapStr) {
fields.maybeSetMapStr("container", m.System.Container.fields())
fields.maybeSetMapStr("kubernetes", m.System.Kubernetes.fields())
fields.maybeSetMapStr("cloud", m.Cloud.fields())
fields.maybeSetMapStr("message", m.Message.Fields())
maybeSetLabels(fields, m.Labels, eventLabels)
}
64 changes: 55 additions & 9 deletions processor/otel/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,12 +296,31 @@ func translateTransaction(
case conventions.AttributeNetHostName:
netHostName = stringval

// messaging
//
// TODO(axw) translate OpenTelemtry messaging conventions.
case "message_bus.destination":
// messaging.*
case "message_bus.destination", conventions.AttributeMessagingDestination:
message.QueueName = stringval
isMessaging = true
isMessagingSpan = true
case "messaging.rabbitmq.routing_key":
message.RoutingKey = stringval
isMessagingSpan = true
case conventions.AttributeMessagingOperation:
if stringval == "" {
// TODO: Is this correct? Inferring that an empty string means "send", based on
// only `send`, `process`, and `receive` are allowed
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#operation-names
// `messaging.operation` can only have `process` and `receive` values
// cf. table in section https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#apache-kafka-example
// and various examples
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#apache-kafka-example
stringval = "send"
}
message.Operation = stringval
isMessagingSpan = true
case conventions.AttributeMessagingSystem:
message.System = stringval
destinationService.Resource = stringval
destinationService.Name = stringval
stuartnelson3 marked this conversation as resolved.
Show resolved Hide resolved
isMessagingSpan = true

// rpc.*
//
Expand Down Expand Up @@ -500,12 +519,31 @@ func translateSpan(span pdata.Span, metadata model.Metadata, event *model.Span)
netPeerName = stringval
}

// messaging
//
// TODO(axw) translate OpenTelemtry messaging conventions.
case "message_bus.destination":
// messaging.*
case "message_bus.destination", conventions.AttributeMessagingDestination:
message.QueueName = stringval
isMessagingSpan = true
case "messaging.rabbitmq.routing_key":
message.RoutingKey = stringval
isMessagingSpan = true
case conventions.AttributeMessagingOperation:
if stringval == "" {
// TODO: Is this correct? Inferring that an empty string means "send", based on
// only `send`, `process`, and `receive` are allowed
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#operation-names
// `messaging.operation` can only have `process` and `receive` values
// cf. table in section https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#apache-kafka-example
// and various examples
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#apache-kafka-example
stringval = "send"
}
message.Operation = stringval
isMessagingSpan = true
case conventions.AttributeMessagingSystem:
message.System = stringval
destinationService.Resource = stringval
destinationService.Name = stringval
isMessagingSpan = true

// rpc.*
//
Expand Down Expand Up @@ -633,6 +671,14 @@ func translateSpan(span pdata.Span, metadata model.Metadata, event *model.Span)
event.DB = &db
case isMessagingSpan:
event.Type = "messaging"
event.Subtype = message.System
event.Action = message.Operation
event.Name = message.Name()
stuartnelson3 marked this conversation as resolved.
Show resolved Hide resolved
// TODO: This is what we want to do? Following agent messaging spec
// https://github.com/elastic/apm/blob/master/specs/agents/tracing-instrumentation-messaging.md
stuartnelson3 marked this conversation as resolved.
Show resolved Hide resolved
if destinationService.Resource != "" && message.QueueName != "" {
destinationService.Resource += "/" + message.QueueName
}
event.Message = &message
case isRPCSpan:
event.Type = "external"
Expand Down
14 changes: 14 additions & 0 deletions processor/otel/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,20 @@ func translateResourceMetadata(resource pdata.Resource, out *model.Metadata) {
case "process.runtime.version":
out.Service.Runtime.Version = truncate(v.StringVal())

// messaging.*
case conventions.AttributeMessagingSystem:
out.Message.System = truncate(v.StringVal())
case conventions.AttributeMessagingDestination:
out.Message.QueueName = truncate(v.StringVal())
case conventions.AttributeMessagingOperation:
s := v.StringVal()
if s == "" {
s = "send"
}
out.Message.Operation = truncate(s)
case "messaging.rabbitmq.routing_key":
out.Message.RoutingKey = truncate(v.StringVal())
stuartnelson3 marked this conversation as resolved.
Show resolved Hide resolved

// os.*
case conventions.AttributeOSType:
out.System.Platform = strings.ToLower(truncate(v.StringVal()))
Expand Down
48 changes: 48 additions & 0 deletions processor/otel/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,54 @@ func TestResourceConventions(t *testing.T) {
},
},
},
"messaging_rabbit": {
attrs: map[string]pdata.AttributeValue{
"messaging.operation": pdata.NewAttributeValueString("receive"),
"messaging.system": pdata.NewAttributeValueString("rabbitmq"),
"messaging.destination": pdata.NewAttributeValueString("myQueue"),
"messaging.rabbitmq.routing_key": pdata.NewAttributeValueString("myKey"),
},
expected: model.Metadata{
Service: defaultService,
Message: model.Message{
Operation: "receive",
System: "rabbitmq",
RoutingKey: "myKey",
QueueName: "myQueue",
},
},
},
"messaging_send": {
attrs: map[string]pdata.AttributeValue{
"messaging.operation": pdata.NewAttributeValueString(""),
"messaging.system": pdata.NewAttributeValueString("kafka"),
"messaging.destination": pdata.NewAttributeValueString("myQueue"),
},
expected: model.Metadata{
Service: defaultService,
Message: model.Message{
Operation: "send",
System: "kafka",
QueueName: "myQueue",
},
},
},
"messaging_generic": {
attrs: map[string]pdata.AttributeValue{
"messaging.operation": pdata.NewAttributeValueString("receive"),
"messaging.system": pdata.NewAttributeValueString("kafka"),
"messaging.destination": pdata.NewAttributeValueString("myQueue"),
},
expected: model.Metadata{
Service: defaultService,
Message: model.Message{
Operation: "receive",
System: "kafka",
RoutingKey: "",
QueueName: "myQueue",
},
},
},
} {
t.Run(name, func(t *testing.T) {
meta := transformResourceMetadata(t, test.attrs)
Expand Down