Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce OpenTelemetry observability service #717

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions .github/workflows/observability.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
name: Observability

on:
push:
branches: [ 'main', 'release-*' ]
pull_request:
branches: [ 'main', 'release-*' ]

jobs:

observability:
name: CloudEvents
strategy:
matrix:
go-version: [1.14.x, 1.15.x]
platform: [ubuntu-latest]

runs-on: ${{ matrix.platform }}

steps:

- name: Setup Go ${{ matrix.go-version }}
uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go-version }}
id: go

- name: Checkout code
uses: actions/checkout@v2

- name: Update git submodule
run: git submodule sync && git submodule update --init

- name: Build
run: ./hack/observability-test.sh
31 changes: 31 additions & 0 deletions hack/observability-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/usr/bin/env bash

# Copyright 2021 The CloudEvents Authors
# SPDX-License-Identifier: Apache-2.0

set -o errexit
set -o nounset
set -o pipefail

COVERAGE="`pwd`/coverage.txt"

# test/observability only
pushd ./test/observability

# Prepare coverage file only if not exists
if [ ! -f $COVERAGE ]; then
touch ./coverage.tmp
echo 'mode: atomic' > $COVERAGE
fi
COVERPKG="github.com/cloudevents/sdk-go/observability/opentelemetry/v2/..."
for gomodule in $(go list ./... | grep -v /cmd | grep -v /vendor)
do
go test -v -timeout 30s -race -covermode=atomic -coverprofile=coverage.tmp -coverpkg "$COVERPKG" "$gomodule" 2>&1 | sed 's/ of statements in.*//; /warning: no packages being tested depend on matches for pattern /d'
tail -n +2 coverage.tmp >> $COVERAGE
done
rm coverage.tmp

# Remove test only deps.
go mod tidy

popd
1 change: 1 addition & 0 deletions hack/tag-release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ do
"github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
"github.com/cloudevents/sdk-go/protocol/ws/v2"
"github.com/cloudevents/sdk-go/observability/opencensus/v2"
"github.com/cloudevents/sdk-go/observability/opentelemetry/v2"
"github.com/cloudevents/sdk-go/sql/v2"
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2"
"github.com/cloudevents/sdk-go/v2" # NOTE: this needs to be last.
Expand Down
183 changes: 183 additions & 0 deletions observability/opentelemetry/v2/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# OpenTelemetry instrumentation for CloudEvents

This package contains the components necessary to instrument CloudEvents clients with OpenTelemetry. The main component is the `OTelObservabilityService` which implements the `ObservabilityService` interface from CloudEvents.

## Instrumented CloudEvents HTTP client

If you want to get a fully instrumented HTTP client, use the helper method in the ` github.com/cloudevents/sdk-go/observability/opentelemetry/v2` module:

```go
import (
"context"

otelObs "github.com/cloudevents/sdk-go/observability/opentelemetry/v2/client"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/client"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
)

// you can pass the http/client options as usual
c, _ := otelObs.NewClientHTTP([]cehttp.Option{}, []client.Option{})
```

