diff --git a/.github/workflows/observability.yaml b/.github/workflows/observability.yaml
new file mode 100644
index 000000000..63ff1c5cc
--- /dev/null
+++ b/.github/workflows/observability.yaml
@@ -0,0 +1,35 @@
+name: Observability
+
+on:
+ push:
+ branches: [ 'main', 'release-*' ]
+ pull_request:
+ branches: [ 'main', 'release-*' ]
+
+jobs:
+
+ observability:
+ name: CloudEvents
+ strategy:
+ matrix:
+ go-version: [1.14.x, 1.15.x]
+ platform: [ubuntu-latest]
+
+ runs-on: ${{ matrix.platform }}
+
+ steps:
+
+ - name: Setup Go ${{ matrix.go-version }}
+ uses: actions/setup-go@v2
+ with:
+ go-version: ${{ matrix.go-version }}
+ id: go
+
+ - name: Checkout code
+ uses: actions/checkout@v2
+
+ - name: Update git submodule
+ run: git submodule sync && git submodule update --init
+
+ - name: Build
+ run: ./hack/observability-test.sh
diff --git a/hack/observability-test.sh b/hack/observability-test.sh
new file mode 100755
index 000000000..bcac5c004
--- /dev/null
+++ b/hack/observability-test.sh
@@ -0,0 +1,31 @@
+#!/usr/bin/env bash
+
+# Copyright 2021 The CloudEvents Authors
+# SPDX-License-Identifier: Apache-2.0
+
+set -o errexit
+set -o nounset
+set -o pipefail
+
+COVERAGE="`pwd`/coverage.txt"
+
+# test/observability only
+pushd ./test/observability
+
+# Prepare coverage file only if not exists
+if [ ! -f $COVERAGE ]; then
+ touch ./coverage.tmp
+ echo 'mode: atomic' > $COVERAGE
+fi
+COVERPKG="github.com/cloudevents/sdk-go/observability/opentelemetry/v2/..."
+for gomodule in $(go list ./... | grep -v /cmd | grep -v /vendor)
+do
+ go test -v -timeout 30s -race -covermode=atomic -coverprofile=coverage.tmp -coverpkg "$COVERPKG" "$gomodule" 2>&1 | sed 's/ of statements in.*//; /warning: no packages being tested depend on matches for pattern /d'
+ tail -n +2 coverage.tmp >> $COVERAGE
+done
+rm coverage.tmp
+
+# Remove test only deps.
+go mod tidy
+
+popd
diff --git a/hack/tag-release.sh b/hack/tag-release.sh
index 908bbdea3..9a7058b8c 100755
--- a/hack/tag-release.sh
+++ b/hack/tag-release.sh
@@ -50,6 +50,7 @@ do
"github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
"github.com/cloudevents/sdk-go/protocol/ws/v2"
"github.com/cloudevents/sdk-go/observability/opencensus/v2"
+ "github.com/cloudevents/sdk-go/observability/opentelemetry/v2"
"github.com/cloudevents/sdk-go/sql/v2"
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2"
"github.com/cloudevents/sdk-go/v2" # NOTE: this needs to be last.
diff --git a/observability/opentelemetry/v2/README.md b/observability/opentelemetry/v2/README.md
new file mode 100644
index 000000000..4732799d7
--- /dev/null
+++ b/observability/opentelemetry/v2/README.md
@@ -0,0 +1,183 @@
+# OpenTelemetry instrumentation for CloudEvents
+
+This package contains the components necessary to instrument CloudEvents clients with OpenTelemetry. The main component is the `OTelObservabilityService` which implements the `ObservabilityService` interface from CloudEvents.
+
+## Instrumented CloudEvents HTTP client
+
+If you want to get a fully instrumented HTTP client, use the helper method in the ` github.com/cloudevents/sdk-go/observability/opentelemetry/v2` module:
+
+```go
+import (
+ "context"
+
+ otelObs "github.com/cloudevents/sdk-go/observability/opentelemetry/v2/client"
+ cloudevents "github.com/cloudevents/sdk-go/v2"
+ "github.com/cloudevents/sdk-go/v2/client"
+ cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
+)
+
+// you can pass the http/client options as usual
+c, _ := otelObs.NewClientHTTP([]cehttp.Option{}, []client.Option{})
+```
+
+This will produce spans for all outgoing and incoming requests. By default, the spans will have the attributes as defined in [keys.go](https://github.com/cloudevents/sdk-go/blob/release-2.5/v2/observability/keys.go). For more advanced configuration, see the next section.
+
+## Advanced configuration
+
+### HTTP auto-instrumentation
+
+In order to generate spans when sending and receiving events, it's necessary to configure the HTTP client from cloudevents with OpenTelemetry instrumentation. The client has two potentially interesting points for instrumentation:
+
+- Outgoing requests
+- Incoming requests (via StartReceiver)
+
+To fulfil these, we can use the [HTTP auto-instrumentation package](https://github.com/open-telemetry/opentelemetry-go-contrib/tree/v0.23.0/instrumentation/net/http/otelhttp) from OpenTelemetry:
+
+```go
+p, err := cloudevents.NewHTTP(
+ cloudevents.WithRoundTripper(otelhttp.NewTransport(http.DefaultTransport)),
+ cloudevents.WithMiddleware(func(next http.Handler) http.Handler {
+ return otelhttp.NewHandler(next, "receive")
+ }),
+)
+```
+
+The `otelhttp.NewTransport` will ensure that spans are generated for each outgoing request, and that the `traceparent` header is properly propagated. The `otelhttp.NewHandler` will take care of incoming requests, reading the `traceparent` header and continuing the trace with a new span.
+
+This already gives some observability "out-of-the-box", but the spans generated only contain common HTTP headers as defined in the [HTTP semantic conventions](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.6.1/specification/trace/semantic_conventions/http.md). Another point is that if using another protocol, propagation will not work and spans will not be automatically generated, unless there is an auto-instrumentation library for it.
+
+Because of this, CloudEvents offers the `ObservabilityService` interface which is used to generate spans, independently of the chosen protocol. See next how to configure the CloudEvents client to use it.
+
+### Using the OTelObservabilityService
+
+The most basic way to configure the CloudEvents client to use the `OTelObservabilityService` is:
+
+```go
+c, err := cloudevents.NewClient(p, client.WithObservabilityService(otelObs.NewOTelObservabilityService()))
+```
+
+With the above configuration, the spans generated by the `OTelObservabilityService` will have:
+
+
+
+If you require different span names or extra attributes, you can pass multiple `OTelObservabilityServiceOption` options when creating the observability service:
+
+```go
+nameFormatter := func(e *cloudevents.Event) string {
+ return "my.custom.name." + e.Context.GetType()
+}
+
+attributesGetter := func(*cloudevents.Event) []attribute.KeyValue {
+ return []attribute.KeyValue{
+ attribute.String("my-attr", "some-value"),
+ }
+}
+
+// create the obs service with custom span names and attributes
+os := otelObs.NewOTelObservabilityService(
+ otelObs.WithSpanNameFormatter(nameFormatter),
+ otelObs.WithSpanAttributesGetter(attributesGetter),
+)
+
+c, err := cloudevents.NewClient(p, client.WithObservabilityService(os))
+```
+
+>Note: The `nameFormatter` and `attributesGetter` functions will be called on each span creation. **Avoid** doing any heavy processing in them.
+
+## Extra types
+
+This package also contains extra types and helper functions that are useful in case you need to access/set the `tracecontext` in a more "low-level" way.
+
+They allow to inject(write) and extract(read) `tracecontext` from the event. This is particularly useful when dealing with code that has no notion of a "request" nor a context. For example, long-running background processes polling from a queue.
+
+>Note: To learn more about the propagation, take a look at the [Propagators API SPEC](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.6.1/specification/context/api-propagators.md).
+
+## Manually extracting/injecting tracecontext from the event
+
+When working with distributed systems, it can be difficult to achieve proper context propagation. For example, a long-running process listening to a topic does not have a "context" concept like a HTTP server receiving requests does.
+
+>Note: The OpenTelemetry community is always creating new auto-instrumentation integrations for popular libraries and frameworks. The [OpenTelemetry registry](https://opentelemetry.io/registry/?s=kafka&component=&language=go) lists the integrations that are available. As support increases, new auto-instrumentation libraries can be integrated into the CloudEvents SDK.
+
+For this case, it might be useful to `inject` the `tracecontext` inside the event before sending it to a queue. Later, the process can `extract` it and continue the trace normally. For that we can use the `InjectDistributedTracingExtension` and `ExtractDistributedTracingExtension` helper functions.
+
+```go
+func sendEventToQueue(ctx context.Context, event cloudevents.Event) {
+
+ // assuming this function is properly instrumented,
+ // the ctx contains the current span
+
+ // Before sending the event to the queue
+ // we can inject the tracecontext into the event as a DistributedTracingExtension
+ otelObs.InjectDistributedTracingExtension(ctx, event)
+}
+```
+
+```go
+func handleEvent(e cloudevents.Event) {
+ // here in our long-running process, we don't have a "context"
+
+ // if we have the tracecontext in the event, we can
+ // re-create the context with it and continue the trace:
+ ctx := otelObs.ExtractDistributedTracingExtension(context.Background(), e)
+
+ // ctx now has the tracecontext from the moment when the event was sent.
+
+ // All subsequent requests made with this context will be part of the trace.
+ c, _ := otelObs.NewClientHTTP([]cehttp.Option{}, []client.Option{})
+ ctx = cloudevents.ContextWithTarget(ctx, "my-other-cloudevents-app")
+ c.Send(ctx, e)
+}
+```
+
+Because we used the `context` that was re-created from the event, the call to `my-other-cloudevents-app` will be correlated with the initial span. If `my-other-cloudevents-app` is also instrumented and itself make more calls, these will also be part of the trace.
+
+Most use-cases are covered by using the `InjectDistributedTracingExtension` and `ExtractDistributedTracingExtension` helper functions.
+
+### CloudEventCarrier
+
+The `CloudEventCarrier` is an implementation of the OpenTelemetry [TextMapCarrier](https://github.com/open-telemetry/opentelemetry-go/blob/v1.0.0-RC3/propagation/propagation.go#L23). Its purpose is to carry the `tracecontext`, that is used by propagators later.
+
+`CloudEventCarrier` exposes the `DistributedTracingExtension` which is populated by the propagator. It works similarly as the [HeaderCarrier](https://github.com/open-telemetry/opentelemetry-go/blob/v1.0.0-RC3/propagation/propagation.go#L44) which allows getting/setting the `traceparent` header.
+
+It can be used to get access to the "raw" `tracecontext` values (`traceparent` and `tracestate`). One use case is to inject the `tracecontext` from the `context` into the carrier, to gain access to the populated `DistributedTracingExtension`:
+
+```go
+type MyEvent struct {
+ TraceParent string `json:"traceparent,omitempty"`
+ TraceState string `json:"tracestate,omitempty"`
+}
+
+func injectAndReadTraceParentAndState(ctx context.Context, e cloudevents.Event) {
+
+ me := MyEvent{}
+
+ // the propagator from OpenTelemetry
+ prop := propagation.TraceContext{}
+
+ carrier := otelObs.NewCloudEventCarrier()
+
+ // Injects (writes) the tracecontext into the NewCloudEventCarrier
+ // Doing so, will set the DistributedTracingExtension fields
+ prop.Inject(ctx, carrier)
+
+ // Here then we have the "raw" access to the tracecontext data
+ // https://www.w3.org/TR/trace-context/
+
+ // e.g. 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01
+ me.TraceParent = carrier.Extension.TraceParent
+
+ // e.g. congo=t61rcWkgMzE
+ me.TraceState = carrier.Extension.TraceState
+}
+```
\ No newline at end of file
diff --git a/observability/opentelemetry/v2/client/client.go b/observability/opentelemetry/v2/client/client.go
new file mode 100644
index 000000000..dab2bae51
--- /dev/null
+++ b/observability/opentelemetry/v2/client/client.go
@@ -0,0 +1,34 @@
+/*
+ Copyright 2021 The CloudEvents Authors
+ SPDX-License-Identifier: Apache-2.0
+*/
+
+package client
+
+import (
+ obshttp "github.com/cloudevents/sdk-go/observability/opentelemetry/v2/http"
+ "github.com/cloudevents/sdk-go/v2/client"
+ cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
+)
+
+// NewClientHTTP produces a new client instrumented with OpenTelemetry.
+func NewClientHTTP(topt []cehttp.Option, copt []client.Option, obsOpts ...OTelObservabilityServiceOption) (client.Client, error) {
+ t, err := obshttp.NewObservedHTTP(topt...)
+ if err != nil {
+ return nil, err
+ }
+
+ copt = append(
+ copt,
+ client.WithTimeNow(),
+ client.WithUUIDs(),
+ client.WithObservabilityService(NewOTelObservabilityService(obsOpts...)),
+ )
+
+ c, err := client.New(t, copt...)
+ if err != nil {
+ return nil, err
+ }
+
+ return c, nil
+}
diff --git a/observability/opentelemetry/v2/client/cloudevents_carrier.go b/observability/opentelemetry/v2/client/cloudevents_carrier.go
new file mode 100644
index 000000000..5b858bea4
--- /dev/null
+++ b/observability/opentelemetry/v2/client/cloudevents_carrier.go
@@ -0,0 +1,86 @@
+/*
+ Copyright 2021 The CloudEvents Authors
+ SPDX-License-Identifier: Apache-2.0
+*/
+
+package client
+
+import (
+ "context"
+
+ cloudevents "github.com/cloudevents/sdk-go/v2"
+ cecontext "github.com/cloudevents/sdk-go/v2/context"
+ "github.com/cloudevents/sdk-go/v2/extensions"
+ "go.opentelemetry.io/otel/propagation"
+)
+
+// CloudEventCarrier wraps the distributed trace extension to satisfy the TextMapCarrier interface.
+// https://github.com/open-telemetry/opentelemetry-go/blob/v1.0.0-RC3/propagation/propagation.go#L23
+type CloudEventCarrier struct {
+ Extension *extensions.DistributedTracingExtension
+}
+
+// NewCloudEventCarrier creates a new CloudEventCarrier with an empty distributed tracing extension.
+func NewCloudEventCarrier() CloudEventCarrier {
+ return CloudEventCarrier{Extension: &extensions.DistributedTracingExtension{}}
+}
+
+// NewCloudEventCarrierWithEvent creates a new CloudEventCarrier with a distributed tracing extension
+// populated with the trace data from the event.
+func NewCloudEventCarrierWithEvent(ctx context.Context, event cloudevents.Event) CloudEventCarrier {
+ var te, ok = extensions.GetDistributedTracingExtension(event)
+ if !ok {
+ cecontext.LoggerFrom(ctx).Warn("Could not get the distributed tracing extension from the event.")
+ return CloudEventCarrier{Extension: &extensions.DistributedTracingExtension{}}
+ }
+ return CloudEventCarrier{Extension: &te}
+}
+
+// Get returns the value associated with the passed key.
+func (cec CloudEventCarrier) Get(key string) string {
+ switch key {
+ case extensions.TraceParentExtension:
+ return cec.Extension.TraceParent
+ case extensions.TraceStateExtension:
+ return cec.Extension.TraceState
+ default:
+ return ""
+ }
+}
+
+// Set stores the key-value pair.
+func (cec CloudEventCarrier) Set(key string, value string) {
+ switch key {
+ case extensions.TraceParentExtension:
+ cec.Extension.TraceParent = value
+ case extensions.TraceStateExtension:
+ cec.Extension.TraceState = value
+ }
+}
+
+// Keys lists the keys stored in this carrier.
+func (cec CloudEventCarrier) Keys() []string {
+ return []string{extensions.TraceParentExtension, extensions.TraceStateExtension}
+}
+
+// InjectDistributedTracingExtension injects the tracecontext from the context into the event as a DistributedTracingExtension
+//
+// If a DistributedTracingExtension is present in the provided event, its current value is replaced with the
+// tracecontext obtained from the context.
+func InjectDistributedTracingExtension(ctx context.Context, event cloudevents.Event) {
+ tc := propagation.TraceContext{}
+ carrier := NewCloudEventCarrier()
+ tc.Inject(ctx, carrier)
+ carrier.Extension.AddTracingAttributes(&event)
+}
+
+// ExtractDistributedTracingExtension extracts the tracecontext from the cloud event into the context.
+//
+// Calling this method will always replace the tracecontext in the context with the one extracted from the event.
+// In case this is undesired, check first if the context has a recording span with: `trace.SpanFromContext(ctx)`
+func ExtractDistributedTracingExtension(ctx context.Context, event cloudevents.Event) context.Context {
+ tc := propagation.TraceContext{}
+ carrier := NewCloudEventCarrierWithEvent(ctx, event)
+
+ return tc.Extract(ctx, carrier)
+}
diff --git a/observability/opentelemetry/v2/client/otel_observability_service.go b/observability/opentelemetry/v2/client/otel_observability_service.go
new file mode 100644
index 000000000..25ead8331
--- /dev/null
+++ b/observability/opentelemetry/v2/client/otel_observability_service.go
@@ -0,0 +1,217 @@
+/*
+ Copyright 2021 The CloudEvents Authors
+ SPDX-License-Identifier: Apache-2.0
+*/
+
+package client
+
+import (
+ "context"
+ "runtime"
+ "strings"
+
+ "go.opentelemetry.io/otel"
+ "go.opentelemetry.io/otel/attribute"
+ semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
+ "go.opentelemetry.io/otel/trace"
+
+ cloudevents "github.com/cloudevents/sdk-go/v2"
+ "github.com/cloudevents/sdk-go/v2/binding"
+ "github.com/cloudevents/sdk-go/v2/observability"
+ "github.com/cloudevents/sdk-go/v2/protocol"
+ cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
+)
+
+// OTelObservabilityService implements the ObservabilityService interface from cloudevents
+type OTelObservabilityService struct {
+ tracer trace.Tracer
+ spanAttributesGetter func(cloudevents.Event) []attribute.KeyValue
+ spanNameFormatter func(cloudevents.Event) string
+}
+
+// NewOTelObservabilityService returns an OpenTelemetry-enabled observability service
+func NewOTelObservabilityService(opts ...OTelObservabilityServiceOption) *OTelObservabilityService {
+ tracerProvider := otel.GetTracerProvider()
+
+ o := &OTelObservabilityService{
+ tracer: tracerProvider.Tracer(
+ instrumentationName,
+ // TODO: Can we have the package version here?
+ // trace.WithInstrumentationVersion("1.0.0"),
+ ),
+ spanNameFormatter: defaultSpanNameFormatter,
+ }
+
+ // apply passed options
+ for _, opt := range opts {
+ opt(o)
+ }
+
+ return o
+}
+
+// InboundContextDecorators returns a decorator function that allows enriching the context with the incoming parent trace.
+// This method gets invoked automatically by passing the option 'WithObservabilityService' when creating the cloudevents HTTP client.
+func (o OTelObservabilityService) InboundContextDecorators() []func(context.Context, binding.Message) context.Context {
+ return []func(context.Context, binding.Message) context.Context{tracePropagatorContextDecorator}
+}
+
+// RecordReceivedMalformedEvent records the error from a malformed event in the span.
+func (o OTelObservabilityService) RecordReceivedMalformedEvent(ctx context.Context, err error) {
+ spanName := observability.ClientSpanName + ".malformed receive"
+ _, span := o.tracer.Start(
+ ctx, spanName,
+ trace.WithSpanKind(trace.SpanKindConsumer),
+ trace.WithAttributes(attribute.String(string(semconv.CodeFunctionKey), getFuncName())))
+
+ recordSpanError(span, err)
+ span.End()
+}
+
+// RecordCallingInvoker starts a new span before calling the invoker upon a received event.
+// In case the operation fails, the error is recorded and the span is marked as failed.
+func (o OTelObservabilityService) RecordCallingInvoker(ctx context.Context, event *cloudevents.Event) (context.Context, func(errOrResult error)) {
+ spanName := o.getSpanName(event, "process")
+ ctx, span := o.tracer.Start(
+ ctx, spanName,
+ trace.WithSpanKind(trace.SpanKindConsumer),
+ trace.WithAttributes(GetDefaultSpanAttributes(event, getFuncName())...))
+
+ if span.IsRecording() && o.spanAttributesGetter != nil {
+ span.SetAttributes(o.spanAttributesGetter(*event)...)
+ }
+
+ return ctx, func(errOrResult error) {
+ recordSpanError(span, errOrResult)
+ span.End()
+ }
+}
+
+// RecordSendingEvent starts a new span before sending the event.
+// In case the operation fails, the error is recorded and the span is marked as failed.
+func (o OTelObservabilityService) RecordSendingEvent(ctx context.Context, event cloudevents.Event) (context.Context, func(errOrResult error)) {
+ spanName := o.getSpanName(&event, "send")
+
+ ctx, span := o.tracer.Start(
+ ctx, spanName,
+ trace.WithSpanKind(trace.SpanKindProducer),
+ trace.WithAttributes(GetDefaultSpanAttributes(&event, getFuncName())...))
+
+ if span.IsRecording() && o.spanAttributesGetter != nil {
+ span.SetAttributes(o.spanAttributesGetter(event)...)
+ }
+
+ return ctx, func(errOrResult error) {
+ recordSpanError(span, errOrResult)
+ span.End()
+ }
+}
+
+// RecordRequestEvent starts a new span before transmitting the given request.
+// In case the operation fails, the error is recorded and the span is marked as failed.
+func (o OTelObservabilityService) RecordRequestEvent(ctx context.Context, event cloudevents.Event) (context.Context, func(errOrResult error, event *cloudevents.Event)) {
+ spanName := o.getSpanName(&event, "send")
+
+ ctx, span := o.tracer.Start(
+ ctx, spanName,
+ trace.WithSpanKind(trace.SpanKindProducer),
+ trace.WithAttributes(GetDefaultSpanAttributes(&event, getFuncName())...))
+
+ if span.IsRecording() && o.spanAttributesGetter != nil {
+ span.SetAttributes(o.spanAttributesGetter(event)...)
+ }
+
+ return ctx, func(errOrResult error, event *cloudevents.Event) {
+ recordSpanError(span, errOrResult)
+ span.End()
+ }
+}
+
+// GetDefaultSpanAttributes returns the attributes that are always added to the spans
+// created by the OTelObservabilityService.
+func GetDefaultSpanAttributes(e *cloudevents.Event, method string) []attribute.KeyValue {
+ attr := []attribute.KeyValue{
+ attribute.String(string(semconv.CodeFunctionKey), method),
+ attribute.String(observability.SpecversionAttr, e.SpecVersion()),
+ attribute.String(observability.IdAttr, e.ID()),
+ attribute.String(observability.TypeAttr, e.Type()),
+ attribute.String(observability.SourceAttr, e.Source()),
+ }
+ if sub := e.Subject(); sub != "" {
+ attr = append(attr, attribute.String(observability.SubjectAttr, sub))
+ }
+ if dct := e.DataContentType(); dct != "" {
+ attr = append(attr, attribute.String(observability.DatacontenttypeAttr, dct))
+ }
+ return attr
+}
+
+// Extracts the traceparent from the msg and enriches the context to enable propagation
+func tracePropagatorContextDecorator(ctx context.Context, msg binding.Message) context.Context {
+ var messageCtx context.Context
+ if mctx, ok := msg.(binding.MessageContext); ok {
+ messageCtx = mctx.Context()
+ } else if mctx, ok := binding.UnwrapMessage(msg).(binding.MessageContext); ok {
+ messageCtx = mctx.Context()
+ }
+
+ if messageCtx == nil {
+ return ctx
+ }
+ span := trace.SpanFromContext(messageCtx)
+ if span == nil {
+ return ctx
+ }
+ return trace.ContextWithSpan(ctx, span)
+}
+
+func recordSpanError(span trace.Span, errOrResult error) {
+ if protocol.IsACK(errOrResult) || !span.IsRecording() {
+ return
+ }
+
+ var httpResult *cehttp.Result
+ if cloudevents.ResultAs(errOrResult, &httpResult) {
+ span.RecordError(httpResult)
+ if httpResult.StatusCode > 0 {
+ code, _ := semconv.SpanStatusFromHTTPStatusCode(httpResult.StatusCode)
+ span.SetStatus(code, httpResult.Error())
+ }
+ } else {
+ span.RecordError(errOrResult)
+ }
+}
+
+// getSpanName Returns the name of the span.
+//
+// When no spanNameFormatter is present in OTelObservabilityService,
+// the default name will be "cloudevents.client. prefix" e.g. cloudevents.client.get.customers send.
+//
+// The prefix is always added at the end of the span name. This follows the semantic conventions for
+// messasing systems as defined in https://github.com/open-telemetry/opentelemetry-specification/blob/v1.6.1/specification/trace/semantic_conventions/messaging.md#operation-names
+func (o OTelObservabilityService) getSpanName(e *cloudevents.Event, suffix string) string {
+ name := o.spanNameFormatter(*e)
+
+ // make sure the span name ends with the suffix from the semantic conventions (receive, send, process)
+ if !strings.HasSuffix(name, suffix) {
+ return name + " " + suffix
+ }
+
+ return name
+}
+
+func getFuncName() string {
+ pc := make([]uintptr, 1)
+ n := runtime.Callers(2, pc)
+ frames := runtime.CallersFrames(pc[:n])
+ frame, _ := frames.Next()
+
+ // frame.Function should be github.com/cloudevents/sdk-go/observability/opentelemetry/v2/client.OTelObservabilityService.Func
+ parts := strings.Split(frame.Function, ".")
+
+ // we are interested in the function name
+ if len(parts) != 4 {
+ return ""
+ }
+ return parts[3]
+}
diff --git a/observability/opentelemetry/v2/client/otel_options.go b/observability/opentelemetry/v2/client/otel_options.go
new file mode 100644
index 000000000..053cd1558
--- /dev/null
+++ b/observability/opentelemetry/v2/client/otel_options.go
@@ -0,0 +1,42 @@
+/*
+ Copyright 2021 The CloudEvents Authors
+ SPDX-License-Identifier: Apache-2.0
+*/
+
+package client
+
+import (
+ "go.opentelemetry.io/otel/attribute"
+
+ cloudevents "github.com/cloudevents/sdk-go/v2"
+ "github.com/cloudevents/sdk-go/v2/observability"
+)
+
+const (
+ // The value for the `otel.library.name` span attribute
+ instrumentationName = "github.com/cloudevents/sdk-go/observability/opentelemetry/v2"
+)
+
+type OTelObservabilityServiceOption func(*OTelObservabilityService)
+
+// WithSpanAttributesGetter appends the returned attributes from the function to the span.
+func WithSpanAttributesGetter(attrGetter func(cloudevents.Event) []attribute.KeyValue) OTelObservabilityServiceOption {
+ return func(os *OTelObservabilityService) {
+ if attrGetter != nil {
+ os.spanAttributesGetter = attrGetter
+ }
+ }
+}
+
+// WithSpanNameFormatter replaces the default span name with the string returned from the function
+func WithSpanNameFormatter(nameFormatter func(cloudevents.Event) string) OTelObservabilityServiceOption {
+ return func(os *OTelObservabilityService) {
+ if nameFormatter != nil {
+ os.spanNameFormatter = nameFormatter
+ }
+ }
+}
+
+var defaultSpanNameFormatter func(cloudevents.Event) string = func(e cloudevents.Event) string {
+ return observability.ClientSpanName + "." + e.Context.GetType()
+}
diff --git a/observability/opentelemetry/v2/go.mod b/observability/opentelemetry/v2/go.mod
new file mode 100644
index 000000000..61196b52b
--- /dev/null
+++ b/observability/opentelemetry/v2/go.mod
@@ -0,0 +1,12 @@
+module github.com/cloudevents/sdk-go/observability/opentelemetry/v2
+
+go 1.14
+
+require (
+ github.com/cloudevents/sdk-go/v2 v2.5.0
+ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.23.0
+ go.opentelemetry.io/otel v1.0.0
+ go.opentelemetry.io/otel/trace v1.0.0
+)
+
+replace github.com/cloudevents/sdk-go/v2 => ../../../v2
diff --git a/observability/opentelemetry/v2/go.sum b/observability/opentelemetry/v2/go.sum
new file mode 100644
index 000000000..f9630635d
--- /dev/null
+++ b/observability/opentelemetry/v2/go.sum
@@ -0,0 +1,65 @@
+github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/felixge/httpsnoop v1.0.2 h1:+nS9g82KMXccJ/wp0zyRW9ZBHFETmMGtkk+2CTTrW4o=
+github.com/felixge/httpsnoop v1.0.2/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
+github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
+github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
+github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
+github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
+github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
+github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg=
+github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
+github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
+github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
+github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
+go.opentelemetry.io/contrib v0.23.0 h1:MgRuo0JZZX8J9WLRjyd7OpTSbaLOdQXXJa6SnZvlWLM=
+go.opentelemetry.io/contrib v0.23.0/go.mod h1:EH4yDYeNoaTqn/8yCWQmfNB78VHfGX2Jt2bvnvzBlGM=
+go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.23.0 h1:hNSH6f4WUMDnRAvUCLItD0WKzQqAPoECvORj+ZChbnA=
+go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.23.0/go.mod h1:wLrbAf2Qb+kFsEjowrxOcuy2SE0dcY0VwFiiYCmUeFQ=
+go.opentelemetry.io/otel v1.0.0-RC3/go.mod h1:Ka5j3ua8tZs4Rkq4Ex3hwgBgOchyPVq5S6P2lz//nKQ=
+go.opentelemetry.io/otel v1.0.0 h1:qTTn6x71GVBvoafHK/yaRUmFzI4LcONZD0/kXxl5PHI=
+go.opentelemetry.io/otel v1.0.0/go.mod h1:AjRVh9A5/5DE7S+mZtTR6t8vpKKryam+0lREnfmS4cg=
+go.opentelemetry.io/otel/internal/metric v0.23.0 h1:mPfzm9Iqhw7G2nDBmUAjFTfPqLZPbOW2k7QI57ITbaI=
+go.opentelemetry.io/otel/internal/metric v0.23.0/go.mod h1:z+RPiDJe30YnCrOhFGivwBS+DU1JU/PiLKkk4re2DNY=
+go.opentelemetry.io/otel/metric v0.23.0 h1:mYCcDxi60P4T27/0jchIDFa1WHEfQeU3zH9UEMpnj2c=
+go.opentelemetry.io/otel/metric v0.23.0/go.mod h1:G/Nn9InyNnIv7J6YVkQfpc0JCfKBNJaERBGw08nqmVQ=
+go.opentelemetry.io/otel/trace v1.0.0-RC3/go.mod h1:VUt2TUYd8S2/ZRX09ZDFZQwn2RqfMB5MzO17jBojGxo=
+go.opentelemetry.io/otel/trace v1.0.0 h1:TSBr8GTEtKevYMG/2d21M989r5WJYVimhTHBKVEZuh4=
+go.opentelemetry.io/otel/trace v1.0.0/go.mod h1:PXTWqayeFUlJV1YDNhsJYB184+IvAH814St6o6ajzIs=
+go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
+go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
+go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
+go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
+go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
+go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
+golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
+gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/observability/opentelemetry/v2/http/http.go b/observability/opentelemetry/v2/http/http.go
new file mode 100644
index 000000000..5a7d6670f
--- /dev/null
+++ b/observability/opentelemetry/v2/http/http.go
@@ -0,0 +1,29 @@
+/*
+ Copyright 2021 The CloudEvents Authors
+ SPDX-License-Identifier: Apache-2.0
+*/
+
+package http
+
+import (
+ "net/http"
+
+ "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
+
+ cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
+)
+
+// NewObservedHTTP creates an HTTP protocol with OTel trace propagating middleware.
+func NewObservedHTTP(opts ...cehttp.Option) (*cehttp.Protocol, error) {
+ // appends the OpenTelemetry Http transport + Middleware wrapper
+ // to properly trace outgoing and incoming requests from the client using this protocol
+ return cehttp.New(append(
+ []cehttp.Option{
+ cehttp.WithRoundTripper(otelhttp.NewTransport(http.DefaultTransport)),
+ cehttp.WithMiddleware(func(next http.Handler) http.Handler {
+ return otelhttp.NewHandler(next, "cloudevents.http.receiver")
+ }),
+ },
+ opts...,
+ )...)
+}
diff --git a/samples/http/go.mod b/samples/http/go.mod
index b4da920e5..a26801734 100644
--- a/samples/http/go.mod
+++ b/samples/http/go.mod
@@ -6,11 +6,17 @@ require (
contrib.go.opencensus.io/exporter/prometheus v0.1.0
github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.5.0
github.com/cloudevents/sdk-go/observability/opencensus/v2 v2.5.0
+ github.com/cloudevents/sdk-go/observability/opentelemetry/v2 v2.5.0
github.com/cloudevents/sdk-go/v2 v2.5.0
github.com/google/uuid v1.1.1
github.com/gorilla/mux v1.7.3
github.com/kelseyhightower/envconfig v1.4.0
go.opencensus.io v0.22.3
+ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.23.0
+ go.opentelemetry.io/otel v1.0.0
+ go.opentelemetry.io/otel/exporters/jaeger v1.0.0
+ go.opentelemetry.io/otel/sdk v1.0.0
+ go.opentelemetry.io/otel/trace v1.0.0
google.golang.org/protobuf v1.26.0
)
@@ -19,3 +25,5 @@ replace github.com/cloudevents/sdk-go/v2 => ../../v2
replace github.com/cloudevents/sdk-go/binding/format/protobuf/v2 => ../../binding/format/protobuf/v2
replace github.com/cloudevents/sdk-go/observability/opencensus/v2 => ../../observability/opencensus/v2
+
+replace github.com/cloudevents/sdk-go/observability/opentelemetry/v2 => ../../observability/opentelemetry/v2
diff --git a/samples/http/go.sum b/samples/http/go.sum
index c04b474e0..f6e0204f0 100644
--- a/samples/http/go.sum
+++ b/samples/http/go.sum
@@ -9,6 +9,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/felixge/httpsnoop v1.0.2 h1:+nS9g82KMXccJ/wp0zyRW9ZBHFETmMGtkk+2CTTrW4o=
+github.com/felixge/httpsnoop v1.0.2/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
@@ -32,8 +34,9 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
-github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
+github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@@ -82,17 +85,37 @@ github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jO
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a h1:9a8MnZMP0X2nLJdBg+pBmGgkJlSaKC2KaQmTCk1XDtE=
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
+github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
-github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
+github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.3 h1:8sGtKOrtQqkN1bp2AtX+misvLIlOmsEsNd+9NIcPEm8=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
+go.opentelemetry.io/contrib v0.23.0 h1:MgRuo0JZZX8J9WLRjyd7OpTSbaLOdQXXJa6SnZvlWLM=
+go.opentelemetry.io/contrib v0.23.0/go.mod h1:EH4yDYeNoaTqn/8yCWQmfNB78VHfGX2Jt2bvnvzBlGM=
+go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.23.0 h1:hNSH6f4WUMDnRAvUCLItD0WKzQqAPoECvORj+ZChbnA=
+go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.23.0/go.mod h1:wLrbAf2Qb+kFsEjowrxOcuy2SE0dcY0VwFiiYCmUeFQ=
+go.opentelemetry.io/otel v1.0.0-RC3/go.mod h1:Ka5j3ua8tZs4Rkq4Ex3hwgBgOchyPVq5S6P2lz//nKQ=
+go.opentelemetry.io/otel v1.0.0 h1:qTTn6x71GVBvoafHK/yaRUmFzI4LcONZD0/kXxl5PHI=
+go.opentelemetry.io/otel v1.0.0/go.mod h1:AjRVh9A5/5DE7S+mZtTR6t8vpKKryam+0lREnfmS4cg=
+go.opentelemetry.io/otel/exporters/jaeger v1.0.0 h1:cLhx8llHw02h5JTqGqaRbYn+QVKHmrzD9vEbKnSPk5U=
+go.opentelemetry.io/otel/exporters/jaeger v1.0.0/go.mod h1:q10N1AolE1JjqKrFJK2tYw0iZpmX+HBaXBtuCzRnBGQ=
+go.opentelemetry.io/otel/internal/metric v0.23.0 h1:mPfzm9Iqhw7G2nDBmUAjFTfPqLZPbOW2k7QI57ITbaI=
+go.opentelemetry.io/otel/internal/metric v0.23.0/go.mod h1:z+RPiDJe30YnCrOhFGivwBS+DU1JU/PiLKkk4re2DNY=
+go.opentelemetry.io/otel/metric v0.23.0 h1:mYCcDxi60P4T27/0jchIDFa1WHEfQeU3zH9UEMpnj2c=
+go.opentelemetry.io/otel/metric v0.23.0/go.mod h1:G/Nn9InyNnIv7J6YVkQfpc0JCfKBNJaERBGw08nqmVQ=
+go.opentelemetry.io/otel/sdk v1.0.0 h1:BNPMYUONPNbLneMttKSjQhOTlFLOD9U22HNG1KrIN2Y=
+go.opentelemetry.io/otel/sdk v1.0.0/go.mod h1:PCrDHlSy5x1kjezSdL37PhbFUMjrsLRshJ2zCzeXwbM=
+go.opentelemetry.io/otel/trace v1.0.0-RC3/go.mod h1:VUt2TUYd8S2/ZRX09ZDFZQwn2RqfMB5MzO17jBojGxo=
+go.opentelemetry.io/otel/trace v1.0.0 h1:TSBr8GTEtKevYMG/2d21M989r5WJYVimhTHBKVEZuh4=
+go.opentelemetry.io/otel/trace v1.0.0/go.mod h1:PXTWqayeFUlJV1YDNhsJYB184+IvAH814St6o6ajzIs=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
@@ -132,8 +155,9 @@ golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 h1:iGu644GcxtEcrInvDsQRCwJjtCIOlT2V7IRt6ah2Whw=
+golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
@@ -172,4 +196,6 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
diff --git a/samples/http/otel-sender-receiver/README.md b/samples/http/otel-sender-receiver/README.md
new file mode 100644
index 000000000..df231ff30
--- /dev/null
+++ b/samples/http/otel-sender-receiver/README.md
@@ -0,0 +1,54 @@
+# CloudEvents with OpenTelemetry instrumentation
+
+This sample demonstrates both client and server apps using CloudEvents instrumented with OpenTelemetry.
+
+> If this is your first time hearing about OpenTelemetry, take a look at the official documentation: [What is OpenTelemetry](https://opentelemetry.io/docs/concepts/what-is-opentelemetry/).
+
+The client app sends an event using CloudEvents to a server. The server listens for events also using CloudEvents. For each event it receives, it makes then an external HTTP request to `cloudevents.io`, just to illustrate the context propagation working.
+
+Here's a diagram to examplify the communication between the components:
+
+![client > server > external service](./request-flow.png "Sample app request flow")
+
+Both apps are configured to send the spans to a locally running [Jaeger](https://www.jaegertracing.io/) instance. See instructions below on how to run everything.
+
+## Requirements to run
+
+- Docker/Compose
+- Go
+
+## Running the sample
+
+1. Download dependencies
+
+```shell
+$ cd sdk-go\samples\http
+$ go mod download
+$ cd otel-sender-receiver
+```
+
+2. Start the Jaeger container
+
+```shell
+$ docker-compose up
+```
+
+3. Run the server and the client
+
+```shell
+$ go run server/server.go
+```
+
+```shell
+$ go run client/client.go
+```
+
+4. Open the Jaeger UI (by default Jaeger exposes the UI at [http://localhost:16686](http://localhost:16686))
+
+5. On the `Service` dropdown select `cloudevents-client`. Next, click on `Find Traces`.
+
+![Selecting the service on Jaeger](./jaeger-find-traces.png "Finding our traces in Jaeger")
+
+6. On the right side you should see a trace with name `cloudevents-client: cloudevents.client.example.type send`. Click there to see the full trace:
+
+![The complete trace on Jaeger](./jaeger-example.png "The full trace")
diff --git a/samples/http/otel-sender-receiver/client/client.go b/samples/http/otel-sender-receiver/client/client.go
new file mode 100644
index 000000000..736da74cc
--- /dev/null
+++ b/samples/http/otel-sender-receiver/client/client.go
@@ -0,0 +1,43 @@
+/*
+ Copyright 2021 The CloudEvents Authors
+ SPDX-License-Identifier: Apache-2.0
+*/
+
+package main
+
+import (
+ "context"
+ "log"
+
+ otelObs "github.com/cloudevents/sdk-go/observability/opentelemetry/v2/client"
+ "github.com/cloudevents/sdk-go/samples/http/otel-sender-receiver/instrumentation"
+ cloudevents "github.com/cloudevents/sdk-go/v2"
+ "github.com/cloudevents/sdk-go/v2/client"
+ cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
+)
+
+const (
+ serviceName = "cloudevents-client"
+)
+
+func main() {
+ shutdown := instrumentation.InitOTelSdk(serviceName)
+ defer shutdown()
+
+ // create the cloudevents client instrumented with OpenTelemetry
+ c, err := otelObs.NewClientHTTP([]cehttp.Option{}, []client.Option{})
+ if err != nil {
+ log.Fatalf("failed to create client, %v", err)
+ }
+
+ event := cloudevents.NewEvent()
+ event.SetSource("example/uri")
+ event.SetType("example.type")
+ event.SetData(cloudevents.ApplicationJSON, map[string]string{"hello": "world"})
+
+ ctx := cloudevents.ContextWithTarget(context.Background(), "http://localhost:8080/")
+
+ if result := c.Send(ctx, event); cloudevents.IsUndelivered(result) {
+ log.Fatalf("failed to send, %v", result)
+ }
+}
diff --git a/samples/http/otel-sender-receiver/docker-compose.yaml b/samples/http/otel-sender-receiver/docker-compose.yaml
new file mode 100644
index 000000000..afb4c8a6c
--- /dev/null
+++ b/samples/http/otel-sender-receiver/docker-compose.yaml
@@ -0,0 +1,16 @@
+version: "2.4"
+services:
+ jaeger-otel-cloudevents:
+ image: jaegertracing/all-in-one:1.25
+ environment:
+ - COLLECTOR_ZIPKIN_HOST_PORT=9411
+ ports:
+ - "5775:5775/udp"
+ - "6831:6831/udp"
+ - "6832:6832/udp"
+ - "5778:5778"
+ - "16686:16686"
+ - "14268:14268"
+ - "14250:14250"
+ - "9411:9411"
+ command: --query.max-clock-skew-adjustment 1s
diff --git a/samples/http/otel-sender-receiver/instrumentation/sdkUtils.go b/samples/http/otel-sender-receiver/instrumentation/sdkUtils.go
new file mode 100644
index 000000000..9c5c99dbc
--- /dev/null
+++ b/samples/http/otel-sender-receiver/instrumentation/sdkUtils.go
@@ -0,0 +1,52 @@
+/*
+ Copyright 2021 The CloudEvents Authors
+ SPDX-License-Identifier: Apache-2.0
+*/
+
+package instrumentation
+
+import (
+ "context"
+ "log"
+
+ "go.opentelemetry.io/otel"
+ "go.opentelemetry.io/otel/exporters/jaeger"
+ "go.opentelemetry.io/otel/propagation"
+ "go.opentelemetry.io/otel/sdk/resource"
+ sdktrace "go.opentelemetry.io/otel/sdk/trace"
+ semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
+)
+
+const (
+ JaegerEndpoint = "http://localhost:14268/api/traces"
+)
+
+func InitOTelSdk(serviceName string) func() {
+ ctx := context.Background()
+
+ exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(JaegerEndpoint)))
+ if err != nil {
+ return func() { log.Printf("Failed to create the trace exporter: %v", err) }
+ }
+
+ tp := sdktrace.NewTracerProvider(
+ sdktrace.WithBatcher(exp),
+ sdktrace.WithSampler(sdktrace.AlwaysSample()),
+ sdktrace.WithResource(resource.NewWithAttributes(
+ semconv.SchemaURL,
+ semconv.ServiceNameKey.String(serviceName),
+ )),
+ )
+
+ otel.SetTracerProvider(tp)
+ otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
+
+ return func() {
+ if tp == nil {
+ return
+ }
+ if err := tp.Shutdown(ctx); err != nil {
+ log.Printf("Error shutting down the tracer provider: %v", err)
+ }
+ }
+}
diff --git a/samples/http/otel-sender-receiver/jaeger-example.png b/samples/http/otel-sender-receiver/jaeger-example.png
new file mode 100644
index 000000000..06bb1ce3c
Binary files /dev/null and b/samples/http/otel-sender-receiver/jaeger-example.png differ
diff --git a/samples/http/otel-sender-receiver/jaeger-find-traces.png b/samples/http/otel-sender-receiver/jaeger-find-traces.png
new file mode 100644
index 000000000..f96c78103
Binary files /dev/null and b/samples/http/otel-sender-receiver/jaeger-find-traces.png differ
diff --git a/samples/http/otel-sender-receiver/request-flow.png b/samples/http/otel-sender-receiver/request-flow.png
new file mode 100644
index 000000000..172b21429
Binary files /dev/null and b/samples/http/otel-sender-receiver/request-flow.png differ
diff --git a/samples/http/otel-sender-receiver/server/server.go b/samples/http/otel-sender-receiver/server/server.go
new file mode 100644
index 000000000..d2f56e3dd
--- /dev/null
+++ b/samples/http/otel-sender-receiver/server/server.go
@@ -0,0 +1,71 @@
+/*
+ Copyright 2021 The CloudEvents Authors
+ SPDX-License-Identifier: Apache-2.0
+*/
+
+package main
+
+import (
+ "context"
+ "log"
+ "net/http"
+
+ "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
+ "go.opentelemetry.io/otel"
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/trace"
+
+ otelObs "github.com/cloudevents/sdk-go/observability/opentelemetry/v2/client"
+ "github.com/cloudevents/sdk-go/samples/http/otel-sender-receiver/instrumentation"
+ cloudevents "github.com/cloudevents/sdk-go/v2"
+ ceclient "github.com/cloudevents/sdk-go/v2/client"
+ "github.com/cloudevents/sdk-go/v2/protocol"
+ cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
+)
+
+var tracer trace.Tracer
+
+const (
+ serviceName = "cloudevents-server"
+)
+
+func main() {
+ shutdown := instrumentation.InitOTelSdk(serviceName)
+ tracer = otel.Tracer(serviceName + "-main")
+ defer shutdown()
+
+ // create the cloudevents client instrumented with OpenTelemetry
+ c, err := otelObs.NewClientHTTP([]cehttp.Option{}, []ceclient.Option{})
+ if err != nil {
+ log.Fatalf("failed to create client, %v", err)
+ }
+
+ log.Fatal(c.StartReceiver(context.Background(), handleReceivedEvent))
+}
+
+func handleReceivedEvent(ctx context.Context, event cloudevents.Event) protocol.Result {
+
+ // Showcase injecting the incoming tracecontext into the event as a DistributedTraceExtension
+ otelObs.InjectDistributedTracingExtension(ctx, event)
+
+ // Showcase extracting the tracecontext from the event into a context in order to continue the trace.
+ // This is useful for cases where events are read from a queue and no context is present.
+ ctx = otelObs.ExtractDistributedTracingExtension(ctx, event)
+
+ // manually start a span for this http request
+ ctx, childSpan := tracer.Start(ctx, "externalHttpCall", trace.WithAttributes(attribute.String("id", "123")))
+ defer childSpan.End()
+
+ // manually creating a http client instrumented with OpenTelemetry to make an external request
+ client := http.Client{Transport: otelhttp.NewTransport(http.DefaultTransport)}
+
+ req, _ := http.NewRequestWithContext(ctx, "GET", "https://cloudevents.io/", nil)
+
+ res, err := client.Do(req)
+ if err != nil {
+ panic(err)
+ }
+ _ = res.Body.Close()
+
+ return nil
+}
diff --git a/test/observability/go.mod b/test/observability/go.mod
new file mode 100644
index 000000000..4eb39d190
--- /dev/null
+++ b/test/observability/go.mod
@@ -0,0 +1,16 @@
+module github.com/cloudevents/sdk-go/test/observability
+
+go 1.14
+
+require (
+ github.com/cloudevents/sdk-go/observability/opentelemetry/v2 v2.5.0
+ github.com/cloudevents/sdk-go/v2 v2.5.0
+ github.com/stretchr/testify v1.7.0
+ go.opentelemetry.io/otel v1.0.0
+ go.opentelemetry.io/otel/sdk v1.0.0
+ go.opentelemetry.io/otel/trace v1.0.0
+)
+
+replace github.com/cloudevents/sdk-go/observability/opentelemetry/v2 => ../../observability/opentelemetry/v2
+
+replace github.com/cloudevents/sdk-go/v2 => ../../v2
diff --git a/test/observability/go.sum b/test/observability/go.sum
new file mode 100644
index 000000000..66f323cc4
--- /dev/null
+++ b/test/observability/go.sum
@@ -0,0 +1,69 @@
+github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/felixge/httpsnoop v1.0.2 h1:+nS9g82KMXccJ/wp0zyRW9ZBHFETmMGtkk+2CTTrW4o=
+github.com/felixge/httpsnoop v1.0.2/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
+github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
+github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
+github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
+github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
+github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
+github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg=
+github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
+github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
+github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
+github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
+go.opentelemetry.io/contrib v0.23.0 h1:MgRuo0JZZX8J9WLRjyd7OpTSbaLOdQXXJa6SnZvlWLM=
+go.opentelemetry.io/contrib v0.23.0/go.mod h1:EH4yDYeNoaTqn/8yCWQmfNB78VHfGX2Jt2bvnvzBlGM=
+go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.23.0 h1:hNSH6f4WUMDnRAvUCLItD0WKzQqAPoECvORj+ZChbnA=
+go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.23.0/go.mod h1:wLrbAf2Qb+kFsEjowrxOcuy2SE0dcY0VwFiiYCmUeFQ=
+go.opentelemetry.io/otel v1.0.0-RC3/go.mod h1:Ka5j3ua8tZs4Rkq4Ex3hwgBgOchyPVq5S6P2lz//nKQ=
+go.opentelemetry.io/otel v1.0.0 h1:qTTn6x71GVBvoafHK/yaRUmFzI4LcONZD0/kXxl5PHI=
+go.opentelemetry.io/otel v1.0.0/go.mod h1:AjRVh9A5/5DE7S+mZtTR6t8vpKKryam+0lREnfmS4cg=
+go.opentelemetry.io/otel/internal/metric v0.23.0 h1:mPfzm9Iqhw7G2nDBmUAjFTfPqLZPbOW2k7QI57ITbaI=
+go.opentelemetry.io/otel/internal/metric v0.23.0/go.mod h1:z+RPiDJe30YnCrOhFGivwBS+DU1JU/PiLKkk4re2DNY=
+go.opentelemetry.io/otel/metric v0.23.0 h1:mYCcDxi60P4T27/0jchIDFa1WHEfQeU3zH9UEMpnj2c=
+go.opentelemetry.io/otel/metric v0.23.0/go.mod h1:G/Nn9InyNnIv7J6YVkQfpc0JCfKBNJaERBGw08nqmVQ=
+go.opentelemetry.io/otel/sdk v1.0.0 h1:BNPMYUONPNbLneMttKSjQhOTlFLOD9U22HNG1KrIN2Y=
+go.opentelemetry.io/otel/sdk v1.0.0/go.mod h1:PCrDHlSy5x1kjezSdL37PhbFUMjrsLRshJ2zCzeXwbM=
+go.opentelemetry.io/otel/trace v1.0.0-RC3/go.mod h1:VUt2TUYd8S2/ZRX09ZDFZQwn2RqfMB5MzO17jBojGxo=
+go.opentelemetry.io/otel/trace v1.0.0 h1:TSBr8GTEtKevYMG/2d21M989r5WJYVimhTHBKVEZuh4=
+go.opentelemetry.io/otel/trace v1.0.0/go.mod h1:PXTWqayeFUlJV1YDNhsJYB184+IvAH814St6o6ajzIs=
+go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
+go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
+go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
+go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
+go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
+go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
+golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 h1:iGu644GcxtEcrInvDsQRCwJjtCIOlT2V7IRt6ah2Whw=
+golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
+gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/test/observability/opentelemetry/client_test.go b/test/observability/opentelemetry/client_test.go
new file mode 100644
index 000000000..a6ee3bd9b
--- /dev/null
+++ b/test/observability/opentelemetry/client_test.go
@@ -0,0 +1,259 @@
+/*
+ Copyright 2021 The CloudEvents Authors
+ SPDX-License-Identifier: Apache-2.0
+*/
+
+package opentelemetry
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "go.opentelemetry.io/otel/codes"
+ "go.opentelemetry.io/otel/propagation"
+ "go.opentelemetry.io/otel/trace"
+
+ otelObs "github.com/cloudevents/sdk-go/observability/opentelemetry/v2/client"
+ obshttp "github.com/cloudevents/sdk-go/observability/opentelemetry/v2/http"
+ "github.com/cloudevents/sdk-go/v2/client"
+ event "github.com/cloudevents/sdk-go/v2/event"
+ "github.com/cloudevents/sdk-go/v2/protocol"
+ cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
+ "github.com/cloudevents/sdk-go/v2/types"
+)
+
+func TestTracedClientReceive(t *testing.T) {
+ now := time.Now()
+
+ testCases := map[string]struct {
+ event event.Event
+ expectedResult protocol.Result
+ ack bool
+ }{
+ "simple binary v0.3": {
+ event: func() event.Event {
+ e := event.Event{
+ Context: event.EventContextV03{
+ Type: "unit.test.client",
+ Source: *types.ParseURIRef("/unit/test/client"),
+ Time: &types.Timestamp{Time: now},
+ ID: "AABBCCDDEE",
+ }.AsV03(),
+ }
+ _ = e.SetData(event.ApplicationJSON, &map[string]string{
+ "sq": "42",
+ "msg": "hello",
+ })
+ return e
+ }(),
+ ack: true,
+ },
+ "receive with error": {
+ event: func() event.Event {
+ e := event.Event{
+ Context: event.EventContextV03{
+ Type: "unit.test.client",
+ Source: *types.ParseURIRef("/unit/test/client"),
+ Time: &types.Timestamp{Time: now},
+ ID: "AABBCCDDEE",
+ }.AsV03(),
+ }
+ _ = e.SetData(event.ApplicationJSON, &map[string]string{
+ "sq": "42",
+ "msg": "hello",
+ })
+ return e
+ }(),
+ expectedResult: cehttp.NewResult(500, "some error happened within the receiver"),
+ ack: false,
+ },
+ }
+ for n, tc := range testCases {
+ t.Run(n, func(t *testing.T) {
+ sr, tracer := configureOtelTestSdk()
+
+ // creates and starts the receiver
+ p, err := obshttp.NewObservedHTTP(cehttp.WithPort(0))
+ require.NoError(t, err)
+ c, err := client.New(p, client.WithObservabilityService(otelObs.NewOTelObservabilityService()))
+ require.NoError(t, err)
+
+ ctx, cancel := context.WithCancel(context.TODO())
+ go func() {
+ require.NoError(t, c.StartReceiver(ctx, func(ctx context.Context, e event.Event) protocol.Result {
+ return tc.expectedResult
+ }))
+ }()
+ time.Sleep(5 * time.Millisecond) // let the server start
+
+ target := fmt.Sprintf("http://localhost:%d", p.GetListeningPort())
+ sender, err := otelObs.NewClientHTTP([]cehttp.Option{cehttp.WithTarget(target)}, []client.Option{})
+ require.NoError(t, err)
+
+ // act
+ ctx, span := tracer.Start(ctx, "test-span")
+ result := sender.Send(ctx, tc.event)
+ span.End()
+
+ require.Equal(t, tc.ack, protocol.IsACK(result))
+
+ spans := sr.Ended()
+
+ // 1 span from the test
+ // 2 spans from sending the event (http client auto-instrumentation + obs service)
+ // 2 spans from receiving the event (http client middleware + obs service)
+ assert.Equal(t, 5, len(spans))
+
+ if !tc.ack {
+ // The span created by the observability service should have the error that came from the receiver fn
+ obsSpan := spans[0]
+ assert.Equal(t, codes.Error, obsSpan.Status().Code)
+ assert.Equal(t, "500: some error happened within the receiver", obsSpan.Status().Description)
+ }
+
+ // Now stop the client
+ cancel()
+ })
+ }
+}
+
+func TestTracingClientSend(t *testing.T) {
+ now := time.Now()
+
+ testCases := map[string]struct {
+ event event.Event
+ resp *http.Response
+ ack bool
+ }{
+ "send with ok response": {
+ event: func() event.Event {
+ e := event.Event{
+ Context: event.EventContextV1{
+ Type: "unit.test.client",
+ Source: *types.ParseURIRef("/unit/test/client"),
+ Time: &types.Timestamp{Time: now},
+ ID: "AABBCCDDEE",
+ }.AsV1(),
+ }
+ _ = e.SetData(event.ApplicationJSON, &map[string]interface{}{
+ "sq": 42,
+ "msg": "hello",
+ })
+ return e
+ }(),
+ resp: &http.Response{
+ StatusCode: http.StatusAccepted,
+ },
+ ack: true,
+ },
+ "send with error response": {
+ event: func() event.Event {
+ e := event.Event{
+ Context: event.EventContextV1{
+ Type: "unit.test.client",
+ Source: *types.ParseURIRef("/unit/test/client"),
+ Time: &types.Timestamp{Time: now},
+ ID: "AABBCCDDEE",
+ }.AsV1(),
+ }
+ _ = e.SetData(event.ApplicationJSON, &map[string]interface{}{
+ "sq": 42,
+ "msg": "hello",
+ })
+ return e
+ }(),
+ resp: &http.Response{
+ StatusCode: http.StatusBadRequest,
+ },
+ ack: false,
+ },
+ }
+ for n, tc := range testCases {
+ t.Run(n, func(t *testing.T) {
+ sr, tracer := configureOtelTestSdk()
+ handler := &fakeHandler{
+ t: t,
+ response: tc.resp,
+ requests: make([]*http.Request, 0),
+ }
+ server := httptest.NewServer(handler)
+ defer server.Close()
+
+ sender, err := otelObs.NewClientHTTP([]cehttp.Option{cehttp.WithTarget(server.URL)}, []client.Option{})
+ require.NoError(t, err)
+
+ // act
+ ctx, span := tracer.Start(context.Background(), "test-span")
+ result := sender.Send(ctx, tc.event)
+ span.End()
+
+ require.Equal(t, tc.ack, protocol.IsACK(result))
+
+ spans := sr.Ended()
+
+ // 1 span from the test
+ // 2 spans from sending the event (http client auto-instrumentation + obs service)
+ // 2 spans from receiving the event (http client middleware + obs service)
+ assert.Equal(t, 3, len(spans))
+
+ // get the traceparent header from the outgoing request
+ r := handler.popRequest(t)
+ if tp := r.Header.Get("traceparent"); tp == "" {
+ t.Fatal("missing traceparent header")
+ }
+
+ // The request should have been sent with the last spanID (from the auto-instrumentation lib)
+ ctx = prop.Extract(ctx, propagation.HeaderCarrier(r.Header))
+ spanCtx := trace.SpanContextFromContext(ctx)
+ assert.Equal(t, spans[0].SpanContext().TraceID(), spanCtx.TraceID())
+ assert.Equal(t, spans[0].SpanContext().SpanID(), spanCtx.SpanID())
+ })
+ }
+}
+
+type fakeHandler struct {
+ t *testing.T
+ response *http.Response
+ requests []*http.Request
+}
+
+func (f *fakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ defer r.Body.Close()
+
+ // Make a copy of the request.
+ f.requests = append(f.requests, r)
+
+ // Write the response.
+ if f.response != nil {
+ for h, vs := range f.response.Header {
+ for _, v := range vs {
+ w.Header().Add(h, v)
+ }
+ }
+ w.WriteHeader(f.response.StatusCode)
+ var buf bytes.Buffer
+ if f.response.ContentLength > 0 {
+ _, _ = buf.ReadFrom(f.response.Body)
+ _, _ = w.Write(buf.Bytes())
+ }
+ } else {
+ w.WriteHeader(http.StatusOK)
+ _, _ = w.Write([]byte(""))
+ }
+}
+
+func (f *fakeHandler) popRequest(t *testing.T) *http.Request {
+ if len(f.requests) == 0 {
+ t.Error("Unable to pop request")
+ }
+ r := f.requests[0]
+ f.requests = f.requests[1:]
+ return r
+}
diff --git a/test/observability/opentelemetry/cloudevents_carrier_test.go b/test/observability/opentelemetry/cloudevents_carrier_test.go
new file mode 100644
index 000000000..34d2a2e5e
--- /dev/null
+++ b/test/observability/opentelemetry/cloudevents_carrier_test.go
@@ -0,0 +1,215 @@
+/*
+ Copyright 2021 The CloudEvents Authors
+ SPDX-License-Identifier: Apache-2.0
+*/
+
+package opentelemetry
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "go.opentelemetry.io/otel"
+ "go.opentelemetry.io/otel/propagation"
+ sdkTrace "go.opentelemetry.io/otel/sdk/trace"
+ "go.opentelemetry.io/otel/sdk/trace/tracetest"
+ "go.opentelemetry.io/otel/trace"
+
+ otelObs "github.com/cloudevents/sdk-go/observability/opentelemetry/v2/client"
+ cloudevents "github.com/cloudevents/sdk-go/v2"
+ "github.com/cloudevents/sdk-go/v2/extensions"
+)
+
+var (
+ traceparent = http.CanonicalHeaderKey("traceparent")
+ tracestate = http.CanonicalHeaderKey("tracestate")
+
+ prop = propagation.TraceContext{}
+ eventTraceID = "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
+ eventSpanID = "bbbbbbbbbbbbbbbb"
+ distributedExt = extensions.DistributedTracingExtension{
+ TraceParent: fmt.Sprintf("00-%s-%s-00", eventTraceID, eventSpanID),
+ TraceState: "key1=value1,key2=value2",
+ }
+)
+
+func TestExtractContextWithTraceContext(t *testing.T) {
+ type testcase struct {
+ name string
+ event cloudevents.Event
+ header http.Header
+ want string
+ }
+
+ tests := []testcase{
+ {
+ name: "tracecontext in the context is overwritten by the one from the event",
+ event: createCloudEvent(distributedExt),
+ header: http.Header{
+ traceparent: []string{"00-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-aaaaaaaaaaaaaaaa-00"},
+ },
+ want: "00-bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb-bbbbbbbbbbbbbbbb-00",
+ },
+ {
+ name: "context with tracecontext and event with invalid tracecontext",
+ event: createCloudEventWithInvalidTraceParent(),
+ header: http.Header{
+ traceparent: []string{"00-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-aaaaaaaaaaaaaaaa-00"},
+ },
+ want: "00-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-aaaaaaaaaaaaaaaa-00",
+ },
+ }
+
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+ // Simulates a case of an auto-instrumented client where the context
+ // has the incoming parent span
+ incomingCtx := prop.Extract(context.Background(), propagation.HeaderCarrier(tc.header))
+
+ // act
+ newCtx := otelObs.ExtractDistributedTracingExtension(incomingCtx, tc.event)
+
+ prop := propagation.TraceContext{}
+ carrier := otelObs.NewCloudEventCarrier()
+ prop.Inject(newCtx, carrier)
+
+ // the newCtx contains the expected traceparent
+ assert.Equal(t, tc.want, carrier.Extension.TraceParent)
+ })
+ }
+}
+
+func TestExtractContextWithoutTraceContext(t *testing.T) {
+ type testcase struct {
+ name string
+ event cloudevents.Event
+ header http.Header
+ }
+ _, _ = configureOtelTestSdk()
+ tests := []testcase{
+ {
+ name: "context without tracecontext",
+ event: createCloudEvent(distributedExt),
+ },
+ {
+ name: "context with invalid tracecontext and event with valid tracecontext",
+ event: createCloudEvent(distributedExt),
+ header: http.Header{
+ traceparent: []string{"00-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-1-00"},
+ },
+ },
+ }
+
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+ incomingCtx := context.Background()
+
+ if tc.header != nil {
+ incomingCtx = prop.Extract(incomingCtx, propagation.HeaderCarrier(tc.header))
+ }
+
+ // act
+ newCtx := otelObs.ExtractDistributedTracingExtension(incomingCtx, tc.event)
+ sc := trace.SpanContextFromContext(newCtx)
+
+ // the new context should be different since it was enriched with the tracecontext from the event
+ assert.NotEqual(t, trace.SpanContextFromContext(incomingCtx), sc)
+
+ // make sure the IDs are as expected
+ assert.Equal(t, eventTraceID, sc.TraceID().String())
+ assert.Equal(t, eventSpanID, sc.SpanID().String())
+ assert.Equal(t, distributedExt.TraceState, sc.TraceState().String())
+ })
+ }
+}
+
+func TestInjectDistributedTracingExtension(t *testing.T) {
+ type testcase struct {
+ name string
+ event cloudevents.Event
+ header http.Header
+ want extensions.DistributedTracingExtension
+ }
+ tests := []testcase{
+ {
+ name: "inject tracecontext into event",
+ event: createCloudEvent(extensions.DistributedTracingExtension{}),
+ header: http.Header{
+ traceparent: []string{"00-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-aaaaaaaaaaaaaaaa-00"},
+ tracestate: []string{"key1=value1,key2=value2"},
+ },
+ want: extensions.DistributedTracingExtension{
+ TraceParent: "00-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-aaaaaaaaaaaaaaaa-00",
+ TraceState: "key1=value1,key2=value2",
+ },
+ },
+ {
+ name: "overwrite tracecontext in the event from the context",
+ event: createCloudEvent(distributedExt),
+ header: http.Header{
+ traceparent: []string{"00-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-aaaaaaaaaaaaaaaa-00"},
+ tracestate: []string{"key1=value1,key2=value2,key3=value3"},
+ },
+ want: extensions.DistributedTracingExtension{
+ TraceParent: "00-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-aaaaaaaaaaaaaaaa-00",
+ TraceState: "key1=value1,key2=value2,key3=value3",
+ },
+ },
+ {
+ name: "context without tracecontext",
+ event: createCloudEvent(distributedExt),
+ want: distributedExt,
+ },
+ }
+
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+ ctx := context.Background()
+ ctx = prop.Extract(ctx, propagation.HeaderCarrier(tc.header))
+
+ // act
+ otelObs.InjectDistributedTracingExtension(ctx, tc.event)
+
+ actual, ok := extensions.GetDistributedTracingExtension(tc.event)
+ assert.True(t, ok)
+ assert.Equal(t, tc.want, actual)
+ })
+ }
+
+}
+
+func createCloudEvent(distributedExt extensions.DistributedTracingExtension) cloudevents.Event {
+ event := cloudevents.NewEvent()
+ event.SetSource("example/uri")
+ event.SetType("example.type")
+ event.SetData(cloudevents.ApplicationJSON, map[string]string{"hello": "world"})
+
+ if distributedExt.TraceParent != "" {
+ distributedExt.AddTracingAttributes(&event)
+ }
+
+ return event
+}
+
+func createCloudEventWithInvalidTraceParent() cloudevents.Event {
+ event := cloudevents.NewEvent()
+ event.SetSource("example/uri")
+ event.SetType("example.type")
+ event.SetData(cloudevents.ApplicationJSON, map[string]string{"hello": "world"})
+
+ // set directly to force an invalid value
+ event.SetExtension(extensions.TraceParentExtension, 123)
+
+ return event
+}
+
+func configureOtelTestSdk() (*tracetest.SpanRecorder, trace.Tracer) {
+ sr := tracetest.NewSpanRecorder()
+ provider := sdkTrace.NewTracerProvider(sdkTrace.WithSpanProcessor(sr), sdkTrace.WithSampler(sdkTrace.AlwaysSample()))
+ otel.SetTracerProvider(provider)
+ otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
+ return sr, provider.Tracer("test-tracer")
+}
diff --git a/test/observability/opentelemetry/doc.go b/test/observability/opentelemetry/doc.go
new file mode 100644
index 000000000..8e9326165
--- /dev/null
+++ b/test/observability/opentelemetry/doc.go
@@ -0,0 +1,13 @@
+/*
+ Copyright 2021 The CloudEvents Authors
+ SPDX-License-Identifier: Apache-2.0
+*/
+
+/*
+Package opentelemetry validates the carrier and observability service instrumentation with the default SDK.
+
+This package is in a separate module from the instrumentation it tests to
+isolate the dependency of the default OTel SDK and not impose this as a transitive
+dependency for users.
+*/
+package opentelemetry
diff --git a/test/observability/opentelemetry/otel_observability_service_test.go b/test/observability/opentelemetry/otel_observability_service_test.go
new file mode 100644
index 000000000..da55dd81f
--- /dev/null
+++ b/test/observability/opentelemetry/otel_observability_service_test.go
@@ -0,0 +1,422 @@
+/*
+ Copyright 2021 The CloudEvents Authors
+ SPDX-License-Identifier: Apache-2.0
+*/
+
+package opentelemetry
+
+import (
+ "context"
+ "fmt"
+ "reflect"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/codes"
+ semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
+ "go.opentelemetry.io/otel/trace"
+
+ otelObs "github.com/cloudevents/sdk-go/observability/opentelemetry/v2/client"
+ cloudevents "github.com/cloudevents/sdk-go/v2"
+ event "github.com/cloudevents/sdk-go/v2/event"
+ "github.com/cloudevents/sdk-go/v2/extensions"
+ "github.com/cloudevents/sdk-go/v2/protocol"
+ "github.com/cloudevents/sdk-go/v2/protocol/http"
+)
+
+var (
+ expectedEvent cloudevents.Event = createCloudEvent(extensions.DistributedTracingExtension{})
+)
+
+func TestRecordSendingEvent(t *testing.T) {
+ tests := []struct {
+ name string
+ expectedSpanName string
+ expectedStatus codes.Code
+ expectedAttrs []attribute.KeyValue
+ expectedResult protocol.Result
+ expectedSpanKind trace.SpanKind
+ nameFormatter func(cloudevents.Event) string
+ attributesGetter func(cloudevents.Event) []attribute.KeyValue
+ }{
+
+ {
+ name: "send with default options",
+ expectedSpanName: "cloudevents.client.example.type send",
+ expectedStatus: codes.Unset,
+ expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordSendingEvent"),
+ expectedSpanKind: trace.SpanKindProducer,
+ nameFormatter: nil,
+ },
+ {
+ name: "send with custom span name",
+ expectedSpanName: "test.example.type send",
+ expectedStatus: codes.Unset,
+ expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordSendingEvent"),
+ expectedSpanKind: trace.SpanKindProducer,
+ nameFormatter: func(e cloudevents.Event) string {
+ return "test." + e.Context.GetType()
+ },
+ },
+ {
+ name: "send with custom attributes",
+ expectedSpanName: "test.example.type send",
+ expectedStatus: codes.Unset,
+ expectedAttrs: append(otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordSendingEvent"), attribute.String("my-attr", "some-value")),
+ expectedSpanKind: trace.SpanKindProducer,
+ nameFormatter: func(e cloudevents.Event) string {
+ return "test." + e.Context.GetType()
+ },
+ attributesGetter: func(cloudevents.Event) []attribute.KeyValue {
+ return []attribute.KeyValue{
+ attribute.String("my-attr", "some-value"),
+ }
+ },
+ },
+ {
+ name: "send with error response",
+ expectedSpanName: "cloudevents.client.example.type send",
+ expectedStatus: codes.Unset,
+ expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordSendingEvent"),
+ expectedSpanKind: trace.SpanKindProducer,
+ expectedResult: protocol.NewReceipt(false, "some error here"),
+ },
+ {
+ name: "send with http error response",
+ expectedSpanName: "cloudevents.client.example.type send",
+ expectedStatus: codes.Error,
+ expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordSendingEvent"),
+ expectedSpanKind: trace.SpanKindProducer,
+ expectedResult: http.NewResult(500, "some server error"),
+ },
+ }
+
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+ sr, _ := configureOtelTestSdk()
+ ctx := context.Background()
+
+ os := otelObs.NewOTelObservabilityService(
+ otelObs.WithSpanNameFormatter(tc.nameFormatter),
+ otelObs.WithSpanAttributesGetter(tc.attributesGetter))
+
+ // act
+ ctx, cb := os.RecordSendingEvent(ctx, expectedEvent)
+ cb(tc.expectedResult)
+
+ spans := sr.Ended()
+
+ // since the obs service started a span, the context should have the spancontext
+ assert.NotNil(t, trace.SpanContextFromContext(ctx))
+ assert.Equal(t, 1, len(spans))
+
+ span := spans[0]
+ assert.Equal(t, tc.expectedSpanName, span.Name())
+ assert.Equal(t, tc.expectedStatus, span.Status().Code)
+ assert.Equal(t, tc.expectedSpanKind, span.SpanKind())
+
+ if !reflect.DeepEqual(span.Attributes(), tc.expectedAttrs) {
+ t.Errorf("p = %v, want %v", span.Attributes(), tc.expectedAttrs)
+ }
+
+ if tc.expectedResult != nil {
+ assert.Equal(t, 1, len(span.Events()))
+ assert.Equal(t, semconv.ExceptionEventName, span.Events()[0].Name)
+
+ attrsMap := getSpanEventMap(span.Events()[0].Attributes)
+ assert.Equal(t, tc.expectedResult.Error(), attrsMap[string(semconv.ExceptionMessageKey)])
+ }
+ })
+ }
+}
+
+func TestRecordRequestEvent(t *testing.T) {
+ tests := []struct {
+ name string
+ expectedSpanName string
+ expectedStatus codes.Code
+ expectedAttrs []attribute.KeyValue
+ expectedResult protocol.Result
+ expectedSpanKind trace.SpanKind
+ nameFormatter func(cloudevents.Event) string
+ attributesGetter func(cloudevents.Event) []attribute.KeyValue
+ }{
+
+ {
+ name: "request with default options",
+ expectedSpanName: "cloudevents.client.example.type send",
+ expectedStatus: codes.Unset,
+ expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordRequestEvent"),
+ expectedSpanKind: trace.SpanKindProducer,
+ nameFormatter: nil,
+ },
+ {
+ name: "request with custom span name",
+ expectedSpanName: "test.example.type send",
+ expectedStatus: codes.Unset,
+ expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordRequestEvent"),
+ expectedSpanKind: trace.SpanKindProducer,
+ nameFormatter: func(e cloudevents.Event) string {
+ return "test." + e.Context.GetType()
+ },
+ },
+ {
+ name: "request with custom attributes",
+ expectedSpanName: "test.example.type send",
+ expectedStatus: codes.Unset,
+ expectedAttrs: append(otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordRequestEvent"), attribute.String("my-attr", "some-value")),
+ expectedSpanKind: trace.SpanKindProducer,
+ nameFormatter: func(e cloudevents.Event) string {
+ return "test." + e.Context.GetType()
+ },
+ attributesGetter: func(cloudevents.Event) []attribute.KeyValue {
+ return []attribute.KeyValue{
+ attribute.String("my-attr", "some-value"),
+ }
+ },
+ },
+ {
+ name: "send with error response",
+ expectedSpanName: "cloudevents.client.example.type send",
+ expectedStatus: codes.Unset,
+ expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordRequestEvent"),
+ expectedSpanKind: trace.SpanKindProducer,
+ expectedResult: protocol.NewReceipt(false, "some error here"),
+ },
+ {
+ name: "request with http error response",
+ expectedSpanName: "cloudevents.client.example.type send",
+ expectedStatus: codes.Error,
+ expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordRequestEvent"),
+ expectedSpanKind: trace.SpanKindProducer,
+ expectedResult: http.NewResult(500, "some server error"),
+ },
+ }
+
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+ sr, _ := configureOtelTestSdk()
+ ctx := context.Background()
+
+ os := otelObs.NewOTelObservabilityService(
+ otelObs.WithSpanNameFormatter(tc.nameFormatter),
+ otelObs.WithSpanAttributesGetter(tc.attributesGetter))
+
+ // act
+ ctx, cb := os.RecordRequestEvent(ctx, expectedEvent)
+ cb(tc.expectedResult, &expectedEvent)
+
+ spans := sr.Ended()
+
+ // since the obs service started a span, the context should have the spancontext
+ assert.NotNil(t, trace.SpanContextFromContext(ctx))
+ assert.Equal(t, 1, len(spans))
+
+ span := spans[0]
+ assert.Equal(t, tc.expectedSpanName, span.Name())
+ assert.Equal(t, tc.expectedStatus, span.Status().Code)
+ assert.Equal(t, tc.expectedSpanKind, span.SpanKind())
+
+ if !reflect.DeepEqual(span.Attributes(), tc.expectedAttrs) {
+ t.Errorf("p = %v, want %v", span.Attributes(), tc.expectedAttrs)
+ }
+
+ if tc.expectedResult != nil {
+ assert.Equal(t, 1, len(span.Events()))
+ assert.Equal(t, semconv.ExceptionEventName, span.Events()[0].Name)
+
+ attrsMap := getSpanEventMap(span.Events()[0].Attributes)
+ assert.Equal(t, tc.expectedResult.Error(), attrsMap[string(semconv.ExceptionMessageKey)])
+ }
+ })
+ }
+}
+
+func TestRecordCallingInvoker(t *testing.T) {
+ tests := []struct {
+ name string
+ expectedSpanName string
+ expectedStatus codes.Code
+ expectedAttrs []attribute.KeyValue
+ expectedResult protocol.Result
+ expectedSpanKind trace.SpanKind
+ nameFormatter func(cloudevents.Event) string
+ attributesGetter func(cloudevents.Event) []attribute.KeyValue
+ }{
+
+ {
+ name: "invoker with default options",
+ expectedSpanName: "cloudevents.client.example.type process",
+ expectedStatus: codes.Unset,
+ expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordCallingInvoker"),
+ expectedSpanKind: trace.SpanKindConsumer,
+ nameFormatter: nil,
+ },
+ {
+ name: "invoker with custom span name",
+ expectedSpanName: "test.example.type process",
+ expectedStatus: codes.Unset,
+ expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordCallingInvoker"),
+ expectedSpanKind: trace.SpanKindConsumer,
+ nameFormatter: func(e cloudevents.Event) string {
+ return "test." + e.Context.GetType()
+ },
+ },
+ {
+ name: "invoker with custom attributes",
+ expectedSpanName: "test.example.type process",
+ expectedStatus: codes.Unset,
+ expectedAttrs: append(otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordCallingInvoker"), attribute.String("my-attr", "some-value")),
+ expectedSpanKind: trace.SpanKindConsumer,
+ nameFormatter: func(e cloudevents.Event) string {
+ return "test." + e.Context.GetType()
+ },
+ attributesGetter: func(cloudevents.Event) []attribute.KeyValue {
+ return []attribute.KeyValue{
+ attribute.String("my-attr", "some-value"),
+ }
+ },
+ },
+ {
+ name: "invoker with error response",
+ expectedSpanName: "cloudevents.client.example.type process",
+ expectedStatus: codes.Unset,
+ expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordCallingInvoker"),
+ expectedSpanKind: trace.SpanKindConsumer,
+ expectedResult: protocol.NewReceipt(false, "some error here"),
+ },
+ {
+ name: "invoker with http error response",
+ expectedSpanName: "cloudevents.client.example.type process",
+ expectedStatus: codes.Error,
+ expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordCallingInvoker"),
+ expectedSpanKind: trace.SpanKindConsumer,
+ expectedResult: http.NewResult(500, "some server error"),
+ },
+ }
+
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+ sr, _ := configureOtelTestSdk()
+ ctx := context.Background()
+
+ os := otelObs.NewOTelObservabilityService(
+ otelObs.WithSpanNameFormatter(tc.nameFormatter),
+ otelObs.WithSpanAttributesGetter(tc.attributesGetter))
+
+ // act
+ ctx, cb := os.RecordCallingInvoker(ctx, &expectedEvent)
+ cb(tc.expectedResult)
+
+ spans := sr.Ended()
+
+ // since the obs service started a span, the context should have the spancontext
+ assert.NotNil(t, trace.SpanContextFromContext(ctx))
+ assert.Equal(t, 1, len(spans))
+
+ span := spans[0]
+ assert.Equal(t, tc.expectedSpanName, span.Name())
+ assert.Equal(t, tc.expectedStatus, span.Status().Code)
+ assert.Equal(t, tc.expectedSpanKind, span.SpanKind())
+
+ if !reflect.DeepEqual(span.Attributes(), tc.expectedAttrs) {
+ t.Errorf("p = %v, want %v", span.Attributes(), tc.expectedAttrs)
+ }
+
+ if tc.expectedResult != nil {
+ assert.Equal(t, 1, len(span.Events()))
+ assert.Equal(t, semconv.ExceptionEventName, span.Events()[0].Name)
+
+ attrsMap := getSpanEventMap(span.Events()[0].Attributes)
+ assert.Equal(t, tc.expectedResult.Error(), attrsMap[string(semconv.ExceptionMessageKey)])
+ }
+ })
+ }
+}
+
+func TestRecordReceivedMalformedEvent(t *testing.T) {
+ tests := []struct {
+ name string
+ expectedSpanName string
+ expectedStatus codes.Code
+ expectedAttrs []attribute.KeyValue
+ expectedResult protocol.Result
+ expectedSpanKind trace.SpanKind
+ }{
+
+ {
+ name: "received simple error",
+ expectedSpanName: "cloudevents.client.malformed receive",
+ expectedStatus: codes.Unset,
+ expectedAttrs: []attribute.KeyValue{
+ attribute.String(string(semconv.CodeFunctionKey), "RecordReceivedMalformedEvent"),
+ },
+ expectedSpanKind: trace.SpanKindConsumer,
+ expectedResult: fmt.Errorf("unrecognized event version 0.1.1"),
+ },
+ {
+ name: "received validation error",
+ expectedSpanName: "cloudevents.client.malformed receive",
+ expectedStatus: codes.Unset,
+ expectedAttrs: []attribute.KeyValue{
+ attribute.String(string(semconv.CodeFunctionKey), "RecordReceivedMalformedEvent"),
+ },
+ expectedSpanKind: trace.SpanKindConsumer,
+ expectedResult: event.ValidationError{"specversion": fmt.Errorf("missing Event.Context")},
+ },
+ {
+ name: "received http error",
+ expectedSpanName: "cloudevents.client.malformed receive",
+ expectedStatus: codes.Error,
+ expectedAttrs: []attribute.KeyValue{
+ attribute.String(string(semconv.CodeFunctionKey), "RecordReceivedMalformedEvent"),
+ },
+ expectedSpanKind: trace.SpanKindConsumer,
+ expectedResult: http.NewResult(400, "malformed event"),
+ },
+ }
+
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+ sr, _ := configureOtelTestSdk()
+ ctx := context.Background()
+
+ os := otelObs.NewOTelObservabilityService()
+
+ // act
+ os.RecordReceivedMalformedEvent(ctx, tc.expectedResult)
+
+ spans := sr.Ended()
+
+ // since the obs service started a span, the context should have the spancontext
+ assert.NotNil(t, trace.SpanContextFromContext(ctx))
+ assert.Equal(t, 1, len(spans))
+
+ span := spans[0]
+ assert.Equal(t, tc.expectedSpanName, span.Name())
+ assert.Equal(t, tc.expectedStatus, span.Status().Code)
+ assert.Equal(t, tc.expectedSpanKind, span.SpanKind())
+
+ if !reflect.DeepEqual(span.Attributes(), tc.expectedAttrs) {
+ t.Errorf("p = %v, want %v", span.Attributes(), tc.expectedAttrs)
+ }
+
+ if tc.expectedResult != nil {
+ assert.Equal(t, 1, len(span.Events()))
+ assert.Equal(t, semconv.ExceptionEventName, span.Events()[0].Name)
+
+ attrsMap := getSpanEventMap(span.Events()[0].Attributes)
+ assert.Equal(t, tc.expectedResult.Error(), attrsMap[string(semconv.ExceptionMessageKey)])
+ }
+ })
+ }
+}
+
+func getSpanEventMap(evtAttrs []attribute.KeyValue) map[string]string {
+ attr := map[string]string{}
+ for _, v := range evtAttrs {
+ attr[string(v.Key)] = v.Value.AsString()
+ }
+ return attr
+}