Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contrib/cloud.google.com/go/pubsub.v1: add Option to Publish #1332

Merged
merged 10 commits into from
Oct 26, 2022
26 changes: 26 additions & 0 deletions contrib/cloud.google.com/go/pubsub.v1/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package pubsub

type config struct {
serviceName string
measured bool
}

// A Option is used to customize spans started by WrapReceiveHandler or Publish.
type Option func(cfg *config)

// A ReceiveOption has been deprecated in favor of Option.
type ReceiveOption = Option

// WithServiceName sets the service name tag for traces started by WrapReceiveHandler or Publish.
func WithServiceName(serviceName string) Option {
return func(cfg *config) {
cfg.serviceName = serviceName
}
}

// WithMeasured sets the measured tag for traces started by WrapReceiveHandler or Publish.
func WithMeasured() Option {
return func(cfg *config) {
cfg.measured = true
}
}
41 changes: 22 additions & 19 deletions contrib/cloud.google.com/go/pubsub.v1/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,28 @@ import (
// the published message.
// It is required to call (*PublishResult).Get(ctx) on the value returned by Publish to complete
// the span.
func Publish(ctx context.Context, t *pubsub.Topic, msg *pubsub.Message) *PublishResult {
span, ctx := tracer.StartSpanFromContext(
ctx,
"pubsub.publish",
func Publish(ctx context.Context, t *pubsub.Topic, msg *pubsub.Message, opts ...Option) *PublishResult {
var cfg config
for _, opt := range opts {
opt(&cfg)
}

hakankutluay marked this conversation as resolved.
Show resolved Hide resolved
spanOpts := []ddtrace.StartSpanOption{
tracer.ResourceName(t.String()),
tracer.SpanType(ext.SpanTypeMessageProducer),
tracer.Tag("message_size", len(msg.Data)),
tracer.Tag("ordering_key", msg.OrderingKey),
}
if cfg.serviceName != "" {
spanOpts = append(spanOpts, tracer.ServiceName(cfg.serviceName))
}
if cfg.measured {
spanOpts = append(spanOpts, tracer.Measured())
}
span, ctx := tracer.StartSpanFromContext(
ctx,
"pubsub.publish",
spanOpts...,
)
if msg.Attributes == nil {
msg.Attributes = make(map[string]string)
Expand Down Expand Up @@ -64,24 +78,10 @@ func (r *PublishResult) Get(ctx context.Context) (string, error) {
return serverID, err
}

type config struct {
serviceName string
}

// A ReceiveOption is used to customize spans started by WrapReceiveHandler.
type ReceiveOption func(cfg *config)

// WithServiceName sets the service name tag for traces started by WrapReceiveHandler.
func WithServiceName(serviceName string) ReceiveOption {
return func(cfg *config) {
cfg.serviceName = serviceName
}
}

// WrapReceiveHandler returns a receive handler that wraps the supplied handler,
// extracts any tracing metadata attached to the received message, and starts a
// receive span.
func WrapReceiveHandler(s *pubsub.Subscription, f func(context.Context, *pubsub.Message), opts ...ReceiveOption) func(context.Context, *pubsub.Message) {
func WrapReceiveHandler(s *pubsub.Subscription, f func(context.Context, *pubsub.Message), opts ...Option) func(context.Context, *pubsub.Message) {
var cfg config
for _, opt := range opts {
opt(&cfg)
Expand All @@ -102,6 +102,9 @@ func WrapReceiveHandler(s *pubsub.Subscription, f func(context.Context, *pubsub.
if cfg.serviceName != "" {
opts = append(opts, tracer.ServiceName(cfg.serviceName))
}
if cfg.measured {
opts = append(opts, tracer.Measured())
}
span, ctx := tracer.StartSpanFromContext(ctx, "pubsub.receive", opts...)
if msg.DeliveryAttempt != nil {
span.SetTag("delivery_attempt", *msg.DeliveryAttempt)
Expand Down