Skip to content

Commit

Permalink
javafmt
Browse files Browse the repository at this point in the history
  • Loading branch information
seglo committed Jun 16, 2020
1 parent 545ad6b commit 18152bf
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
/**
* This container wraps Confluent Kafka and Zookeeper (optionally)
*
* This is a copy of KafkaContainer from testcontainers/testcontainers-java that we can tweak as needed
* <p>This is a copy of KafkaContainer from testcontainers/testcontainers-java that we can tweak as
* needed
*/
@InternalApi
public class AlpakkaKafkaContainer extends GenericContainer<AlpakkaKafkaContainer> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@

import static java.util.concurrent.TimeUnit.SECONDS;

/**
* Provides an easy way to launch a Kafka cluster with multiple brokers.
*/
/** Provides an easy way to launch a Kafka cluster with multiple brokers. */
@InternalApi
public class KafkaContainerCluster implements Startable {

public static final String CONFLUENT_PLATFORM_VERSION =
AlpakkaKafkaContainer.DEFAULT_CP_PLATFORM_VERSION;
AlpakkaKafkaContainer.DEFAULT_CP_PLATFORM_VERSION;
public static final int START_TIMEOUT_SECONDS = 1200;

private static final String TEST_SCRIPT = "/testcontainers_test.sh";
Expand All @@ -50,43 +48,43 @@ public KafkaContainerCluster(int brokersNum, int internalTopicsRf) {
}

public KafkaContainerCluster(
String confluentPlatformVersion, int brokersNum, int internalTopicsRf) {
String confluentPlatformVersion, int brokersNum, int internalTopicsRf) {
if (brokersNum < 0) {
throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
}
if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) {
throw new IllegalArgumentException(
"internalTopicsRf '"
+ internalTopicsRf
+ "' must be less than brokersNum and greater than 0");
"internalTopicsRf '"
+ internalTopicsRf
+ "' must be less than brokersNum and greater than 0");
}

this.brokersNum = brokersNum;
this.network = Network.newNetwork();

this.zookeeper =
new GenericContainer("confluentinc/cp-zookeeper:" + confluentPlatformVersion)
.withNetwork(network)
.withNetworkAliases("zookeeper")
.withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(AlpakkaKafkaContainer.ZOOKEEPER_PORT));
new GenericContainer("confluentinc/cp-zookeeper:" + confluentPlatformVersion)
.withNetwork(network)
.withNetworkAliases("zookeeper")
.withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(AlpakkaKafkaContainer.ZOOKEEPER_PORT));

this.brokers =
IntStream.range(0, this.brokersNum)
.mapToObj(
brokerNum ->
new AlpakkaKafkaContainer(confluentPlatformVersion)
.withNetwork(this.network)
.withNetworkAliases("broker-" + brokerNum)
.withRemoteJmxService()
.dependsOn(this.zookeeper)
.withExternalZookeeper("zookeeper:" + AlpakkaKafkaContainer.ZOOKEEPER_PORT)
.withEnv("KAFKA_BROKER_ID", brokerNum + "")
.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", internalTopicsRf + "")
.withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", internalTopicsRf + "")
.withEnv(
"KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", internalTopicsRf + "")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicsRf + ""))
.collect(Collectors.toList());
IntStream.range(0, this.brokersNum)
.mapToObj(
brokerNum ->
new AlpakkaKafkaContainer(confluentPlatformVersion)
.withNetwork(this.network)
.withNetworkAliases("broker-" + brokerNum)
.withRemoteJmxService()
.dependsOn(this.zookeeper)
.withExternalZookeeper("zookeeper:" + AlpakkaKafkaContainer.ZOOKEEPER_PORT)
.withEnv("KAFKA_BROKER_ID", brokerNum + "")
.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", internalTopicsRf + "")
.withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", internalTopicsRf + "")
.withEnv(
"KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", internalTopicsRf + "")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicsRf + ""))
.collect(Collectors.toList());
}

