diff --git a/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java b/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java index c263dd9..05f6b9d 100644 --- a/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java +++ b/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java @@ -16,6 +16,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -37,17 +38,19 @@ public class StrimziKafkaCluster implements KafkaContainer { private static final Logger LOGGER = LoggerFactory.getLogger(StrimziKafkaCluster.class); // instance attributes - private int brokersNum; - private int internalTopicReplicationFactor; + private final int brokersNum; + private final int internalTopicReplicationFactor; private Map additionalKafkaConfiguration; private ToxiproxyContainer proxyContainer; private boolean enableSharedNetwork; private String kafkaVersion; + private boolean enableKraft; // not editable private final Network network; - private final StrimziZookeeperContainer zookeeper; + private StrimziZookeeperContainer zookeeper; private Collection brokers; + private String clusterId; /** * Constructor for @StrimziKafkaCluster class, which allows you to specify number of brokers @see{brokersNum}, @@ -141,12 +144,16 @@ private StrimziKafkaCluster(StrimziKafkaClusterBuilder builder) { this.additionalKafkaConfiguration = builder.additionalKafkaConfiguration; this.proxyContainer = builder.proxyContainer; this.kafkaVersion = builder.kafkaVersion; + this.enableKraft = builder.enableKRaft; + this.clusterId = builder.clusterId; validateBrokerNum(this.brokersNum); validateInternalTopicReplicationFactor(this.internalTopicReplicationFactor, this.brokersNum); - this.zookeeper = new StrimziZookeeperContainer() - .withNetwork(this.network); + if (this.isZooKeeperBasedKafkaCluster()) { + this.zookeeper = new StrimziZookeeperContainer() + .withNetwork(this.network); + } if (this.proxyContainer != null) { this.proxyContainer.setNetwork(this.network); @@ -162,6 +169,11 @@ private void prepareKafkaCluster(final Map additionalKafkaConfig defaultKafkaConfigurationForMultiNode.put("transaction.state.log.replication.factor", String.valueOf(internalTopicReplicationFactor)); defaultKafkaConfigurationForMultiNode.put("transaction.state.log.min.isr", String.valueOf(internalTopicReplicationFactor)); + if (this.isKraftKafkaCluster()) { + // we have to configure quorum voters but also we simplify process because we use network aliases (i.e., broker-) + this.configureQuorumVoters(additionalKafkaConfiguration); + } + if (additionalKafkaConfiguration != null) { defaultKafkaConfigurationForMultiNode.putAll(additionalKafkaConfiguration); } @@ -172,15 +184,27 @@ private void prepareKafkaCluster(final Map additionalKafkaConfig .mapToObj(brokerId -> { LOGGER.info("Starting broker with id {}", brokerId); // adding broker id for each kafka container - KafkaContainer kafkaContainer = new StrimziKafkaContainer() + StrimziKafkaContainer kafkaContainer = new StrimziKafkaContainer() .withBrokerId(brokerId) .withKafkaConfigurationMap(defaultKafkaConfigurationForMultiNode) - .withExternalZookeeperConnect("zookeeper:" + StrimziZookeeperContainer.ZOOKEEPER_PORT) .withNetwork(this.network) .withProxyContainer(proxyContainer) - .withNetworkAliases("broker-" + brokerId) - .withKafkaVersion(kafkaVersion == null ? KafkaVersionService.getInstance().latestRelease().getVersion() : kafkaVersion) - .dependsOn(this.zookeeper); + .withKafkaVersion(kafkaVersion == null ? KafkaVersionService.getInstance().latestRelease().getVersion() : kafkaVersion); + + // if it's ZK-based Kafka cluster we depend on ZK container and we need to specify external ZK connect + if (this.isZooKeeperBasedKafkaCluster()) { + kafkaContainer.withExternalZookeeperConnect("zookeeper:" + StrimziZookeeperContainer.ZOOKEEPER_PORT) + .dependsOn(this.zookeeper); + } else { + kafkaContainer + // if KRaft we need to enable it + .withKraft() + // One must set `node.id` to the same value as `broker.id` if we use KRaft mode + .withNodeId(brokerId) + // pass shared `cluster.id` to each broker + .withClusterId(this.clusterId) + .waitForRunning(); + } LOGGER.info("Started broker with id: {}", kafkaContainer); @@ -208,6 +232,8 @@ public static class StrimziKafkaClusterBuilder { private ToxiproxyContainer proxyContainer; private boolean enableSharedNetwork; private String kafkaVersion; + private boolean enableKRaft; + private String clusterId; /** * Sets the number of Kafka brokers in the cluster. @@ -280,12 +306,20 @@ public StrimziKafkaClusterBuilder withKafkaVersion(String kafkaVersion) { return this; } + public StrimziKafkaClusterBuilder withKraftEnabled() { + this.enableKRaft = true; + return this; + } + /** * Builds and returns a {@code StrimziKafkaCluster} instance based on the provided configurations. * * @return a new instance of {@code StrimziKafkaCluster} */ public StrimziKafkaCluster build() { + // Generate a single cluster ID, which will be shared by all brokers + this.clusterId = UUID.randomUUID().toString(); + return new StrimziKafkaCluster(this); } } @@ -319,6 +353,14 @@ public String getBootstrapServers() { .collect(Collectors.joining(",")); } + public boolean isZooKeeperBasedKafkaCluster() { + return !this.enableKraft; + } + + public boolean isKraftKafkaCluster() { + return this.enableKraft; + } + /* test */ int getInternalTopicReplicationFactor() { return this.internalTopicReplicationFactor; } @@ -331,6 +373,15 @@ public String getBootstrapServers() { return this.additionalKafkaConfiguration; } + private void configureQuorumVoters(final Map additionalKafkaConfiguration) { + // Construct controller.quorum.voters based on network aliases (broker-1, broker-2, etc.) + final String quorumVoters = IntStream.range(0, this.brokersNum) + .mapToObj(brokerId -> String.format("%d@" + StrimziKafkaContainer.NETWORK_ALIAS_PREFIX + "%d:9094", brokerId, brokerId)) + .collect(Collectors.joining(",")); + + additionalKafkaConfiguration.put("controller.quorum.voters", quorumVoters); + } + @Override public void start() { Stream startables = this.brokers.stream(); @@ -341,31 +392,35 @@ public void start() { e.printStackTrace(); } - Utils.waitFor("Kafka brokers nodes to be connected to the ZooKeeper", Duration.ofSeconds(5).toMillis(), Duration.ofMinutes(1).toMillis(), - () -> { - Container.ExecResult result; - try { - result = this.zookeeper.execInContainer( - "sh", "-c", - "bin/zookeeper-shell.sh zookeeper:" + StrimziZookeeperContainer.ZOOKEEPER_PORT + " ls /brokers/ids | tail -n 1" - ); - String brokers = result.getStdout(); - - LOGGER.info("Running Kafka brokers: {}", result.getStdout()); - - return brokers != null && brokers.split(",").length == this.brokersNum; - } catch (IOException | InterruptedException e) { - Thread.currentThread().interrupt(); - e.printStackTrace(); - return false; - } - }); + if (this.isZooKeeperBasedKafkaCluster()) { + Utils.waitFor("Kafka brokers nodes to be connected to the ZooKeeper", Duration.ofSeconds(5).toMillis(), Duration.ofMinutes(1).toMillis(), + () -> { + Container.ExecResult result; + try { + result = this.zookeeper.execInContainer( + "sh", "-c", + "bin/zookeeper-shell.sh zookeeper:" + StrimziZookeeperContainer.ZOOKEEPER_PORT + " ls /brokers/ids | tail -n 1" + ); + String brokers = result.getStdout(); + + LOGGER.info("Running Kafka brokers: {}", result.getStdout()); + + return brokers != null && brokers.split(",").length == this.brokersNum; + } catch (IOException | InterruptedException e) { + Thread.currentThread().interrupt(); + e.printStackTrace(); + return false; + } + }); + } } @Override public void stop() { - // firstly we shut-down zookeeper -> reason: 'On the command line if I kill ZK first it sometimes prevents a broker from shutting down quickly.' - this.zookeeper.stop(); + if (this.isZooKeeperBasedKafkaCluster()) { + // firstly we shut-down zookeeper -> reason: 'On the command line if I kill ZK first it sometimes prevents a broker from shutting down quickly.' + this.zookeeper.stop(); + } // stop all kafka containers in parallel this.brokers.stream() diff --git a/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java b/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java index 779aad6..e08c4e7 100644 --- a/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java +++ b/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java @@ -19,6 +19,8 @@ import org.testcontainers.utility.MountableFile; import java.io.IOException; +import java.io.StringWriter; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -28,6 +30,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -60,6 +63,8 @@ public class StrimziKafkaContainer extends GenericContainer kafkaConfigurationMap; private String externalZookeeperConnect; private int brokerId; + private Integer nodeId; private String kafkaVersion; private boolean useKraft; private Function bootstrapServersProvider = c -> String.format("PLAINTEXT://%s:%s", getHost(), this.kafkaExposedPort); @@ -139,6 +145,7 @@ protected void doStart() { // expose internal ZooKeeper internal port iff external ZooKeeper or KRaft is not specified/enabled super.addExposedPort(StrimziZookeeperContainer.ZOOKEEPER_PORT); } + super.withNetworkAliases(NETWORK_ALIAS_PREFIX + this.brokerId); // we need it for the startZookeeper(); and startKafka(); to run container before... super.setCommand("sh", "-c", runStarterScript()); super.doStart(); @@ -180,7 +187,7 @@ public StrimziKafkaContainer waitForRunning() { } @Override - @SuppressWarnings({"JavaNCSS", "NPathComplexity"}) + @SuppressWarnings({"JavaNCSS", "NPathComplexity", "CyclomaticComplexity"}) protected void containerIsStarting(final InspectContainerResponse containerInfo, final boolean reused) { super.containerIsStarting(containerInfo, reused); @@ -211,7 +218,7 @@ protected void containerIsStarting(final InspectContainerResponse containerInfo, advertisedListenerNumber++; } - LOGGER.info("This is all advertised listeners for Kafka {}", advertisedListeners.toString()); + LOGGER.info("This is all advertised listeners for Kafka {}", advertisedListeners); StringBuilder kafkaListeners = new StringBuilder(); StringBuilder kafkaListenerSecurityProtocol = new StringBuilder(); @@ -237,7 +244,8 @@ protected void containerIsStarting(final InspectContainerResponse containerInfo, if (this.useKraft) { // adding Controller listener for Kraft mode - kafkaListeners.append(",").append("CONTROLLER://localhost:9094"); + // (wildcard address for multi-node setup; that way we other nodes can connect and communicate between each other) + kafkaListeners.append(",").append("CONTROLLER://0.0.0.0:9094"); kafkaListenerSecurityProtocol.append(",").append("CONTROLLER:PLAINTEXT"); } @@ -249,24 +257,25 @@ protected void containerIsStarting(final InspectContainerResponse containerInfo, kafkaConfiguration.put("inter.broker.listener.name", "BROKER1"); kafkaConfiguration.put("broker.id", String.valueOf(this.brokerId)); - if (this.useKraft) { - // explicitly say, which listener will be controller (in this case CONTROLLER) - kafkaConfiguration.put("controller.quorum.voters", this.brokerId + "@localhost:9094"); - kafkaConfiguration.put("controller.listener.names", "CONTROLLER"); - } else if (this.externalZookeeperConnect != null) { - LOGGER.info("Using external ZooKeeper 'zookeeper.connect={}'.", this.externalZookeeperConnect); - kafkaConfiguration.put("zookeeper.connect", this.externalZookeeperConnect); - } else { - // using internal ZooKeeper - LOGGER.info("Using internal ZooKeeper 'zookeeper.connect={}.'", "localhost:" + StrimziZookeeperContainer.ZOOKEEPER_PORT); - kafkaConfiguration.put("zookeeper.connect", "localhost:" + StrimziZookeeperContainer.ZOOKEEPER_PORT); - } - // additional kafka config if (this.kafkaConfigurationMap != null) { kafkaConfiguration.putAll(this.kafkaConfigurationMap); } - String kafkaConfigurationOverride = writeOverrideString(kafkaConfiguration); + + if (this.nodeId == null) { + LOGGER.warn("Node ID is not set. Using broker ID {} as the default node ID.", this.brokerId); + this.nodeId = this.brokerId; + } + + final Properties defaultServerProperties = this.buildDefaultServerProperties(); + final String serverPropertiesWithOverride = this.overrideProperties(defaultServerProperties, kafkaConfiguration); + + // copy override file to the container + if (this.useKraft) { + copyFileToContainer(Transferable.of(serverPropertiesWithOverride.getBytes(StandardCharsets.UTF_8)), "/opt/kafka/config/kraft/server.properties"); + } else { + copyFileToContainer(Transferable.of(serverPropertiesWithOverride.getBytes(StandardCharsets.UTF_8)), "/opt/kafka/config/server.properties"); + } String command = "#!/bin/bash \n"; @@ -276,11 +285,15 @@ protected void containerIsStarting(final InspectContainerResponse containerInfo, } else { command += "bin/zookeeper-server-start.sh config/zookeeper.properties &\n"; } - command += "bin/kafka-server-start.sh config/server.properties" + kafkaConfigurationOverride; + command += "bin/kafka-server-start.sh config/server.properties"; } else { - clusterId = this.randomUuid(); - command += "bin/kafka-storage.sh format -t=\"" + clusterId + "\" -c config/kraft/server.properties \n"; - command += "bin/kafka-server-start.sh config/kraft/server.properties" + kafkaConfigurationOverride; + if (this.clusterId == null) { + this.clusterId = this.randomUuid(); + LOGGER.info("New `cluster.id` has been generated: {}", this.clusterId); + } + + command += "bin/kafka-storage.sh format -t=\"" + this.clusterId + "\" -c /opt/kafka/config/kraft/server.properties \n"; + command += "bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties \n"; } Utils.asTransferableBytes(serverPropertiesFile).ifPresent(properties -> copyFileToContainer( @@ -341,6 +354,63 @@ private String randomUuid() { return Base64.getUrlEncoder().withoutPadding().encodeToString(uuidBytesArray); } + private Properties buildDefaultServerProperties() { + // Default properties for server.properties + Properties properties = new Properties(); + + // Common settings for both KRaft and non-KRaft modes + properties.setProperty("listeners", "PLAINTEXT://:9092"); + properties.setProperty("inter.broker.listener.name", "PLAINTEXT"); + properties.setProperty("advertised.listeners", "PLAINTEXT://localhost:9092"); + properties.setProperty("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL"); + properties.setProperty("num.network.threads", "3"); + properties.setProperty("num.io.threads", "8"); + properties.setProperty("socket.send.buffer.bytes", "102400"); + properties.setProperty("socket.receive.buffer.bytes", "102400"); + properties.setProperty("socket.request.max.bytes", "104857600"); + properties.setProperty("log.dirs", "/tmp/default-log-dir"); + properties.setProperty("num.partitions", "1"); + properties.setProperty("num.recovery.threads.per.data.dir", "1"); + properties.setProperty("offsets.topic.replication.factor", "1"); + properties.setProperty("transaction.state.log.replication.factor", "1"); + properties.setProperty("transaction.state.log.min.isr", "1"); + properties.setProperty("log.retention.hours", "168"); + properties.setProperty("log.retention.check.interval.ms", "300000"); + + // Add KRaft-specific settings if useKraft is enabled + if (this.useKraft) { + properties.setProperty("process.roles", "broker,controller"); + properties.setProperty("node.id", String.valueOf(this.nodeId)); // Use dynamic node id + properties.setProperty("controller.quorum.voters", String.format("%d@" + NETWORK_ALIAS_PREFIX + this.nodeId + ":9094", this.nodeId)); + properties.setProperty("listeners", "PLAINTEXT://:9092,CONTROLLER://:9093"); + properties.setProperty("controller.listener.names", "CONTROLLER"); + } else if (this.externalZookeeperConnect != null) { + LOGGER.info("Using external ZooKeeper 'zookeeper.connect={}'.", this.externalZookeeperConnect); + properties.put("zookeeper.connect", this.externalZookeeperConnect); + } else { + // using internal ZooKeeper + LOGGER.info("Using internal ZooKeeper 'zookeeper.connect={}.'", "localhost:" + StrimziZookeeperContainer.ZOOKEEPER_PORT); + properties.put("zookeeper.connect", "localhost:" + StrimziZookeeperContainer.ZOOKEEPER_PORT); + } + + return properties; + } + + private String overrideProperties(Properties defaultProperties, Map overrides) { + // Apply overrides + overrides.forEach(defaultProperties::setProperty); + + // Write properties to string + StringWriter writer = new StringWriter(); + try { + defaultProperties.store(writer, null); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return writer.toString(); + } + @Override public String getInternalZooKeeperConnect() { if (this.hasKraftOrExternalZooKeeperConfigured()) { @@ -400,10 +470,19 @@ public StrimziKafkaContainer withExternalZookeeperConnect(final String externalZ * @return StrimziKafkaContainer instance */ public StrimziKafkaContainer withBrokerId(final int brokerId) { + if (this.useKraft && this.brokerId != this.nodeId) { + throw new IllegalStateException("`broker.id` and `node.id` must have same value!"); + } + this.brokerId = brokerId; return self(); } + public StrimziKafkaContainer withNodeId(final int nodeId) { + this.nodeId = nodeId; + return self(); + } + /** * Fluent method, which sets @code{kafkaVersion}. * @@ -487,6 +566,11 @@ public StrimziKafkaContainer withProxyContainer(final ToxiproxyContainer proxyCo return self(); } + protected StrimziKafkaContainer withClusterId(String clusterId) { + this.clusterId = clusterId; + return this; + } + /** * Retrieves a synchronized Proxy instance for this Kafka broker. * diff --git a/src/test/java/io/strimzi/test/container/StrimziKafkaKRaftClusterIT.java b/src/test/java/io/strimzi/test/container/StrimziKafkaKRaftClusterIT.java new file mode 100644 index 0000000..777a3b9 --- /dev/null +++ b/src/test/java/io/strimzi/test/container/StrimziKafkaKRaftClusterIT.java @@ -0,0 +1,222 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.test.container; + +import eu.rekawek.toxiproxy.Proxy; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.hamcrest.CoreMatchers; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.ToxiproxyContainer; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +@SuppressWarnings({"ClassFanOutComplexity", "ClassDataAbstractionCoupling"}) +public class StrimziKafkaKRaftClusterIT extends AbstractIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(StrimziKafkaContainerIT.class); + private static final int NUMBER_OF_REPLICAS = 3; + + private StrimziKafkaCluster systemUnderTest; + + @Test + void testKafkaClusterStartup() throws InterruptedException, ExecutionException { + try { + setUpKafkaKRaftCluster(); + + verifyReadinessOfKRaftCluster(); + } finally { + systemUnderTest.stop(); + } + } + + @Test + void testKafkaClusterStartupWithSharedNetwork() throws InterruptedException, ExecutionException { + try { + systemUnderTest = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(NUMBER_OF_REPLICAS) + .withSharedNetwork() + .withKraftEnabled() + .build(); + systemUnderTest.start(); + + verifyReadinessOfKRaftCluster(); + } finally { + systemUnderTest.stop(); + } + } + + @Test + void testKafkaClusterFunctionality() throws ExecutionException, InterruptedException, TimeoutException { + setUpKafkaKRaftCluster(); + + try { + verifyFunctionalityOfKafkaCluster(); + } finally { + systemUnderTest.stop(); + } + } + + @Test + void testKafkaClusterWithSharedNetworkFunctionality() throws ExecutionException, InterruptedException, TimeoutException { + try { + systemUnderTest = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(NUMBER_OF_REPLICAS) + .withSharedNetwork() + .withKraftEnabled() + .build(); + systemUnderTest.start(); + + verifyFunctionalityOfKafkaCluster(); + } finally { + systemUnderTest.stop(); + } + } + + @Test + void testStartClusterWithProxyContainer() { + setUpKafkaKRaftCluster(); + + ToxiproxyContainer proxyContainer = new ToxiproxyContainer( + DockerImageName.parse("ghcr.io/shopify/toxiproxy:2.6.0") + .asCompatibleSubstituteFor("shopify/toxiproxy")); + + StrimziKafkaCluster kafkaCluster = null; + + try { + kafkaCluster = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(NUMBER_OF_REPLICAS) + .withProxyContainer(proxyContainer) + .withKraftEnabled() + .build(); + + kafkaCluster.start(); + List bootstrapUrls = new ArrayList<>(); + for (KafkaContainer kafkaContainer : kafkaCluster.getBrokers()) { + Proxy proxy = ((StrimziKafkaContainer) kafkaContainer).getProxy(); + assertThat(proxy, notNullValue()); + bootstrapUrls.add(kafkaContainer.getBootstrapServers()); + } + + assertThat(kafkaCluster.getBootstrapServers(), + is(bootstrapUrls.stream().collect(Collectors.joining(",")))); + } finally { + kafkaCluster.stop(); + systemUnderTest.stop(); + } + } + + private void setUpKafkaKRaftCluster() { + systemUnderTest = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(NUMBER_OF_REPLICAS) + .withKraftEnabled() + .build(); + systemUnderTest.start(); + } + + private void verifyReadinessOfKRaftCluster() throws InterruptedException, ExecutionException { + try (Admin adminClient = AdminClient.create(ImmutableMap.of( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, systemUnderTest.getBootstrapServers()))) { + // Check broker availability + Collection brokers = adminClient.describeCluster().nodes().get(); + assertThat(brokers, notNullValue()); + // Check if we have 3 brokers + assertThat(brokers.size(), CoreMatchers.equalTo(3)); + + // Optionally check controller status + Node controller = adminClient.describeCluster().controller().get(); + assertThat(controller, notNullValue()); + + LOGGER.info("Brokers are {}", systemUnderTest.getBootstrapServers()); + } + } + + private void verifyFunctionalityOfKafkaCluster() throws ExecutionException, InterruptedException, TimeoutException { + // using try-with-resources for AdminClient, KafkaProducer and KafkaConsumer (implicit closing connection) + try (final AdminClient adminClient = AdminClient.create(ImmutableMap.of( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, systemUnderTest.getBootstrapServers())); + KafkaProducer producer = new KafkaProducer<>( + ImmutableMap.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, systemUnderTest.getBootstrapServers(), + ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString() + ), + new StringSerializer(), + new StringSerializer() + ); + KafkaConsumer consumer = new KafkaConsumer<>( + ImmutableMap.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, systemUnderTest.getBootstrapServers(), + ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID(), + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name().toLowerCase(Locale.ROOT) + ), + new StringDeserializer(), + new StringDeserializer() + ) + ) { + final String topicName = "example-topic"; + final String recordKey = "strimzi"; + final String recordValue = "the-best-project-in-the-world"; + + final Collection topics = Collections.singletonList(new NewTopic(topicName, NUMBER_OF_REPLICAS, (short) NUMBER_OF_REPLICAS)); + adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS); + + consumer.subscribe(Collections.singletonList(topicName)); + + producer.send(new ProducerRecord<>(topicName, recordKey, recordValue)).get(); + + Utils.waitFor("Consumer records are present", Duration.ofSeconds(10).toMillis(), Duration.ofMinutes(2).toMillis(), + () -> { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + + if (records.isEmpty()) { + return false; + } + + // verify count + assertThat(records.count(), is(1)); + + ConsumerRecord consumerRecord = records.records(topicName).iterator().next(); + + // verify content of the record + assertThat(consumerRecord.topic(), is(topicName)); + assertThat(consumerRecord.key(), is(recordKey)); + assertThat(consumerRecord.value(), is(recordValue)); + + return true; + }); + } + } +}