diff --git a/.github/workflows/observability.yaml b/.github/workflows/observability.yaml new file mode 100644 index 000000000..63ff1c5cc --- /dev/null +++ b/.github/workflows/observability.yaml @@ -0,0 +1,35 @@ +name: Observability + +on: + push: + branches: [ 'main', 'release-*' ] + pull_request: + branches: [ 'main', 'release-*' ] + +jobs: + + observability: + name: CloudEvents + strategy: + matrix: + go-version: [1.14.x, 1.15.x] + platform: [ubuntu-latest] + + runs-on: ${{ matrix.platform }} + + steps: + + - name: Setup Go ${{ matrix.go-version }} + uses: actions/setup-go@v2 + with: + go-version: ${{ matrix.go-version }} + id: go + + - name: Checkout code + uses: actions/checkout@v2 + + - name: Update git submodule + run: git submodule sync && git submodule update --init + + - name: Build + run: ./hack/observability-test.sh diff --git a/hack/observability-test.sh b/hack/observability-test.sh new file mode 100755 index 000000000..bcac5c004 --- /dev/null +++ b/hack/observability-test.sh @@ -0,0 +1,31 @@ +#!/usr/bin/env bash + +# Copyright 2021 The CloudEvents Authors +# SPDX-License-Identifier: Apache-2.0 + +set -o errexit +set -o nounset +set -o pipefail + +COVERAGE="`pwd`/coverage.txt" + +# test/observability only +pushd ./test/observability + +# Prepare coverage file only if not exists +if [ ! -f $COVERAGE ]; then + touch ./coverage.tmp + echo 'mode: atomic' > $COVERAGE +fi +COVERPKG="github.com/cloudevents/sdk-go/observability/opentelemetry/v2/..." +for gomodule in $(go list ./... | grep -v /cmd | grep -v /vendor) +do + go test -v -timeout 30s -race -covermode=atomic -coverprofile=coverage.tmp -coverpkg "$COVERPKG" "$gomodule" 2>&1 | sed 's/ of statements in.*//; /warning: no packages being tested depend on matches for pattern /d' + tail -n +2 coverage.tmp >> $COVERAGE +done +rm coverage.tmp + +# Remove test only deps. +go mod tidy + +popd diff --git a/hack/tag-release.sh b/hack/tag-release.sh index 908bbdea3..9a7058b8c 100755 --- a/hack/tag-release.sh +++ b/hack/tag-release.sh @@ -50,6 +50,7 @@ do "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" "github.com/cloudevents/sdk-go/protocol/ws/v2" "github.com/cloudevents/sdk-go/observability/opencensus/v2" + "github.com/cloudevents/sdk-go/observability/opentelemetry/v2" "github.com/cloudevents/sdk-go/sql/v2" "github.com/cloudevents/sdk-go/binding/format/protobuf/v2" "github.com/cloudevents/sdk-go/v2" # NOTE: this needs to be last. diff --git a/observability/opentelemetry/v2/README.md b/observability/opentelemetry/v2/README.md new file mode 100644 index 000000000..4732799d7 --- /dev/null +++ b/observability/opentelemetry/v2/README.md @@ -0,0 +1,183 @@ +# OpenTelemetry instrumentation for CloudEvents + +This package contains the components necessary to instrument CloudEvents clients with OpenTelemetry. The main component is the `OTelObservabilityService` which implements the `ObservabilityService` interface from CloudEvents. + +## Instrumented CloudEvents HTTP client + +If you want to get a fully instrumented HTTP client, use the helper method in the ` github.com/cloudevents/sdk-go/observability/opentelemetry/v2` module: + +```go +import ( + "context" + + otelObs "github.com/cloudevents/sdk-go/observability/opentelemetry/v2/client" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/client" + cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" +) + +// you can pass the http/client options as usual +c, _ := otelObs.NewClientHTTP([]cehttp.Option{}, []client.Option{}) +``` + +This will produce spans for all outgoing and incoming requests. By default, the spans will have the attributes as defined in [keys.go](https://github.com/cloudevents/sdk-go/blob/release-2.5/v2/observability/keys.go). For more advanced configuration, see the next section. + +## Advanced configuration + +### HTTP auto-instrumentation + +In order to generate spans when sending and receiving events, it's necessary to configure the HTTP client from cloudevents with OpenTelemetry instrumentation. The client has two potentially interesting points for instrumentation: + +- Outgoing requests +- Incoming requests (via StartReceiver) + +To fulfil these, we can use the [HTTP auto-instrumentation package](https://github.com/open-telemetry/opentelemetry-go-contrib/tree/v0.23.0/instrumentation/net/http/otelhttp) from OpenTelemetry: + +```go +p, err := cloudevents.NewHTTP( + cloudevents.WithRoundTripper(otelhttp.NewTransport(http.DefaultTransport)), + cloudevents.WithMiddleware(func(next http.Handler) http.Handler { + return otelhttp.NewHandler(next, "receive") + }), +) +``` + +The `otelhttp.NewTransport` will ensure that spans are generated for each outgoing request, and that the `traceparent` header is properly propagated. The `otelhttp.NewHandler` will take care of incoming requests, reading the `traceparent` header and continuing the trace with a new span. + +This already gives some observability "out-of-the-box", but the spans generated only contain common HTTP headers as defined in the [HTTP semantic conventions](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.6.1/specification/trace/semantic_conventions/http.md). Another point is that if using another protocol, propagation will not work and spans will not be automatically generated, unless there is an auto-instrumentation library for it. + +Because of this, CloudEvents offers the `ObservabilityService` interface which is used to generate spans, independently of the chosen protocol. See next how to configure the CloudEvents client to use it. + +### Using the OTelObservabilityService + +The most basic way to configure the CloudEvents client to use the `OTelObservabilityService` is: + +```go +c, err := cloudevents.NewClient(p, client.WithObservabilityService(otelObs.NewOTelObservabilityService())) +``` + +With the above configuration, the spans generated by the `OTelObservabilityService` will have: + + + + + + + + + + + + +
Span namecloudevents.client.[event type] [operation name] where [operation name] follows the OpenTelemetry semantic conventions for messaging systems
Span attributesThe attributes as defined in keys.go
+ +If you require different span names or extra attributes, you can pass multiple `OTelObservabilityServiceOption` options when creating the observability service: + +```go +nameFormatter := func(e *cloudevents.Event) string { + return "my.custom.name." + e.Context.GetType() +} + +attributesGetter := func(*cloudevents.Event) []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String("my-attr", "some-value"), + } +} + +// create the obs service with custom span names and attributes +os := otelObs.NewOTelObservabilityService( + otelObs.WithSpanNameFormatter(nameFormatter), + otelObs.WithSpanAttributesGetter(attributesGetter), +) + +c, err := cloudevents.NewClient(p, client.WithObservabilityService(os)) +``` + +>Note: The `nameFormatter` and `attributesGetter` functions will be called on each span creation. **Avoid** doing any heavy processing in them. + +## Extra types + +This package also contains extra types and helper functions that are useful in case you need to access/set the `tracecontext` in a more "low-level" way. + +They allow to inject(write) and extract(read) `tracecontext` from the event. This is particularly useful when dealing with code that has no notion of a "request" nor a context. For example, long-running background processes polling from a queue. + +>Note: To learn more about the propagation, take a look at the [Propagators API SPEC](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.6.1/specification/context/api-propagators.md). + +## Manually extracting/injecting tracecontext from the event + +When working with distributed systems, it can be difficult to achieve proper context propagation. For example, a long-running process listening to a topic does not have a "context" concept like a HTTP server receiving requests does. + +>Note: The OpenTelemetry community is always creating new auto-instrumentation integrations for popular libraries and frameworks. The [OpenTelemetry registry](https://opentelemetry.io/registry/?s=kafka&component=&language=go) lists the integrations that are available. As support increases, new auto-instrumentation libraries can be integrated into the CloudEvents SDK. + +For this case, it might be useful to `inject` the `tracecontext` inside the event before sending it to a queue. Later, the process can `extract` it and continue the trace normally. For that we can use the `InjectDistributedTracingExtension` and `ExtractDistributedTracingExtension` helper functions. + +```go +func sendEventToQueue(ctx context.Context, event cloudevents.Event) { + + // assuming this function is properly instrumented, + // the ctx contains the current span + + // Before sending the event to the queue + // we can inject the tracecontext into the event as a DistributedTracingExtension + otelObs.InjectDistributedTracingExtension(ctx, event) +} +``` + +```go +func handleEvent(e cloudevents.Event) { + // here in our long-running process, we don't have a "context" + + // if we have the tracecontext in the event, we can + // re-create the context with it and continue the trace: + ctx := otelObs.ExtractDistributedTracingExtension(context.Background(), e) + + // ctx now has the tracecontext from the moment when the event was sent. + + // All subsequent requests made with this context will be part of the trace. + c, _ := otelObs.NewClientHTTP([]cehttp.Option{}, []client.Option{}) + ctx = cloudevents.ContextWithTarget(ctx, "my-other-cloudevents-app") + c.Send(ctx, e) +} +``` + +Because we used the `context` that was re-created from the event, the call to `my-other-cloudevents-app` will be correlated with the initial span. If `my-other-cloudevents-app` is also instrumented and itself make more calls, these will also be part of the trace. + +Most use-cases are covered by using the `InjectDistributedTracingExtension` and `ExtractDistributedTracingExtension` helper functions. + +### CloudEventCarrier + +The `CloudEventCarrier` is an implementation of the OpenTelemetry [TextMapCarrier](https://github.com/open-telemetry/opentelemetry-go/blob/v1.0.0-RC3/propagation/propagation.go#L23). Its purpose is to carry the `tracecontext`, that is used by propagators later. + +`CloudEventCarrier` exposes the `DistributedTracingExtension` which is populated by the propagator. It works similarly as the [HeaderCarrier](https://github.com/open-telemetry/opentelemetry-go/blob/v1.0.0-RC3/propagation/propagation.go#L44) which allows getting/setting the `traceparent` header. + +It can be used to get access to the "raw" `tracecontext` values (`traceparent` and `tracestate`). One use case is to inject the `tracecontext` from the `context` into the carrier, to gain access to the populated `DistributedTracingExtension`: + +```go +type MyEvent struct { + TraceParent string `json:"traceparent,omitempty"` + TraceState string `json:"tracestate,omitempty"` +} + +func injectAndReadTraceParentAndState(ctx context.Context, e cloudevents.Event) { + + me := MyEvent{} + + // the propagator from OpenTelemetry + prop := propagation.TraceContext{} + + carrier := otelObs.NewCloudEventCarrier() + + // Injects (writes) the tracecontext into the NewCloudEventCarrier + // Doing so, will set the DistributedTracingExtension fields + prop.Inject(ctx, carrier) + + // Here then we have the "raw" access to the tracecontext data + // https://www.w3.org/TR/trace-context/ + + // e.g. 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01 + me.TraceParent = carrier.Extension.TraceParent + + // e.g. congo=t61rcWkgMzE + me.TraceState = carrier.Extension.TraceState +} +``` \ No newline at end of file diff --git a/observability/opentelemetry/v2/client/client.go b/observability/opentelemetry/v2/client/client.go new file mode 100644 index 000000000..dab2bae51 --- /dev/null +++ b/observability/opentelemetry/v2/client/client.go @@ -0,0 +1,34 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package client + +import ( + obshttp "github.com/cloudevents/sdk-go/observability/opentelemetry/v2/http" + "github.com/cloudevents/sdk-go/v2/client" + cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" +) + +// NewClientHTTP produces a new client instrumented with OpenTelemetry. +func NewClientHTTP(topt []cehttp.Option, copt []client.Option, obsOpts ...OTelObservabilityServiceOption) (client.Client, error) { + t, err := obshttp.NewObservedHTTP(topt...) + if err != nil { + return nil, err + } + + copt = append( + copt, + client.WithTimeNow(), + client.WithUUIDs(), + client.WithObservabilityService(NewOTelObservabilityService(obsOpts...)), + ) + + c, err := client.New(t, copt...) + if err != nil { + return nil, err + } + + return c, nil +} diff --git a/observability/opentelemetry/v2/client/cloudevents_carrier.go b/observability/opentelemetry/v2/client/cloudevents_carrier.go new file mode 100644 index 000000000..5b858bea4 --- /dev/null +++ b/observability/opentelemetry/v2/client/cloudevents_carrier.go @@ -0,0 +1,86 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package client + +import ( + "context" + + cloudevents "github.com/cloudevents/sdk-go/v2" + cecontext "github.com/cloudevents/sdk-go/v2/context" + "github.com/cloudevents/sdk-go/v2/extensions" + "go.opentelemetry.io/otel/propagation" +) + +// CloudEventCarrier wraps the distributed trace extension to satisfy the TextMapCarrier interface. +// https://github.com/open-telemetry/opentelemetry-go/blob/v1.0.0-RC3/propagation/propagation.go#L23 +type CloudEventCarrier struct { + Extension *extensions.DistributedTracingExtension +} + +// NewCloudEventCarrier creates a new CloudEventCarrier with an empty distributed tracing extension. +func NewCloudEventCarrier() CloudEventCarrier { + return CloudEventCarrier{Extension: &extensions.DistributedTracingExtension{}} +} + +// NewCloudEventCarrierWithEvent creates a new CloudEventCarrier with a distributed tracing extension +// populated with the trace data from the event. +func NewCloudEventCarrierWithEvent(ctx context.Context, event cloudevents.Event) CloudEventCarrier { + var te, ok = extensions.GetDistributedTracingExtension(event) + if !ok { + cecontext.LoggerFrom(ctx).Warn("Could not get the distributed tracing extension from the event.") + return CloudEventCarrier{Extension: &extensions.DistributedTracingExtension{}} + } + return CloudEventCarrier{Extension: &te} +} + +// Get returns the value associated with the passed key. +func (cec CloudEventCarrier) Get(key string) string { + switch key { + case extensions.TraceParentExtension: + return cec.Extension.TraceParent + case extensions.TraceStateExtension: + return cec.Extension.TraceState + default: + return "" + } +} + +// Set stores the key-value pair. +func (cec CloudEventCarrier) Set(key string, value string) { + switch key { + case extensions.TraceParentExtension: + cec.Extension.TraceParent = value + case extensions.TraceStateExtension: + cec.Extension.TraceState = value + } +} + +// Keys lists the keys stored in this carrier. +func (cec CloudEventCarrier) Keys() []string { + return []string{extensions.TraceParentExtension, extensions.TraceStateExtension} +} + +// InjectDistributedTracingExtension injects the tracecontext from the context into the event as a DistributedTracingExtension +// +// If a DistributedTracingExtension is present in the provided event, its current value is replaced with the +// tracecontext obtained from the context. +func InjectDistributedTracingExtension(ctx context.Context, event cloudevents.Event) { + tc := propagation.TraceContext{} + carrier := NewCloudEventCarrier() + tc.Inject(ctx, carrier) + carrier.Extension.AddTracingAttributes(&event) +} + +// ExtractDistributedTracingExtension extracts the tracecontext from the cloud event into the context. +// +// Calling this method will always replace the tracecontext in the context with the one extracted from the event. +// In case this is undesired, check first if the context has a recording span with: `trace.SpanFromContext(ctx)` +func ExtractDistributedTracingExtension(ctx context.Context, event cloudevents.Event) context.Context { + tc := propagation.TraceContext{} + carrier := NewCloudEventCarrierWithEvent(ctx, event) + + return tc.Extract(ctx, carrier) +} diff --git a/observability/opentelemetry/v2/client/otel_observability_service.go b/observability/opentelemetry/v2/client/otel_observability_service.go new file mode 100644 index 000000000..25ead8331 --- /dev/null +++ b/observability/opentelemetry/v2/client/otel_observability_service.go @@ -0,0 +1,217 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package client + +import ( + "context" + "runtime" + "strings" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "go.opentelemetry.io/otel/trace" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/observability" + "github.com/cloudevents/sdk-go/v2/protocol" + cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" +) + +// OTelObservabilityService implements the ObservabilityService interface from cloudevents +type OTelObservabilityService struct { + tracer trace.Tracer + spanAttributesGetter func(cloudevents.Event) []attribute.KeyValue + spanNameFormatter func(cloudevents.Event) string +} + +// NewOTelObservabilityService returns an OpenTelemetry-enabled observability service +func NewOTelObservabilityService(opts ...OTelObservabilityServiceOption) *OTelObservabilityService { + tracerProvider := otel.GetTracerProvider() + + o := &OTelObservabilityService{ + tracer: tracerProvider.Tracer( + instrumentationName, + // TODO: Can we have the package version here? + // trace.WithInstrumentationVersion("1.0.0"), + ), + spanNameFormatter: defaultSpanNameFormatter, + } + + // apply passed options + for _, opt := range opts { + opt(o) + } + + return o +} + +// InboundContextDecorators returns a decorator function that allows enriching the context with the incoming parent trace. +// This method gets invoked automatically by passing the option 'WithObservabilityService' when creating the cloudevents HTTP client. +func (o OTelObservabilityService) InboundContextDecorators() []func(context.Context, binding.Message) context.Context { + return []func(context.Context, binding.Message) context.Context{tracePropagatorContextDecorator} +} + +// RecordReceivedMalformedEvent records the error from a malformed event in the span. +func (o OTelObservabilityService) RecordReceivedMalformedEvent(ctx context.Context, err error) { + spanName := observability.ClientSpanName + ".malformed receive" + _, span := o.tracer.Start( + ctx, spanName, + trace.WithSpanKind(trace.SpanKindConsumer), + trace.WithAttributes(attribute.String(string(semconv.CodeFunctionKey), getFuncName()))) + + recordSpanError(span, err) + span.End() +} + +// RecordCallingInvoker starts a new span before calling the invoker upon a received event. +// In case the operation fails, the error is recorded and the span is marked as failed. +func (o OTelObservabilityService) RecordCallingInvoker(ctx context.Context, event *cloudevents.Event) (context.Context, func(errOrResult error)) { + spanName := o.getSpanName(event, "process") + ctx, span := o.tracer.Start( + ctx, spanName, + trace.WithSpanKind(trace.SpanKindConsumer), + trace.WithAttributes(GetDefaultSpanAttributes(event, getFuncName())...)) + + if span.IsRecording() && o.spanAttributesGetter != nil { + span.SetAttributes(o.spanAttributesGetter(*event)...) + } + + return ctx, func(errOrResult error) { + recordSpanError(span, errOrResult) + span.End() + } +} + +// RecordSendingEvent starts a new span before sending the event. +// In case the operation fails, the error is recorded and the span is marked as failed. +func (o OTelObservabilityService) RecordSendingEvent(ctx context.Context, event cloudevents.Event) (context.Context, func(errOrResult error)) { + spanName := o.getSpanName(&event, "send") + + ctx, span := o.tracer.Start( + ctx, spanName, + trace.WithSpanKind(trace.SpanKindProducer), + trace.WithAttributes(GetDefaultSpanAttributes(&event, getFuncName())...)) + + if span.IsRecording() && o.spanAttributesGetter != nil { + span.SetAttributes(o.spanAttributesGetter(event)...) + } + + return ctx, func(errOrResult error) { + recordSpanError(span, errOrResult) + span.End() + } +} + +// RecordRequestEvent starts a new span before transmitting the given request. +// In case the operation fails, the error is recorded and the span is marked as failed. +func (o OTelObservabilityService) RecordRequestEvent(ctx context.Context, event cloudevents.Event) (context.Context, func(errOrResult error, event *cloudevents.Event)) { + spanName := o.getSpanName(&event, "send") + + ctx, span := o.tracer.Start( + ctx, spanName, + trace.WithSpanKind(trace.SpanKindProducer), + trace.WithAttributes(GetDefaultSpanAttributes(&event, getFuncName())...)) + + if span.IsRecording() && o.spanAttributesGetter != nil { + span.SetAttributes(o.spanAttributesGetter(event)...) + } + + return ctx, func(errOrResult error, event *cloudevents.Event) { + recordSpanError(span, errOrResult) + span.End() + } +} + +// GetDefaultSpanAttributes returns the attributes that are always added to the spans +// created by the OTelObservabilityService. +func GetDefaultSpanAttributes(e *cloudevents.Event, method string) []attribute.KeyValue { + attr := []attribute.KeyValue{ + attribute.String(string(semconv.CodeFunctionKey), method), + attribute.String(observability.SpecversionAttr, e.SpecVersion()), + attribute.String(observability.IdAttr, e.ID()), + attribute.String(observability.TypeAttr, e.Type()), + attribute.String(observability.SourceAttr, e.Source()), + } + if sub := e.Subject(); sub != "" { + attr = append(attr, attribute.String(observability.SubjectAttr, sub)) + } + if dct := e.DataContentType(); dct != "" { + attr = append(attr, attribute.String(observability.DatacontenttypeAttr, dct)) + } + return attr +} + +// Extracts the traceparent from the msg and enriches the context to enable propagation +func tracePropagatorContextDecorator(ctx context.Context, msg binding.Message) context.Context { + var messageCtx context.Context + if mctx, ok := msg.(binding.MessageContext); ok { + messageCtx = mctx.Context() + } else if mctx, ok := binding.UnwrapMessage(msg).(binding.MessageContext); ok { + messageCtx = mctx.Context() + } + + if messageCtx == nil { + return ctx + } + span := trace.SpanFromContext(messageCtx) + if span == nil { + return ctx + } + return trace.ContextWithSpan(ctx, span) +} + +func recordSpanError(span trace.Span, errOrResult error) { + if protocol.IsACK(errOrResult) || !span.IsRecording() { + return + } + + var httpResult *cehttp.Result + if cloudevents.ResultAs(errOrResult, &httpResult) { + span.RecordError(httpResult) + if httpResult.StatusCode > 0 { + code, _ := semconv.SpanStatusFromHTTPStatusCode(httpResult.StatusCode) + span.SetStatus(code, httpResult.Error()) + } + } else { + span.RecordError(errOrResult) + } +} + +// getSpanName Returns the name of the span. +// +// When no spanNameFormatter is present in OTelObservabilityService, +// the default name will be "cloudevents.client. prefix" e.g. cloudevents.client.get.customers send. +// +// The prefix is always added at the end of the span name. This follows the semantic conventions for +// messasing systems as defined in https://github.com/open-telemetry/opentelemetry-specification/blob/v1.6.1/specification/trace/semantic_conventions/messaging.md#operation-names +func (o OTelObservabilityService) getSpanName(e *cloudevents.Event, suffix string) string { + name := o.spanNameFormatter(*e) + + // make sure the span name ends with the suffix from the semantic conventions (receive, send, process) + if !strings.HasSuffix(name, suffix) { + return name + " " + suffix + } + + return name +} + +func getFuncName() string { + pc := make([]uintptr, 1) + n := runtime.Callers(2, pc) + frames := runtime.CallersFrames(pc[:n]) + frame, _ := frames.Next() + + // frame.Function should be github.com/cloudevents/sdk-go/observability/opentelemetry/v2/client.OTelObservabilityService.Func + parts := strings.Split(frame.Function, ".") + + // we are interested in the function name + if len(parts) != 4 { + return "" + } + return parts[3] +} diff --git a/observability/opentelemetry/v2/client/otel_options.go b/observability/opentelemetry/v2/client/otel_options.go new file mode 100644 index 000000000..053cd1558 --- /dev/null +++ b/observability/opentelemetry/v2/client/otel_options.go @@ -0,0 +1,42 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package client + +import ( + "go.opentelemetry.io/otel/attribute" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/observability" +) + +const ( + // The value for the `otel.library.name` span attribute + instrumentationName = "github.com/cloudevents/sdk-go/observability/opentelemetry/v2" +) + +type OTelObservabilityServiceOption func(*OTelObservabilityService) + +// WithSpanAttributesGetter appends the returned attributes from the function to the span. +func WithSpanAttributesGetter(attrGetter func(cloudevents.Event) []attribute.KeyValue) OTelObservabilityServiceOption { + return func(os *OTelObservabilityService) { + if attrGetter != nil { + os.spanAttributesGetter = attrGetter + } + } +} + +// WithSpanNameFormatter replaces the default span name with the string returned from the function +func WithSpanNameFormatter(nameFormatter func(cloudevents.Event) string) OTelObservabilityServiceOption { + return func(os *OTelObservabilityService) { + if nameFormatter != nil { + os.spanNameFormatter = nameFormatter + } + } +} + +var defaultSpanNameFormatter func(cloudevents.Event) string = func(e cloudevents.Event) string { + return observability.ClientSpanName + "." + e.Context.GetType() +} diff --git a/observability/opentelemetry/v2/go.mod b/observability/opentelemetry/v2/go.mod new file mode 100644 index 000000000..61196b52b --- /dev/null +++ b/observability/opentelemetry/v2/go.mod @@ -0,0 +1,12 @@ +module github.com/cloudevents/sdk-go/observability/opentelemetry/v2 + +go 1.14 + +require ( + github.com/cloudevents/sdk-go/v2 v2.5.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.23.0 + go.opentelemetry.io/otel v1.0.0 + go.opentelemetry.io/otel/trace v1.0.0 +) + +replace github.com/cloudevents/sdk-go/v2 => ../../../v2 diff --git a/observability/opentelemetry/v2/go.sum b/observability/opentelemetry/v2/go.sum new file mode 100644 index 000000000..f9630635d --- /dev/null +++ b/observability/opentelemetry/v2/go.sum @@ -0,0 +1,65 @@ +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/felixge/httpsnoop v1.0.2 h1:+nS9g82KMXccJ/wp0zyRW9ZBHFETmMGtkk+2CTTrW4o= +github.com/felixge/httpsnoop v1.0.2/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68= +github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +go.opentelemetry.io/contrib v0.23.0 h1:MgRuo0JZZX8J9WLRjyd7OpTSbaLOdQXXJa6SnZvlWLM= +go.opentelemetry.io/contrib v0.23.0/go.mod h1:EH4yDYeNoaTqn/8yCWQmfNB78VHfGX2Jt2bvnvzBlGM= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.23.0 h1:hNSH6f4WUMDnRAvUCLItD0WKzQqAPoECvORj+ZChbnA= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.23.0/go.mod h1:wLrbAf2Qb+kFsEjowrxOcuy2SE0dcY0VwFiiYCmUeFQ= +go.opentelemetry.io/otel v1.0.0-RC3/go.mod h1:Ka5j3ua8tZs4Rkq4Ex3hwgBgOchyPVq5S6P2lz//nKQ= +go.opentelemetry.io/otel v1.0.0 h1:qTTn6x71GVBvoafHK/yaRUmFzI4LcONZD0/kXxl5PHI= +go.opentelemetry.io/otel v1.0.0/go.mod h1:AjRVh9A5/5DE7S+mZtTR6t8vpKKryam+0lREnfmS4cg= +go.opentelemetry.io/otel/internal/metric v0.23.0 h1:mPfzm9Iqhw7G2nDBmUAjFTfPqLZPbOW2k7QI57ITbaI= +go.opentelemetry.io/otel/internal/metric v0.23.0/go.mod h1:z+RPiDJe30YnCrOhFGivwBS+DU1JU/PiLKkk4re2DNY= +go.opentelemetry.io/otel/metric v0.23.0 h1:mYCcDxi60P4T27/0jchIDFa1WHEfQeU3zH9UEMpnj2c= +go.opentelemetry.io/otel/metric v0.23.0/go.mod h1:G/Nn9InyNnIv7J6YVkQfpc0JCfKBNJaERBGw08nqmVQ= +go.opentelemetry.io/otel/trace v1.0.0-RC3/go.mod h1:VUt2TUYd8S2/ZRX09ZDFZQwn2RqfMB5MzO17jBojGxo= +go.opentelemetry.io/otel/trace v1.0.0 h1:TSBr8GTEtKevYMG/2d21M989r5WJYVimhTHBKVEZuh4= +go.opentelemetry.io/otel/trace v1.0.0/go.mod h1:PXTWqayeFUlJV1YDNhsJYB184+IvAH814St6o6ajzIs= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/observability/opentelemetry/v2/http/http.go b/observability/opentelemetry/v2/http/http.go new file mode 100644 index 000000000..5a7d6670f --- /dev/null +++ b/observability/opentelemetry/v2/http/http.go @@ -0,0 +1,29 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package http + +import ( + "net/http" + + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + + cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" +) + +// NewObservedHTTP creates an HTTP protocol with OTel trace propagating middleware. +func NewObservedHTTP(opts ...cehttp.Option) (*cehttp.Protocol, error) { + // appends the OpenTelemetry Http transport + Middleware wrapper + // to properly trace outgoing and incoming requests from the client using this protocol + return cehttp.New(append( + []cehttp.Option{ + cehttp.WithRoundTripper(otelhttp.NewTransport(http.DefaultTransport)), + cehttp.WithMiddleware(func(next http.Handler) http.Handler { + return otelhttp.NewHandler(next, "cloudevents.http.receiver") + }), + }, + opts..., + )...) +} diff --git a/samples/http/go.mod b/samples/http/go.mod index b4da920e5..a26801734 100644 --- a/samples/http/go.mod +++ b/samples/http/go.mod @@ -6,11 +6,17 @@ require ( contrib.go.opencensus.io/exporter/prometheus v0.1.0 github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.5.0 github.com/cloudevents/sdk-go/observability/opencensus/v2 v2.5.0 + github.com/cloudevents/sdk-go/observability/opentelemetry/v2 v2.5.0 github.com/cloudevents/sdk-go/v2 v2.5.0 github.com/google/uuid v1.1.1 github.com/gorilla/mux v1.7.3 github.com/kelseyhightower/envconfig v1.4.0 go.opencensus.io v0.22.3 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.23.0 + go.opentelemetry.io/otel v1.0.0 + go.opentelemetry.io/otel/exporters/jaeger v1.0.0 + go.opentelemetry.io/otel/sdk v1.0.0 + go.opentelemetry.io/otel/trace v1.0.0 google.golang.org/protobuf v1.26.0 ) @@ -19,3 +25,5 @@ replace github.com/cloudevents/sdk-go/v2 => ../../v2 replace github.com/cloudevents/sdk-go/binding/format/protobuf/v2 => ../../binding/format/protobuf/v2 replace github.com/cloudevents/sdk-go/observability/opencensus/v2 => ../../observability/opencensus/v2 + +replace github.com/cloudevents/sdk-go/observability/opentelemetry/v2 => ../../observability/opentelemetry/v2 diff --git a/samples/http/go.sum b/samples/http/go.sum index c04b474e0..f6e0204f0 100644 --- a/samples/http/go.sum +++ b/samples/http/go.sum @@ -9,6 +9,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/felixge/httpsnoop v1.0.2 h1:+nS9g82KMXccJ/wp0zyRW9ZBHFETmMGtkk+2CTTrW4o= +github.com/felixge/httpsnoop v1.0.2/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -32,8 +34,9 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -82,17 +85,37 @@ github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jO github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a h1:9a8MnZMP0X2nLJdBg+pBmGgkJlSaKC2KaQmTCk1XDtE= github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.3 h1:8sGtKOrtQqkN1bp2AtX+misvLIlOmsEsNd+9NIcPEm8= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.opentelemetry.io/contrib v0.23.0 h1:MgRuo0JZZX8J9WLRjyd7OpTSbaLOdQXXJa6SnZvlWLM= +go.opentelemetry.io/contrib v0.23.0/go.mod h1:EH4yDYeNoaTqn/8yCWQmfNB78VHfGX2Jt2bvnvzBlGM= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.23.0 h1:hNSH6f4WUMDnRAvUCLItD0WKzQqAPoECvORj+ZChbnA= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.23.0/go.mod h1:wLrbAf2Qb+kFsEjowrxOcuy2SE0dcY0VwFiiYCmUeFQ= +go.opentelemetry.io/otel v1.0.0-RC3/go.mod h1:Ka5j3ua8tZs4Rkq4Ex3hwgBgOchyPVq5S6P2lz//nKQ= +go.opentelemetry.io/otel v1.0.0 h1:qTTn6x71GVBvoafHK/yaRUmFzI4LcONZD0/kXxl5PHI= +go.opentelemetry.io/otel v1.0.0/go.mod h1:AjRVh9A5/5DE7S+mZtTR6t8vpKKryam+0lREnfmS4cg= +go.opentelemetry.io/otel/exporters/jaeger v1.0.0 h1:cLhx8llHw02h5JTqGqaRbYn+QVKHmrzD9vEbKnSPk5U= +go.opentelemetry.io/otel/exporters/jaeger v1.0.0/go.mod h1:q10N1AolE1JjqKrFJK2tYw0iZpmX+HBaXBtuCzRnBGQ= +go.opentelemetry.io/otel/internal/metric v0.23.0 h1:mPfzm9Iqhw7G2nDBmUAjFTfPqLZPbOW2k7QI57ITbaI= +go.opentelemetry.io/otel/internal/metric v0.23.0/go.mod h1:z+RPiDJe30YnCrOhFGivwBS+DU1JU/PiLKkk4re2DNY= +go.opentelemetry.io/otel/metric v0.23.0 h1:mYCcDxi60P4T27/0jchIDFa1WHEfQeU3zH9UEMpnj2c= +go.opentelemetry.io/otel/metric v0.23.0/go.mod h1:G/Nn9InyNnIv7J6YVkQfpc0JCfKBNJaERBGw08nqmVQ= +go.opentelemetry.io/otel/sdk v1.0.0 h1:BNPMYUONPNbLneMttKSjQhOTlFLOD9U22HNG1KrIN2Y= +go.opentelemetry.io/otel/sdk v1.0.0/go.mod h1:PCrDHlSy5x1kjezSdL37PhbFUMjrsLRshJ2zCzeXwbM= +go.opentelemetry.io/otel/trace v1.0.0-RC3/go.mod h1:VUt2TUYd8S2/ZRX09ZDFZQwn2RqfMB5MzO17jBojGxo= +go.opentelemetry.io/otel/trace v1.0.0 h1:TSBr8GTEtKevYMG/2d21M989r5WJYVimhTHBKVEZuh4= +go.opentelemetry.io/otel/trace v1.0.0/go.mod h1:PXTWqayeFUlJV1YDNhsJYB184+IvAH814St6o6ajzIs= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= @@ -132,8 +155,9 @@ golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 h1:iGu644GcxtEcrInvDsQRCwJjtCIOlT2V7IRt6ah2Whw= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= @@ -172,4 +196,6 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/samples/http/otel-sender-receiver/README.md b/samples/http/otel-sender-receiver/README.md new file mode 100644 index 000000000..df231ff30 --- /dev/null +++ b/samples/http/otel-sender-receiver/README.md @@ -0,0 +1,54 @@ +# CloudEvents with OpenTelemetry instrumentation + +This sample demonstrates both client and server apps using CloudEvents instrumented with OpenTelemetry. + +> If this is your first time hearing about OpenTelemetry, take a look at the official documentation: [What is OpenTelemetry](https://opentelemetry.io/docs/concepts/what-is-opentelemetry/). + +The client app sends an event using CloudEvents to a server. The server listens for events also using CloudEvents. For each event it receives, it makes then an external HTTP request to `cloudevents.io`, just to illustrate the context propagation working. + +Here's a diagram to examplify the communication between the components: + +![client > server > external service](./request-flow.png "Sample app request flow") + +Both apps are configured to send the spans to a locally running [Jaeger](https://www.jaegertracing.io/) instance. See instructions below on how to run everything. + +## Requirements to run + +- Docker/Compose +- Go + +## Running the sample + +1. Download dependencies + +```shell +$ cd sdk-go\samples\http +$ go mod download +$ cd otel-sender-receiver +``` + +2. Start the Jaeger container + +```shell +$ docker-compose up +``` + +3. Run the server and the client + +```shell +$ go run server/server.go +``` + +```shell +$ go run client/client.go +``` + +4. Open the Jaeger UI (by default Jaeger exposes the UI at [http://localhost:16686](http://localhost:16686)) + +5. On the `Service` dropdown select `cloudevents-client`. Next, click on `Find Traces`. + +![Selecting the service on Jaeger](./jaeger-find-traces.png "Finding our traces in Jaeger") + +6. On the right side you should see a trace with name `cloudevents-client: cloudevents.client.example.type send`. Click there to see the full trace: + +![The complete trace on Jaeger](./jaeger-example.png "The full trace") diff --git a/samples/http/otel-sender-receiver/client/client.go b/samples/http/otel-sender-receiver/client/client.go new file mode 100644 index 000000000..736da74cc --- /dev/null +++ b/samples/http/otel-sender-receiver/client/client.go @@ -0,0 +1,43 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package main + +import ( + "context" + "log" + + otelObs "github.com/cloudevents/sdk-go/observability/opentelemetry/v2/client" + "github.com/cloudevents/sdk-go/samples/http/otel-sender-receiver/instrumentation" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/client" + cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" +) + +const ( + serviceName = "cloudevents-client" +) + +func main() { + shutdown := instrumentation.InitOTelSdk(serviceName) + defer shutdown() + + // create the cloudevents client instrumented with OpenTelemetry + c, err := otelObs.NewClientHTTP([]cehttp.Option{}, []client.Option{}) + if err != nil { + log.Fatalf("failed to create client, %v", err) + } + + event := cloudevents.NewEvent() + event.SetSource("example/uri") + event.SetType("example.type") + event.SetData(cloudevents.ApplicationJSON, map[string]string{"hello": "world"}) + + ctx := cloudevents.ContextWithTarget(context.Background(), "http://localhost:8080/") + + if result := c.Send(ctx, event); cloudevents.IsUndelivered(result) { + log.Fatalf("failed to send, %v", result) + } +} diff --git a/samples/http/otel-sender-receiver/docker-compose.yaml b/samples/http/otel-sender-receiver/docker-compose.yaml new file mode 100644 index 000000000..afb4c8a6c --- /dev/null +++ b/samples/http/otel-sender-receiver/docker-compose.yaml @@ -0,0 +1,16 @@ +version: "2.4" +services: + jaeger-otel-cloudevents: + image: jaegertracing/all-in-one:1.25 + environment: + - COLLECTOR_ZIPKIN_HOST_PORT=9411 + ports: + - "5775:5775/udp" + - "6831:6831/udp" + - "6832:6832/udp" + - "5778:5778" + - "16686:16686" + - "14268:14268" + - "14250:14250" + - "9411:9411" + command: --query.max-clock-skew-adjustment 1s diff --git a/samples/http/otel-sender-receiver/instrumentation/sdkUtils.go b/samples/http/otel-sender-receiver/instrumentation/sdkUtils.go new file mode 100644 index 000000000..9c5c99dbc --- /dev/null +++ b/samples/http/otel-sender-receiver/instrumentation/sdkUtils.go @@ -0,0 +1,52 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package instrumentation + +import ( + "context" + "log" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/jaeger" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" +) + +const ( + JaegerEndpoint = "http://localhost:14268/api/traces" +) + +func InitOTelSdk(serviceName string) func() { + ctx := context.Background() + + exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(JaegerEndpoint))) + if err != nil { + return func() { log.Printf("Failed to create the trace exporter: %v", err) } + } + + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exp), + sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithResource(resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String(serviceName), + )), + ) + + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) + + return func() { + if tp == nil { + return + } + if err := tp.Shutdown(ctx); err != nil { + log.Printf("Error shutting down the tracer provider: %v", err) + } + } +} diff --git a/samples/http/otel-sender-receiver/jaeger-example.png b/samples/http/otel-sender-receiver/jaeger-example.png new file mode 100644 index 000000000..06bb1ce3c Binary files /dev/null and b/samples/http/otel-sender-receiver/jaeger-example.png differ diff --git a/samples/http/otel-sender-receiver/jaeger-find-traces.png b/samples/http/otel-sender-receiver/jaeger-find-traces.png new file mode 100644 index 000000000..f96c78103 Binary files /dev/null and b/samples/http/otel-sender-receiver/jaeger-find-traces.png differ diff --git a/samples/http/otel-sender-receiver/request-flow.png b/samples/http/otel-sender-receiver/request-flow.png new file mode 100644 index 000000000..172b21429 Binary files /dev/null and b/samples/http/otel-sender-receiver/request-flow.png differ diff --git a/samples/http/otel-sender-receiver/server/server.go b/samples/http/otel-sender-receiver/server/server.go new file mode 100644 index 000000000..d2f56e3dd --- /dev/null +++ b/samples/http/otel-sender-receiver/server/server.go @@ -0,0 +1,71 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package main + +import ( + "context" + "log" + "net/http" + + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + + otelObs "github.com/cloudevents/sdk-go/observability/opentelemetry/v2/client" + "github.com/cloudevents/sdk-go/samples/http/otel-sender-receiver/instrumentation" + cloudevents "github.com/cloudevents/sdk-go/v2" + ceclient "github.com/cloudevents/sdk-go/v2/client" + "github.com/cloudevents/sdk-go/v2/protocol" + cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" +) + +var tracer trace.Tracer + +const ( + serviceName = "cloudevents-server" +) + +func main() { + shutdown := instrumentation.InitOTelSdk(serviceName) + tracer = otel.Tracer(serviceName + "-main") + defer shutdown() + + // create the cloudevents client instrumented with OpenTelemetry + c, err := otelObs.NewClientHTTP([]cehttp.Option{}, []ceclient.Option{}) + if err != nil { + log.Fatalf("failed to create client, %v", err) + } + + log.Fatal(c.StartReceiver(context.Background(), handleReceivedEvent)) +} + +func handleReceivedEvent(ctx context.Context, event cloudevents.Event) protocol.Result { + + // Showcase injecting the incoming tracecontext into the event as a DistributedTraceExtension + otelObs.InjectDistributedTracingExtension(ctx, event) + + // Showcase extracting the tracecontext from the event into a context in order to continue the trace. + // This is useful for cases where events are read from a queue and no context is present. + ctx = otelObs.ExtractDistributedTracingExtension(ctx, event) + + // manually start a span for this http request + ctx, childSpan := tracer.Start(ctx, "externalHttpCall", trace.WithAttributes(attribute.String("id", "123"))) + defer childSpan.End() + + // manually creating a http client instrumented with OpenTelemetry to make an external request + client := http.Client{Transport: otelhttp.NewTransport(http.DefaultTransport)} + + req, _ := http.NewRequestWithContext(ctx, "GET", "https://cloudevents.io/", nil) + + res, err := client.Do(req) + if err != nil { + panic(err) + } + _ = res.Body.Close() + + return nil +} diff --git a/test/observability/go.mod b/test/observability/go.mod new file mode 100644 index 000000000..4eb39d190 --- /dev/null +++ b/test/observability/go.mod @@ -0,0 +1,16 @@ +module github.com/cloudevents/sdk-go/test/observability + +go 1.14 + +require ( + github.com/cloudevents/sdk-go/observability/opentelemetry/v2 v2.5.0 + github.com/cloudevents/sdk-go/v2 v2.5.0 + github.com/stretchr/testify v1.7.0 + go.opentelemetry.io/otel v1.0.0 + go.opentelemetry.io/otel/sdk v1.0.0 + go.opentelemetry.io/otel/trace v1.0.0 +) + +replace github.com/cloudevents/sdk-go/observability/opentelemetry/v2 => ../../observability/opentelemetry/v2 + +replace github.com/cloudevents/sdk-go/v2 => ../../v2 diff --git a/test/observability/go.sum b/test/observability/go.sum new file mode 100644 index 000000000..66f323cc4 --- /dev/null +++ b/test/observability/go.sum @@ -0,0 +1,69 @@ +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/felixge/httpsnoop v1.0.2 h1:+nS9g82KMXccJ/wp0zyRW9ZBHFETmMGtkk+2CTTrW4o= +github.com/felixge/httpsnoop v1.0.2/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68= +github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +go.opentelemetry.io/contrib v0.23.0 h1:MgRuo0JZZX8J9WLRjyd7OpTSbaLOdQXXJa6SnZvlWLM= +go.opentelemetry.io/contrib v0.23.0/go.mod h1:EH4yDYeNoaTqn/8yCWQmfNB78VHfGX2Jt2bvnvzBlGM= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.23.0 h1:hNSH6f4WUMDnRAvUCLItD0WKzQqAPoECvORj+ZChbnA= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.23.0/go.mod h1:wLrbAf2Qb+kFsEjowrxOcuy2SE0dcY0VwFiiYCmUeFQ= +go.opentelemetry.io/otel v1.0.0-RC3/go.mod h1:Ka5j3ua8tZs4Rkq4Ex3hwgBgOchyPVq5S6P2lz//nKQ= +go.opentelemetry.io/otel v1.0.0 h1:qTTn6x71GVBvoafHK/yaRUmFzI4LcONZD0/kXxl5PHI= +go.opentelemetry.io/otel v1.0.0/go.mod h1:AjRVh9A5/5DE7S+mZtTR6t8vpKKryam+0lREnfmS4cg= +go.opentelemetry.io/otel/internal/metric v0.23.0 h1:mPfzm9Iqhw7G2nDBmUAjFTfPqLZPbOW2k7QI57ITbaI= +go.opentelemetry.io/otel/internal/metric v0.23.0/go.mod h1:z+RPiDJe30YnCrOhFGivwBS+DU1JU/PiLKkk4re2DNY= +go.opentelemetry.io/otel/metric v0.23.0 h1:mYCcDxi60P4T27/0jchIDFa1WHEfQeU3zH9UEMpnj2c= +go.opentelemetry.io/otel/metric v0.23.0/go.mod h1:G/Nn9InyNnIv7J6YVkQfpc0JCfKBNJaERBGw08nqmVQ= +go.opentelemetry.io/otel/sdk v1.0.0 h1:BNPMYUONPNbLneMttKSjQhOTlFLOD9U22HNG1KrIN2Y= +go.opentelemetry.io/otel/sdk v1.0.0/go.mod h1:PCrDHlSy5x1kjezSdL37PhbFUMjrsLRshJ2zCzeXwbM= +go.opentelemetry.io/otel/trace v1.0.0-RC3/go.mod h1:VUt2TUYd8S2/ZRX09ZDFZQwn2RqfMB5MzO17jBojGxo= +go.opentelemetry.io/otel/trace v1.0.0 h1:TSBr8GTEtKevYMG/2d21M989r5WJYVimhTHBKVEZuh4= +go.opentelemetry.io/otel/trace v1.0.0/go.mod h1:PXTWqayeFUlJV1YDNhsJYB184+IvAH814St6o6ajzIs= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 h1:iGu644GcxtEcrInvDsQRCwJjtCIOlT2V7IRt6ah2Whw= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/test/observability/opentelemetry/client_test.go b/test/observability/opentelemetry/client_test.go new file mode 100644 index 000000000..a6ee3bd9b --- /dev/null +++ b/test/observability/opentelemetry/client_test.go @@ -0,0 +1,259 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package opentelemetry + +import ( + "bytes" + "context" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" + + otelObs "github.com/cloudevents/sdk-go/observability/opentelemetry/v2/client" + obshttp "github.com/cloudevents/sdk-go/observability/opentelemetry/v2/http" + "github.com/cloudevents/sdk-go/v2/client" + event "github.com/cloudevents/sdk-go/v2/event" + "github.com/cloudevents/sdk-go/v2/protocol" + cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" + "github.com/cloudevents/sdk-go/v2/types" +) + +func TestTracedClientReceive(t *testing.T) { + now := time.Now() + + testCases := map[string]struct { + event event.Event + expectedResult protocol.Result + ack bool + }{ + "simple binary v0.3": { + event: func() event.Event { + e := event.Event{ + Context: event.EventContextV03{ + Type: "unit.test.client", + Source: *types.ParseURIRef("/unit/test/client"), + Time: &types.Timestamp{Time: now}, + ID: "AABBCCDDEE", + }.AsV03(), + } + _ = e.SetData(event.ApplicationJSON, &map[string]string{ + "sq": "42", + "msg": "hello", + }) + return e + }(), + ack: true, + }, + "receive with error": { + event: func() event.Event { + e := event.Event{ + Context: event.EventContextV03{ + Type: "unit.test.client", + Source: *types.ParseURIRef("/unit/test/client"), + Time: &types.Timestamp{Time: now}, + ID: "AABBCCDDEE", + }.AsV03(), + } + _ = e.SetData(event.ApplicationJSON, &map[string]string{ + "sq": "42", + "msg": "hello", + }) + return e + }(), + expectedResult: cehttp.NewResult(500, "some error happened within the receiver"), + ack: false, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + sr, tracer := configureOtelTestSdk() + + // creates and starts the receiver + p, err := obshttp.NewObservedHTTP(cehttp.WithPort(0)) + require.NoError(t, err) + c, err := client.New(p, client.WithObservabilityService(otelObs.NewOTelObservabilityService())) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.TODO()) + go func() { + require.NoError(t, c.StartReceiver(ctx, func(ctx context.Context, e event.Event) protocol.Result { + return tc.expectedResult + })) + }() + time.Sleep(5 * time.Millisecond) // let the server start + + target := fmt.Sprintf("http://localhost:%d", p.GetListeningPort()) + sender, err := otelObs.NewClientHTTP([]cehttp.Option{cehttp.WithTarget(target)}, []client.Option{}) + require.NoError(t, err) + + // act + ctx, span := tracer.Start(ctx, "test-span") + result := sender.Send(ctx, tc.event) + span.End() + + require.Equal(t, tc.ack, protocol.IsACK(result)) + + spans := sr.Ended() + + // 1 span from the test + // 2 spans from sending the event (http client auto-instrumentation + obs service) + // 2 spans from receiving the event (http client middleware + obs service) + assert.Equal(t, 5, len(spans)) + + if !tc.ack { + // The span created by the observability service should have the error that came from the receiver fn + obsSpan := spans[0] + assert.Equal(t, codes.Error, obsSpan.Status().Code) + assert.Equal(t, "500: some error happened within the receiver", obsSpan.Status().Description) + } + + // Now stop the client + cancel() + }) + } +} + +func TestTracingClientSend(t *testing.T) { + now := time.Now() + + testCases := map[string]struct { + event event.Event + resp *http.Response + ack bool + }{ + "send with ok response": { + event: func() event.Event { + e := event.Event{ + Context: event.EventContextV1{ + Type: "unit.test.client", + Source: *types.ParseURIRef("/unit/test/client"), + Time: &types.Timestamp{Time: now}, + ID: "AABBCCDDEE", + }.AsV1(), + } + _ = e.SetData(event.ApplicationJSON, &map[string]interface{}{ + "sq": 42, + "msg": "hello", + }) + return e + }(), + resp: &http.Response{ + StatusCode: http.StatusAccepted, + }, + ack: true, + }, + "send with error response": { + event: func() event.Event { + e := event.Event{ + Context: event.EventContextV1{ + Type: "unit.test.client", + Source: *types.ParseURIRef("/unit/test/client"), + Time: &types.Timestamp{Time: now}, + ID: "AABBCCDDEE", + }.AsV1(), + } + _ = e.SetData(event.ApplicationJSON, &map[string]interface{}{ + "sq": 42, + "msg": "hello", + }) + return e + }(), + resp: &http.Response{ + StatusCode: http.StatusBadRequest, + }, + ack: false, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + sr, tracer := configureOtelTestSdk() + handler := &fakeHandler{ + t: t, + response: tc.resp, + requests: make([]*http.Request, 0), + } + server := httptest.NewServer(handler) + defer server.Close() + + sender, err := otelObs.NewClientHTTP([]cehttp.Option{cehttp.WithTarget(server.URL)}, []client.Option{}) + require.NoError(t, err) + + // act + ctx, span := tracer.Start(context.Background(), "test-span") + result := sender.Send(ctx, tc.event) + span.End() + + require.Equal(t, tc.ack, protocol.IsACK(result)) + + spans := sr.Ended() + + // 1 span from the test + // 2 spans from sending the event (http client auto-instrumentation + obs service) + // 2 spans from receiving the event (http client middleware + obs service) + assert.Equal(t, 3, len(spans)) + + // get the traceparent header from the outgoing request + r := handler.popRequest(t) + if tp := r.Header.Get("traceparent"); tp == "" { + t.Fatal("missing traceparent header") + } + + // The request should have been sent with the last spanID (from the auto-instrumentation lib) + ctx = prop.Extract(ctx, propagation.HeaderCarrier(r.Header)) + spanCtx := trace.SpanContextFromContext(ctx) + assert.Equal(t, spans[0].SpanContext().TraceID(), spanCtx.TraceID()) + assert.Equal(t, spans[0].SpanContext().SpanID(), spanCtx.SpanID()) + }) + } +} + +type fakeHandler struct { + t *testing.T + response *http.Response + requests []*http.Request +} + +func (f *fakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + + // Make a copy of the request. + f.requests = append(f.requests, r) + + // Write the response. + if f.response != nil { + for h, vs := range f.response.Header { + for _, v := range vs { + w.Header().Add(h, v) + } + } + w.WriteHeader(f.response.StatusCode) + var buf bytes.Buffer + if f.response.ContentLength > 0 { + _, _ = buf.ReadFrom(f.response.Body) + _, _ = w.Write(buf.Bytes()) + } + } else { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("")) + } +} + +func (f *fakeHandler) popRequest(t *testing.T) *http.Request { + if len(f.requests) == 0 { + t.Error("Unable to pop request") + } + r := f.requests[0] + f.requests = f.requests[1:] + return r +} diff --git a/test/observability/opentelemetry/cloudevents_carrier_test.go b/test/observability/opentelemetry/cloudevents_carrier_test.go new file mode 100644 index 000000000..34d2a2e5e --- /dev/null +++ b/test/observability/opentelemetry/cloudevents_carrier_test.go @@ -0,0 +1,215 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package opentelemetry + +import ( + "context" + "fmt" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + sdkTrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" + + otelObs "github.com/cloudevents/sdk-go/observability/opentelemetry/v2/client" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/extensions" +) + +var ( + traceparent = http.CanonicalHeaderKey("traceparent") + tracestate = http.CanonicalHeaderKey("tracestate") + + prop = propagation.TraceContext{} + eventTraceID = "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" + eventSpanID = "bbbbbbbbbbbbbbbb" + distributedExt = extensions.DistributedTracingExtension{ + TraceParent: fmt.Sprintf("00-%s-%s-00", eventTraceID, eventSpanID), + TraceState: "key1=value1,key2=value2", + } +) + +func TestExtractContextWithTraceContext(t *testing.T) { + type testcase struct { + name string + event cloudevents.Event + header http.Header + want string + } + + tests := []testcase{ + { + name: "tracecontext in the context is overwritten by the one from the event", + event: createCloudEvent(distributedExt), + header: http.Header{ + traceparent: []string{"00-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-aaaaaaaaaaaaaaaa-00"}, + }, + want: "00-bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb-bbbbbbbbbbbbbbbb-00", + }, + { + name: "context with tracecontext and event with invalid tracecontext", + event: createCloudEventWithInvalidTraceParent(), + header: http.Header{ + traceparent: []string{"00-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-aaaaaaaaaaaaaaaa-00"}, + }, + want: "00-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-aaaaaaaaaaaaaaaa-00", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + // Simulates a case of an auto-instrumented client where the context + // has the incoming parent span + incomingCtx := prop.Extract(context.Background(), propagation.HeaderCarrier(tc.header)) + + // act + newCtx := otelObs.ExtractDistributedTracingExtension(incomingCtx, tc.event) + + prop := propagation.TraceContext{} + carrier := otelObs.NewCloudEventCarrier() + prop.Inject(newCtx, carrier) + + // the newCtx contains the expected traceparent + assert.Equal(t, tc.want, carrier.Extension.TraceParent) + }) + } +} + +func TestExtractContextWithoutTraceContext(t *testing.T) { + type testcase struct { + name string + event cloudevents.Event + header http.Header + } + _, _ = configureOtelTestSdk() + tests := []testcase{ + { + name: "context without tracecontext", + event: createCloudEvent(distributedExt), + }, + { + name: "context with invalid tracecontext and event with valid tracecontext", + event: createCloudEvent(distributedExt), + header: http.Header{ + traceparent: []string{"00-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-1-00"}, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + incomingCtx := context.Background() + + if tc.header != nil { + incomingCtx = prop.Extract(incomingCtx, propagation.HeaderCarrier(tc.header)) + } + + // act + newCtx := otelObs.ExtractDistributedTracingExtension(incomingCtx, tc.event) + sc := trace.SpanContextFromContext(newCtx) + + // the new context should be different since it was enriched with the tracecontext from the event + assert.NotEqual(t, trace.SpanContextFromContext(incomingCtx), sc) + + // make sure the IDs are as expected + assert.Equal(t, eventTraceID, sc.TraceID().String()) + assert.Equal(t, eventSpanID, sc.SpanID().String()) + assert.Equal(t, distributedExt.TraceState, sc.TraceState().String()) + }) + } +} + +func TestInjectDistributedTracingExtension(t *testing.T) { + type testcase struct { + name string + event cloudevents.Event + header http.Header + want extensions.DistributedTracingExtension + } + tests := []testcase{ + { + name: "inject tracecontext into event", + event: createCloudEvent(extensions.DistributedTracingExtension{}), + header: http.Header{ + traceparent: []string{"00-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-aaaaaaaaaaaaaaaa-00"}, + tracestate: []string{"key1=value1,key2=value2"}, + }, + want: extensions.DistributedTracingExtension{ + TraceParent: "00-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-aaaaaaaaaaaaaaaa-00", + TraceState: "key1=value1,key2=value2", + }, + }, + { + name: "overwrite tracecontext in the event from the context", + event: createCloudEvent(distributedExt), + header: http.Header{ + traceparent: []string{"00-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-aaaaaaaaaaaaaaaa-00"}, + tracestate: []string{"key1=value1,key2=value2,key3=value3"}, + }, + want: extensions.DistributedTracingExtension{ + TraceParent: "00-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-aaaaaaaaaaaaaaaa-00", + TraceState: "key1=value1,key2=value2,key3=value3", + }, + }, + { + name: "context without tracecontext", + event: createCloudEvent(distributedExt), + want: distributedExt, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + ctx = prop.Extract(ctx, propagation.HeaderCarrier(tc.header)) + + // act + otelObs.InjectDistributedTracingExtension(ctx, tc.event) + + actual, ok := extensions.GetDistributedTracingExtension(tc.event) + assert.True(t, ok) + assert.Equal(t, tc.want, actual) + }) + } + +} + +func createCloudEvent(distributedExt extensions.DistributedTracingExtension) cloudevents.Event { + event := cloudevents.NewEvent() + event.SetSource("example/uri") + event.SetType("example.type") + event.SetData(cloudevents.ApplicationJSON, map[string]string{"hello": "world"}) + + if distributedExt.TraceParent != "" { + distributedExt.AddTracingAttributes(&event) + } + + return event +} + +func createCloudEventWithInvalidTraceParent() cloudevents.Event { + event := cloudevents.NewEvent() + event.SetSource("example/uri") + event.SetType("example.type") + event.SetData(cloudevents.ApplicationJSON, map[string]string{"hello": "world"}) + + // set directly to force an invalid value + event.SetExtension(extensions.TraceParentExtension, 123) + + return event +} + +func configureOtelTestSdk() (*tracetest.SpanRecorder, trace.Tracer) { + sr := tracetest.NewSpanRecorder() + provider := sdkTrace.NewTracerProvider(sdkTrace.WithSpanProcessor(sr), sdkTrace.WithSampler(sdkTrace.AlwaysSample())) + otel.SetTracerProvider(provider) + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) + return sr, provider.Tracer("test-tracer") +} diff --git a/test/observability/opentelemetry/doc.go b/test/observability/opentelemetry/doc.go new file mode 100644 index 000000000..8e9326165 --- /dev/null +++ b/test/observability/opentelemetry/doc.go @@ -0,0 +1,13 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +/* +Package opentelemetry validates the carrier and observability service instrumentation with the default SDK. + +This package is in a separate module from the instrumentation it tests to +isolate the dependency of the default OTel SDK and not impose this as a transitive +dependency for users. +*/ +package opentelemetry diff --git a/test/observability/opentelemetry/otel_observability_service_test.go b/test/observability/opentelemetry/otel_observability_service_test.go new file mode 100644 index 000000000..da55dd81f --- /dev/null +++ b/test/observability/opentelemetry/otel_observability_service_test.go @@ -0,0 +1,422 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package opentelemetry + +import ( + "context" + "fmt" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "go.opentelemetry.io/otel/trace" + + otelObs "github.com/cloudevents/sdk-go/observability/opentelemetry/v2/client" + cloudevents "github.com/cloudevents/sdk-go/v2" + event "github.com/cloudevents/sdk-go/v2/event" + "github.com/cloudevents/sdk-go/v2/extensions" + "github.com/cloudevents/sdk-go/v2/protocol" + "github.com/cloudevents/sdk-go/v2/protocol/http" +) + +var ( + expectedEvent cloudevents.Event = createCloudEvent(extensions.DistributedTracingExtension{}) +) + +func TestRecordSendingEvent(t *testing.T) { + tests := []struct { + name string + expectedSpanName string + expectedStatus codes.Code + expectedAttrs []attribute.KeyValue + expectedResult protocol.Result + expectedSpanKind trace.SpanKind + nameFormatter func(cloudevents.Event) string + attributesGetter func(cloudevents.Event) []attribute.KeyValue + }{ + + { + name: "send with default options", + expectedSpanName: "cloudevents.client.example.type send", + expectedStatus: codes.Unset, + expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordSendingEvent"), + expectedSpanKind: trace.SpanKindProducer, + nameFormatter: nil, + }, + { + name: "send with custom span name", + expectedSpanName: "test.example.type send", + expectedStatus: codes.Unset, + expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordSendingEvent"), + expectedSpanKind: trace.SpanKindProducer, + nameFormatter: func(e cloudevents.Event) string { + return "test." + e.Context.GetType() + }, + }, + { + name: "send with custom attributes", + expectedSpanName: "test.example.type send", + expectedStatus: codes.Unset, + expectedAttrs: append(otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordSendingEvent"), attribute.String("my-attr", "some-value")), + expectedSpanKind: trace.SpanKindProducer, + nameFormatter: func(e cloudevents.Event) string { + return "test." + e.Context.GetType() + }, + attributesGetter: func(cloudevents.Event) []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String("my-attr", "some-value"), + } + }, + }, + { + name: "send with error response", + expectedSpanName: "cloudevents.client.example.type send", + expectedStatus: codes.Unset, + expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordSendingEvent"), + expectedSpanKind: trace.SpanKindProducer, + expectedResult: protocol.NewReceipt(false, "some error here"), + }, + { + name: "send with http error response", + expectedSpanName: "cloudevents.client.example.type send", + expectedStatus: codes.Error, + expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordSendingEvent"), + expectedSpanKind: trace.SpanKindProducer, + expectedResult: http.NewResult(500, "some server error"), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + sr, _ := configureOtelTestSdk() + ctx := context.Background() + + os := otelObs.NewOTelObservabilityService( + otelObs.WithSpanNameFormatter(tc.nameFormatter), + otelObs.WithSpanAttributesGetter(tc.attributesGetter)) + + // act + ctx, cb := os.RecordSendingEvent(ctx, expectedEvent) + cb(tc.expectedResult) + + spans := sr.Ended() + + // since the obs service started a span, the context should have the spancontext + assert.NotNil(t, trace.SpanContextFromContext(ctx)) + assert.Equal(t, 1, len(spans)) + + span := spans[0] + assert.Equal(t, tc.expectedSpanName, span.Name()) + assert.Equal(t, tc.expectedStatus, span.Status().Code) + assert.Equal(t, tc.expectedSpanKind, span.SpanKind()) + + if !reflect.DeepEqual(span.Attributes(), tc.expectedAttrs) { + t.Errorf("p = %v, want %v", span.Attributes(), tc.expectedAttrs) + } + + if tc.expectedResult != nil { + assert.Equal(t, 1, len(span.Events())) + assert.Equal(t, semconv.ExceptionEventName, span.Events()[0].Name) + + attrsMap := getSpanEventMap(span.Events()[0].Attributes) + assert.Equal(t, tc.expectedResult.Error(), attrsMap[string(semconv.ExceptionMessageKey)]) + } + }) + } +} + +func TestRecordRequestEvent(t *testing.T) { + tests := []struct { + name string + expectedSpanName string + expectedStatus codes.Code + expectedAttrs []attribute.KeyValue + expectedResult protocol.Result + expectedSpanKind trace.SpanKind + nameFormatter func(cloudevents.Event) string + attributesGetter func(cloudevents.Event) []attribute.KeyValue + }{ + + { + name: "request with default options", + expectedSpanName: "cloudevents.client.example.type send", + expectedStatus: codes.Unset, + expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordRequestEvent"), + expectedSpanKind: trace.SpanKindProducer, + nameFormatter: nil, + }, + { + name: "request with custom span name", + expectedSpanName: "test.example.type send", + expectedStatus: codes.Unset, + expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordRequestEvent"), + expectedSpanKind: trace.SpanKindProducer, + nameFormatter: func(e cloudevents.Event) string { + return "test." + e.Context.GetType() + }, + }, + { + name: "request with custom attributes", + expectedSpanName: "test.example.type send", + expectedStatus: codes.Unset, + expectedAttrs: append(otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordRequestEvent"), attribute.String("my-attr", "some-value")), + expectedSpanKind: trace.SpanKindProducer, + nameFormatter: func(e cloudevents.Event) string { + return "test." + e.Context.GetType() + }, + attributesGetter: func(cloudevents.Event) []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String("my-attr", "some-value"), + } + }, + }, + { + name: "send with error response", + expectedSpanName: "cloudevents.client.example.type send", + expectedStatus: codes.Unset, + expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordRequestEvent"), + expectedSpanKind: trace.SpanKindProducer, + expectedResult: protocol.NewReceipt(false, "some error here"), + }, + { + name: "request with http error response", + expectedSpanName: "cloudevents.client.example.type send", + expectedStatus: codes.Error, + expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordRequestEvent"), + expectedSpanKind: trace.SpanKindProducer, + expectedResult: http.NewResult(500, "some server error"), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + sr, _ := configureOtelTestSdk() + ctx := context.Background() + + os := otelObs.NewOTelObservabilityService( + otelObs.WithSpanNameFormatter(tc.nameFormatter), + otelObs.WithSpanAttributesGetter(tc.attributesGetter)) + + // act + ctx, cb := os.RecordRequestEvent(ctx, expectedEvent) + cb(tc.expectedResult, &expectedEvent) + + spans := sr.Ended() + + // since the obs service started a span, the context should have the spancontext + assert.NotNil(t, trace.SpanContextFromContext(ctx)) + assert.Equal(t, 1, len(spans)) + + span := spans[0] + assert.Equal(t, tc.expectedSpanName, span.Name()) + assert.Equal(t, tc.expectedStatus, span.Status().Code) + assert.Equal(t, tc.expectedSpanKind, span.SpanKind()) + + if !reflect.DeepEqual(span.Attributes(), tc.expectedAttrs) { + t.Errorf("p = %v, want %v", span.Attributes(), tc.expectedAttrs) + } + + if tc.expectedResult != nil { + assert.Equal(t, 1, len(span.Events())) + assert.Equal(t, semconv.ExceptionEventName, span.Events()[0].Name) + + attrsMap := getSpanEventMap(span.Events()[0].Attributes) + assert.Equal(t, tc.expectedResult.Error(), attrsMap[string(semconv.ExceptionMessageKey)]) + } + }) + } +} + +func TestRecordCallingInvoker(t *testing.T) { + tests := []struct { + name string + expectedSpanName string + expectedStatus codes.Code + expectedAttrs []attribute.KeyValue + expectedResult protocol.Result + expectedSpanKind trace.SpanKind + nameFormatter func(cloudevents.Event) string + attributesGetter func(cloudevents.Event) []attribute.KeyValue + }{ + + { + name: "invoker with default options", + expectedSpanName: "cloudevents.client.example.type process", + expectedStatus: codes.Unset, + expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordCallingInvoker"), + expectedSpanKind: trace.SpanKindConsumer, + nameFormatter: nil, + }, + { + name: "invoker with custom span name", + expectedSpanName: "test.example.type process", + expectedStatus: codes.Unset, + expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordCallingInvoker"), + expectedSpanKind: trace.SpanKindConsumer, + nameFormatter: func(e cloudevents.Event) string { + return "test." + e.Context.GetType() + }, + }, + { + name: "invoker with custom attributes", + expectedSpanName: "test.example.type process", + expectedStatus: codes.Unset, + expectedAttrs: append(otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordCallingInvoker"), attribute.String("my-attr", "some-value")), + expectedSpanKind: trace.SpanKindConsumer, + nameFormatter: func(e cloudevents.Event) string { + return "test." + e.Context.GetType() + }, + attributesGetter: func(cloudevents.Event) []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String("my-attr", "some-value"), + } + }, + }, + { + name: "invoker with error response", + expectedSpanName: "cloudevents.client.example.type process", + expectedStatus: codes.Unset, + expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordCallingInvoker"), + expectedSpanKind: trace.SpanKindConsumer, + expectedResult: protocol.NewReceipt(false, "some error here"), + }, + { + name: "invoker with http error response", + expectedSpanName: "cloudevents.client.example.type process", + expectedStatus: codes.Error, + expectedAttrs: otelObs.GetDefaultSpanAttributes(&expectedEvent, "RecordCallingInvoker"), + expectedSpanKind: trace.SpanKindConsumer, + expectedResult: http.NewResult(500, "some server error"), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + sr, _ := configureOtelTestSdk() + ctx := context.Background() + + os := otelObs.NewOTelObservabilityService( + otelObs.WithSpanNameFormatter(tc.nameFormatter), + otelObs.WithSpanAttributesGetter(tc.attributesGetter)) + + // act + ctx, cb := os.RecordCallingInvoker(ctx, &expectedEvent) + cb(tc.expectedResult) + + spans := sr.Ended() + + // since the obs service started a span, the context should have the spancontext + assert.NotNil(t, trace.SpanContextFromContext(ctx)) + assert.Equal(t, 1, len(spans)) + + span := spans[0] + assert.Equal(t, tc.expectedSpanName, span.Name()) + assert.Equal(t, tc.expectedStatus, span.Status().Code) + assert.Equal(t, tc.expectedSpanKind, span.SpanKind()) + + if !reflect.DeepEqual(span.Attributes(), tc.expectedAttrs) { + t.Errorf("p = %v, want %v", span.Attributes(), tc.expectedAttrs) + } + + if tc.expectedResult != nil { + assert.Equal(t, 1, len(span.Events())) + assert.Equal(t, semconv.ExceptionEventName, span.Events()[0].Name) + + attrsMap := getSpanEventMap(span.Events()[0].Attributes) + assert.Equal(t, tc.expectedResult.Error(), attrsMap[string(semconv.ExceptionMessageKey)]) + } + }) + } +} + +func TestRecordReceivedMalformedEvent(t *testing.T) { + tests := []struct { + name string + expectedSpanName string + expectedStatus codes.Code + expectedAttrs []attribute.KeyValue + expectedResult protocol.Result + expectedSpanKind trace.SpanKind + }{ + + { + name: "received simple error", + expectedSpanName: "cloudevents.client.malformed receive", + expectedStatus: codes.Unset, + expectedAttrs: []attribute.KeyValue{ + attribute.String(string(semconv.CodeFunctionKey), "RecordReceivedMalformedEvent"), + }, + expectedSpanKind: trace.SpanKindConsumer, + expectedResult: fmt.Errorf("unrecognized event version 0.1.1"), + }, + { + name: "received validation error", + expectedSpanName: "cloudevents.client.malformed receive", + expectedStatus: codes.Unset, + expectedAttrs: []attribute.KeyValue{ + attribute.String(string(semconv.CodeFunctionKey), "RecordReceivedMalformedEvent"), + }, + expectedSpanKind: trace.SpanKindConsumer, + expectedResult: event.ValidationError{"specversion": fmt.Errorf("missing Event.Context")}, + }, + { + name: "received http error", + expectedSpanName: "cloudevents.client.malformed receive", + expectedStatus: codes.Error, + expectedAttrs: []attribute.KeyValue{ + attribute.String(string(semconv.CodeFunctionKey), "RecordReceivedMalformedEvent"), + }, + expectedSpanKind: trace.SpanKindConsumer, + expectedResult: http.NewResult(400, "malformed event"), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + sr, _ := configureOtelTestSdk() + ctx := context.Background() + + os := otelObs.NewOTelObservabilityService() + + // act + os.RecordReceivedMalformedEvent(ctx, tc.expectedResult) + + spans := sr.Ended() + + // since the obs service started a span, the context should have the spancontext + assert.NotNil(t, trace.SpanContextFromContext(ctx)) + assert.Equal(t, 1, len(spans)) + + span := spans[0] + assert.Equal(t, tc.expectedSpanName, span.Name()) + assert.Equal(t, tc.expectedStatus, span.Status().Code) + assert.Equal(t, tc.expectedSpanKind, span.SpanKind()) + + if !reflect.DeepEqual(span.Attributes(), tc.expectedAttrs) { + t.Errorf("p = %v, want %v", span.Attributes(), tc.expectedAttrs) + } + + if tc.expectedResult != nil { + assert.Equal(t, 1, len(span.Events())) + assert.Equal(t, semconv.ExceptionEventName, span.Events()[0].Name) + + attrsMap := getSpanEventMap(span.Events()[0].Attributes) + assert.Equal(t, tc.expectedResult.Error(), attrsMap[string(semconv.ExceptionMessageKey)]) + } + }) + } +} + +func getSpanEventMap(evtAttrs []attribute.KeyValue) map[string]string { + attr := map[string]string{} + for _, v := range evtAttrs { + attr[string(v.Key)] = v.Value.AsString() + } + return attr +}