Skip to content

Commit

Permalink
Add consumer and producer unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Sep 16, 2021
1 parent be1076a commit 5bf89c9
Show file tree
Hide file tree
Showing 2 changed files with 290 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (c *Consumer) traceEventsChannel(in chan kafka.Event) chan kafka.Event {
return nil
}

out := make(chan kafka.Event, len(in))
out := make(chan kafka.Event, cap(in))
go func() {
defer close(out)
for evt := range in {
Expand Down Expand Up @@ -197,7 +197,7 @@ func (p *Producer) traceProduceChannel(out chan *kafka.Message) chan *kafka.Mess
return nil
}

in := make(chan *kafka.Message, len(out))
in := make(chan *kafka.Message, cap(out))
go func() {
for msg := range in {
span := p.startSpan(msg)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
// Copyright Splunk Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package splunkkafka

import (
"context"
"fmt"
"testing"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
)

const grpID = "test group ID"

func TestNewConsumerCapturesGroupID(t *testing.T) {
c, err := NewConsumer(&kafka.ConfigMap{"group.id": grpID})
require.NoError(t, err)
assert.Equal(t, grpID, c.group)
}

func TestNewConsumerType(t *testing.T) {
c, err := NewConsumer(&kafka.ConfigMap{"group.id": grpID})
require.NoError(t, err)
assert.IsType(t, Consumer{}, *c)
}

func TestNewConsumerReturnsError(t *testing.T) {
// It is an error to not specify the group.id.
_, err := NewConsumer(&kafka.ConfigMap{})
require.Error(t, err)
}

func TestWrapConsumerType(t *testing.T) {
c, err := kafka.NewConsumer(&kafka.ConfigMap{"group.id": grpID})
require.NoError(t, err)
assert.IsType(t, Consumer{}, *WrapConsumer(c))
}

func TestConsumerEventsChanCreated(t *testing.T) {
chSize := 500
c, err := NewConsumer(&kafka.ConfigMap{
// required for the events channel to be turned on
"go.events.channel.enable": true,
"go.events.channel.size": chSize,
"group.id": grpID,
})
require.NoError(t, err)
assert.NotNil(t, c.Events())
assert.Equal(t, chSize, cap(c.Events()))
}

type spanRecord struct {
SpanContext trace.SpanContext
SpanConfig trace.SpanConfig
}

type spanRecorder map[string][]spanRecord

func (s spanRecorder) start(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
existing := s[name]
existing = append(existing, spanRecord{
SpanContext: trace.SpanContextFromContext(ctx),
SpanConfig: trace.NewSpanStartConfig(opts...),
})
s[name] = existing
return trace.NewNoopTracerProvider().Tracer("").Start(ctx, name, opts...)
}

func (s spanRecorder) get(name string) []spanRecord {
return s[name]
}

func TestConsumerSpan(t *testing.T) {
sr := make(spanRecorder)
tp := &fnTracerProvider{
tracer: func(string, ...trace.TracerOption) trace.Tracer {
return &fnTracer{start: sr.start}
},
}
prop := propagation.TraceContext{}
c, err := NewConsumer(&kafka.ConfigMap{
// required for the events channel to be turned on
"go.events.channel.enable": true,
"group.id": grpID,
}, WithTracerProvider(tp), WithPropagator(prop))
require.NoError(t, err)

sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: trace.TraceID{0x01},
SpanID: trace.SpanID{0x01},
Remote: true,
})
ctx := trace.ContextWithSpanContext(context.Background(), sc)

keys := []string{"key1", "key2"}
testTopic := "gotest"
for _, k := range keys {
msg := &kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &testTopic,
Partition: 1,
Offset: 1,
},
Key: []byte(k),
Value: []byte("value"),
}
// Test that context is propagated.
prop.Inject(ctx, NewMessageCarrier(msg))
c.Consumer.Events() <- msg
got := (<-c.Events()).(*kafka.Message)
assert.Equal(t, []byte(k), got.Key)
}

require.Len(t, sr, 1)
expectedName := fmt.Sprintf("%s receive", testTopic)
require.Contains(t, sr, expectedName)
spans := sr.get(expectedName)
assert.Len(t, spans, len(keys))
for i, record := range spans {
assert.Equal(t, sc, record.SpanContext)
assert.Equal(t, record.SpanConfig.SpanKind(), trace.SpanKindConsumer)
attrs := record.SpanConfig.Attributes()
assert.Contains(t, attrs, semconv.MessagingSystemKey.String("kafka"))
assert.Contains(t, attrs, semconv.MessagingDestinationKindTopic)
assert.Contains(t, attrs, semconv.MessagingDestinationKey.String(testTopic))
assert.Contains(t, attrs, semconv.MessagingOperationReceive)
assert.Contains(t, attrs, semconv.MessagingMessageIDKey.String("1"))
assert.Contains(t, attrs, semconv.MessagingKafkaMessageKeyKey.String(keys[i]))
assert.Contains(t, attrs, semconv.MessagingKafkaConsumerGroupKey.String(grpID))
assert.Contains(t, attrs, semconv.MessagingKafkaPartitionKey.Int64(1))
}
}

