Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StrimziKafkaCluster KRaft mode #89

Merged
merged 4 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 88 additions & 31 deletions src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -35,19 +36,22 @@ public class StrimziKafkaCluster implements KafkaContainer {

// class attributes
private static final Logger LOGGER = LoggerFactory.getLogger(StrimziKafkaCluster.class);
private static final String NETWORK_ALIAS_PREFIX = "broker-";

// instance attributes
private int brokersNum;
private int internalTopicReplicationFactor;
private final int brokersNum;
private final int internalTopicReplicationFactor;
private Map<String, String> additionalKafkaConfiguration;
private ToxiproxyContainer proxyContainer;
private boolean enableSharedNetwork;
private String kafkaVersion;
private boolean enableKraft;

// not editable
private final Network network;
private final StrimziZookeeperContainer zookeeper;
private StrimziZookeeperContainer zookeeper;
private Collection<KafkaContainer> brokers;
private String clusterId;

/**
* Constructor for @StrimziKafkaCluster class, which allows you to specify number of brokers @see{brokersNum},
Expand Down Expand Up @@ -141,12 +145,16 @@ private StrimziKafkaCluster(StrimziKafkaClusterBuilder builder) {
this.additionalKafkaConfiguration = builder.additionalKafkaConfiguration;
this.proxyContainer = builder.proxyContainer;
this.kafkaVersion = builder.kafkaVersion;
this.enableKraft = builder.enableKRaft;
this.clusterId = builder.clusterId;

validateBrokerNum(this.brokersNum);
validateInternalTopicReplicationFactor(this.internalTopicReplicationFactor, this.brokersNum);

this.zookeeper = new StrimziZookeeperContainer()
.withNetwork(this.network);
if (this.isZooKeeperBasedKafkaCluster()) {
this.zookeeper = new StrimziZookeeperContainer()
.withNetwork(this.network);
}

if (this.proxyContainer != null) {
this.proxyContainer.setNetwork(this.network);
Expand All @@ -162,6 +170,11 @@ private void prepareKafkaCluster(final Map<String, String> additionalKafkaConfig
defaultKafkaConfigurationForMultiNode.put("transaction.state.log.replication.factor", String.valueOf(internalTopicReplicationFactor));
defaultKafkaConfigurationForMultiNode.put("transaction.state.log.min.isr", String.valueOf(internalTopicReplicationFactor));

if (this.isKraftKafkaCluster()) {
// we have to configure quorum voters but also we simplify process because we use network aliases (i.e., broker-<id>)
this.configureQuorumVoters(additionalKafkaConfiguration);
}

if (additionalKafkaConfiguration != null) {
defaultKafkaConfigurationForMultiNode.putAll(additionalKafkaConfiguration);
}
Expand All @@ -172,15 +185,28 @@ private void prepareKafkaCluster(final Map<String, String> additionalKafkaConfig
.mapToObj(brokerId -> {
LOGGER.info("Starting broker with id {}", brokerId);
// adding broker id for each kafka container
KafkaContainer kafkaContainer = new StrimziKafkaContainer()
StrimziKafkaContainer kafkaContainer = new StrimziKafkaContainer()
.withBrokerId(brokerId)
.withKafkaConfigurationMap(defaultKafkaConfigurationForMultiNode)
.withExternalZookeeperConnect("zookeeper:" + StrimziZookeeperContainer.ZOOKEEPER_PORT)
.withNetwork(this.network)
.withProxyContainer(proxyContainer)
.withNetworkAliases("broker-" + brokerId)
.withKafkaVersion(kafkaVersion == null ? KafkaVersionService.getInstance().latestRelease().getVersion() : kafkaVersion)
.dependsOn(this.zookeeper);
.withNetworkAliases(NETWORK_ALIAS_PREFIX + brokerId)
.withKafkaVersion(kafkaVersion == null ? KafkaVersionService.getInstance().latestRelease().getVersion() : kafkaVersion);

// if it's ZK-based Kafka cluster we depend on ZK container and we need to specify external ZK connect
if (this.isZooKeeperBasedKafkaCluster()) {
kafkaContainer.withExternalZookeeperConnect("zookeeper:" + StrimziZookeeperContainer.ZOOKEEPER_PORT)
.dependsOn(this.zookeeper);
} else {
kafkaContainer
// if KRaft we need to enable it
.withKraft()
// One must set `node.id` to the same value as `broker.id` if we use KRaft mode
.withNodeId(brokerId)
// pass shared `cluster.id` to each broker
.withClusterId(this.clusterId)
.waitForRunning();
}

LOGGER.info("Started broker with id: {}", kafkaContainer);

Expand Down Expand Up @@ -208,6 +234,8 @@ public static class StrimziKafkaClusterBuilder {
private ToxiproxyContainer proxyContainer;
private boolean enableSharedNetwork;
private String kafkaVersion;
private boolean enableKRaft;
private String clusterId;

/**
* Sets the number of Kafka brokers in the cluster.
Expand Down Expand Up @@ -280,12 +308,20 @@ public StrimziKafkaClusterBuilder withKafkaVersion(String kafkaVersion) {
return this;
}

public StrimziKafkaClusterBuilder withKraftEnabled() {
this.enableKRaft = true;
return this;
}

/**
* Builds and returns a {@code StrimziKafkaCluster} instance based on the provided configurations.
*
* @return a new instance of {@code StrimziKafkaCluster}
*/
public StrimziKafkaCluster build() {
// Generate a single cluster ID, which will be shared by all brokers
this.clusterId = UUID.randomUUID().toString();

return new StrimziKafkaCluster(this);
}
}
Expand Down Expand Up @@ -319,6 +355,14 @@ public String getBootstrapServers() {
.collect(Collectors.joining(","));
}

