From d6657fb3448e265becc619b0847f4dd05730c11b Mon Sep 17 00:00:00 2001 From: see-quick Date: Wed, 16 Oct 2024 14:03:55 +0200 Subject: [PATCH 1/2] Add KRaft readiness check in StrimziKafkaCluster Signed-off-by: see-quick --- .../test/container/StrimziKafkaCluster.java | 47 +++++++++++++++++++ .../test/container/StrimziKafkaContainer.java | 4 ++ 2 files changed, 51 insertions(+) diff --git a/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java b/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java index 05f6b9d..2d5f321 100644 --- a/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java +++ b/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java @@ -20,6 +20,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -382,6 +384,7 @@ private void configureQuorumVoters(final Map additionalKafkaConf additionalKafkaConfiguration.put("controller.quorum.voters", quorumVoters); } + @SuppressWarnings({"CyclomaticComplexity"}) @Override public void start() { Stream startables = this.brokers.stream(); @@ -412,6 +415,50 @@ public void start() { return false; } }); + } else if (this.isKraftKafkaCluster()) { + // Readiness check for KRaft mode + Utils.waitFor("Kafka brokers to form a quorum", Duration.ofSeconds(5).toMillis(), Duration.ofMinutes(1).toMillis(), + () -> { + try { + for (KafkaContainer kafkaContainer : this.brokers) { + Container.ExecResult result = ((StrimziKafkaContainer) kafkaContainer).execInContainer( + "bash", "-c", + "bin/kafka-metadata-quorum.sh --bootstrap-server localhost:9093 describe --status" + ); + String output = result.getStdout(); + + LOGGER.info("Metadata quorum status from broker {}: {}", ((StrimziKafkaContainer) kafkaContainer).getBrokerId(), output); + + if (output == null || output.isEmpty()) { + return false; + } + + // Check if LeaderId is present and valid + final Pattern leaderIdPattern = Pattern.compile("LeaderId:\\s+(\\d+)"); + final Matcher leaderIdMatcher = leaderIdPattern.matcher(output); + + if (!leaderIdMatcher.find()) { + return false; // LeaderId not found + } + + String leaderIdStr = leaderIdMatcher.group(1); + try { + int leaderId = Integer.parseInt(leaderIdStr); + if (leaderId < 0) { + return false; // Invalid LeaderId + } + } catch (NumberFormatException e) { + return false; // LeaderId is not a valid integer + } + + // If LeaderId is present and valid, we assume the broker is ready + } + return true; // All brokers have a valid LeaderId + } catch (IOException | InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Failed to execute command in Kafka container", e); + } + }); } } diff --git a/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java b/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java index e08c4e7..6871422 100644 --- a/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java +++ b/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java @@ -605,4 +605,8 @@ public synchronized Proxy getProxy() { /* test */ String getKafkaVersion() { return this.kafkaVersion; } + + /* test */ int getBrokerId() { + return brokerId; + } } From 4edb54715178bc725100231376993a4e7cf2a57e Mon Sep 17 00:00:00 2001 From: see-quick Date: Wed, 16 Oct 2024 14:51:31 +0200 Subject: [PATCH 2/2] try 1s polling Signed-off-by: see-quick --- .../java/io/strimzi/test/container/StrimziKafkaCluster.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java b/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java index 2d5f321..659a7f0 100644 --- a/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java +++ b/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java @@ -396,7 +396,7 @@ public void start() { } if (this.isZooKeeperBasedKafkaCluster()) { - Utils.waitFor("Kafka brokers nodes to be connected to the ZooKeeper", Duration.ofSeconds(5).toMillis(), Duration.ofMinutes(1).toMillis(), + Utils.waitFor("Kafka brokers nodes to be connected to the ZooKeeper", Duration.ofSeconds(1).toMillis(), Duration.ofMinutes(1).toMillis(), () -> { Container.ExecResult result; try { @@ -417,7 +417,7 @@ public void start() { }); } else if (this.isKraftKafkaCluster()) { // Readiness check for KRaft mode - Utils.waitFor("Kafka brokers to form a quorum", Duration.ofSeconds(5).toMillis(), Duration.ofMinutes(1).toMillis(), + Utils.waitFor("Kafka brokers to form a quorum", Duration.ofSeconds(1).toMillis(), Duration.ofMinutes(1).toMillis(), () -> { try { for (KafkaContainer kafkaContainer : this.brokers) {