Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add instrumentation for confluent-kafka-go #100

Merged
merged 45 commits into from
Oct 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
09793ec
Add instrumentation for confluent-kafka-go
MrAlias Sep 15, 2021
ceebafd
Add integration test
MrAlias Sep 15, 2021
2895192
Add test runner to CI for integration Kafka tests
MrAlias Sep 15, 2021
d186cb6
Merge branch 'main' into splunkkafka
MrAlias Sep 15, 2021
27507ff
Merge branch 'splunkkafka' of github.com:MrAlias/splunk-otel-go into …
MrAlias Sep 15, 2021
b3dc7bf
Fix lint errors
MrAlias Sep 15, 2021
dab07d5
Remove CI caching
MrAlias Sep 15, 2021
6f1e654
Increase test timeout to 1 minute
MrAlias Sep 15, 2021
75f8786
Use Go 1.15
MrAlias Sep 15, 2021
253da9d
Skip integration tests in short mode
MrAlias Sep 15, 2021
ccc0db6
Set instrumentation name to pkg name
MrAlias Sep 15, 2021
50f8c09
Add unit tests for carrier
MrAlias Sep 15, 2021
8d2f066
Add config unit tests
MrAlias Sep 15, 2021
01c54e8
Install needed kafka pkg on linux 386 and macos
MrAlias Sep 16, 2021
a7b3b98
Revert "Install needed kafka pkg on linux 386 and macos"
MrAlias Sep 16, 2021
be1076a
Make common strings pkg level const
MrAlias Sep 16, 2021
7f6cb64
Add consumer and producer unit tests
MrAlias Sep 16, 2021
bdddadb
Add build constraints to not build on windows
MrAlias Sep 16, 2021
8fcebca
Install needed kafka pkg on linux 386 and macos
MrAlias Sep 16, 2021
da73c21
Revert "Add build constraints to not build on windows"
MrAlias Sep 16, 2021
a4046fb
Skip kafka tests where not supported
MrAlias Sep 16, 2021
5fe3730
Do not build splunkkafka on Windows
pellared Sep 16, 2021
3fddf07
Fix build constrain for Go 1.16 and prior
MrAlias Sep 16, 2021
6e606cd
Add README
MrAlias Sep 16, 2021
ec74695
Restructure consumer and producer into own files
MrAlias Sep 16, 2021
305bba7
Fix whitespace in README
MrAlias Sep 16, 2021
4b80478
Merge branch 'main' into splunkkafka
MrAlias Sep 28, 2021
88b8abc
Add changes to changelog
MrAlias Sep 28, 2021
a109f26
Upgrade to 1.0.0 of go.opentelemetry.io/otel
MrAlias Sep 28, 2021
544d55e
make gendependabot
MrAlias Sep 28, 2021
2700b7c
Refactor integration tests to use dockertest
MrAlias Sep 28, 2021
112444f
Update build constraints
MrAlias Sep 28, 2021
e7b4bd9
Upgrade otel sdk to 1.0.0
MrAlias Sep 28, 2021
cf6e6d8
Add channel based producer integration test
MrAlias Sep 28, 2021
4842250
Update docs
MrAlias Sep 29, 2021
5ff4fd2
Add WithAttributes option
MrAlias Sep 29, 2021
e9ae4de
Synchronize Close with span ending for the Producer
MrAlias Sep 29, 2021
ac3cb8f
Fix race conditions with Consumer and implement ReadMessage
MrAlias Sep 29, 2021
a1bb77d
Move TestMain
MrAlias Sep 29, 2021
e9d8b35
Comment and restructure integration tests
MrAlias Oct 1, 2021
7957439
Replace atomic.Value with unsafe.Pointer algo
MrAlias Oct 1, 2021
5141ca7
Add documentation that delivery reports are required
MrAlias Oct 1, 2021
b16090f
Flush producer spans in the test
MrAlias Oct 1, 2021
ba6c12c
Comment decommission of the producer
MrAlias Oct 1, 2021
13eea07
Synchronize the Produce async callback goroutine with Close
MrAlias Oct 1, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ updates:
directory: "/instrumentation/database/sql/splunksql/test"
schedule:
interval: "daily"
- package-ecosystem: "gomod"
directory: "/instrumentation/github.com/confluentinc/confluent-kafka-go/kafka/splunkkafka"
schedule:
interval: "daily"
- package-ecosystem: "gomod"
directory: "/instrumentation/github.com/confluentinc/confluent-kafka-go/kafka/splunkkafka/test"
schedule:
interval: "daily"
- package-ecosystem: "gomod"
directory: "/instrumentation/github.com/go-sql-driver/mysql/splunkmysql"
schedule:
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add the
`github.com/signalfx/splunk-otel-go/instrumentation/github.com/jinzhu/gorm/splunkgorm`
instrumentation for the `github.com/jinzhu/gorm` package. (#98)
- Add the
`github.com/signalfx/splunk-otel-go/instrumentation/github.com/confluentinc/confluent-kafka-go/kafka/splunkkafka`
instrumentation for the `github.com/confluentinc/confluent-kafka-go/kafka`
package. (#100)

### Changed

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
SHELL := /bin/bash

GO = go
TIMEOUT = 15
TIMEOUT = 60
PKGS = ./...
BUILD_DIR = ./build
TEST_RESULTS = $(CURDIR)/test-results
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Additional recommended Splunk specific instrumentations:

- [`splunkgorm`](./instrumentation/github.com/jinzhu/gorm/splunkgorm)
- [`splunkhttp`](./instrumentation/net/http/splunkhttp)
- [`splunkkafka`](./instrumentation/github.com/confluentinc/confluent-kafka-go/kafka/splunkkafka)
- [`splunkmysql`](./instrumentation/github.com/go-sql-driver/mysql/splunkmysql)
- [`splunkpgx`](./instrumentation/github.com/jackc/pgx/splunkpgx)
- [`splunkpq`](./instrumentation/github.com/lib/pq/splunkpq)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Splunk instrumentation for `github.com/confluentinc/confluent-kafka-go/kafka/splunkkafka`

This instrumentation is for the
[github.com/confluentinc/confluent-kafka-go/kafka](https://github.com/confluentinc/confluent-kafka-go)
package.

## Compatibility

The Producer will end spans when a delivery report is returned. Setting
`"go.delivery.reports"` to `false` will disable the delivery reports and can
result in an build up of un-ended spans. If delivery reports are disabled, an
un-instrumented Producer should be used instead.

This instrumentation was built to support
[v1.7.0](https://github.com/confluentinc/confluent-kafka-go/releases/tag/v1.7.0)
of github.com/confluentinc/confluent-kafka-go/kafka/splunkkafka. Similar to the
instrumented package, librdkafka 1.6.0+ is required. This means you will need
to use an environment that supports the [pre-built
binaries](https://github.com/confluentinc/confluent-kafka-go#librdkafka), or
[install](https://github.com/confluentinc/confluent-kafka-go#installing-librdkafka)
the library manually. Important to note, similar to the instrumented package,
**cgo is required** and **this instrumentation does not support the Windows
operating system**.

## Getting started

The `NewConsumer` and `NewProducer` functions are provided as drop-in
replacements of the equivalent from the `kafka` package. See [these
examples](./example_test.go) for how to use these functions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright Splunk Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build cgo && (linux || darwin)
// +build cgo
// +build linux darwin

package splunkkafka

import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"go.opentelemetry.io/otel/propagation"
)

// textMapCarrier wraps a kafka.Message so it can be used used by a
// TextMapPropagator to propagate tracing context.
type textMapCarrier struct {
msg *kafka.Message
}

var _ propagation.TextMapCarrier = (*textMapCarrier)(nil)

// NewMessageCarrier returns a TextMapCarrier that will encode and decode
// tracing information to and from the passed message.
func NewMessageCarrier(message *kafka.Message) propagation.TextMapCarrier {
return &textMapCarrier{message}
}

// Get returns the value associated with the passed key.
func (c *textMapCarrier) Get(key string) string {
for _, h := range c.msg.Headers {
if h.Key == key {
return string(h.Value)
}
}
return ""
}

// Set stores the key-value pair.
func (c *textMapCarrier) Set(key, value string) {
// Ensure the uniqueness of the key.
for i := len(c.msg.Headers) - 1; i >= 0; i-- {
if c.msg.Headers[i].Key == key {
c.msg.Headers = append(c.msg.Headers[:i], c.msg.Headers[i+1:]...)
}
}
c.msg.Headers = append(c.msg.Headers, kafka.Header{
Key: key,
Value: []byte(value),
})
}

// Keys lists the keys stored in this carrier.
func (c *textMapCarrier) Keys() []string {
out := make([]string, len(c.msg.Headers))
for i, h := range c.msg.Headers {
out[i] = h.Key
}
return out
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright Splunk Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build cgo && (linux || darwin)
// +build cgo
// +build linux darwin

package splunkkafka_test

import (
"testing"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/signalfx/splunk-otel-go/instrumentation/github.com/confluentinc/confluent-kafka-go/kafka/splunkkafka"
"github.com/stretchr/testify/assert"
)

const (
k, v = "key", "value"
)

func TestCarrierGet(t *testing.T) {
msg := &kafka.Message{
Headers: []kafka.Header{
{Key: k, Value: []byte(v)},
},
}
carrier := splunkkafka.NewMessageCarrier(msg)
assert.Equal(t, v, carrier.Get(k))
}

func TestCarrierGetEmpty(t *testing.T) {
msg := &kafka.Message{}
carrier := splunkkafka.NewMessageCarrier(msg)
assert.Equal(t, "", carrier.Get("key"))
}

func TestCarrierSet(t *testing.T) {
msg := &kafka.Message{}
carrier := splunkkafka.NewMessageCarrier(msg)
carrier.Set(k, v)
var got string
for _, h := range msg.Headers {
if h.Key == k {
got = string(h.Value)
}
}
assert.Equal(t, v, got)
}

func TestCarrierSetOverwrites(t *testing.T) {
msg := &kafka.Message{
Headers: []kafka.Header{
{Key: k, Value: []byte("not value")},
{Key: k, Value: []byte("also not value")},
},
}
carrier := splunkkafka.NewMessageCarrier(msg)
carrier.Set(k, v)
var got string
for _, h := range msg.Headers {
if h.Key == k {
got = string(h.Value)
}
}
assert.Equal(t, v, got)
}

func TestCarrierKeys(t *testing.T) {
keys := []string{"one", "two", "three"}
msg := &kafka.Message{
Headers: []kafka.Header{
{Key: keys[0], Value: []byte("")},
{Key: keys[1], Value: []byte("")},
{Key: keys[2], Value: []byte("")},
},
}
carrier := splunkkafka.NewMessageCarrier(msg)
assert.Equal(t, keys, carrier.Keys())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright Splunk Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build cgo && (linux || darwin)
// +build cgo
// +build linux darwin

package splunkkafka

import (
splunkotel "github.com/signalfx/splunk-otel-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
)

// instrumentationName is the instrumentation library identifier for a Tracer.
const instrumentationName = "github.com/signalfx/splunk-otel-go/instrumentation/github.com/confluentinc/confluent-kafka-go/kafka/splunkkafka"

// config contains tracing configuration options.
type config struct {
Tracer trace.Tracer
Propagator propagation.TextMapPropagator
Attributes []attribute.KeyValue
}

func newConfig(options ...Option) config {
c := config{
Attributes: []attribute.KeyValue{
semconv.MessagingSystemKey.String("kafka"),
},
}
for _, o := range options {
if o != nil {
o.apply(&c)
}
}
if c.Tracer == nil {
c.Tracer = otel.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(splunkotel.Version()),
)
}
if c.Propagator == nil {
c.Propagator = otel.GetTextMapPropagator()
}
return c
}

// Option applies options to a tracing configuration.
type Option interface {
apply(*config)
}

type optionFunc func(*config)

func (o optionFunc) apply(c *config) {
o(c)
}

// WithTracerProvider returns an Option that sets the TracerProvider used with
// this instrumentation library.
func WithTracerProvider(tp trace.TracerProvider) Option {
return optionFunc(func(c *config) {
c.Tracer = tp.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(splunkotel.Version()),
)
})
}

MrAlias marked this conversation as resolved.
Show resolved Hide resolved
// WithPropagator specifies the TextMapPropagator to use when extracting and
// injecting cross-cutting concerns. If none is specified, the global
// TextMapPropagator will be used.
func WithPropagator(propagator propagation.TextMapPropagator) Option {
return optionFunc(func(cfg *config) {
cfg.Propagator = propagator
})
}

// WithAttributes returns an Option that appends attr to the attributes set
// for every span created with this instrumentation library.
func WithAttributes(attr []attribute.KeyValue) Option {
return optionFunc(func(c *config) {
c.Attributes = append(c.Attributes, attr...)
})
}
Loading