Skip to content

Commit

Permalink
Toggle kafka-topic connect param based on version
Browse files Browse the repository at this point in the history
  • Loading branch information
seglo committed Jun 17, 2020
1 parent 18152bf commit 9846b84
Showing 1 changed file with 70 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.core.command.ExecStartResultCallback;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
Expand Down Expand Up @@ -36,7 +38,10 @@ public class KafkaContainerCluster implements Startable {

private static final String TEST_SCRIPT = "/testcontainers_test.sh";
private static final String TEST_TOPIC = "test-kafka-container-cluster";
private static final Version BOOTSTRAP_PARAM_MIN_VERSION = new Version("5.2.0");

private final Logger log = LoggerFactory.getLogger(getClass());
private final Version confluentPlatformVersion;
private final int brokersNum;
private final Network network;
private final GenericContainer zookeeper;
Expand All @@ -59,6 +64,7 @@ public KafkaContainerCluster(
+ "' must be less than brokersNum and greater than 0");
}

this.confluentPlatformVersion = new Version(confluentPlatformVersion);
this.brokersNum = brokersNum;
this.network = Network.newNetwork();

Expand Down Expand Up @@ -131,15 +137,15 @@ public void start() {
.ifPresent(
broker -> {
broker.copyFileToContainer(
Transferable.of(testScript().getBytes(StandardCharsets.UTF_8), 700),
Transferable.of(readinessCheckScript().getBytes(StandardCharsets.UTF_8), 700),
TEST_SCRIPT);
});

// test produce & consume message with full cluster involvement
Unreliables.retryUntilTrue(
START_TIMEOUT_SECONDS,
TimeUnit.SECONDS,
() -> this.brokers.stream().findFirst().map(this::runTest).orElse(false));
() -> this.brokers.stream().findFirst().map(this::runReadinessCheck).orElse(false));
} catch (Exception ex) {
throw new RuntimeException(ex);
}
Expand Down Expand Up @@ -174,15 +180,20 @@ private String clusterBrokers(GenericContainer c) {
* (https://github.com/helm/charts)
* https://github.com/helm/charts/blob/master/incubator/kafka/templates/tests/test_topic_create_consume_produce.yaml
*/
private String testScript() {
private String readinessCheckScript() {
String connect = kafkaTopicConnectParam();
String command = "#!/bin/bash \n";
command += "set -e \n";
command +=
"kafka-topics --bootstrap-server localhost:9092 --delete --topic "
"kafka-topics "
+ connect
+ " --delete --topic "
+ TEST_TOPIC
+ " || echo \"topic does not exist\" \n";
command +=
"kafka-topics --bootstrap-server localhost:9092 --topic "
"kafka-topics "
+ connect
+ " --topic "
+ TEST_TOPIC
+ " --create --partitions "
+ this.brokersNum
Expand All @@ -200,25 +211,35 @@ private String testScript() {
"kafka-console-consumer --bootstrap-server localhost:9092 --topic "
+ TEST_TOPIC
+ " --from-beginning --timeout-ms 2000 --max-messages 1 | grep \"$MESSAGE\" \n";
command +=
"kafka-topics --bootstrap-server localhost:9092 --delete --topic " + TEST_TOPIC + " \n";
command += "kafka-topics " + connect + " --delete --topic " + TEST_TOPIC + " \n";
command += "echo \"test succeeded\" \n";
return command;
}

private Boolean runTest(GenericContainer c) {
private String kafkaTopicConnectParam() {
if (this.confluentPlatformVersion.compareTo(BOOTSTRAP_PARAM_MIN_VERSION) >= 0) {
return "--bootstrap-server localhost:9092";
} else {
return "--zookeeper zookeeper:" + AlpakkaKafkaContainer.ZOOKEEPER_PORT;
}
}

private Boolean runReadinessCheck(GenericContainer c) {
try {
ByteArrayOutputStream stdoutStream = new ByteArrayOutputStream();
ByteArrayOutputStream stderrStream = new ByteArrayOutputStream();
dockerClient
.execStartCmd(
dockerClient
.execCreateCmd(c.getContainerId())
.withAttachStdout(true)
.withAttachStderr(true)
.withCmd("sh", "-c", TEST_SCRIPT)
.exec()
.getId())
.exec(new ExecStartResultCallback(stdoutStream, null))
.exec(new ExecStartResultCallback(stdoutStream, stderrStream))
.awaitCompletion();
log.debug("Readiness check returned errors:\n{}", stderrStream.toString());
return stdoutStream.toString().contains("test succeeded");
} catch (Exception ex) {
throw new RuntimeException(ex);
Expand All @@ -230,3 +251,43 @@ public void stop() {
allContainers().parallel().forEach(GenericContainer::stop);
}
}

@InternalApi
class Version implements Comparable<Version> {

private String version;

public final String get() {
return this.version;
}

public Version(String version) {
if (version == null) throw new IllegalArgumentException("Version can not be null");
if (!version.matches("[0-9]+(\\.[0-9]+)*"))
throw new IllegalArgumentException("Invalid version format");
this.version = version;
}

@Override
public int compareTo(Version that) {
if (that == null) return 1;
String[] thisParts = this.get().split("\\.");
String[] thatParts = that.get().split("\\.");
int length = Math.max(thisParts.length, thatParts.length);
for (int i = 0; i < length; i++) {
int thisPart = i < thisParts.length ? Integer.parseInt(thisParts[i]) : 0;
int thatPart = i < thatParts.length ? Integer.parseInt(thatParts[i]) : 0;
if (thisPart < thatPart) return -1;
if (thisPart > thatPart) return 1;
}
return 0;
}

@Override
public boolean equals(Object that) {
if (this == that) return true;
if (that == null) return false;
if (this.getClass() != that.getClass()) return false;
return this.compareTo((Version) that) == 0;
}
}

0 comments on commit 9846b84

Please sign in to comment.