This will produce spans for all outgoing and incoming requests. By default, the spans will have the attributes as defined in [keys.go](https://github.com/cloudevents/sdk-go/blob/release-2.5/v2/observability/keys.go). For more advanced configuration, see the next section.

## Advanced configuration

### HTTP auto-instrumentation

In order to generate spans when sending and receiving events, it's necessary to configure the HTTP client from cloudevents with OpenTelemetry instrumentation. The client has two potentially interesting points for instrumentation:

- Outgoing requests
- Incoming requests (via StartReceiver)

To fulfil these, we can use the [HTTP auto-instrumentation package](https://github.com/open-telemetry/opentelemetry-go-contrib/tree/v0.23.0/instrumentation/net/http/otelhttp) from OpenTelemetry:

```go
p, err := cloudevents.NewHTTP(
cloudevents.WithRoundTripper(otelhttp.NewTransport(http.DefaultTransport)),
cloudevents.WithMiddleware(func(next http.Handler) http.Handler {
return otelhttp.NewHandler(next, "receive")
}),
)
```

The `otelhttp.NewTransport` will ensure that spans are generated for each outgoing request, and that the `traceparent` header is properly propagated. The `otelhttp.NewHandler` will take care of incoming requests, reading the `traceparent` header and continuing the trace with a new span.

This already gives some observability "out-of-the-box", but the spans generated only contain common HTTP headers as defined in the [HTTP semantic conventions](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.6.1/specification/trace/semantic_conventions/http.md). Another point is that if using another protocol, propagation will not work and spans will not be automatically generated, unless there is an auto-instrumentation library for it.

Because of this, CloudEvents offers the `ObservabilityService` interface which is used to generate spans, independently of the chosen protocol. See next how to configure the CloudEvents client to use it.

### Using the OTelObservabilityService

The most basic way to configure the CloudEvents client to use the `OTelObservabilityService` is:

```go
c, err := cloudevents.NewClient(p, client.WithObservabilityService(otelObs.NewOTelObservabilityService()))
```

With the above configuration, the spans generated by the `OTelObservabilityService` will have:

<table>
<tbody>
<tr>
<td style="font-weight:bold">Span name</td>
<td><code>cloudevents.client.[event type] [operation name]</code> where <code>[operation name]</code> follows the OpenTelemetry <a href="https://github.com/open-telemetry/opentelemetry-specification/blob/v1.6.1/specification/trace/semantic_conventions/messaging.md#operation-names">semantic conventions for messaging systems</a></td>
</tr>
<tr>
<td style="font-weight:bold">Span attributes</td>
<td>The attributes as defined in <a href="https://github.com/cloudevents/sdk-go/blob/release-2.5/v2/observability/keys.go">keys.go</a></td>
</tr>
</tbody>
</table>

If you require different span names or extra attributes, you can pass multiple `OTelObservabilityServiceOption` options when creating the observability service:

```go
nameFormatter := func(e *cloudevents.Event) string {
return "my.custom.name." + e.Context.GetType()
}

attributesGetter := func(*cloudevents.Event) []attribute.KeyValue {
return []attribute.KeyValue{
attribute.String("my-attr", "some-value"),
}
}

// create the obs service with custom span names and attributes
os := otelObs.NewOTelObservabilityService(
otelObs.WithSpanNameFormatter(nameFormatter),
otelObs.WithSpanAttributesGetter(attributesGetter),
)

c, err := cloudevents.NewClient(p, client.WithObservabilityService(os))
```

>Note: The `nameFormatter` and `attributesGetter` functions will be called on each span creation. **Avoid** doing any heavy processing in them.

## Extra types

This package also contains extra types and helper functions that are useful in case you need to access/set the `tracecontext` in a more "low-level" way.

They allow to inject(write) and extract(read) `tracecontext` from the event. This is particularly useful when dealing with code that has no notion of a "request" nor a context. For example, long-running background processes polling from a queue.

>Note: To learn more about the propagation, take a look at the [Propagators API SPEC](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.6.1/specification/context/api-propagators.md).

## Manually extracting/injecting tracecontext from the event

When working with distributed systems, it can be difficult to achieve proper context propagation. For example, a long-running process listening to a topic does not have a "context" concept like a HTTP server receiving requests does.

>Note: The OpenTelemetry community is always creating new auto-instrumentation integrations for popular libraries and frameworks. The [OpenTelemetry registry](https://opentelemetry.io/registry/?s=kafka&component=&language=go) lists the integrations that are available. As support increases, new auto-instrumentation libraries can be integrated into the CloudEvents SDK.

For this case, it might be useful to `inject` the `tracecontext` inside the event before sending it to a queue. Later, the process can `extract` it and continue the trace normally. For that we can use the `InjectDistributedTracingExtension` and `ExtractDistributedTracingExtension` helper functions.

```go
func sendEventToQueue(ctx context.Context, event cloudevents.Event) {

// assuming this function is properly instrumented,
// the ctx contains the current span

// Before sending the event to the queue
// we can inject the tracecontext into the event as a DistributedTracingExtension
otelObs.InjectDistributedTracingExtension(ctx, event)
}
```

```go
func handleEvent(e cloudevents.Event) {
// here in our long-running process, we don't have a "context"

// if we have the tracecontext in the event, we can
// re-create the context with it and continue the trace:
ctx := otelObs.ExtractDistributedTracingExtension(context.Background(), e)

// ctx now has the tracecontext from the moment when the event was sent.

// All subsequent requests made with this context will be part of the trace.
c, _ := otelObs.NewClientHTTP([]cehttp.Option{}, []client.Option{})
ctx = cloudevents.ContextWithTarget(ctx, "my-other-cloudevents-app")
c.Send(ctx, e)
}
```

Because we used the `context` that was re-created from the event, the call to `my-other-cloudevents-app` will be correlated with the initial span. If `my-other-cloudevents-app` is also instrumented and itself make more calls, these will also be part of the trace.

Most use-cases are covered by using the `InjectDistributedTracingExtension` and `ExtractDistributedTracingExtension` helper functions.

### CloudEventCarrier

The `CloudEventCarrier` is an implementation of the OpenTelemetry [TextMapCarrier](https://github.com/open-telemetry/opentelemetry-go/blob/v1.0.0-RC3/propagation/propagation.go#L23). Its purpose is to carry the `tracecontext`, that is used by propagators later.

`CloudEventCarrier` exposes the `DistributedTracingExtension` which is populated by the propagator. It works similarly as the [HeaderCarrier](https://github.com/open-telemetry/opentelemetry-go/blob/v1.0.0-RC3/propagation/propagation.go#L44) which allows getting/setting the `traceparent` header.

It can be used to get access to the "raw" `tracecontext` values (`traceparent` and `tracestate`). One use case is to inject the `tracecontext` from the `context` into the carrier, to gain access to the populated `DistributedTracingExtension`:

```go
type MyEvent struct {
TraceParent string `json:"traceparent,omitempty"`
TraceState string `json:"tracestate,omitempty"`
}

func injectAndReadTraceParentAndState(ctx context.Context, e cloudevents.Event) {

me := MyEvent{}

// the propagator from OpenTelemetry
prop := propagation.TraceContext{}

carrier := otelObs.NewCloudEventCarrier()

// Injects (writes) the tracecontext into the NewCloudEventCarrier
// Doing so, will set the DistributedTracingExtension fields
prop.Inject(ctx, carrier)

// Here then we have the "raw" access to the tracecontext data
// https://www.w3.org/TR/trace-context/

// e.g. 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01
me.TraceParent = carrier.Extension.TraceParent

// e.g. congo=t61rcWkgMzE
me.TraceState = carrier.Extension.TraceState
}
```
34 changes: 34 additions & 0 deletions observability/opentelemetry/v2/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
Copyright 2021 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

package client

import (
obshttp "github.com/cloudevents/sdk-go/observability/opentelemetry/v2/http"
"github.com/cloudevents/sdk-go/v2/client"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
)

// NewClientHTTP produces a new client instrumented with OpenTelemetry.
func NewClientHTTP(topt []cehttp.Option, copt []client.Option, obsOpts ...OTelObservabilityServiceOption) (client.Client, error) {
t, err := obshttp.NewObservedHTTP(topt...)
if err != nil {
return nil, err
}

copt = append(
copt,
client.WithTimeNow(),
client.WithUUIDs(),
client.WithObservabilityService(NewOTelObservabilityService(obsOpts...)),
)

c, err := client.New(t, copt...)
if err != nil {
return nil, err
}

return c, nil
}
86 changes: 86 additions & 0 deletions observability/opentelemetry/v2/client/cloudevents_carrier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
Copyright 2021 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

package client

import (
"context"

cloudevents "github.com/cloudevents/sdk-go/v2"
cecontext "github.com/cloudevents/sdk-go/v2/context"
"github.com/cloudevents/sdk-go/v2/extensions"
"go.opentelemetry.io/otel/propagation"
)

// CloudEventCarrier wraps the distributed trace extension to satisfy the TextMapCarrier interface.
// https://github.com/open-telemetry/opentelemetry-go/blob/v1.0.0-RC3/propagation/propagation.go#L23
type CloudEventCarrier struct {
Extension *extensions.DistributedTracingExtension
}

// NewCloudEventCarrier creates a new CloudEventCarrier with an empty distributed tracing extension.
func NewCloudEventCarrier() CloudEventCarrier {
return CloudEventCarrier{Extension: &extensions.DistributedTracingExtension{}}
}

// NewCloudEventCarrierWithEvent creates a new CloudEventCarrier with a distributed tracing extension
// populated with the trace data from the event.
func NewCloudEventCarrierWithEvent(ctx context.Context, event cloudevents.Event) CloudEventCarrier {
var te, ok = extensions.GetDistributedTracingExtension(event)
if !ok {
cecontext.LoggerFrom(ctx).Warn("Could not get the distributed tracing extension from the event.")
return CloudEventCarrier{Extension: &extensions.DistributedTracingExtension{}}
}
return CloudEventCarrier{Extension: &te}
}

// Get returns the value associated with the passed key.
func (cec CloudEventCarrier) Get(key string) string {
switch key {
case extensions.TraceParentExtension:
return cec.Extension.TraceParent
case extensions.TraceStateExtension:
return cec.Extension.TraceState
default:
return ""
}
}

// Set stores the key-value pair.
func (cec CloudEventCarrier) Set(key string, value string) {
switch key {
case extensions.TraceParentExtension:
cec.Extension.TraceParent = value
case extensions.TraceStateExtension:
cec.Extension.TraceState = value
}
}

// Keys lists the keys stored in this carrier.
func (cec CloudEventCarrier) Keys() []string {
return []string{extensions.TraceParentExtension, extensions.TraceStateExtension}
}

// InjectDistributedTracingExtension injects the tracecontext from the context into the event as a DistributedTracingExtension
//
// If a DistributedTracingExtension is present in the provided event, its current value is replaced with the
// tracecontext obtained from the context.
func InjectDistributedTracingExtension(ctx context.Context, event cloudevents.Event) {
tc := propagation.TraceContext{}
carrier := NewCloudEventCarrier()
tc.Inject(ctx, carrier)
carrier.Extension.AddTracingAttributes(&event)
}

// ExtractDistributedTracingExtension extracts the tracecontext from the cloud event into the context.
//
// Calling this method will always replace the tracecontext in the context with the one extracted from the event.
// In case this is undesired, check first if the context has a recording span with: `trace.SpanFromContext(ctx)`
func ExtractDistributedTracingExtension(ctx context.Context, event cloudevents.Event) context.Context {
tc := propagation.TraceContext{}
carrier := NewCloudEventCarrierWithEvent(ctx, event)

return tc.Extract(ctx, carrier)
}
Loading