Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/v1' into felix.geisendoerfer/pro…
Browse files Browse the repository at this point in the history
…filer-upload-timeout
  • Loading branch information
felixge committed Mar 25, 2021
2 parents 4451a5c + 7a9f4e2 commit 8d4e562
Show file tree
Hide file tree
Showing 6 changed files with 601 additions and 67 deletions.
17 changes: 16 additions & 1 deletion contrib/confluentinc/confluent-kafka-go/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package kafka // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/co

import (
"math"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
Expand Down Expand Up @@ -131,7 +132,7 @@ func (c *Consumer) Events() chan kafka.Event {
return c.events
}

// Poll polls the consumer for messages or events. Message events will be
// Poll polls the consumer for messages or events. Message will be
// traced.
func (c *Consumer) Poll(timeoutMS int) (event kafka.Event) {
if c.prev != nil {
Expand All @@ -145,6 +146,20 @@ func (c *Consumer) Poll(timeoutMS int) (event kafka.Event) {
return evt
}

// ReadMessage polls the consumer for a message. Message will be traced.
func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
if c.prev != nil {
c.prev.Finish()
c.prev = nil
}
msg, err := c.Consumer.ReadMessage(timeout)
if err != nil {
return nil, err
}
c.prev = c.startSpan(msg)
return msg, nil
}

// A Producer wraps a kafka.Producer.
type Producer struct {
*kafka.Producer
Expand Down
159 changes: 93 additions & 66 deletions contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
package kafka

import (
"errors"
"os"
"testing"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"
Expand Down Expand Up @@ -108,75 +110,100 @@ to run the integration test locally:
confluentinc/cp-kafka:5.0.0
*/

func TestConsumerPoll(t *testing.T) {
func TestConsumerFunctional(t *testing.T) {
if _, ok := os.LookupEnv("INTEGRATION"); !ok {
t.Skip("to enable integration test, set the INTEGRATION environment variable")
}

mt := mocktracer.Start()
defer mt.Stop()

// first write a message to the topic

p, err := NewProducer(&kafka.ConfigMap{
"group.id": testGroupID,
"bootstrap.servers": "127.0.0.1:9092",
"go.delivery.reports": true,
}, WithAnalyticsRate(0.1))
assert.NoError(t, err)
delivery := make(chan kafka.Event, 1)
err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &testTopic,
Partition: 0,
for _, tt := range []struct {
name string
action func(c *Consumer) (*kafka.Message, error)
}{
{
name: "Poll",
action: func(c *Consumer) (*kafka.Message, error) {
switch e := c.Poll(3000).(type) {
case *kafka.Message:
return e, nil
default:
return nil, errors.New("some error")
}
},
},
Key: []byte("key2"),
Value: []byte("value2"),
}, delivery)
assert.NoError(t, err)
msg1, _ := (<-delivery).(*kafka.Message)
p.Close()

// next attempt to consume the message

c, err := NewConsumer(&kafka.ConfigMap{
"group.id": testGroupID,
"bootstrap.servers": "127.0.0.1:9092",
"socket.timeout.ms": 1000,
"session.timeout.ms": 1000,
"enable.auto.offset.store": false,
})
assert.NoError(t, err)

err = c.Assign([]kafka.TopicPartition{
{Topic: &testTopic, Partition: 0, Offset: msg1.TopicPartition.Offset},
})
assert.NoError(t, err)

msg2, _ := c.Poll(3000).(*kafka.Message)
assert.Equal(t, msg1.String(), msg2.String())

c.Close()

// now verify the spans
spans := mt.FinishedSpans()
assert.Len(t, spans, 2)
// they should be linked via headers
assert.Equal(t, spans[0].TraceID(), spans[1].TraceID())

s0 := spans[0] // produce
assert.Equal(t, "kafka.produce", s0.OperationName())
assert.Equal(t, "kafka", s0.Tag(ext.ServiceName))
assert.Equal(t, "Produce Topic gotest", s0.Tag(ext.ResourceName))
assert.Equal(t, 0.1, s0.Tag(ext.EventSampleRate))
assert.Equal(t, "queue", s0.Tag(ext.SpanType))
assert.Equal(t, int32(0), s0.Tag("partition"))

s1 := spans[1] // consume
assert.Equal(t, "kafka.consume", s1.OperationName())
assert.Equal(t, "kafka", s1.Tag(ext.ServiceName))
assert.Equal(t, "Consume Topic gotest", s1.Tag(ext.ResourceName))
assert.Equal(t, nil, s1.Tag(ext.EventSampleRate))
assert.Equal(t, "queue", s1.Tag(ext.SpanType))
assert.Equal(t, int32(0), s1.Tag("partition"))
{
name: "ReadMessage",
action: func(c *Consumer) (*kafka.Message, error) {
return c.ReadMessage(3000 * time.Millisecond)
},
},
} {
t.Run(tt.name, func(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()

// first write a message to the topic

p, err := NewProducer(&kafka.ConfigMap{
"group.id": testGroupID,
"bootstrap.servers": "127.0.0.1:9092",
"go.delivery.reports": true,
}, WithAnalyticsRate(0.1))
assert.NoError(t, err)
delivery := make(chan kafka.Event, 1)
err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &testTopic,
Partition: 0,
},
Key: []byte("key2"),
Value: []byte("value2"),
}, delivery)
assert.NoError(t, err)
msg1, _ := (<-delivery).(*kafka.Message)
p.Close()

// next attempt to consume the message

c, err := NewConsumer(&kafka.ConfigMap{
"group.id": testGroupID,
"bootstrap.servers": "127.0.0.1:9092",
"socket.timeout.ms": 1000,
"session.timeout.ms": 1000,
"enable.auto.offset.store": false,
})
assert.NoError(t, err)

err = c.Assign([]kafka.TopicPartition{
{Topic: &testTopic, Partition: 0, Offset: msg1.TopicPartition.Offset},
})
assert.NoError(t, err)

msg2, err := tt.action(c)
assert.NoError(t, err)
assert.Equal(t, msg1.String(), msg2.String())
c.Close()

// now verify the spans
spans := mt.FinishedSpans()
assert.Len(t, spans, 2)
// they should be linked via headers
assert.Equal(t, spans[0].TraceID(), spans[1].TraceID())

s0 := spans[0] // produce
assert.Equal(t, "kafka.produce", s0.OperationName())
assert.Equal(t, "kafka", s0.Tag(ext.ServiceName))
assert.Equal(t, "Produce Topic gotest", s0.Tag(ext.ResourceName))
assert.Equal(t, 0.1, s0.Tag(ext.EventSampleRate))
assert.Equal(t, "queue", s0.Tag(ext.SpanType))
assert.Equal(t, int32(0), s0.Tag("partition"))

s1 := spans[1] // consume
assert.Equal(t, "kafka.consume", s1.OperationName())
assert.Equal(t, "kafka", s1.Tag(ext.ServiceName))
assert.Equal(t, "Consume Topic gotest", s1.Tag(ext.ResourceName))
assert.Equal(t, nil, s1.Tag(ext.EventSampleRate))
assert.Equal(t, "queue", s1.Tag(ext.SpanType))
assert.Equal(t, int32(0), s1.Tag("partition"))
})
}
}
78 changes: 78 additions & 0 deletions contrib/go-chi/chi.v5/chi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

