Skip to content

Commit

Permalink
[ST] KRaft to KRaft upgrades/downgrades in STs (strimzi#9442)
Browse files Browse the repository at this point in the history
Signed-off-by: Lukas Kral <lukywill16@gmail.com>
  • Loading branch information
im-konge authored Dec 8, 2023
1 parent aff31bc commit f71526f
Show file tree
Hide file tree
Showing 20 changed files with 1,184 additions and 94 deletions.
10 changes: 10 additions & 0 deletions .azure/templates/jobs/system-tests/upgrade_jobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,13 @@ jobs:
timeout: 360
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'

- template: '../../steps/system_test_general.yaml'
parameters:
name: 'strimzi_upgrade_kraft'
display_name: 'strimzi-upgrade-kraft-bundle'
profile: 'azp_kraft_upgrade'
cluster_operator_install_type: 'bundle'
timeout: 360
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'
24 changes: 22 additions & 2 deletions systemtest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,14 @@
</properties>
</profile>

<profile>
<id>kraft_upgrade</id>
<properties>
<skipTests>false</skipTests>
<groups>kraftupgrade</groups>
</properties>
</profile>

<profile>
<id>operators</id>
<properties>
Expand Down Expand Up @@ -526,13 +534,25 @@
</properties>
</profile>

<profile>
<id>azp_kraft_upgrade</id>
<properties>
<skipTests>false</skipTests>
<groups>kraftupgrade</groups>
<it.test>
!KRaftKafkaUpgradeDowngradeST
</it.test>
</properties>
</profile>

<profile>
<id>azp_kafka_upgrade</id>
<properties>
<skipTests>false</skipTests>
<groups>upgrade</groups>
<groups>upgrade,kraftupgrade</groups>
<it.test>
KafkaUpgradeDowngradeST
KafkaUpgradeDowngradeST,
KRaftKafkaUpgradeDowngradeST
</it.test>
</properties>
</profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,10 @@ public interface TestConstants {
*/
String USE_KRAFT_MODE = "+UseKRaft";
String DONT_USE_KAFKA_NODE_POOLS = "-KafkaNodePools";
// kept for upgrade/downgrade tests in KRaft
String USE_KAFKA_NODE_POOLS = "+KafkaNodePools";
String DONT_USE_UNIDIRECTIONAL_TOPIC_OPERATOR = "-UnidirectionalTopicOperator";
String USE_UNIDIRECTIONAL_TOPIC_OPERATOR = "+UnidirectionalTopicOperator";

/**
* Default value which allows execution of tests with any tags
Expand All @@ -223,6 +226,11 @@ public interface TestConstants {
*/
String UPGRADE = "upgrade";

/**
* Tag for KRaft to KRaft tests.
*/
String KRAFT_UPGRADE = "kraftupgrade";

/**
* Tag for olm upgrade tests
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/
package io.strimzi.systemtest.resources.crd;

import io.fabric8.kubernetes.api.model.LabelSelector;
import io.fabric8.kubernetes.api.model.LabelSelectorBuilder;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.strimzi.api.kafka.Crds;
Expand All @@ -21,7 +23,9 @@
import io.strimzi.systemtest.utils.kubeUtils.objects.PersistentVolumeClaimUtils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import static io.strimzi.operator.common.Util.hashStub;
Expand Down Expand Up @@ -111,4 +115,21 @@ public static KafkaNodePool convertKafkaResourceToKafkaNodePool(Kafka resource)

return builder.build();
}

public static LabelSelector getLabelSelector(String clusterName, String poolName, ProcessRoles processRole) {
Map<String, String> matchLabels = new HashMap<>();
matchLabels.put(Labels.STRIMZI_CLUSTER_LABEL, clusterName);
matchLabels.put(Labels.STRIMZI_KIND_LABEL, Kafka.RESOURCE_KIND);
matchLabels.put(Labels.STRIMZI_POOL_NAME_LABEL, poolName);

switch (processRole) {
case BROKER -> matchLabels.put(Labels.STRIMZI_BROKER_ROLE_LABEL, "true");
case CONTROLLER -> matchLabels.put(Labels.STRIMZI_CONTROLLER_ROLE_LABEL, "true");
default -> throw new RuntimeException("No role for KafkaNodePool specified");
}

return new LabelSelectorBuilder()
.withMatchLabels(matchLabels)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,40 @@ public static KafkaNodePoolBuilder defaultKafkaNodePool(String namespaceName, St
.endSpec();
}

public static KafkaNodePoolBuilder kafkaNodePoolWithControllerRole(String namespaceName, String nodePoolName, String kafkaClusterName, int kafkaReplicas) {
return defaultKafkaNodePool(namespaceName, nodePoolName, kafkaClusterName, kafkaReplicas)
.editOrNewSpec()
.addToRoles(ProcessRoles.CONTROLLER)
.endSpec();
}

public static KafkaNodePoolBuilder kafkaNodePoolWithControllerRoleAndPersistentStorage(String namespaceName, String nodePoolName, String kafkaClusterName, int kafkaReplicas) {
return kafkaNodePoolWithControllerRole(namespaceName, nodePoolName, kafkaClusterName, kafkaReplicas)
.editOrNewSpec()
.withNewPersistentClaimStorage()
.withSize("1Gi")
.withDeleteClaim(true)
.endPersistentClaimStorage()
.endSpec();
}

public static KafkaNodePoolBuilder kafkaNodePoolWithBrokerRole(String namespaceName, String nodePoolName, String kafkaClusterName, int kafkaReplicas) {
return defaultKafkaNodePool(namespaceName, nodePoolName, kafkaClusterName, kafkaReplicas)
.editOrNewSpec()
.addToRoles(ProcessRoles.BROKER)
.endSpec();
}

public static KafkaNodePoolBuilder kafkaNodePoolWithBrokerRoleAndPersistentStorage(String namespaceName, String nodePoolName, String kafkaClusterName, int kafkaReplicas) {
return kafkaNodePoolWithBrokerRole(namespaceName, nodePoolName, kafkaClusterName, kafkaReplicas)
.editOrNewSpec()
.withNewPersistentClaimStorage()
.withSize("1Gi")
.withDeleteClaim(true)
.endPersistentClaimStorage()
.endSpec();
}

/**
* Creates a KafkaNodePoolBuilder for a Kafka instance (mirroring its mandatory specification) with roles based
* on the environment setting (TestConstants.USE_KRAFT_MODE) having BROKER role in Zookeeper and Kraft mode alike
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@ public class UpgradeKafkaVersion {
private String version;
private String logMessageVersion;
private String interBrokerVersion;
private String metadataVersion;

UpgradeKafkaVersion(TestKafkaVersion testKafkaVersion) {
public UpgradeKafkaVersion(TestKafkaVersion testKafkaVersion) {
this(testKafkaVersion.version(), testKafkaVersion.messageVersion(), testKafkaVersion.protocolVersion());
}

UpgradeKafkaVersion(String version) {
public UpgradeKafkaVersion(String version, String desiredMetadataVersion) {
this.version = version;
this.metadataVersion = desiredMetadataVersion;
}

public UpgradeKafkaVersion(String version) {
String shortVersion = version;

if (version != null && !version.equals("")) {
Expand All @@ -31,17 +37,18 @@ public class UpgradeKafkaVersion {
this.version = version;
this.logMessageVersion = shortVersion;
this.interBrokerVersion = shortVersion;
this.metadataVersion = shortVersion;
}

/**
* Leaving empty, so original Kafka version in `kafka-persistent.yaml` will be used
* LMFV and IBPV should be null, so the test steps will for updating the config will be skipped
*/
UpgradeKafkaVersion() {
public UpgradeKafkaVersion() {
this("", null, null);
}

UpgradeKafkaVersion(String version, String logMessageVersion, String interBrokerVersion) {
public UpgradeKafkaVersion(String version, String logMessageVersion, String interBrokerVersion) {
this.version = version;
this.logMessageVersion = logMessageVersion;
this.interBrokerVersion = interBrokerVersion;
Expand All @@ -63,6 +70,10 @@ public String getInterBrokerVersion() {
return this.interBrokerVersion;
}

public String getMetadataVersion() {
return this.metadataVersion;
}

public static UpgradeKafkaVersion getKafkaWithVersionFromUrl(String kafkaVersionsUrl, String kafkaVersion) {
if (kafkaVersionsUrl.equals("HEAD")) {
return new UpgradeKafkaVersion(TestKafkaVersion.getSpecificVersion(kafkaVersion));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.fasterxml.jackson.databind.type.CollectionType;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import io.strimzi.systemtest.Environment;
import io.strimzi.systemtest.TestConstants;
import io.strimzi.systemtest.utils.TestKafkaVersion;
import io.strimzi.test.TestUtils;
import org.apache.logging.log4j.LogManager;
Expand All @@ -33,6 +34,7 @@ public enum ModificationType {
private static final Logger LOGGER = LogManager.getLogger(VersionModificationDataLoader.class);
private OlmVersionModificationData olmUpgradeData;
private List<BundleVersionModificationData> bundleVersionModificationDataList;
private static final String KRAFT_UPGRADE_FEATURE_GATES = String.join(",", TestConstants.USE_KRAFT_MODE, TestConstants.USE_KAFKA_NODE_POOLS, TestConstants.USE_UNIDIRECTIONAL_TOPIC_OPERATOR);

public VersionModificationDataLoader(ModificationType upgradeType) {
if (upgradeType == ModificationType.OLM_UPGRADE) {
Expand Down Expand Up @@ -106,6 +108,14 @@ public int getBundleUpgradeOrDowngradeDataSize() {
return bundleVersionModificationDataList.size();
}

public BundleVersionModificationData buildDataForUpgradeAcrossVersionsForKRaft() {
BundleVersionModificationData acrossUpgradeData = buildDataForUpgradeAcrossVersions();

acrossUpgradeData = updateUpgradeDataWithFeatureGates(acrossUpgradeData, KRAFT_UPGRADE_FEATURE_GATES);

return acrossUpgradeData;
}

public BundleVersionModificationData buildDataForUpgradeAcrossVersions() {
List<TestKafkaVersion> sortedVersions = TestKafkaVersion.getSupportedKafkaVersions();
TestKafkaVersion latestKafkaSupported = sortedVersions.get(sortedVersions.size() - 1);
Expand Down Expand Up @@ -133,17 +143,43 @@ public BundleVersionModificationData buildDataForUpgradeAcrossVersions() {
}

public static Stream<Arguments> loadYamlDowngradeData() {
return loadYamlDowngradeDataWithFeatureGates(null);
}

public static Stream<Arguments> loadYamlDowngradeDataForKRaft() {
return loadYamlDowngradeDataWithFeatureGates(KRAFT_UPGRADE_FEATURE_GATES);
}

public static Stream<Arguments> loadYamlDowngradeDataWithFeatureGates(String featureGates) {
VersionModificationDataLoader dataLoader = new VersionModificationDataLoader(ModificationType.BUNDLE_DOWNGRADE);
List<Arguments> parameters = new LinkedList<>();

List<TestKafkaVersion> testKafkaVersions = TestKafkaVersion.getSupportedKafkaVersions();
TestKafkaVersion testKafkaVersion = testKafkaVersions.get(0);

// Generate procedures for upgrade
UpgradeKafkaVersion procedures = new UpgradeKafkaVersion(testKafkaVersion.version());

dataLoader.getBundleUpgradeOrDowngradeDataList().forEach(downgradeData -> {
downgradeData.setProcedures(procedures);

downgradeData = updateUpgradeDataWithFeatureGates(downgradeData, featureGates);

parameters.add(Arguments.of(downgradeData.getFromVersion(), downgradeData.getToVersion(), downgradeData));
});

return parameters.stream();
}

public static Stream<Arguments> loadYamlUpgradeData() {
return loadYamlUpgradeDataWithFeatureGates(null);
}

public static Stream<Arguments> loadYamlUpgradeDataForKRaft() {
return loadYamlUpgradeDataWithFeatureGates(KRAFT_UPGRADE_FEATURE_GATES);
}

public static Stream<Arguments> loadYamlUpgradeDataWithFeatureGates(String featureGates) {
VersionModificationDataLoader upgradeDataList = new VersionModificationDataLoader(ModificationType.BUNDLE_UPGRADE);
List<Arguments> parameters = new LinkedList<>();

Expand All @@ -155,6 +191,9 @@ public static Stream<Arguments> loadYamlUpgradeData() {

upgradeDataList.getBundleUpgradeOrDowngradeDataList().forEach(upgradeData -> {
upgradeData.setProcedures(procedures);

upgradeData = updateUpgradeDataWithFeatureGates(upgradeData, featureGates);

parameters.add(Arguments.of(
upgradeData.getFromVersion(), upgradeData.getToVersion(),
upgradeData.getFeatureGatesBefore(), upgradeData.getFeatureGatesAfter(),
Expand All @@ -164,4 +203,28 @@ public static Stream<Arguments> loadYamlUpgradeData() {

return parameters.stream();
}

private static BundleVersionModificationData updateUpgradeDataWithFeatureGates(BundleVersionModificationData upgradeData, String featureGates) {
if (featureGates != null && !featureGates.isEmpty()) {
String fgBefore = upgradeData.getFeatureGatesBefore();
String fgAfter = upgradeData.getFeatureGatesAfter();

// in case that we would like to keep some feature gates, we should replace those from the YAML and use the specified one instead
// for example in case that we are disabling UTO in YAML, but we need it for KRaft upgrade, we should remove it from the list and
// keep just specified
for (String fg : featureGates.split(",")) {
String fgNameWithoutSign = fg.replace("+", "").replace("-", "");

fgBefore = fgBefore.replaceFirst("(,?)(\\+|-)" + fgNameWithoutSign, "");
fgAfter = fgAfter.replaceFirst("(,?)(\\+|-)" + fgNameWithoutSign, "");
}

upgradeData.setFeatureGatesBefore(fgBefore.isEmpty() ?
featureGates : String.join(",", fgBefore, featureGates));
upgradeData.setFeatureGatesAfter(fgAfter.isEmpty() ?
featureGates : String.join(",", fgAfter, featureGates));
}

return upgradeData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,11 @@ public static boolean componentHasRolled(String namespaceName, LabelSelector sel
* @return The snapshot of the component (StrimziPodSet, Deployment) after rolling update with Uid for every pod
*/
public static Map<String, String> waitTillComponentHasRolled(String namespaceName, LabelSelector selector, Map<String, String> snapshot) {
String clusterName = selector.getMatchLabels().get(Labels.STRIMZI_CLUSTER_LABEL);
String componentName = selector.getMatchLabels().get(Labels.STRIMZI_NAME_LABEL);

componentName = componentName == null ? clusterName + "-" + selector.getMatchLabels().get(Labels.STRIMZI_POOL_NAME_LABEL) : componentName;

LOGGER.info("Waiting for component matching {} -> {}/{} rolling update", selector, namespaceName, componentName);
TestUtils.waitFor("rolling update of component: " + namespaceName + "/" + componentName,
TestConstants.WAIT_FOR_ROLLING_UPDATE_INTERVAL, ResourceOperation.timeoutForPodsOperation(snapshot.size()), () -> {
Expand All @@ -92,6 +95,8 @@ public static Map<String, String> waitTillComponentHasRolledAndPodsReady(String
String clusterName = selector.getMatchLabels().get(Labels.STRIMZI_CLUSTER_LABEL);
String componentName = selector.getMatchLabels().get(Labels.STRIMZI_NAME_LABEL);

componentName = componentName == null ? clusterName + "-" + selector.getMatchLabels().get(Labels.STRIMZI_POOL_NAME_LABEL) : componentName;

waitTillComponentHasRolled(namespaceName, selector, snapshot);

LOGGER.info("Waiting for {} Pod(s) of {}/{} to be ready", expectedPods, namespaceName, componentName);
Expand All @@ -116,8 +121,10 @@ public static Map<String, String> waitTillComponentHasRolled(String namespaceNam
* @return The new Snapshot of actually present Pods after the first successful roll
*/
public static Map<String, String> waitTillComponentHasStartedRolling(String namespaceName, LabelSelector selector, Map<String, String> snapshot) {
String clusterName = selector.getMatchLabels().get(Labels.STRIMZI_CLUSTER_LABEL);
String componentName = selector.getMatchLabels().get(Labels.STRIMZI_CONTROLLER_NAME_LABEL);

String componentName = selector.getMatchLabels().get(Labels.STRIMZI_NAME_LABEL);
componentName = componentName == null ? clusterName + "-" + selector.getMatchLabels().get(Labels.STRIMZI_POOL_NAME_LABEL) : componentName;

LOGGER.info("Waiting for component matching {} -> {}/{} first rolled Pod", selector, namespaceName, componentName);
TestUtils.waitFor("first pod's roll : " + namespaceName + "/" + componentName,
Expand Down Expand Up @@ -156,7 +163,9 @@ public static Map<String, String> waitTillComponentHasStartedRolling(String name

public static void waitForComponentAndPodsReady(String namespaceName, LabelSelector selector, int expectedPods) {
final String clusterName = selector.getMatchLabels().get(Labels.STRIMZI_CLUSTER_LABEL);
final String componentName = selector.getMatchLabels().get(Labels.STRIMZI_NAME_LABEL);
String componentName = selector.getMatchLabels().get(Labels.STRIMZI_NAME_LABEL);

componentName = componentName == null ? clusterName + "-" + selector.getMatchLabels().get(Labels.STRIMZI_POOL_NAME_LABEL) : componentName;

LOGGER.info("Waiting for {} Pod(s) of {}/{} to be ready", expectedPods, namespaceName, componentName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public static List<TestKafkaVersion> parseKafkaVersionsFromUrl(String url) throw
@JsonProperty("format")
String messageVersion;

@JsonProperty("metadata")
String metadataVersion;

@JsonProperty("zookeeper")
String zookeeperVersion;

Expand Down Expand Up @@ -89,6 +92,10 @@ public String messageVersion() {
return messageVersion;
}

public String metadataVersion() {
return metadataVersion;
}

public String zookeeperVersion() {
return zookeeperVersion;
}
Expand Down
Loading

0 comments on commit f71526f

Please sign in to comment.