public boolean isZooKeeperBasedKafkaCluster() {
return !this.enableKraft;
}

public boolean isKraftKafkaCluster() {
return this.enableKraft;
}

/* test */ int getInternalTopicReplicationFactor() {
return this.internalTopicReplicationFactor;
}
Expand All @@ -331,6 +375,15 @@ public String getBootstrapServers() {
return this.additionalKafkaConfiguration;
}

private void configureQuorumVoters(final Map<String, String> additionalKafkaConfiguration) {
// Construct controller.quorum.voters based on network aliases (broker-1, broker-2, etc.)
final String quorumVoters = IntStream.range(0, this.brokersNum)
.mapToObj(brokerId -> String.format("%d@" + NETWORK_ALIAS_PREFIX + "%d:9094", brokerId, brokerId))
.collect(Collectors.joining(","));

additionalKafkaConfiguration.put("controller.quorum.voters", quorumVoters);
}

@Override
public void start() {
Stream<KafkaContainer> startables = this.brokers.stream();
Expand All @@ -341,31 +394,35 @@ public void start() {
e.printStackTrace();
}

Utils.waitFor("Kafka brokers nodes to be connected to the ZooKeeper", Duration.ofSeconds(5).toMillis(), Duration.ofMinutes(1).toMillis(),
() -> {
Container.ExecResult result;
try {
result = this.zookeeper.execInContainer(
"sh", "-c",
"bin/zookeeper-shell.sh zookeeper:" + StrimziZookeeperContainer.ZOOKEEPER_PORT + " ls /brokers/ids | tail -n 1"
);
String brokers = result.getStdout();

LOGGER.info("Running Kafka brokers: {}", result.getStdout());

return brokers != null && brokers.split(",").length == this.brokersNum;
} catch (IOException | InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
return false;
}
});
if (this.isZooKeeperBasedKafkaCluster()) {
Utils.waitFor("Kafka brokers nodes to be connected to the ZooKeeper", Duration.ofSeconds(5).toMillis(), Duration.ofMinutes(1).toMillis(),
() -> {
Container.ExecResult result;
try {
result = this.zookeeper.execInContainer(
"sh", "-c",
"bin/zookeeper-shell.sh zookeeper:" + StrimziZookeeperContainer.ZOOKEEPER_PORT + " ls /brokers/ids | tail -n 1"
);
String brokers = result.getStdout();

LOGGER.info("Running Kafka brokers: {}", result.getStdout());

return brokers != null && brokers.split(",").length == this.brokersNum;
} catch (IOException | InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
return false;
}
});
}
}

@Override
public void stop() {
// firstly we shut-down zookeeper -> reason: 'On the command line if I kill ZK first it sometimes prevents a broker from shutting down quickly.'
this.zookeeper.stop();
if (this.isZooKeeperBasedKafkaCluster()) {
// firstly we shut-down zookeeper -> reason: 'On the command line if I kill ZK first it sometimes prevents a broker from shutting down quickly.'
this.zookeeper.stop();
}

// stop all kafka containers in parallel
this.brokers.stream()
Expand Down
Loading
Loading