Skip to content

Commit

Permalink
Testkit: Make Testcontainer logs visible (#1281)
Browse files Browse the repository at this point in the history
  • Loading branch information
seglo committed Dec 21, 2020
1 parent f1d4431 commit e1ee032
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 18 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ stages:
if: repo = akka/alpakka-kafka AND ( ( branch = master AND type = push ) OR tag =~ ^v )

after_failure:
- docker-compose logs
- find . -name "*.log" -exec ./scripts/cat-log.sh {} \;

before_cache:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class AlpakkaKafkaContainer extends GenericContainer<AlpakkaKafkaContaine

protected String externalZookeeperConnect = null;

private int brokerNum = 1;

private int port = PORT_NOT_ASSIGNED;
private int jmxPort = PORT_NOT_ASSIGNED;

Expand All @@ -60,8 +62,11 @@ public AlpakkaKafkaContainer(String confluentPlatformVersion) {
TestcontainersConfiguration.getInstance().getKafkaImage() + ":" + confluentPlatformVersion);

super.withNetwork(Network.SHARED);

withExposedPorts(KAFKA_PORT);

withBrokerNum(this.brokerNum);

// Use two listeners with different names, it will force Kafka to communicate with itself via
// internal
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the
Expand All @@ -70,7 +75,6 @@ public AlpakkaKafkaContainer(String confluentPlatformVersion) {
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");

withEnv("KAFKA_BROKER_ID", "1");
withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1");
withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1");
withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
Expand All @@ -83,6 +87,15 @@ public AlpakkaKafkaContainer withNetwork(Network network) {
return super.withNetwork(network);
}

public AlpakkaKafkaContainer withBrokerNum(int brokerNum) {
if (brokerNum != this.brokerNum) {
this.brokerNum = brokerNum;
return super.withNetworkAliases("broker-" + this.brokerNum)
.withEnv("KAFKA_BROKER_ID", "" + this.brokerNum);
}
return this;
}

@Override
public Network getNetwork() {
if (useImplicitNetwork) {
Expand All @@ -97,6 +110,10 @@ public Network getNetwork() {
return super.getNetwork();
}

public int getBrokerNum() {
return brokerNum;
}

public void stopKafka() {
try {
ExecResult execResult = execInContainer("sh", "-c", "touch /tmp/stop");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.lifecycle.Startables;

import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -33,7 +35,9 @@ public class KafkaContainerCluster implements Startable {
public static final String CONFLUENT_PLATFORM_VERSION =
AlpakkaKafkaContainer.DEFAULT_CONFLUENT_PLATFORM_VERSION;
public static final int START_TIMEOUT_SECONDS = 120;
public static final int READINESS_CHECK_TIMEOUT = START_TIMEOUT_SECONDS;

private static final String LOGGING_NAMESPACE_PREFIX = "akka.kafka.testkit.testcontainers.logs";
private static final String READINESS_CHECK_SCRIPT = "/testcontainers_readiness_check.sh";
private static final String READINESS_CHECK_TOPIC = "ready-kafka-container-cluster";
private static final Version BOOTSTRAP_PARAM_MIN_VERSION = new Version("5.2.0");
Expand All @@ -42,20 +46,22 @@ public class KafkaContainerCluster implements Startable {
private final Version confluentPlatformVersion;
private final int brokersNum;
private final Boolean useSchemaRegistry;
private final Boolean containerLogging;
private final Network network;
private final GenericContainer zookeeper;
private final Collection<AlpakkaKafkaContainer> brokers;
private Optional<SchemaRegistryContainer> schemaRegistry;
private Optional<SchemaRegistryContainer> schemaRegistry = Optional.empty();

public KafkaContainerCluster(int brokersNum, int internalTopicsRf) {
this(CONFLUENT_PLATFORM_VERSION, brokersNum, internalTopicsRf, false);
this(CONFLUENT_PLATFORM_VERSION, brokersNum, internalTopicsRf, false, false);
}

public KafkaContainerCluster(
String confluentPlatformVersion,
int brokersNum,
int internalTopicsRf,
boolean useSchemaRegistry) {
boolean useSchemaRegistry,
boolean containerLogging) {
if (brokersNum < 0) {
throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
}
Expand All @@ -69,6 +75,7 @@ public KafkaContainerCluster(
this.confluentPlatformVersion = new Version(confluentPlatformVersion);
this.brokersNum = brokersNum;
this.useSchemaRegistry = useSchemaRegistry;
this.containerLogging = containerLogging;
this.network = Network.newNetwork();

this.zookeeper =
Expand All @@ -83,7 +90,7 @@ public KafkaContainerCluster(
brokerNum ->
new AlpakkaKafkaContainer(confluentPlatformVersion)
.withNetwork(this.network)
.withNetworkAliases("broker-" + brokerNum)
.withBrokerNum(brokerNum)
.withRemoteJmxService()
.dependsOn(this.zookeeper)
.withExternalZookeeper("zookeeper:" + AlpakkaKafkaContainer.ZOOKEEPER_PORT)
Expand Down Expand Up @@ -139,6 +146,7 @@ private Stream<GenericContainer> allContainers() {
@Override
public void start() {
try {
configureContainerLogging();
Stream<Startable> startables = this.brokers.stream().map(Startable.class::cast);
Startables.deepStart(startables).get(START_TIMEOUT_SECONDS, SECONDS);

Expand Down Expand Up @@ -169,11 +177,29 @@ public void start() {
}
}

private void configureContainerLogging() {
if (containerLogging) {
log.debug("Testcontainer logging enabled");
this.brokers.forEach(
broker ->
setContainerLogger(
LOGGING_NAMESPACE_PREFIX + ".broker.broker-" + broker.getBrokerNum(), broker));
setContainerLogger(LOGGING_NAMESPACE_PREFIX + ".zookeeper", this.zookeeper);
this.schemaRegistry.ifPresent(
container -> setContainerLogger(LOGGING_NAMESPACE_PREFIX + ".schemaregistry", container));
}
}

private void setContainerLogger(String loggerName, GenericContainer<?> container) {
Logger logger = LoggerFactory.getLogger(loggerName);
Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(logger);
container.withLogConsumer(logConsumer);
}

private void waitForClusterFormation() {
// assert that cluster has formed
Unreliables.retryUntilTrue(
START_TIMEOUT_SECONDS,
TimeUnit.SECONDS,
runReadinessCheck(
"Readiness check (1/2). ZooKeeper state updated.",
() -> {
Container.ExecResult result =
this.zookeeper.execInContainer(
Expand All @@ -186,10 +212,8 @@ private void waitForClusterFormation() {
return brokers != null && brokers.split(",").length == this.brokersNum;
});

// test produce & consume message with full cluster involvement
Unreliables.retryUntilTrue(
START_TIMEOUT_SECONDS,
TimeUnit.SECONDS,
runReadinessCheck(
"Readiness check (2/2). Run producer consumer with acks=all.",
() -> this.brokers.stream().findFirst().map(this::runReadinessCheck).orElse(false));
}

Expand All @@ -202,6 +226,17 @@ public void startKafka() {
waitForClusterFormation();
}

private void runReadinessCheck(String logLine, Callable<Boolean> fn) {
try {
log.debug("Start: {}", logLine);
Unreliables.retryUntilTrue(READINESS_CHECK_TIMEOUT, TimeUnit.SECONDS, fn);
} catch (Throwable t) {
log.error("Failed: {}", logLine);
throw t;
}
log.debug("Passed: {}", logLine);
}

private String readinessCheckScript() {
String connect = kafkaTopicConnectParam();
String command = "#!/bin/bash \n";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.testkit.KafkaTestkitTestcontainersSettings.this")
4 changes: 4 additions & 0 deletions testkit/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,9 @@ akka.kafka.testkit.testcontainers {

# set this to true to use launch a testcontainer for Confluent Schema Registry
use-schema-registry = false

# set this to true to stream the STDOUT and STDERR of containers to SLF4J loggers
# this requires the SLF4J dependency to be on the classpath and the loggers enabled in your logging configuration
container-logging = false
}
# // #testkit-testcontainers-settings
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ final class KafkaTestkitTestcontainersSettings private (
val numBrokers: Int,
val internalTopicsReplicationFactor: Int,
val useSchemaRegistry: Boolean,
val containerLogging: Boolean,
val configureKafka: Vector[AlpakkaKafkaContainer] => Unit = _ => (),
val configureKafkaConsumer: java.util.function.Consumer[java.util.Collection[AlpakkaKafkaContainer]] =
new Consumer[java.util.Collection[AlpakkaKafkaContainer]]() {
Expand Down Expand Up @@ -49,6 +50,11 @@ final class KafkaTestkitTestcontainersSettings private (
*/
def getSchemaRegistry(): Boolean = useSchemaRegistry

/**
* Java Api
*/
def getContainerLogging(): Boolean = containerLogging

/**
* Sets the Confluent Platform Version
*/
Expand Down Expand Up @@ -104,11 +110,18 @@ final class KafkaTestkitTestcontainersSettings private (
def withSchemaRegistry(useSchemaRegistry: Boolean): KafkaTestkitTestcontainersSettings =
copy(useSchemaRegistry = useSchemaRegistry);

/**
* Stream container output to SLF4J logger(s).
*/
def withContainerLogging(containerLogging: Boolean): KafkaTestkitTestcontainersSettings =
copy(containerLogging = containerLogging)

private def copy(
confluentPlatformVersion: String = confluentPlatformVersion,
numBrokers: Int = numBrokers,
internalTopicsReplicationFactor: Int = internalTopicsReplicationFactor,
useSchemaRegistry: Boolean = useSchemaRegistry,
containerLogging: Boolean = containerLogging,
configureKafka: Vector[AlpakkaKafkaContainer] => Unit = configureKafka,
configureKafkaConsumer: java.util.function.Consumer[java.util.Collection[AlpakkaKafkaContainer]] =
configureKafkaConsumer,
Expand All @@ -119,6 +132,7 @@ final class KafkaTestkitTestcontainersSettings private (
numBrokers,
internalTopicsReplicationFactor,
useSchemaRegistry,
containerLogging,
configureKafka,
configureKafkaConsumer,
configureZooKeeper,
Expand All @@ -129,7 +143,8 @@ final class KafkaTestkitTestcontainersSettings private (
s"confluentPlatformVersion=$confluentPlatformVersion," +
s"numBrokers=$numBrokers," +
s"internalTopicsReplicationFactor=$internalTopicsReplicationFactor," +
s"useSchemaRegistry=$useSchemaRegistry)"
s"useSchemaRegistry=$useSchemaRegistry," +
s"containerLogging=$containerLogging)"
}

object KafkaTestkitTestcontainersSettings {
Expand All @@ -156,11 +171,13 @@ object KafkaTestkitTestcontainersSettings {
val numBrokers = config.getInt("num-brokers")
val internalTopicsReplicationFactor = config.getInt("internal-topics-replication-factor")
val useSchemaRegistry = config.getBoolean("use-schema-registry")
val containerLogging = config.getBoolean("container-logging")

new KafkaTestkitTestcontainersSettings(confluentPlatformVersion,
numBrokers,
internalTopicsReplicationFactor,
useSchemaRegistry)
useSchemaRegistry,
containerLogging)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ object TestcontainersKafka {
cluster = new KafkaContainerCluster(settings.confluentPlatformVersion,
numBrokers,
internalTopicsReplicationFactor,
settings.useSchemaRegistry)
settings.useSchemaRegistry,
settings.containerLogging)
configureKafka(brokerContainers)
configureKafkaConsumer.accept(brokerContainers.asJavaCollection)
configureZooKeeper(zookeeperContainer)
Expand Down
1 change: 1 addition & 0 deletions tests/src/it/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

<logger name="akka" level="DEBUG"/>
<logger name="akka.kafka" level="DEBUG"/>
<logger name="akka.kafka.test.testcontainers.logs" level="INFO" />
<logger name="docs.scaladsl" levle="DEBUG"/>

<logger name="org.apache.zookeeper" level="WARN"/>
Expand Down
2 changes: 2 additions & 0 deletions tests/src/it/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ akka {
single-expect-default = 10s
}

kafka.testkit.testcontainers.container-logging = true

kafka.consumer {
stop-timeout = 3 s
}
Expand Down
7 changes: 5 additions & 2 deletions tests/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,8 @@ our-kafka-consumer: ${akka.kafka.consumer} {
}
# #consumer-config-inheritance

# enabled for all tests because the cluster is only started once per test run
akka.kafka.testkit.testcontainers.use-schema-registry = true
akka.kafka.testkit.testcontainers {
# enabled for all tests because the cluster is only started once per test run
use-schema-registry = true
container-logging = true
}

0 comments on commit e1ee032

Please sign in to comment.