public Network getNetwork() {
Expand All @@ -103,8 +101,8 @@ public Collection<AlpakkaKafkaContainer> getBrokers() {

public String getBootstrapServers() {
return brokers.stream()
.map(AlpakkaKafkaContainer::getBootstrapServers)
.collect(Collectors.joining(","));
.map(AlpakkaKafkaContainer::getBootstrapServers)
.collect(Collectors.joining(","));
}

private Stream<GenericContainer> allContainers() {
Expand All @@ -121,23 +119,27 @@ public void start() {

// assert that cluster has formed
Unreliables.retryUntilTrue(
START_TIMEOUT_SECONDS,
TimeUnit.SECONDS,
() -> Stream.of(this.zookeeper)
.map(this::clusterBrokers)
.anyMatch(brokers -> brokers.split(",").length == this.brokersNum));

this.brokers.stream().findFirst().ifPresent(broker -> {
broker.copyFileToContainer(
Transferable.of(testScript().getBytes(StandardCharsets.UTF_8), 700), TEST_SCRIPT);
});
START_TIMEOUT_SECONDS,
TimeUnit.SECONDS,
() ->
Stream.of(this.zookeeper)
.map(this::clusterBrokers)
.anyMatch(brokers -> brokers.split(",").length == this.brokersNum));

this.brokers.stream()
.findFirst()
.ifPresent(
broker -> {
broker.copyFileToContainer(
Transferable.of(testScript().getBytes(StandardCharsets.UTF_8), 700),
TEST_SCRIPT);
});

// test produce & consume message with full cluster involvement
Unreliables.retryUntilTrue(
START_TIMEOUT_SECONDS,
TimeUnit.SECONDS,
() -> this.brokers.stream().findFirst().map(this::runTest).orElse(false)
);
START_TIMEOUT_SECONDS,
TimeUnit.SECONDS,
() -> this.brokers.stream().findFirst().map(this::runTest).orElse(false));
} catch (Exception ex) {
throw new RuntimeException(ex);
}
Expand All @@ -147,39 +149,59 @@ private String clusterBrokers(GenericContainer c) {
try {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
dockerClient
.execStartCmd(
dockerClient
.execCreateCmd(c.getContainerId())
.withAttachStdout(true)
.withCmd(
"sh",
"-c",
"zookeeper-shell zookeeper:"
+ AlpakkaKafkaContainer.ZOOKEEPER_PORT
+ " ls /brokers/ids | tail -n 1")
.exec()
.getId())
.exec(new ExecStartResultCallback(outputStream, null))
.awaitCompletion();
.execStartCmd(
dockerClient
.execCreateCmd(c.getContainerId())
.withAttachStdout(true)
.withCmd(
"sh",
"-c",
"zookeeper-shell zookeeper:"
+ AlpakkaKafkaContainer.ZOOKEEPER_PORT
+ " ls /brokers/ids | tail -n 1")
.exec()
.getId())
.exec(new ExecStartResultCallback(outputStream, null))
.awaitCompletion();
return outputStream.toString();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

/**
* Adapted from Kafka test template in incubator/kafka from Helm hub (https://github.com/helm/charts)
* Adapted from Kafka test template in incubator/kafka from Helm hub
* (https://github.com/helm/charts)
* https://github.com/helm/charts/blob/master/incubator/kafka/templates/tests/test_topic_create_consume_produce.yaml
*/
private String testScript() {
String command = "#!/bin/bash \n";
command += "set -e \n";
command += "kafka-topics --bootstrap-server localhost:9092 --delete --topic " + TEST_TOPIC + " || echo \"topic does not exist\" \n";
command += "kafka-topics --bootstrap-server localhost:9092 --topic " + TEST_TOPIC + " --create --partitions " + this.brokersNum + " --replication-factor " + this.brokersNum + " --config min.insync.replicas=" + this.brokersNum + " \n";
command +=
"kafka-topics --bootstrap-server localhost:9092 --delete --topic "
+ TEST_TOPIC
+ " || echo \"topic does not exist\" \n";
command +=
"kafka-topics --bootstrap-server localhost:9092 --topic "
+ TEST_TOPIC
+ " --create --partitions "
+ this.brokersNum
+ " --replication-factor "
+ this.brokersNum
+ " --config min.insync.replicas="
+ this.brokersNum
+ " \n";
command += "MESSAGE=\"`date -u`\" \n";
command += "echo \"$MESSAGE\" | kafka-console-producer --broker-list localhost:9092 --topic " + TEST_TOPIC + " --producer-property acks=all \n";
command += "kafka-console-consumer --bootstrap-server localhost:9092 --topic " + TEST_TOPIC + " --from-beginning --timeout-ms 2000 --max-messages 1 | grep \"$MESSAGE\" \n";
command += "kafka-topics --bootstrap-server localhost:9092 --delete --topic " + TEST_TOPIC + " \n";
command +=
"echo \"$MESSAGE\" | kafka-console-producer --broker-list localhost:9092 --topic "
+ TEST_TOPIC
+ " --producer-property acks=all \n";
command +=
"kafka-console-consumer --bootstrap-server localhost:9092 --topic "
+ TEST_TOPIC
+ " --from-beginning --timeout-ms 2000 --max-messages 1 | grep \"$MESSAGE\" \n";
command +=
"kafka-topics --bootstrap-server localhost:9092 --delete --topic " + TEST_TOPIC + " \n";
command += "echo \"test succeeded\" \n";
return command;
}
Expand All @@ -188,18 +210,15 @@ private Boolean runTest(GenericContainer c) {
try {
ByteArrayOutputStream stdoutStream = new ByteArrayOutputStream();
dockerClient
.execStartCmd(
dockerClient
.execCreateCmd(c.getContainerId())
.withAttachStdout(true)
.withCmd(
"sh",
"-c",
TEST_SCRIPT)
.exec()
.getId())
.exec(new ExecStartResultCallback(stdoutStream, null))
.awaitCompletion();
.execStartCmd(
dockerClient
.execCreateCmd(c.getContainerId())
.withAttachStdout(true)
.withCmd("sh", "-c", TEST_SCRIPT)
.exec()
.getId())
.exec(new ExecStartResultCallback(stdoutStream, null))
.awaitCompletion();
return stdoutStream.toString().contains("test succeeded");
} catch (Exception ex) {
throw new RuntimeException(ex);
Expand Down

0 comments on commit 18152bf

Please sign in to comment.