// Package chi provides tracing functions for tracing the go-chi/chi/v5 package (https://github.com/go-chi/chi).
package chi // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/go-chi/chi.v5"

import (
"fmt"
"math"
"net/http"
"strconv"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"

"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
)

// Middleware returns middleware that will trace incoming requests.
func Middleware(opts ...Option) func(next http.Handler) http.Handler {
cfg := new(config)
defaults(cfg)
for _, fn := range opts {
fn(cfg)
}
log.Debug("contrib/go-chi/chi.v5: Configuring Middleware: %#v", cfg)
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
opts := []ddtrace.StartSpanOption{
tracer.SpanType(ext.SpanTypeWeb),
tracer.ServiceName(cfg.serviceName),
tracer.Tag(ext.HTTPMethod, r.Method),
tracer.Tag(ext.HTTPURL, r.URL.Path),
tracer.Measured(),
}
if !math.IsNaN(cfg.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, cfg.analyticsRate))
}
if spanctx, err := tracer.Extract(tracer.HTTPHeadersCarrier(r.Header)); err == nil {
opts = append(opts, tracer.ChildOf(spanctx))
}
opts = append(opts, cfg.spanOpts...)
span, ctx := tracer.StartSpanFromContext(r.Context(), "http.request", opts...)
defer span.Finish()

ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor)

// pass the span through the request context and serve the request to the next middleware
next.ServeHTTP(ww, r.WithContext(ctx))

// set the resource name as we get it only once the handler is executed
resourceName := chi.RouteContext(r.Context()).RoutePattern()
if resourceName == "" {
resourceName = "unknown"
}
resourceName = r.Method + " " + resourceName
span.SetTag(ext.ResourceName, resourceName)

// set the status code
status := ww.Status()
// 0 status means one has not yet been sent in which case net/http library will write StatusOK
if ww.Status() == 0 {
status = http.StatusOK
}
span.SetTag(ext.HTTPCode, strconv.Itoa(status))

if cfg.isStatusError(status) {
// mark 5xx server error
span.SetTag(ext.Error, fmt.Errorf("%d: %s", status, http.StatusText(status)))
}
})
}
}
Loading

0 comments on commit 8d4e562

Please sign in to comment.