Skip to content

Commit

Permalink
test: migrate kafka to testcontainers (#11206)
Browse files Browse the repository at this point in the history
  • Loading branch information
powersj authored and MyaLongmire committed Jul 6, 2022
1 parent 3b25746 commit f9cb7c0
Showing 1 changed file with 53 additions and 10 deletions.
63 changes: 53 additions & 10 deletions plugins/outputs/kafka/kafka_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package kafka

import (
"context"
"fmt"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
Expand All @@ -23,7 +27,54 @@ func TestConnectAndWriteIntegration(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}

brokers := []string{testutil.GetLocalHost() + ":9092"}
ctx := context.Background()
networkName := "kafka-test-network"
net, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{
NetworkRequest: testcontainers.NetworkRequest{
Name: networkName,
Attachable: true,
CheckDuplicate: true,
},
})
require.NoError(t, err)
defer func() {
require.NoError(t, net.Remove(ctx), "terminating network failed")
}()

zookeeper := testutil.Container{
Image: "wurstmeister/zookeeper",
ExposedPorts: []string{"2181:2181"},
Networks: []string{networkName},
WaitingFor: wait.ForLog("binding to port"),
Name: "telegraf-test-zookeeper",
}
err = zookeeper.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, zookeeper.Terminate(), "terminating container failed")
}()

container := testutil.Container{
Image: "wurstmeister/kafka",
ExposedPorts: []string{"9092:9092"},
Env: map[string]string{
"KAFKA_ADVERTISED_HOST_NAME": "localhost",
"KAFKA_ADVERTISED_PORT": "9092",
"KAFKA_ZOOKEEPER_CONNECT": fmt.Sprintf("telegraf-test-zookeeper:%s", zookeeper.Ports["2181"]),
},
Networks: []string{networkName},
WaitingFor: wait.ForLog("[KafkaServer id=1001] started"),
}
err = container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()

brokers := []string{
fmt.Sprintf("%s:%s", container.Address, container.Ports["9092"]),
}

s, _ := serializers.NewInfluxSerializer()
k := &Kafka{
Brokers: brokers,
Expand All @@ -33,7 +84,7 @@ func TestConnectAndWriteIntegration(t *testing.T) {
}

// Verify that we can connect to the Kafka broker
err := k.Init()
err = k.Init()
require.NoError(t, err)
err = k.Connect()
require.NoError(t, err)
Expand All @@ -45,10 +96,6 @@ func TestConnectAndWriteIntegration(t *testing.T) {
}

func TestTopicSuffixesIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

topic := "Test"

m := testutil.TestMetric(1)
Expand Down Expand Up @@ -92,10 +139,6 @@ func TestTopicSuffixesIntegration(t *testing.T) {
}

func TestValidateTopicSuffixMethodIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

err := ValidateTopicSuffixMethod("invalid_topic_suffix_method")
require.Error(t, err, "Topic suffix method used should be invalid.")

Expand Down

0 comments on commit f9cb7c0

Please sign in to comment.