From cfe9f727e7c28ebc822ccdaf3ccbfba370e1bd7f Mon Sep 17 00:00:00 2001 From: Sean Glover Date: Sun, 20 Oct 2019 16:57:44 -0400 Subject: [PATCH] PR feedback --- .../containers/KafkaContainer.java | 29 +++++--------- .../containers/KafkaContainerCluster.java | 39 ++++++++++++------- .../containers/KafkaContainerTest.java | 34 +--------------- 3 files changed, 38 insertions(+), 64 deletions(-) diff --git a/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java b/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java index 11c5e5c9078..e2eb2a5fcaa 100644 --- a/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java +++ b/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java @@ -14,14 +14,12 @@ * */ public class KafkaContainer extends GenericContainer { - public static final int DEFAULT_KAFKA_PORT = 9093; - - public static final String DEFAULT_KAFKA_BROKER_ID = "1"; - - public static final int DEFAULT_INTERNAL_TOPIC_RF = 1; + public static final int KAFKA_PORT = 9093; public static final int ZOOKEEPER_PORT = 2181; + public static final String DEFAULT_INTERNAL_TOPIC_RF = "1"; + public static final String CONFLUENT_PLATFORM_VERSION = "5.2.1"; private static final int PORT_NOT_ASSIGNED = -1; @@ -29,36 +27,29 @@ public class KafkaContainer extends GenericContainer { protected String externalZookeeperConnect = null; private int port = PORT_NOT_ASSIGNED; - private int exposedPort; public KafkaContainer() { this(CONFLUENT_PLATFORM_VERSION); } public KafkaContainer(String confluentPlatformVersion) { - this(confluentPlatformVersion, DEFAULT_KAFKA_PORT, DEFAULT_KAFKA_BROKER_ID, DEFAULT_INTERNAL_TOPIC_RF); - } - - public KafkaContainer(String confluentPlatformVersion, int exposedPort, String brokerId, int internalTopicRf) { super(TestcontainersConfiguration.getInstance().getKafkaImage() + ":" + confluentPlatformVersion); - this.exposedPort = exposedPort; // TODO Only for backward compatibility withNetwork(Network.newNetwork()); withNetworkAliases("kafka-" + Base58.randomString(6)); - withExposedPorts(exposedPort); + withExposedPorts(KAFKA_PORT); // Use two listeners with different names, it will force Kafka to communicate with itself via internal // listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener - withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + exposedPort + ",BROKER://0.0.0.0:9092"); + withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092"); withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER"); - withEnv("KAFKA_BROKER_ID", brokerId); - withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", internalTopicRf + ""); - withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", internalTopicRf + ""); - withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", internalTopicRf + ""); - withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicRf + ""); + withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF); withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + ""); withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0"); } @@ -96,7 +87,7 @@ protected void doStart() { protected void containerIsStarting(InspectContainerResponse containerInfo) { super.containerIsStarting(containerInfo); - port = getMappedPort(this.exposedPort); + port = getMappedPort(KAFKA_PORT); final String zookeeperConnect; if (externalZookeeperConnect != null) { diff --git a/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainerCluster.java b/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainerCluster.java index 4cb0a093a79..5eb8d8d67cf 100644 --- a/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainerCluster.java +++ b/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainerCluster.java @@ -1,27 +1,31 @@ package org.testcontainers.containers; +import lombok.SneakyThrows; +import org.testcontainers.lifecycle.Startable; +import org.testcontainers.lifecycle.Startables; + import java.util.Collection; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.testcontainers.containers.KafkaContainer.CONFLUENT_PLATFORM_VERSION; /** * Provides an easy way to launch a Kafka cluster with multiple brokers. */ -public class KafkaContainerCluster implements AutoCloseable { +public class KafkaContainerCluster implements Startable { - private final int startPort; private final Network network; private final GenericContainer zookeeper; private final Collection brokers; - public KafkaContainerCluster(int brokersNum, int internalTopicsRf, int startPort) { - this(CONFLUENT_PLATFORM_VERSION, brokersNum, internalTopicsRf, startPort); + public KafkaContainerCluster(int brokersNum, int internalTopicsRf) { + this(CONFLUENT_PLATFORM_VERSION, brokersNum, internalTopicsRf); } - public KafkaContainerCluster(String confluentPlatformVersion, int brokersNum, int internalTopicsRf, int startPort) { + public KafkaContainerCluster(String confluentPlatformVersion, int brokersNum, int internalTopicsRf) { if (brokersNum < 0) { throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0"); } @@ -29,8 +33,6 @@ public KafkaContainerCluster(String confluentPlatformVersion, int brokersNum, in throw new IllegalArgumentException("internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0"); } - this.startPort = startPort; - this.network = Network.newNetwork(); this.zookeeper = new GenericContainer("confluentinc/cp-zookeeper:" + confluentPlatformVersion) @@ -41,9 +43,16 @@ public KafkaContainerCluster(String confluentPlatformVersion, int brokersNum, in this.brokers = IntStream .range(0, brokersNum) .mapToObj(brokerNum -> - new KafkaContainer(confluentPlatformVersion, this.startPort + brokerNum, String.valueOf(brokerNum), internalTopicsRf) + new KafkaContainer(confluentPlatformVersion) .withNetwork(this.network) + .withNetworkAliases("broker-" + brokerNum) + .dependsOn(this.zookeeper) .withExternalZookeeper("zookeeper:" + KafkaContainer.ZOOKEEPER_PORT) + .withEnv("KAFKA_BROKER_ID", brokerNum + "") + .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", internalTopicsRf + "") + .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", internalTopicsRf + "") + .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", internalTopicsRf + "") + .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicsRf + "") ) .collect(Collectors.toList()); } @@ -72,16 +81,20 @@ private Stream allContainers() { return Stream.concat(genericBrokers, zookeeper); } - public void startAll() { - allContainers().parallel().forEach(GenericContainer::start); + @Override + @SneakyThrows + public void start() { + Stream startables = this.brokers.stream().map(b -> (Startable) b); + Startables.deepStart(startables).get(60, SECONDS); } - public void stopAll() { + @Override + public void stop() { allContainers().parallel().forEach(GenericContainer::stop); } @Override - public void close() throws Exception { - this.stopAll(); + public void close() { + this.stop(); } } diff --git a/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java index 2a5b75e360a..9036e8df226 100644 --- a/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java +++ b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java @@ -75,42 +75,12 @@ public void testExternalZookeeperWithExternalNetwork() throws Exception { } } - @Test - public void testMultiKafkaBrokerCluster() throws Exception { - try ( - Network network = Network.newNetwork(); - - KafkaContainer kafka1 = new KafkaContainer(CONFLUENT_PLATFORM_VERSION, 9093, "1", 2) - .withNetwork(network) - .withExternalZookeeper("zookeeper:2181"); - KafkaContainer kafka2 = new KafkaContainer(CONFLUENT_PLATFORM_VERSION, 9193, "2", 2) - .withNetwork(network) - .withExternalZookeeper("zookeeper:2181"); - KafkaContainer kafka3 = new KafkaContainer(CONFLUENT_PLATFORM_VERSION, 9293, "3", 2) - .withNetwork(network) - .withExternalZookeeper("zookeeper:2181"); - - GenericContainer zookeeper = new GenericContainer("confluentinc/cp-zookeeper:" + CONFLUENT_PLATFORM_VERSION) - .withNetwork(network) - .withNetworkAliases("zookeeper") - .withEnv("ZOOKEEPER_CLIENT_PORT", "2181"); - ) { - Stream.of(kafka1, kafka2, kafka3, zookeeper).parallel().forEach(GenericContainer::start); - String bootstrapServers = Stream - .of(kafka1, kafka2, kafka3) - .map(KafkaContainer::getBootstrapServers) - .collect(Collectors.joining(",")); - - testKafkaFunctionality(bootstrapServers, 3, 2); - } - } - @Test public void testKafkaContainerCluster() throws Exception { try ( - KafkaContainerCluster cluster = new KafkaContainerCluster(CONFLUENT_PLATFORM_VERSION, 3, 2, 9093) + KafkaContainerCluster cluster = new KafkaContainerCluster(CONFLUENT_PLATFORM_VERSION, 3, 2) ) { - cluster.startAll(); + cluster.start(); String bootstrapServers = cluster.getBootstrapServers(); assertThat(cluster.getBrokers()).hasSize(3);