Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various Java cleanups #114

Merged
merged 1 commit into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,5 @@ public enum AuthenticationType {
/**
* GSSAPI (Kerberos) authentication.
*/
GSSAPI;
GSSAPI
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected static String strimziTestContainerImageName(String kafkaVersion) {
}
throw new UnknownKafkaVersionException("Doesn't know the specified Kafka version: " + kafkaVersion + ". " +
"The supported Kafka versions are: " +
KafkaVersionService.getInstance().logicalKafkaVersionEntities.stream().map(KafkaVersion::getVersion).collect(Collectors.toList()).toString());
KafkaVersionService.getInstance().logicalKafkaVersionEntities.stream().map(KafkaVersion::getVersion).collect(Collectors.toList()));
}
}
return imageName;
Expand Down Expand Up @@ -197,7 +197,7 @@ public String toString() {
*/
public KafkaVersion latestRelease() {
// at least one release in the json schema is needed
if (this.logicalKafkaVersionEntities == null || this.logicalKafkaVersionEntities.size() < 1) {
if (this.logicalKafkaVersionEntities.isEmpty()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field is final and initialized when declared so it should never be null.

throw new IllegalStateException("Wrong json schema! It must have at least one release");
}

Expand Down Expand Up @@ -227,7 +227,7 @@ public KafkaVersion latestRelease() {
* *
* | 2.8.1 &lt;---&gt; 2.8.2 &lt;---&gt; 3.0.0 |
* ============================================================
* Assuming that test container images `kafka_versions.yaml` has following content:
* Assuming that test container images `kafka_versions.yaml` has the following content:
* {
* "version": 1,
* "kafkaVersions": {
Expand All @@ -239,7 +239,7 @@ public KafkaVersion latestRelease() {
* @return LogicalKafkaVersion the previous minor release
*/
public KafkaVersion previousMinor() {
if (this.logicalKafkaVersionEntities == null || this.logicalKafkaVersionEntities.size() < 1) {
if (this.logicalKafkaVersionEntities.isEmpty()) {
throw new IllegalStateException("Wrong json schema! It must have at least one release");
}

Expand Down
18 changes: 9 additions & 9 deletions src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@ public class StrimziKafkaCluster implements KafkaContainer {
// instance attributes
private final int brokersNum;
private final int internalTopicReplicationFactor;
private Map<String, String> additionalKafkaConfiguration;
private ToxiproxyContainer proxyContainer;
private boolean enableSharedNetwork;
private String kafkaVersion;
private boolean enableKraft;
private final Map<String, String> additionalKafkaConfiguration;
private final ToxiproxyContainer proxyContainer;
private final boolean enableSharedNetwork;
private final String kafkaVersion;
private final boolean enableKraft;

// not editable
private final Network network;
private StrimziZookeeperContainer zookeeper;
private Collection<KafkaContainer> brokers;
private String clusterId;
private final String clusterId;

private StrimziKafkaCluster(StrimziKafkaClusterBuilder builder) {
this.brokersNum = builder.brokersNum;
Expand Down Expand Up @@ -267,7 +267,7 @@ public Collection<KafkaContainer> getBrokers() {
@DoNotMutate
public boolean hasKraftOrExternalZooKeeperConfigured() {
KafkaContainer broker0 = brokers.iterator().next();
return broker0.hasKraftOrExternalZooKeeperConfigured() ? true : false;
return broker0.hasKraftOrExternalZooKeeperConfigured();
}

@Override
Expand Down Expand Up @@ -347,7 +347,7 @@ public void start() {
}

if (this.isZooKeeperBasedKafkaCluster()) {
Utils.waitFor("Kafka brokers nodes to be connected to the ZooKeeper", Duration.ofSeconds(1).toMillis(), Duration.ofMinutes(1).toMillis(),
Utils.waitFor("Kafka brokers nodes to be connected to the ZooKeeper", Duration.ofSeconds(1), Duration.ofMinutes(1),
() -> {
Container.ExecResult result;
try {
Expand All @@ -367,7 +367,7 @@ public void start() {
});
} else if (this.isKraftKafkaCluster()) {
// Readiness check for KRaft mode
Utils.waitFor("Kafka brokers to form a quorum", Duration.ofSeconds(1).toMillis(), Duration.ofMinutes(1).toMillis(),
Utils.waitFor("Kafka brokers to form a quorum", Duration.ofSeconds(1), Duration.ofMinutes(1),
() -> {
try {
for (KafkaContainer kafkaContainer : this.brokers) {
Expand Down
17 changes: 9 additions & 8 deletions src/main/java/io/strimzi/test/container/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.net.ServerSocket;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Optional;
import java.util.function.BooleanSupplier;

Expand All @@ -27,19 +28,19 @@ class Utils {
private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class);

/**
* Poll the given {@code ready} function every {@code pollIntervalMs} milliseconds until it returns true,
* or throw a WaitException if it doesn't returns true within {@code timeoutMs} milliseconds.
* Poll the given {@code ready} function every {@code pollInterval} until it returns true,
* or throw a WaitException if it doesn't returns true within {@code timeout}.
* @return The remaining time left until timeout occurs
* (helpful if you have several calls which need to share a common timeout),
*
* @param description waiting for `description`
* @param pollIntervalMs poll interval in milliseconds
* @param timeoutMs timeout in milliseconds
* @param pollInterval poll interval
* @param timeout timeout
* @param ready lambda predicate
*/
static long waitFor(String description, long pollIntervalMs, long timeoutMs, BooleanSupplier ready) {
static long waitFor(String description, Duration pollInterval, Duration timeout, BooleanSupplier ready) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class is not public so I assume it's only used by this project and we can change its declaration.

LOGGER.debug("Waiting for {}", description);
long deadline = System.currentTimeMillis() + timeoutMs;
long deadline = System.currentTimeMillis() + timeout.toMillis();
String exceptionMessage = null;
int exceptionCount = 0;
StringWriter stackTraceError = new StringWriter();
Expand Down Expand Up @@ -71,11 +72,11 @@ static long waitFor(String description, long pollIntervalMs, long timeoutMs, Boo
LOGGER.error(stackTraceError.toString());
}
}
WaitException waitException = new WaitException("Timeout after " + timeoutMs + " ms waiting for " + description);
WaitException waitException = new WaitException("Timeout after " + timeout.toMillis() + " ms waiting for " + description);
waitException.addSuppressed(waitException);
throw waitException;
}
long sleepTime = Math.min(pollIntervalMs, timeLeft);
long sleepTime = Math.min(pollInterval.toMillis(), timeLeft);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("{} not satisfied, will try again in {} ms ({}ms till timeout)", description, sleepTime, timeLeft);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import eu.rekawek.toxiproxy.Proxy;
import org.apache.kafka.clients.admin.AdminClient;
Expand Down Expand Up @@ -111,7 +110,7 @@ void testStartClusterWithProxyContainer() {
}

assertThat(systemUnderTest.getBootstrapServers(),
is(bootstrapUrls.stream().collect(Collectors.joining(","))));
is(String.join(",", bootstrapUrls)));
} finally {
systemUnderTest.stop();
}
Expand Down Expand Up @@ -173,7 +172,7 @@ private void verifyFunctionalityOfKafkaCluster() throws ExecutionException, Inte

producer.send(new ProducerRecord<>(topicName, recordKey, recordValue)).get();

Utils.waitFor("Consumer records are present", Duration.ofSeconds(10).toMillis(), Duration.ofMinutes(2).toMillis(),
Utils.waitFor("Consumer records are present", Duration.ofSeconds(10), Duration.ofMinutes(2),
() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ void testStartContainerWithEmptyConfiguration(final String imageName) {
systemUnderTest.start();

assertThat(systemUnderTest.getBootstrapServers(), is("PLAINTEXT://"
+ systemUnderTest.getContainerIpAddress() + ":" + systemUnderTest.getMappedPort(9092)));
+ systemUnderTest.getHost() + ":" + systemUnderTest.getMappedPort(9092)));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getContainerIpAddress() is deprecated, so switching to getHost() which is the preferred method now.

}
}

Expand Down Expand Up @@ -123,7 +123,7 @@ void testStartContainerWithSSLBootstrapServers(final String imageName) {
systemUnderTest.start();

assertThat(systemUnderTest.getBootstrapServers(), is("SSL://"
+ systemUnderTest.getContainerIpAddress() + ":" + systemUnderTest.getMappedPort(9092)));
+ systemUnderTest.getHost() + ":" + systemUnderTest.getMappedPort(9092)));
}

@ParameterizedTest(name = "testStartContainerWithServerProperties-{0}")
Expand All @@ -140,7 +140,7 @@ void testStartContainerWithServerProperties(final String imageName) {
assertThat(logsFromKafka, containsString("auto.create.topics.enable = false"));

assertThat(systemUnderTest.getBootstrapServers(), is("PLAINTEXT://"
+ systemUnderTest.getContainerIpAddress() + ":" + systemUnderTest.getMappedPort(9092)));
+ systemUnderTest.getHost() + ":" + systemUnderTest.getMappedPort(9092)));
}

@Test
Expand All @@ -155,7 +155,7 @@ void testStartContainerWithStrimziKafkaImage() {
systemUnderTest.start();

assertThat(systemUnderTest.getBootstrapServers(), is("PLAINTEXT://"
+ systemUnderTest.getContainerIpAddress() + ":" + systemUnderTest.getMappedPort(9092)));
+ systemUnderTest.getHost() + ":" + systemUnderTest.getMappedPort(9092)));

assertThat(systemUnderTest.getDockerImageName(), is(imageName));

Expand All @@ -172,7 +172,7 @@ void testStartContainerWithCustomImage() {
systemUnderTest.start();

assertThat(systemUnderTest.getBootstrapServers(), is("PLAINTEXT://"
+ systemUnderTest.getContainerIpAddress() + ":" + systemUnderTest.getMappedPort(9092)));
+ systemUnderTest.getHost() + ":" + systemUnderTest.getMappedPort(9092)));

assertThat(systemUnderTest.getDockerImageName(), is(imageName));
}
Expand All @@ -188,7 +188,7 @@ void testStartContainerWithCustomNetwork() {
systemUnderTest.start();

assertThat(systemUnderTest.getBootstrapServers(), is("PLAINTEXT://"
+ systemUnderTest.getContainerIpAddress() + ":" + systemUnderTest.getMappedPort(9092)));
+ systemUnderTest.getHost() + ":" + systemUnderTest.getMappedPort(9092)));

assertThat(systemUnderTest.getNetwork().getId(), is(network.getId()));
}
Expand Down Expand Up @@ -349,7 +349,7 @@ void testKafkaContainerFunctionality() {

producer.send(new ProducerRecord<>(topicName, recordKey, recordValue)).get();

Utils.waitFor("Consumer records are present", Duration.ofSeconds(10).toMillis(), Duration.ofMinutes(2).toMillis(),
Utils.waitFor("Consumer records are present", Duration.ofSeconds(10), Duration.ofMinutes(2),
() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertTrue;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the junit 4 dependency brought via testcontainers. We should instead use org.junit.jupiter.api.Assertions which is junit 5 (the declared dependency in pom.xml).

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class StrimziKafkaContainerMockTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

class StrimziKafkaContainerTest {

Expand Down Expand Up @@ -358,8 +358,7 @@ void testConfigureOAuthBearer() {
assertThat(properties.getProperty("sasl.mechanism.controller.protocol"), is("OAUTHBEARER"));
assertThat(properties.getProperty("principal.builder.class"), is("io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder"));

String expectedJaasConfig = String.format(
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;");
String expectedJaasConfig = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;";

String listenerNameLowerCase = "plaintext";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
Expand Down Expand Up @@ -128,7 +127,7 @@ void testStartClusterWithProxyContainer() {
}

assertThat(systemUnderTest.getBootstrapServers(),
is(bootstrapUrls.stream().collect(Collectors.joining(","))));
is(String.join(",", bootstrapUrls)));
} finally {
if (systemUnderTest != null) {
systemUnderTest.stop();
Expand Down Expand Up @@ -194,7 +193,7 @@ private void verifyFunctionalityOfKafkaCluster() throws ExecutionException, Inte

producer.send(new ProducerRecord<>(topicName, recordKey, recordValue)).get();

Utils.waitFor("Consumer records are present", Duration.ofSeconds(10).toMillis(), Duration.ofMinutes(2).toMillis(),
Utils.waitFor("Consumer records are present", Duration.ofSeconds(10), Duration.ofMinutes(2),
() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void testStartContainerWithEmptyConfiguration(final String imageName, final Stri
verify();

assertThat(systemUnderTest.getBootstrapServers(), is("PLAINTEXT://" +
systemUnderTest.getContainerIpAddress() + ":" + systemUnderTest.getMappedPort(9092)));
systemUnderTest.getHost() + ":" + systemUnderTest.getMappedPort(9092)));
} finally {
systemUnderTest.stop();
}
Expand Down
Loading