Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add possiblity to specify Kafka version in StrimziKafkaCluster #87

Merged
merged 1 commit into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class StrimziKafkaCluster implements KafkaContainer {
private Map<String, String> additionalKafkaConfiguration;
private ToxiproxyContainer proxyContainer;
private boolean enableSharedNetwork;
private String kafkaVersion;

// not editable
private final Network network;
Expand Down Expand Up @@ -80,7 +81,7 @@ public StrimziKafkaCluster(final int brokersNum,
proxyContainer.setNetwork(this.network);
}

prepareKafkaCluster(additionalKafkaConfiguration);
prepareKafkaCluster(additionalKafkaConfiguration, null);
}

/**
Expand Down Expand Up @@ -138,6 +139,7 @@ private StrimziKafkaCluster(StrimziKafkaClusterBuilder builder) {
this.internalTopicReplicationFactor = builder.internalTopicReplicationFactor == 0 ? this.brokersNum : builder.internalTopicReplicationFactor;
this.additionalKafkaConfiguration = builder.additionalKafkaConfiguration;
this.proxyContainer = builder.proxyContainer;
this.kafkaVersion = builder.kafkaVersion;

validateBrokerNum(this.brokersNum);
validateInternalTopicReplicationFactor(this.internalTopicReplicationFactor);
Expand All @@ -149,10 +151,10 @@ private StrimziKafkaCluster(StrimziKafkaClusterBuilder builder) {
this.proxyContainer.setNetwork(this.network);
}

prepareKafkaCluster(this.additionalKafkaConfiguration);
prepareKafkaCluster(this.additionalKafkaConfiguration, this.kafkaVersion);
}

private void prepareKafkaCluster(final Map<String, String> additionalKafkaConfiguration) {
private void prepareKafkaCluster(final Map<String, String> additionalKafkaConfiguration, final String kafkaVersion) {
final Map<String, String> defaultKafkaConfigurationForMultiNode = new HashMap<>();
defaultKafkaConfigurationForMultiNode.put("offsets.topic.replication.factor", String.valueOf(internalTopicReplicationFactor));
defaultKafkaConfigurationForMultiNode.put("num.partitions", String.valueOf(internalTopicReplicationFactor));
Expand All @@ -176,6 +178,7 @@ private void prepareKafkaCluster(final Map<String, String> additionalKafkaConfig
.withNetwork(this.network)
.withProxyContainer(proxyContainer)
.withNetworkAliases("broker-" + brokerId)
.withKafkaVersion(kafkaVersion == null ? KafkaVersionService.getInstance().latestRelease().getVersion() : kafkaVersion)
.dependsOn(this.zookeeper);

LOGGER.info("Started broker with id: {}", kafkaContainer);
Expand Down Expand Up @@ -203,6 +206,7 @@ public static class StrimziKafkaClusterBuilder {
private Map<String, String> additionalKafkaConfiguration = new HashMap<>();
private ToxiproxyContainer proxyContainer;
private boolean enableSharedNetwork;
private String kafkaVersion;

/**
* Sets the number of Kafka brokers in the cluster.
Expand Down Expand Up @@ -263,6 +267,18 @@ public StrimziKafkaClusterBuilder withSharedNetwork() {
return this;
}

/**
* Specifies the Kafka version to be used for the brokers in the cluster.
* If no version is provided, the latest Kafka version available from {@link KafkaVersionService} will be used.
*
* @param kafkaVersion the desired Kafka version for the cluster
* @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
*/
public StrimziKafkaClusterBuilder withKafkaVersion(String kafkaVersion) {
this.kafkaVersion = kafkaVersion;
return this;
}

/**
* Builds and returns a {@code StrimziKafkaCluster} instance based on the provided configurations.
*
Expand Down Expand Up @@ -302,6 +318,18 @@ public String getBootstrapServers() {
.collect(Collectors.joining(","));
}

/* test */ int getInternalTopicReplicationFactor() {
return this.internalTopicReplicationFactor;
}

/* test */ boolean isSharedNetworkEnabled() {
return this.enableSharedNetwork;
}

/* test */ Map<String, String> getAdditionalKafkaConfiguration() {
return this.additionalKafkaConfiguration;
}

@Override
public void start() {
Stream<KafkaContainer> startables = this.brokers.stream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,4 +517,8 @@ public synchronized Proxy getProxy() {
}
return this.proxy;
}

/* test */ String getKafkaVersion() {
return this.kafkaVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
*/
package io.strimzi.test.container;

import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.ToxiproxyContainer;

import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;

Expand Down Expand Up @@ -105,4 +107,55 @@ void testKafkaClusterWithAdditionalConfiguration() {
.build()
);
}

@Test
void testKafkaClusterWithSpecificKafkaVersion() {
assertDoesNotThrow(() ->
new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(3)
.withInternalTopicReplicationFactor(3)
.withKafkaVersion("3.7.1")
.build()
);
}

@Test
void testKafkaClusterWithMultipleBrokersAndReplicationFactor() {
StrimziKafkaCluster cluster = new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(5)
.withInternalTopicReplicationFactor(3)
.build();

assertThat(cluster.getBrokers().size(), CoreMatchers.is(5));
assertThat(cluster.getInternalTopicReplicationFactor(), CoreMatchers.is(3));
}

@Test
void testKafkaClusterWithCustomNetworkConfiguration() {
StrimziKafkaCluster cluster = new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(4)
.withSharedNetwork()
.withInternalTopicReplicationFactor(2)
.build();

assertThat(cluster.getBrokers().size(), CoreMatchers.is(4));
assertThat(cluster.isSharedNetworkEnabled(), CoreMatchers.is(true));
}

@Test
void testKafkaClusterWithKafkaVersionAndAdditionalConfigs() {
Map<String, String> additionalConfigs = new HashMap<>();
additionalConfigs.put("log.retention.bytes", "10485760");

StrimziKafkaCluster cluster = new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(3)
.withInternalTopicReplicationFactor(3)
.withKafkaVersion("3.7.1")
.withAdditionalKafkaConfiguration(additionalConfigs)
.build();

assertThat(cluster.getBrokers().size(), CoreMatchers.is(3));
assertThat(((StrimziKafkaContainer) cluster.getBrokers().iterator().next()).getKafkaVersion(), CoreMatchers.is("3.7.1"));
assertThat(cluster.getAdditionalKafkaConfiguration().get("log.retention.bytes"), CoreMatchers.is("10485760"));
}
}
Loading