Skip to content

Commit

Permalink
feat: add kafka (KRaft mode only) module (#1610)
Browse files Browse the repository at this point in the history
* chore: scaffolding for kafka module

* chore: add test for reading from a topic

* docs: document methods and options

* fix: golangci-lint

* fix: wrong copy&paste

* docs: link to KRaft docs

* chore: better wait for KRaft log

* chore: support for set cluster ID

* chore: configure controller quorum voters based on the networks

* chore: validate KRaft version is above 7.0.0

* fix: lint

* chore: validate that the image namespace is the official one

* docs: improve message for image validation

* chore: run go mod tidy

* chore: rename file to only exists for testing

* chore: use concluent-local Docker image
  • Loading branch information
mdelapenya authored Sep 18, 2023
1 parent d6d5b0b commit 9a340c3
Show file tree
Hide file tree
Showing 12 changed files with 883 additions and 1 deletion.
7 changes: 7 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ updates:
day: sunday
open-pull-requests-limit: 3
rebase-strategy: disabled
- package-ecosystem: gomod
directory: /modules/kafka
schedule:
interval: monthly
day: sunday
open-pull-requests-limit: 3
rebase-strategy: disabled
- package-ecosystem: gomod
directory: /modules/localstack
schedule:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ jobs:
matrix:
go-version: [1.20.x, 1.x]
platform: [ubuntu-latest, macos-latest]
module: [artemis, clickhouse, compose, couchbase, elasticsearch, k3s, localstack, mariadb, mongodb, mysql, nats, neo4j, postgres, pulsar, redis, redpanda, vault]
module: [artemis, clickhouse, compose, couchbase, elasticsearch, k3s, kafka, localstack, mariadb, mongodb, mysql, nats, neo4j, postgres, pulsar, redis, redpanda, vault]
uses: ./.github/workflows/ci-test-go.yml
with:
go-version: ${{ matrix.go-version }}
Expand Down
4 changes: 4 additions & 0 deletions .vscode/.testcontainers-go.code-workspace
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@
"name": "module / k3s",
"path": "../modules/k3s"
},
{
"name": "module / kafka",
"path": "../modules/kafka"
},
{
"name": "module / localstack",
"path": "../modules/localstack"
Expand Down
94 changes: 94 additions & 0 deletions docs/modules/kafka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Kafka (KRaft)

Not available until the next release of testcontainers-go <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a>

## Introduction

The Testcontainers module for KRaft: [Apache Kafka Without ZooKeeper](https://developer.confluent.io/learn/kraft).

## Adding this module to your project dependencies

Please run the following command to add the Kafka module to your Go dependencies:

```
go get github.com/testcontainers/testcontainers-go/modules/kafka
```

## Usage example

<!--codeinclude-->
[Creating a Kafka container](../../modules/kafka/examples_test.go) inside_block:runKafkaContainer
<!--/codeinclude-->

## Module reference

The Kafka module exposes one entrypoint function to create the Kafka container, and this function receives two parameters:

```golang
func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*KafkaContainer, error)
```

- `context.Context`, the Go context.
- `testcontainers.ContainerCustomizer`, a variadic argument for passing options.

### Container Options

When starting the Kafka container, you can pass options in a variadic way to configure it.

#### Image

If you need to set a different Kafka Docker image, you can use `testcontainers.WithImage` with a valid Docker image
for Kafka. E.g. `testcontainers.WithImage("confluentinc/confluent-local:7.5.0")`.

!!! warning
The minimal required version of Kafka for KRaft mode is `confluentinc/confluent-local:7.4.0`. If you are using an image that
is different from the official one, please make sure that it's compatible with KRaft mode, as the module won't check
the version for you.

#### Init script

The Kafka container will be started using a custom shell script:

<!--codeinclude-->
[Init script](../../modules/kafka/kafka.go) inside_block:starterScript
<!--/codeinclude-->

#### Environment variables

The environment variables that are already set by default are:

<!--codeinclude-->
[Environment variables](../../modules/kafka/kafka.go) inside_block:envVars
<!--/codeinclude-->

#### Wait Strategies

If you need to set a different wait strategy for Kafka, you can use `testcontainers.WithWaitStrategy` with a valid wait strategy
for Kafka.

!!!info
The default deadline for the wait strategy is 60 seconds.

At the same time, it's possible to set a wait strategy and a custom deadline with `testcontainers.WithWaitStrategyAndDeadline`.

#### Docker type modifiers

If you need an advanced configuration for Kafka, you can leverage the following Docker type modifiers:

- `testcontainers.WithConfigModifier`
- `testcontainers.WithHostConfigModifier`
- `testcontainers.WithEndpointSettingsModifier`

Please read the [Create containers: Advanced Settings](../features/creating_container.md#advanced-settings) documentation for more information.

### Container Methods

The Kafka container exposes the following methods:

#### Brokers

The `Brokers(ctx)` method returns the Kafka brokers as a string slice, containing the host and the random port defined by Kafka's public port (`9093/tcp`).

<!--codeinclude-->
[Get Kafka brokers](../../modules/kafka/kafka_test.go) inside_block:getBrokers
<!--/codeinclude-->
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ nav:
- modules/couchbase.md
- modules/elasticsearch.md
- modules/k3s.md
- modules/kafka.md
- modules/localstack.md
- modules/mariadb.md
- modules/mongodb.md
Expand Down
5 changes: 5 additions & 0 deletions modules/kafka/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
include ../../commons-test.mk

.PHONY: test
test:
$(MAKE) test-kafka
55 changes: 55 additions & 0 deletions modules/kafka/consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package kafka

import (
"testing"

"github.com/IBM/sarama"
)

// TestKafkaConsumer is a test consumer for Kafka
type TestKafkaConsumer struct {
t *testing.T
ready chan bool
done chan bool
cancel chan bool
message *sarama.ConsumerMessage
}

func NewTestKafkaConsumer(t *testing.T) (consumer *TestKafkaConsumer, ready <-chan bool, done <-chan bool, cancel func()) {
kc := &TestKafkaConsumer{
t: t,
ready: make(chan bool, 1),
done: make(chan bool, 1),
cancel: make(chan bool, 1),
}
return kc, kc.ready, kc.done, func() {
kc.cancel <- true
}
}

func (k *TestKafkaConsumer) Setup(_ sarama.ConsumerGroupSession) error {
return nil
}

func (k *TestKafkaConsumer) Cleanup(_ sarama.ConsumerGroupSession) error {
return nil
}

// ConsumeClaim is called by the Kafka client library when a message is received
func (k *TestKafkaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
k.ready <- true
for {
select {
case message := <-claim.Messages():
k.message = message
session.MarkMessage(message, "")
k.done <- true

case <-k.cancel:
return nil

case <-session.Context().Done():
return nil
}
}
}
42 changes: 42 additions & 0 deletions modules/kafka/examples_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package kafka_test

import (
"context"
"fmt"

"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/kafka"
)

func ExampleRunContainer() {
// runKafkaContainer {
ctx := context.Background()

kafkaContainer, err := kafka.RunContainer(ctx,
kafka.WithClusterID("test-cluster"),
testcontainers.WithImage("confluentinc/confluent-local:7.5.0"),
)
if err != nil {
panic(err)
}

// Clean up the container after
defer func() {
if err := kafkaContainer.Terminate(ctx); err != nil {
panic(err)
}
}()
// }

state, err := kafkaContainer.State(ctx)
if err != nil {
panic(err)
}

fmt.Println(kafkaContainer.ClusterID)
fmt.Println(state.Running)

// Output:
// test-cluster
// true
}
62 changes: 62 additions & 0 deletions modules/kafka/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
module github.com/testcontainers/testcontainers-go/modules/kafka

go 1.20

require (
github.com/IBM/sarama v1.41.1
github.com/docker/go-connections v0.4.0
github.com/testcontainers/testcontainers-go v0.23.0
golang.org/x/mod v0.12.0
)

require (
dario.cat/mergo v1.0.0 // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/Microsoft/hcsshim v0.11.0 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/containerd/containerd v1.7.6 // indirect
github.com/cpuguy83/dockercfg v0.3.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/distribution v2.8.2+incompatible // indirect
github.com/docker/docker v24.0.6+incompatible // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/moby/patternmatcher v0.5.0 // indirect
github.com/moby/sys/sequential v0.5.0 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc4 // indirect
github.com/opencontainers/runc v1.1.5 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
golang.org/x/crypto v0.12.0 // indirect
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/tools v0.7.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
)

replace github.com/testcontainers/testcontainers-go => ../..
Loading

0 comments on commit 9a340c3

Please sign in to comment.