func TestNewProducerType(t *testing.T) {
p, err := NewProducer(&kafka.ConfigMap{})
require.NoError(t, err)
assert.IsType(t, Producer{}, *p)
}

func TestNewProducerReturnsError(t *testing.T) {
// go.batch.producer not being a bool type will cause an error.
_, err := NewProducer(&kafka.ConfigMap{"go.batch.producer": 1})
require.Error(t, err)
}

func TestWrapProducerType(t *testing.T) {
p, err := kafka.NewProducer(&kafka.ConfigMap{})
require.NoError(t, err)
assert.IsType(t, Producer{}, *WrapProducer(p))
}

func TestProducerEventsChanCreated(t *testing.T) {
chSize := 500
p, err := NewProducer(&kafka.ConfigMap{
"go.produce.channel.size": chSize,
})
require.NoError(t, err)
assert.NotNil(t, p.ProduceChannel())
assert.Equal(t, chSize, cap(p.ProduceChannel()))
}

func TestProducerChannelSpan(t *testing.T) {
sr := make(spanRecorder)
tp := &fnTracerProvider{
tracer: func(string, ...trace.TracerOption) trace.Tracer {
return &fnTracer{start: sr.start}
},
}
prop := propagation.TraceContext{}
p, err := NewProducer(&kafka.ConfigMap{}, WithTracerProvider(tp), WithPropagator(prop))
require.NoError(t, err)

keys := []string{"key1", "key2"}
produceChannel := make(chan *kafka.Message, len(keys))
p.produceChannel = p.traceProduceChannel(produceChannel)

sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: trace.TraceID{0x01},
SpanID: trace.SpanID{0x01},
Remote: true,
})
ctx := trace.ContextWithSpanContext(context.Background(), sc)

testTopic := "gotest"
for _, k := range keys {
msg := &kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &testTopic,
Partition: 1,
Offset: 1,
},
Key: []byte(k),
Value: []byte("value"),
}
// Test that context is propagated.
prop.Inject(ctx, NewMessageCarrier(msg))
p.ProduceChannel() <- msg
got := <-produceChannel
assert.Equal(t, []byte(k), got.Key)
}

require.Len(t, sr, 1)
expectedName := fmt.Sprintf("%s send", testTopic)
require.Contains(t, sr, expectedName)
spans := sr.get(expectedName)
assert.Len(t, spans, len(keys))
for i, record := range spans {
assert.Equal(t, sc, record.SpanContext)
assert.Equal(t, record.SpanConfig.SpanKind(), trace.SpanKindProducer)
attrs := record.SpanConfig.Attributes()
assert.Contains(t, attrs, semconv.MessagingSystemKey.String("kafka"))
assert.Contains(t, attrs, semconv.MessagingDestinationKindTopic)
assert.Contains(t, attrs, semconv.MessagingDestinationKey.String(testTopic))
assert.Contains(t, attrs, semconv.MessagingMessageIDKey.String("1"))
assert.Contains(t, attrs, semconv.MessagingKafkaMessageKeyKey.String(keys[i]))
assert.Contains(t, attrs, semconv.MessagingKafkaPartitionKey.Int64(1))
}
}

func TestProduceSpan(t *testing.T) {
sr := make(spanRecorder)
tp := &fnTracerProvider{
tracer: func(string, ...trace.TracerOption) trace.Tracer {
return &fnTracer{start: sr.start}
},
}
prop := propagation.TraceContext{}
p, err := NewProducer(&kafka.ConfigMap{}, WithTracerProvider(tp), WithPropagator(prop))
require.NoError(t, err)

sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: trace.TraceID{0x01},
SpanID: trace.SpanID{0x01},
Remote: true,
})
ctx := trace.ContextWithSpanContext(context.Background(), sc)

testTopic := "gotest"
keys := []string{"key1", "key2"}
for _, k := range keys {
msg := &kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &testTopic,
Partition: 1,
Offset: 1,
},
Key: []byte(k),
Value: []byte("value"),
}
// Test that context is propagated.
prop.Inject(ctx, NewMessageCarrier(msg))
p.Produce(msg, nil)
}

require.Len(t, sr, 1)
expectedName := fmt.Sprintf("%s send", testTopic)
require.Contains(t, sr, expectedName)
spans := sr.get(expectedName)
assert.Len(t, spans, len(keys))
for i, record := range spans {
assert.Equal(t, sc, record.SpanContext)
assert.Equal(t, record.SpanConfig.SpanKind(), trace.SpanKindProducer)
attrs := record.SpanConfig.Attributes()
assert.Contains(t, attrs, semconv.MessagingSystemKey.String("kafka"))
assert.Contains(t, attrs, semconv.MessagingDestinationKindTopic)
assert.Contains(t, attrs, semconv.MessagingDestinationKey.String(testTopic))
assert.Contains(t, attrs, semconv.MessagingMessageIDKey.String("1"))
assert.Contains(t, attrs, semconv.MessagingKafkaMessageKeyKey.String(keys[i]))
assert.Contains(t, attrs, semconv.MessagingKafkaPartitionKey.Int64(1))
}
}

0 comments on commit 5bf89c9

Please sign in to comment.