Skip to content

Commit

Permalink
Merge branch 'v1' into pr/686
Browse files Browse the repository at this point in the history
  • Loading branch information
knusbaum committed Oct 20, 2020
2 parents 5cd279d + 977bbb8 commit 8c22a72
Show file tree
Hide file tree
Showing 57 changed files with 3,033 additions and 212 deletions.
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ jobs:
command: |
go get k8s.io/client-go@v0.17.0
go get k8s.io/apimachinery@v0.17.0
go get cloud.google.com/go/pubsub@v1.6.1
- run:
name: Wait for MySQL
Expand Down
1 change: 1 addition & 0 deletions contrib/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ First, find the library which you'd like to integrate with. The naming conventio
* If the package is hosted on Github (eg. `github.com/user/repo`) and has version `v2.1.0`, it will be located at the shorthand path `user/repo.v2`.
* If the package is from anywhere else (eg. `google.golang.org/grpc`) and has no stable version, it can be found under the full import path, followed by the version suffix (in this example `.v0`).
* All new integrations should be suffixed with `.vN` where `N` is the major version that is being covered.
* The package itself should retain its un-versioned name. For example, the integration under `user/repo.v2` stays as `package repo`, and does not become `package repo.v2`

Each integration comes with thorough documentation and usage examples. A good overview can be seen on our
[godoc](https://godoc.org/gopkg.in/DataDog/dd-trace-go.v1/contrib) page.
12 changes: 9 additions & 3 deletions contrib/Shopify/sarama/sarama_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ func TestConsumer(t *testing.T) {
SetMessage("test-topic", 0, 0, sarama.StringEncoder("hello")).
SetMessage("test-topic", 0, 1, sarama.StringEncoder("world")),
})

client, err := sarama.NewClient([]string{broker.Addr()}, sarama.NewConfig())
cfg := sarama.NewConfig()
cfg.Version = sarama.MinVersion
client, err := sarama.NewClient([]string{broker.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -113,6 +114,7 @@ func TestSyncProducer(t *testing.T) {
leader.Returns(prodSuccess)

cfg := sarama.NewConfig()
cfg.Version = sarama.MinVersion
cfg.Producer.Return.Successes = true

producer, err := sarama.NewSyncProducer([]string{seedBroker.Addr()}, cfg)
Expand Down Expand Up @@ -160,6 +162,7 @@ func TestSyncProducerSendMessages(t *testing.T) {
leader.Returns(prodSuccess)

cfg := sarama.NewConfig()
cfg.Version = sarama.MinVersion
cfg.Producer.Return.Successes = true
cfg.Producer.Flush.Messages = 2

Expand Down Expand Up @@ -200,7 +203,9 @@ func TestAsyncProducer(t *testing.T) {

broker := newMockBroker(t)

producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, nil)
cfg := sarama.NewConfig()
cfg.Version = sarama.MinVersion
producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -234,6 +239,7 @@ func TestAsyncProducer(t *testing.T) {
broker := newMockBroker(t)

cfg := sarama.NewConfig()
cfg.Version = sarama.MinVersion
cfg.Producer.Return.Successes = true

producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, cfg)
Expand Down
43 changes: 43 additions & 0 deletions contrib/cloud.google.com/go/pubsub.v1/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-2020 Datadog, Inc.

package pubsub_test

import (
"context"
"log"

pubsubtrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/cloud.google.com/go/pubsub.v1"

"cloud.google.com/go/pubsub"
)

func ExamplePublish() {
client, err := pubsub.NewClient(context.Background(), "project-id")
if err != nil {
log.Fatal(err)
}

topic := client.Topic("topic")
_, err = pubsubtrace.Publish(context.Background(), topic, &pubsub.Message{Data: []byte("hello world!")}).Get(context.Background())
if err != nil {
log.Fatal(err)
}
}

func ExampleReceive() {
client, err := pubsub.NewClient(context.Background(), "project-id")
if err != nil {
log.Fatal(err)
}

sub := client.Subscription("subscription")
err = sub.Receive(context.Background(), pubsubtrace.WrapReceiveHandler(sub, func(ctx context.Context, msg *pubsub.Message) {
// TODO: Handle message.
}))
if err != nil {
log.Fatal(err)
}
}
90 changes: 90 additions & 0 deletions contrib/cloud.google.com/go/pubsub.v1/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-2020 Datadog, Inc.

// Package pubsub provides functions to trace the cloud.google.com/pubsub/go package.
package pubsub

import (
"context"
"sync"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"

"cloud.google.com/go/pubsub"
)

// Publish publishes a message on the specified topic and returns a PublishResult.
// This function is functionally equivalent to t.Publish(ctx, msg), but it also starts a publish
// span and it ensures that the tracing metadata is propagated as attributes attached to
// the published message.
// It is required to call (*PublishResult).Get(ctx) on the value returned by Publish to complete
// the span.
func Publish(ctx context.Context, t *pubsub.Topic, msg *pubsub.Message) *PublishResult {
span, ctx := tracer.StartSpanFromContext(
ctx,
"pubsub.publish",
tracer.ResourceName(t.String()),
tracer.SpanType(ext.SpanTypeMessageProducer),
tracer.Tag("message_size", len(msg.Data)),
tracer.Tag("ordering_key", msg.OrderingKey),
)
if msg.Attributes == nil {
msg.Attributes = make(map[string]string)
}
if err := tracer.Inject(span.Context(), tracer.TextMapCarrier(msg.Attributes)); err != nil {
log.Debug("contrib/cloud.google.com/go/pubsub.v1/: failed injecting tracing attributes: %v", err)
}
span.SetTag("num_attributes", len(msg.Attributes))
return &PublishResult{
PublishResult: t.Publish(ctx, msg),
span: span,
}
}

// PublishResult wraps *pubsub.PublishResult
type PublishResult struct {
*pubsub.PublishResult
once sync.Once
span tracer.Span
}

// Get wraps (pubsub.PublishResult).Get(ctx). When this function returns the publish
// span created in Publish is completed.
func (r *PublishResult) Get(ctx context.Context) (string, error) {
serverID, err := r.PublishResult.Get(ctx)
r.once.Do(func() {
r.span.SetTag("server_id", serverID)
r.span.Finish(tracer.WithError(err))
})
return serverID, err
}

// WrapReceiveHandler returns a receive handler that wraps the supplied handler,
// extracts any tracing metadata attached to the received message, and starts a
// receive span.
func WrapReceiveHandler(s *pubsub.Subscription, f func(context.Context, *pubsub.Message)) func(context.Context, *pubsub.Message) {
return func(ctx context.Context, msg *pubsub.Message) {
parentSpanCtx, _ := tracer.Extract(tracer.TextMapCarrier(msg.Attributes))
span, ctx := tracer.StartSpanFromContext(
ctx,
"pubsub.receive",
tracer.ResourceName(s.String()),
tracer.SpanType(ext.SpanTypeMessageConsumer),
tracer.Tag("message_size", len(msg.Data)),
tracer.Tag("num_attributes", len(msg.Attributes)),
tracer.Tag("ordering_key", msg.OrderingKey),
tracer.Tag("message_id", msg.ID),
tracer.Tag("publish_time", msg.PublishTime.String()),
tracer.ChildOf(parentSpanCtx),
)
if msg.DeliveryAttempt != nil {
span.SetTag("delivery_attempt", *msg.DeliveryAttempt)
}
defer span.Finish()
f(ctx, msg)
}
}
Loading

0 comments on commit 8c22a72

Please sign in to comment.