diff --git a/examples/kafka-cluster/build.gradle b/examples/kafka-cluster/build.gradle new file mode 100644 index 00000000000..db07c6859f3 --- /dev/null +++ b/examples/kafka-cluster/build.gradle @@ -0,0 +1,17 @@ +plugins { + id 'java' +} + +repositories { + jcenter() +} + +dependencies { + testCompileOnly "org.projectlombok:lombok:1.18.10" + testAnnotationProcessor "org.projectlombok:lombok:1.18.10" + testCompile 'org.testcontainers:kafka' + testCompile 'org.apache.kafka:kafka-clients:2.3.1' + testCompile 'org.assertj:assertj-core:3.14.0' + testCompile 'com.google.guava:guava:23.0' + testCompile 'org.slf4j:slf4j-simple:1.7.30' +} diff --git a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerCluster.java b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerCluster.java new file mode 100644 index 00000000000..fd5b1a0a0b6 --- /dev/null +++ b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerCluster.java @@ -0,0 +1,102 @@ +package com.example.kafkacluster; + +import lombok.SneakyThrows; +import org.rnorth.ducttape.unreliables.Unreliables; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.lifecycle.Startable; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; + +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * Provides an easy way to launch a Kafka cluster with multiple brokers. + */ +public class KafkaContainerCluster implements Startable { + + private final int brokersNum; + private final Network network; + private final GenericContainer zookeeper; + private final Collection brokers; + + public KafkaContainerCluster(String confluentPlatformVersion, int brokersNum, int internalTopicsRf) { + if (brokersNum < 0) { + throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0"); + } + if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) { + throw new IllegalArgumentException("internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0"); + } + + this.brokersNum = brokersNum; + this.network = Network.newNetwork(); + + this.zookeeper = new GenericContainer<>(DockerImageName.parse("confluentinc/cp-zookeeper").withTag(confluentPlatformVersion)) + .withNetwork(network) + .withNetworkAliases("zookeeper") + .withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(KafkaContainer.ZOOKEEPER_PORT)); + + this.brokers = IntStream + .range(0, this.brokersNum) + .mapToObj(brokerNum -> { + return new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag(confluentPlatformVersion)) + .withNetwork(this.network) + .withNetworkAliases("broker-" + brokerNum) + .dependsOn(this.zookeeper) + .withExternalZookeeper("zookeeper:" + KafkaContainer.ZOOKEEPER_PORT) + .withEnv("KAFKA_BROKER_ID", brokerNum + "") + .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", internalTopicsRf + "") + .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", internalTopicsRf + "") + .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", internalTopicsRf + "") + .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicsRf + ""); + }) + .collect(Collectors.toList()); + } + + public Collection getBrokers() { + return this.brokers; + } + + public String getBootstrapServers() { + return brokers.stream() + .map(KafkaContainer::getBootstrapServers) + .collect(Collectors.joining(",")); + } + + private Stream> allContainers() { + return Stream.concat( + this.brokers.stream(), + Stream.of(this.zookeeper) + ); + } + + @Override + @SneakyThrows + public void start() { + Stream startables = this.brokers.stream().map(Startable.class::cast); + Startables.deepStart(startables).get(60, SECONDS); + + Unreliables.retryUntilTrue(30, TimeUnit.SECONDS, () -> { + Container.ExecResult result = this.zookeeper.execInContainer( + "sh", "-c", + "zookeeper-shell zookeeper:" + KafkaContainer.ZOOKEEPER_PORT + " ls /brokers/ids | tail -n 1" + ); + String brokers = result.getStdout(); + + return brokers != null && brokers.split(",").length == this.brokersNum; + }); + } + + @Override + public void stop() { + allContainers().parallel().forEach(GenericContainer::stop); + } +} diff --git a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerClusterTest.java b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerClusterTest.java new file mode 100644 index 00000000000..0756b24d58a --- /dev/null +++ b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerClusterTest.java @@ -0,0 +1,97 @@ +package com.example.kafkacluster; + +import com.google.common.collect.ImmutableMap; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.Test; +import org.rnorth.ducttape.unreliables.Unreliables; + +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.tuple; + +public class KafkaContainerClusterTest { + + @Test + public void testKafkaContainerCluster() throws Exception { + try ( + KafkaContainerCluster cluster = new KafkaContainerCluster("5.2.1", 3, 2) + ) { + cluster.start(); + String bootstrapServers = cluster.getBootstrapServers(); + + assertThat(cluster.getBrokers()).hasSize(3); + + testKafkaFunctionality(bootstrapServers, 3, 2); + } + } + + protected void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception { + try ( + AdminClient adminClient = AdminClient.create(ImmutableMap.of( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers + )); + + KafkaProducer producer = new KafkaProducer<>( + ImmutableMap.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers, + ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString() + ), + new StringSerializer(), + new StringSerializer() + ); + + KafkaConsumer consumer = new KafkaConsumer<>( + ImmutableMap.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers, + ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID(), + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" + ), + new StringDeserializer(), + new StringDeserializer() + ); + ) { + String topicName = "messages"; + + Collection topics = Collections.singletonList(new NewTopic(topicName, partitions, (short) rf)); + adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS); + + consumer.subscribe(Collections.singletonList(topicName)); + + producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get(); + + Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + + if (records.isEmpty()) { + return false; + } + + assertThat(records) + .hasSize(1) + .extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value) + .containsExactly(tuple(topicName, "testcontainers", "rulezzz")); + + return true; + }); + + consumer.unsubscribe(); + } + } + +} diff --git a/examples/settings.gradle b/examples/settings.gradle index 25da14a0931..ea988b4ada8 100644 --- a/examples/settings.gradle +++ b/examples/settings.gradle @@ -19,6 +19,7 @@ includeBuild '..' // explicit include to allow Dependabot to autodiscover subprojects include 'disque-job-queue' +include 'kafka-cluster' include 'linked-container' include 'mongodb-container' include 'redis-backed-cache' diff --git a/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java b/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java index 12867239251..b1bc3d735f9 100644 --- a/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java +++ b/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java @@ -24,6 +24,8 @@ public class KafkaContainer extends GenericContainer { public static final int ZOOKEEPER_PORT = 2181; + private static final String DEFAULT_INTERNAL_TOPIC_RF = "1"; + private static final int PORT_NOT_ASSIGNED = -1; protected String externalZookeeperConnect = null; @@ -60,8 +62,10 @@ public KafkaContainer(final DockerImageName dockerImageName) { withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER"); withEnv("KAFKA_BROKER_ID", "1"); - withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1"); - withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1"); + withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF); withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + ""); withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0"); } diff --git a/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java index 263653822e6..d219aa17b13 100644 --- a/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java +++ b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java @@ -6,8 +6,12 @@ import com.google.common.collect.ImmutableMap; import java.time.Duration; +import java.util.Collection; import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -110,7 +114,15 @@ public void testConfluentPlatformVersion6() throws Exception { } protected void testKafkaFunctionality(String bootstrapServers) throws Exception { + testKafkaFunctionality(bootstrapServers, 1, 1); + } + + protected void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception { try ( + AdminClient adminClient = AdminClient.create(ImmutableMap.of( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers + )); + KafkaProducer producer = new KafkaProducer<>( ImmutableMap.of( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers, @@ -131,6 +143,10 @@ protected void testKafkaFunctionality(String bootstrapServers) throws Exception ); ) { String topicName = "messages-" + UUID.randomUUID(); + + Collection topics = singletonList(new NewTopic(topicName, partitions, (short) rf)); + adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS); + consumer.subscribe(singletonList(topicName)); producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();