diff --git a/contrib/cloud.google.com/go/pubsub.v1/option.go b/contrib/cloud.google.com/go/pubsub.v1/option.go new file mode 100644 index 0000000000..c28b560bc6 --- /dev/null +++ b/contrib/cloud.google.com/go/pubsub.v1/option.go @@ -0,0 +1,31 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022 Datadog, Inc. + +package pubsub + +type config struct { + serviceName string + measured bool +} + +// A Option is used to customize spans started by WrapReceiveHandler or Publish. +type Option func(cfg *config) + +// A ReceiveOption has been deprecated in favor of Option. +type ReceiveOption = Option + +// WithServiceName sets the service name tag for traces started by WrapReceiveHandler or Publish. +func WithServiceName(serviceName string) Option { + return func(cfg *config) { + cfg.serviceName = serviceName + } +} + +// WithMeasured sets the measured tag for traces started by WrapReceiveHandler or Publish. +func WithMeasured() Option { + return func(cfg *config) { + cfg.measured = true + } +} diff --git a/contrib/cloud.google.com/go/pubsub.v1/pubsub.go b/contrib/cloud.google.com/go/pubsub.v1/pubsub.go index 3e2082c47b..6daf0f88b3 100644 --- a/contrib/cloud.google.com/go/pubsub.v1/pubsub.go +++ b/contrib/cloud.google.com/go/pubsub.v1/pubsub.go @@ -24,14 +24,27 @@ import ( // the published message. // It is required to call (*PublishResult).Get(ctx) on the value returned by Publish to complete // the span. -func Publish(ctx context.Context, t *pubsub.Topic, msg *pubsub.Message) *PublishResult { - span, ctx := tracer.StartSpanFromContext( - ctx, - "pubsub.publish", +func Publish(ctx context.Context, t *pubsub.Topic, msg *pubsub.Message, opts ...Option) *PublishResult { + var cfg config + for _, opt := range opts { + opt(&cfg) + } + spanOpts := []ddtrace.StartSpanOption{ tracer.ResourceName(t.String()), tracer.SpanType(ext.SpanTypeMessageProducer), tracer.Tag("message_size", len(msg.Data)), tracer.Tag("ordering_key", msg.OrderingKey), + } + if cfg.serviceName != "" { + spanOpts = append(spanOpts, tracer.ServiceName(cfg.serviceName)) + } + if cfg.measured { + spanOpts = append(spanOpts, tracer.Measured()) + } + span, ctx := tracer.StartSpanFromContext( + ctx, + "pubsub.publish", + spanOpts..., ) if msg.Attributes == nil { msg.Attributes = make(map[string]string) @@ -64,24 +77,10 @@ func (r *PublishResult) Get(ctx context.Context) (string, error) { return serverID, err } -type config struct { - serviceName string -} - -// A ReceiveOption is used to customize spans started by WrapReceiveHandler. -type ReceiveOption func(cfg *config) - -// WithServiceName sets the service name tag for traces started by WrapReceiveHandler. -func WithServiceName(serviceName string) ReceiveOption { - return func(cfg *config) { - cfg.serviceName = serviceName - } -} - // WrapReceiveHandler returns a receive handler that wraps the supplied handler, // extracts any tracing metadata attached to the received message, and starts a // receive span. -func WrapReceiveHandler(s *pubsub.Subscription, f func(context.Context, *pubsub.Message), opts ...ReceiveOption) func(context.Context, *pubsub.Message) { +func WrapReceiveHandler(s *pubsub.Subscription, f func(context.Context, *pubsub.Message), opts ...Option) func(context.Context, *pubsub.Message) { var cfg config for _, opt := range opts { opt(&cfg) @@ -102,6 +101,9 @@ func WrapReceiveHandler(s *pubsub.Subscription, f func(context.Context, *pubsub. if cfg.serviceName != "" { opts = append(opts, tracer.ServiceName(cfg.serviceName)) } + if cfg.measured { + opts = append(opts, tracer.Measured()) + } span, ctx := tracer.StartSpanFromContext(ctx, "pubsub.receive", opts...) if msg.DeliveryAttempt != nil { span.SetTag("delivery_attempt", *msg.DeliveryAttempt)