-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka cluster example #1984
Kafka cluster example #1984
Conversation
dd4d9dd
to
26721e9
Compare
modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java
Outdated
Show resolved
Hide resolved
modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java
Outdated
Show resolved
Hide resolved
modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainerCluster.java
Outdated
Show resolved
Hide resolved
modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainerCluster.java
Outdated
Show resolved
Hide resolved
modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainerCluster.java
Outdated
Show resolved
Hide resolved
modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java
Outdated
Show resolved
Hide resolved
cfe9f72
to
060cf70
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome, thanks a lot for the review @bsideup. I reverted most of the changes to KafkaContainer
.
Is KafkaContainerCluster
something you're willing to consider adding to the API? It should make it easier for users to setup multi-broker clusters without knowing much about Kafka.
modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java
Outdated
Show resolved
Hide resolved
modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java
Outdated
Show resolved
Hide resolved
modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java
Outdated
Show resolved
Hide resolved
modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainerCluster.java
Outdated
Show resolved
Hide resolved
e119105
to
3860de3
Compare
3860de3
to
95b28d7
Compare
Rebased on master. @bsideup LMK if you have any concerns. |
@SneakyThrows | ||
private String clusterBrokers(KafkaContainer c) { | ||
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); | ||
dockerClient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered using zookeeper.execInContainer(cmd)
?
Also, what would be the use of this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used in start()
to assert that Kafka cluster has formed, by checking ZooKeeper (its coordinator) to see if the right number of brokers have joined together.
Good idea about using the ZooKeeper container. It looks like the zookeeper-shell
script is present on the path there as well. I've pushed an update.
modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainerCluster.java
Outdated
Show resolved
Hide resolved
Stream<Startable> startables = this.brokers.stream().map(b -> (Startable) b); | ||
Startables.deepStart(startables).get(60, SECONDS); | ||
|
||
Unreliables.retryUntilTrue(30, TimeUnit.SECONDS, () -> Stream.of(this.zookeeper) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it can be simplified to () -> clusterBrokersIDs().length == brokersNum
(where clusterBrokersIDs
returns an array (or list) of broker IDs (or, if execInContainer
is possible, can be inlined since it becomes a one liner)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The container command returns a string (internally it's stored as a string, I guess, i.e. 1,2,3
). I could inline a shell command to split or count the string instead if that's what you prefer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realized that there is no error handling here, which means that if the start fails then the containers will be dangling until someone calls #stop()
.
That made me think of the rules use cases (some people still got used to running things with JUnit's rules) whether we should have it as part of the module at all, or perhaps we should make it an example instead (We have the examples/ folder for things that feel more like a very special use case rather than a thing that most of the users will be doing)
@seglo I remember that you were using it in Alpakka.
Given some cleanups we were able to do, do you still feel that this amount of code is preferable to have as a part of the Kafka module (and being maintained), or maybe you're okay with having it as an example instead?
P.S. the KafkaContainer's rf fix is still very much needed 👍
@bsideup By error handling do you mean if the containers fail to start? I don't mind scrapping this PR (creating a separate one for the tx RF stuff), and making this code an example. As you know I have a copy in Alpakka that works well for us, though I would probably port it to Scala instead. LMK. |
When some containers fail to start, or when the
No need to, you can just adjust this PR! (Well, unless you want to make 2 separate contributions, totally deserved 👍)
Let's do it like that: we will keep it as an example for now (I feel sorry for not suggesting it on a first place, btw! P.S. thanks a lot for the fast replies and your dedication! Really appreciate that 👍 💯 |
7a7364e
to
6d6b838
Compare
Thanks @bsideup . I'm having a gradle dep issue perhaps you can help me resolve. I added a
|
@seglo I see that you defined it as |
…#3335) Bumps s3 from 2.14.21 to 2.15.7. Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
) Bumps [cucumber-java](https://github.com/cucumber/cucumber-jvm) from 6.6.0 to 6.8.1. - [Release notes](https://github.com/cucumber/cucumber-jvm/releases) - [Changelog](https://github.com/cucumber/cucumber-jvm/blob/main/CHANGELOG.md) - [Commits](cucumber/cucumber-jvm@v6.6.0...v6.8.1) Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
For more specific cache matching
Co-authored-by: Richard North <rich.north@gmail.com>
…Spanner emulators (testcontainers#2690) Co-authored-by: Richard North <rich.north@gmail.com>
…rs#3343) * Use a lighter weight image for MultiplePortsExposedTest * Update helloworld container version
…m daemon port (testcontainers#2769) (testcontainers#3237) Co-authored-by: Vitalii Chura <c-vitalii.chura@hulu.com>
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. If you believe this is a mistake, please reply to this comment to keep it open. If there isn't one already, a PR to fix or at least reproduce the problem in a test case will always help us get back on track to tackle this. |
@seglo you won't believe! 😂 I just merged your PR into our own branch (for some polishing) and will merge it into master in a moment! Thanks A LOT for submitting it and staying with us :D And I am so sorry that it took so long 😅 |
Nice, thanks. Longest PR ever :) I'll admit that I've been iterating this more over in Alpakka Kafka. I can take a look at the end result and see if there is anything worth carrying over. |
@seglo the longest indeed 😅 And with such a nice id, btw ;) Looking forward to the updates, and I promise it won't take that much this time 😂 |
This PR introduces the
KafkaContainerCluster
to run a cluster ofKafkaContainers
. I discussed this with @bsideup on slack and thought it would be useful for others too.Cluster-related changes
KafkaContainerCluster
to facilitate creating a multi-broker cluster.Startable
and usedeepStart
to launch cluster containers.AdminClient
to create topics in tests so we can define topics with partitions and replication factors greater than 1.Other changes
KafkaConsumer.poll(int)
withKafkaConsumer.poll(Duration)
in tests.