From f71526f01b1e16e865c75629663c46766d5650ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Kr=C3=A1l?= <53821852+im-konge@users.noreply.github.com> Date: Fri, 8 Dec 2023 16:24:31 +0100 Subject: [PATCH] [ST] KRaft to KRaft upgrades/downgrades in STs (#9442) Signed-off-by: Lukas Kral --- .../jobs/system-tests/upgrade_jobs.yaml | 10 + systemtest/pom.xml | 24 +- .../io/strimzi/systemtest/TestConstants.java | 8 + .../resources/crd/KafkaNodePoolResource.java | 21 ++ .../templates/crd/KafkaNodePoolTemplates.java | 34 +++ .../upgrade/UpgradeKafkaVersion.java | 19 +- .../VersionModificationDataLoader.java | 63 ++++ .../systemtest/utils/RollingUpdateUtils.java | 13 +- .../systemtest/utils/TestKafkaVersion.java | 7 + .../utils/kafkaUtils/KafkaUtils.java | 52 ++++ .../systemtest/upgrade/AbstractUpgradeST.java | 131 ++++----- .../upgrade/kraft/AbstractKRaftUpgradeST.java | 228 ++++++++++++++ .../kraft/KRaftKafkaUpgradeDowngradeST.java | 278 ++++++++++++++++++ .../kraft/KRaftStrimziDowngradeST.java | 113 +++++++ .../upgrade/kraft/KRaftStrimziUpgradeST.java | 226 ++++++++++++++ .../KafkaUpgradeDowngradeST.java | 3 +- .../upgrade/{ => regular}/OlmUpgradeST.java | 9 +- .../{ => regular}/StrimziDowngradeST.java | 10 +- .../{ => regular}/StrimziUpgradeST.java | 28 +- .../resources/upgrade/BundleDowngrade.yaml | 1 + 20 files changed, 1184 insertions(+), 94 deletions(-) create mode 100644 systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/AbstractKRaftUpgradeST.java create mode 100644 systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftKafkaUpgradeDowngradeST.java create mode 100644 systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziDowngradeST.java create mode 100644 systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziUpgradeST.java rename systemtest/src/test/java/io/strimzi/systemtest/upgrade/{ => regular}/KafkaUpgradeDowngradeST.java (99%) rename systemtest/src/test/java/io/strimzi/systemtest/upgrade/{ => regular}/OlmUpgradeST.java (96%) rename systemtest/src/test/java/io/strimzi/systemtest/upgrade/{ => regular}/StrimziDowngradeST.java (93%) rename systemtest/src/test/java/io/strimzi/systemtest/upgrade/{ => regular}/StrimziUpgradeST.java (92%) diff --git a/.azure/templates/jobs/system-tests/upgrade_jobs.yaml b/.azure/templates/jobs/system-tests/upgrade_jobs.yaml index bfe5a495374..fc540be7f80 100644 --- a/.azure/templates/jobs/system-tests/upgrade_jobs.yaml +++ b/.azure/templates/jobs/system-tests/upgrade_jobs.yaml @@ -18,3 +18,13 @@ jobs: timeout: 360 releaseVersion: '${{ parameters.releaseVersion }}' kafkaVersion: '${{ parameters.kafkaVersion }}' + + - template: '../../steps/system_test_general.yaml' + parameters: + name: 'strimzi_upgrade_kraft' + display_name: 'strimzi-upgrade-kraft-bundle' + profile: 'azp_kraft_upgrade' + cluster_operator_install_type: 'bundle' + timeout: 360 + releaseVersion: '${{ parameters.releaseVersion }}' + kafkaVersion: '${{ parameters.kafkaVersion }}' \ No newline at end of file diff --git a/systemtest/pom.xml b/systemtest/pom.xml index a40daa1087a..b512474f887 100644 --- a/systemtest/pom.xml +++ b/systemtest/pom.xml @@ -354,6 +354,14 @@ + + kraft_upgrade + + false + kraftupgrade + + + operators @@ -526,13 +534,25 @@ + + azp_kraft_upgrade + + false + kraftupgrade + + !KRaftKafkaUpgradeDowngradeST + + + + azp_kafka_upgrade false - upgrade + upgrade,kraftupgrade - KafkaUpgradeDowngradeST + KafkaUpgradeDowngradeST, + KRaftKafkaUpgradeDowngradeST diff --git a/systemtest/src/main/java/io/strimzi/systemtest/TestConstants.java b/systemtest/src/main/java/io/strimzi/systemtest/TestConstants.java index d3952623fb3..c19d203608b 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/TestConstants.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/TestConstants.java @@ -201,7 +201,10 @@ public interface TestConstants { */ String USE_KRAFT_MODE = "+UseKRaft"; String DONT_USE_KAFKA_NODE_POOLS = "-KafkaNodePools"; + // kept for upgrade/downgrade tests in KRaft + String USE_KAFKA_NODE_POOLS = "+KafkaNodePools"; String DONT_USE_UNIDIRECTIONAL_TOPIC_OPERATOR = "-UnidirectionalTopicOperator"; + String USE_UNIDIRECTIONAL_TOPIC_OPERATOR = "+UnidirectionalTopicOperator"; /** * Default value which allows execution of tests with any tags @@ -223,6 +226,11 @@ public interface TestConstants { */ String UPGRADE = "upgrade"; + /** + * Tag for KRaft to KRaft tests. + */ + String KRAFT_UPGRADE = "kraftupgrade"; + /** * Tag for olm upgrade tests */ diff --git a/systemtest/src/main/java/io/strimzi/systemtest/resources/crd/KafkaNodePoolResource.java b/systemtest/src/main/java/io/strimzi/systemtest/resources/crd/KafkaNodePoolResource.java index 83607fa7677..ba9929197e9 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/resources/crd/KafkaNodePoolResource.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/resources/crd/KafkaNodePoolResource.java @@ -4,6 +4,8 @@ */ package io.strimzi.systemtest.resources.crd; +import io.fabric8.kubernetes.api.model.LabelSelector; +import io.fabric8.kubernetes.api.model.LabelSelectorBuilder; import io.fabric8.kubernetes.client.dsl.MixedOperation; import io.fabric8.kubernetes.client.dsl.Resource; import io.strimzi.api.kafka.Crds; @@ -21,7 +23,9 @@ import io.strimzi.systemtest.utils.kubeUtils.objects.PersistentVolumeClaimUtils; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.function.Consumer; import static io.strimzi.operator.common.Util.hashStub; @@ -111,4 +115,21 @@ public static KafkaNodePool convertKafkaResourceToKafkaNodePool(Kafka resource) return builder.build(); } + + public static LabelSelector getLabelSelector(String clusterName, String poolName, ProcessRoles processRole) { + Map matchLabels = new HashMap<>(); + matchLabels.put(Labels.STRIMZI_CLUSTER_LABEL, clusterName); + matchLabels.put(Labels.STRIMZI_KIND_LABEL, Kafka.RESOURCE_KIND); + matchLabels.put(Labels.STRIMZI_POOL_NAME_LABEL, poolName); + + switch (processRole) { + case BROKER -> matchLabels.put(Labels.STRIMZI_BROKER_ROLE_LABEL, "true"); + case CONTROLLER -> matchLabels.put(Labels.STRIMZI_CONTROLLER_ROLE_LABEL, "true"); + default -> throw new RuntimeException("No role for KafkaNodePool specified"); + } + + return new LabelSelectorBuilder() + .withMatchLabels(matchLabels) + .build(); + } } diff --git a/systemtest/src/main/java/io/strimzi/systemtest/templates/crd/KafkaNodePoolTemplates.java b/systemtest/src/main/java/io/strimzi/systemtest/templates/crd/KafkaNodePoolTemplates.java index e5f125ba417..babf2138ad0 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/templates/crd/KafkaNodePoolTemplates.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/templates/crd/KafkaNodePoolTemplates.java @@ -30,6 +30,40 @@ public static KafkaNodePoolBuilder defaultKafkaNodePool(String namespaceName, St .endSpec(); } + public static KafkaNodePoolBuilder kafkaNodePoolWithControllerRole(String namespaceName, String nodePoolName, String kafkaClusterName, int kafkaReplicas) { + return defaultKafkaNodePool(namespaceName, nodePoolName, kafkaClusterName, kafkaReplicas) + .editOrNewSpec() + .addToRoles(ProcessRoles.CONTROLLER) + .endSpec(); + } + + public static KafkaNodePoolBuilder kafkaNodePoolWithControllerRoleAndPersistentStorage(String namespaceName, String nodePoolName, String kafkaClusterName, int kafkaReplicas) { + return kafkaNodePoolWithControllerRole(namespaceName, nodePoolName, kafkaClusterName, kafkaReplicas) + .editOrNewSpec() + .withNewPersistentClaimStorage() + .withSize("1Gi") + .withDeleteClaim(true) + .endPersistentClaimStorage() + .endSpec(); + } + + public static KafkaNodePoolBuilder kafkaNodePoolWithBrokerRole(String namespaceName, String nodePoolName, String kafkaClusterName, int kafkaReplicas) { + return defaultKafkaNodePool(namespaceName, nodePoolName, kafkaClusterName, kafkaReplicas) + .editOrNewSpec() + .addToRoles(ProcessRoles.BROKER) + .endSpec(); + } + + public static KafkaNodePoolBuilder kafkaNodePoolWithBrokerRoleAndPersistentStorage(String namespaceName, String nodePoolName, String kafkaClusterName, int kafkaReplicas) { + return kafkaNodePoolWithBrokerRole(namespaceName, nodePoolName, kafkaClusterName, kafkaReplicas) + .editOrNewSpec() + .withNewPersistentClaimStorage() + .withSize("1Gi") + .withDeleteClaim(true) + .endPersistentClaimStorage() + .endSpec(); + } + /** * Creates a KafkaNodePoolBuilder for a Kafka instance (mirroring its mandatory specification) with roles based * on the environment setting (TestConstants.USE_KRAFT_MODE) having BROKER role in Zookeeper and Kraft mode alike diff --git a/systemtest/src/main/java/io/strimzi/systemtest/upgrade/UpgradeKafkaVersion.java b/systemtest/src/main/java/io/strimzi/systemtest/upgrade/UpgradeKafkaVersion.java index 29f2f928412..78072f927ab 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/upgrade/UpgradeKafkaVersion.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/upgrade/UpgradeKafkaVersion.java @@ -15,12 +15,18 @@ public class UpgradeKafkaVersion { private String version; private String logMessageVersion; private String interBrokerVersion; + private String metadataVersion; - UpgradeKafkaVersion(TestKafkaVersion testKafkaVersion) { + public UpgradeKafkaVersion(TestKafkaVersion testKafkaVersion) { this(testKafkaVersion.version(), testKafkaVersion.messageVersion(), testKafkaVersion.protocolVersion()); } - UpgradeKafkaVersion(String version) { + public UpgradeKafkaVersion(String version, String desiredMetadataVersion) { + this.version = version; + this.metadataVersion = desiredMetadataVersion; + } + + public UpgradeKafkaVersion(String version) { String shortVersion = version; if (version != null && !version.equals("")) { @@ -31,17 +37,18 @@ public class UpgradeKafkaVersion { this.version = version; this.logMessageVersion = shortVersion; this.interBrokerVersion = shortVersion; + this.metadataVersion = shortVersion; } /** * Leaving empty, so original Kafka version in `kafka-persistent.yaml` will be used * LMFV and IBPV should be null, so the test steps will for updating the config will be skipped */ - UpgradeKafkaVersion() { + public UpgradeKafkaVersion() { this("", null, null); } - UpgradeKafkaVersion(String version, String logMessageVersion, String interBrokerVersion) { + public UpgradeKafkaVersion(String version, String logMessageVersion, String interBrokerVersion) { this.version = version; this.logMessageVersion = logMessageVersion; this.interBrokerVersion = interBrokerVersion; @@ -63,6 +70,10 @@ public String getInterBrokerVersion() { return this.interBrokerVersion; } + public String getMetadataVersion() { + return this.metadataVersion; + } + public static UpgradeKafkaVersion getKafkaWithVersionFromUrl(String kafkaVersionsUrl, String kafkaVersion) { if (kafkaVersionsUrl.equals("HEAD")) { return new UpgradeKafkaVersion(TestKafkaVersion.getSpecificVersion(kafkaVersion)); diff --git a/systemtest/src/main/java/io/strimzi/systemtest/upgrade/VersionModificationDataLoader.java b/systemtest/src/main/java/io/strimzi/systemtest/upgrade/VersionModificationDataLoader.java index 7611b56fdd0..d1145e327b2 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/upgrade/VersionModificationDataLoader.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/upgrade/VersionModificationDataLoader.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.type.CollectionType; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import io.strimzi.systemtest.Environment; +import io.strimzi.systemtest.TestConstants; import io.strimzi.systemtest.utils.TestKafkaVersion; import io.strimzi.test.TestUtils; import org.apache.logging.log4j.LogManager; @@ -33,6 +34,7 @@ public enum ModificationType { private static final Logger LOGGER = LogManager.getLogger(VersionModificationDataLoader.class); private OlmVersionModificationData olmUpgradeData; private List bundleVersionModificationDataList; + private static final String KRAFT_UPGRADE_FEATURE_GATES = String.join(",", TestConstants.USE_KRAFT_MODE, TestConstants.USE_KAFKA_NODE_POOLS, TestConstants.USE_UNIDIRECTIONAL_TOPIC_OPERATOR); public VersionModificationDataLoader(ModificationType upgradeType) { if (upgradeType == ModificationType.OLM_UPGRADE) { @@ -106,6 +108,14 @@ public int getBundleUpgradeOrDowngradeDataSize() { return bundleVersionModificationDataList.size(); } + public BundleVersionModificationData buildDataForUpgradeAcrossVersionsForKRaft() { + BundleVersionModificationData acrossUpgradeData = buildDataForUpgradeAcrossVersions(); + + acrossUpgradeData = updateUpgradeDataWithFeatureGates(acrossUpgradeData, KRAFT_UPGRADE_FEATURE_GATES); + + return acrossUpgradeData; + } + public BundleVersionModificationData buildDataForUpgradeAcrossVersions() { List sortedVersions = TestKafkaVersion.getSupportedKafkaVersions(); TestKafkaVersion latestKafkaSupported = sortedVersions.get(sortedVersions.size() - 1); @@ -133,10 +143,28 @@ public BundleVersionModificationData buildDataForUpgradeAcrossVersions() { } public static Stream loadYamlDowngradeData() { + return loadYamlDowngradeDataWithFeatureGates(null); + } + + public static Stream loadYamlDowngradeDataForKRaft() { + return loadYamlDowngradeDataWithFeatureGates(KRAFT_UPGRADE_FEATURE_GATES); + } + + public static Stream loadYamlDowngradeDataWithFeatureGates(String featureGates) { VersionModificationDataLoader dataLoader = new VersionModificationDataLoader(ModificationType.BUNDLE_DOWNGRADE); List parameters = new LinkedList<>(); + List testKafkaVersions = TestKafkaVersion.getSupportedKafkaVersions(); + TestKafkaVersion testKafkaVersion = testKafkaVersions.get(0); + + // Generate procedures for upgrade + UpgradeKafkaVersion procedures = new UpgradeKafkaVersion(testKafkaVersion.version()); + dataLoader.getBundleUpgradeOrDowngradeDataList().forEach(downgradeData -> { + downgradeData.setProcedures(procedures); + + downgradeData = updateUpgradeDataWithFeatureGates(downgradeData, featureGates); + parameters.add(Arguments.of(downgradeData.getFromVersion(), downgradeData.getToVersion(), downgradeData)); }); @@ -144,6 +172,14 @@ public static Stream loadYamlDowngradeData() { } public static Stream loadYamlUpgradeData() { + return loadYamlUpgradeDataWithFeatureGates(null); + } + + public static Stream loadYamlUpgradeDataForKRaft() { + return loadYamlUpgradeDataWithFeatureGates(KRAFT_UPGRADE_FEATURE_GATES); + } + + public static Stream loadYamlUpgradeDataWithFeatureGates(String featureGates) { VersionModificationDataLoader upgradeDataList = new VersionModificationDataLoader(ModificationType.BUNDLE_UPGRADE); List parameters = new LinkedList<>(); @@ -155,6 +191,9 @@ public static Stream loadYamlUpgradeData() { upgradeDataList.getBundleUpgradeOrDowngradeDataList().forEach(upgradeData -> { upgradeData.setProcedures(procedures); + + upgradeData = updateUpgradeDataWithFeatureGates(upgradeData, featureGates); + parameters.add(Arguments.of( upgradeData.getFromVersion(), upgradeData.getToVersion(), upgradeData.getFeatureGatesBefore(), upgradeData.getFeatureGatesAfter(), @@ -164,4 +203,28 @@ public static Stream loadYamlUpgradeData() { return parameters.stream(); } + + private static BundleVersionModificationData updateUpgradeDataWithFeatureGates(BundleVersionModificationData upgradeData, String featureGates) { + if (featureGates != null && !featureGates.isEmpty()) { + String fgBefore = upgradeData.getFeatureGatesBefore(); + String fgAfter = upgradeData.getFeatureGatesAfter(); + + // in case that we would like to keep some feature gates, we should replace those from the YAML and use the specified one instead + // for example in case that we are disabling UTO in YAML, but we need it for KRaft upgrade, we should remove it from the list and + // keep just specified + for (String fg : featureGates.split(",")) { + String fgNameWithoutSign = fg.replace("+", "").replace("-", ""); + + fgBefore = fgBefore.replaceFirst("(,?)(\\+|-)" + fgNameWithoutSign, ""); + fgAfter = fgAfter.replaceFirst("(,?)(\\+|-)" + fgNameWithoutSign, ""); + } + + upgradeData.setFeatureGatesBefore(fgBefore.isEmpty() ? + featureGates : String.join(",", fgBefore, featureGates)); + upgradeData.setFeatureGatesAfter(fgAfter.isEmpty() ? + featureGates : String.join(",", fgAfter, featureGates)); + } + + return upgradeData; + } } diff --git a/systemtest/src/main/java/io/strimzi/systemtest/utils/RollingUpdateUtils.java b/systemtest/src/main/java/io/strimzi/systemtest/utils/RollingUpdateUtils.java index d3fa64f6517..ad8992e63cb 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/utils/RollingUpdateUtils.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/utils/RollingUpdateUtils.java @@ -71,8 +71,11 @@ public static boolean componentHasRolled(String namespaceName, LabelSelector sel * @return The snapshot of the component (StrimziPodSet, Deployment) after rolling update with Uid for every pod */ public static Map waitTillComponentHasRolled(String namespaceName, LabelSelector selector, Map snapshot) { + String clusterName = selector.getMatchLabels().get(Labels.STRIMZI_CLUSTER_LABEL); String componentName = selector.getMatchLabels().get(Labels.STRIMZI_NAME_LABEL); + componentName = componentName == null ? clusterName + "-" + selector.getMatchLabels().get(Labels.STRIMZI_POOL_NAME_LABEL) : componentName; + LOGGER.info("Waiting for component matching {} -> {}/{} rolling update", selector, namespaceName, componentName); TestUtils.waitFor("rolling update of component: " + namespaceName + "/" + componentName, TestConstants.WAIT_FOR_ROLLING_UPDATE_INTERVAL, ResourceOperation.timeoutForPodsOperation(snapshot.size()), () -> { @@ -92,6 +95,8 @@ public static Map waitTillComponentHasRolledAndPodsReady(String String clusterName = selector.getMatchLabels().get(Labels.STRIMZI_CLUSTER_LABEL); String componentName = selector.getMatchLabels().get(Labels.STRIMZI_NAME_LABEL); + componentName = componentName == null ? clusterName + "-" + selector.getMatchLabels().get(Labels.STRIMZI_POOL_NAME_LABEL) : componentName; + waitTillComponentHasRolled(namespaceName, selector, snapshot); LOGGER.info("Waiting for {} Pod(s) of {}/{} to be ready", expectedPods, namespaceName, componentName); @@ -116,8 +121,10 @@ public static Map waitTillComponentHasRolled(String namespaceNam * @return The new Snapshot of actually present Pods after the first successful roll */ public static Map waitTillComponentHasStartedRolling(String namespaceName, LabelSelector selector, Map snapshot) { + String clusterName = selector.getMatchLabels().get(Labels.STRIMZI_CLUSTER_LABEL); + String componentName = selector.getMatchLabels().get(Labels.STRIMZI_CONTROLLER_NAME_LABEL); - String componentName = selector.getMatchLabels().get(Labels.STRIMZI_NAME_LABEL); + componentName = componentName == null ? clusterName + "-" + selector.getMatchLabels().get(Labels.STRIMZI_POOL_NAME_LABEL) : componentName; LOGGER.info("Waiting for component matching {} -> {}/{} first rolled Pod", selector, namespaceName, componentName); TestUtils.waitFor("first pod's roll : " + namespaceName + "/" + componentName, @@ -156,7 +163,9 @@ public static Map waitTillComponentHasStartedRolling(String name public static void waitForComponentAndPodsReady(String namespaceName, LabelSelector selector, int expectedPods) { final String clusterName = selector.getMatchLabels().get(Labels.STRIMZI_CLUSTER_LABEL); - final String componentName = selector.getMatchLabels().get(Labels.STRIMZI_NAME_LABEL); + String componentName = selector.getMatchLabels().get(Labels.STRIMZI_NAME_LABEL); + + componentName = componentName == null ? clusterName + "-" + selector.getMatchLabels().get(Labels.STRIMZI_POOL_NAME_LABEL) : componentName; LOGGER.info("Waiting for {} Pod(s) of {}/{} to be ready", expectedPods, namespaceName, componentName); diff --git a/systemtest/src/main/java/io/strimzi/systemtest/utils/TestKafkaVersion.java b/systemtest/src/main/java/io/strimzi/systemtest/utils/TestKafkaVersion.java index 1dc46cf00ee..8bf3469ddcf 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/utils/TestKafkaVersion.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/utils/TestKafkaVersion.java @@ -56,6 +56,9 @@ public static List parseKafkaVersionsFromUrl(String url) throw @JsonProperty("format") String messageVersion; + @JsonProperty("metadata") + String metadataVersion; + @JsonProperty("zookeeper") String zookeeperVersion; @@ -89,6 +92,10 @@ public String messageVersion() { return messageVersion; } + public String metadataVersion() { + return metadataVersion; + } + public String zookeeperVersion() { return zookeeperVersion; } diff --git a/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaUtils.java b/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaUtils.java index 8ddbbe54897..8af665e87c3 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaUtils.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaUtils.java @@ -4,10 +4,13 @@ */ package io.strimzi.systemtest.utils.kafkaUtils; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.fasterxml.jackson.dataformat.yaml.YAMLMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLParser; import io.fabric8.kubernetes.api.model.LabelSelector; import io.fabric8.kubernetes.api.model.Pod; import io.strimzi.api.kafka.model.Kafka; @@ -494,6 +497,55 @@ public static String changeOrRemoveKafkaConfiguration(File file, String version, } } + public static String changeOrRemoveKafkaInKRaft(File file, String version) { + return changeOrRemoveKafkaConfigurationInKRaft(file, version, null); + } + + public static String changeOrRemoveKafkaConfigurationInKRaft(File file, String version, String metadataVersion) { + YAMLFactory yamlFactory = new YAMLFactory(); + ObjectMapper mapper = new ObjectMapper(); + YAMLMapper yamlMapper = new YAMLMapper(); + + try { + YAMLParser yamlParser = yamlFactory.createParser(file); + List objects = mapper.readValues(yamlParser, new TypeReference() { }).readAll(); + + ObjectNode kafkaResourceNode = objects.get(2); + ObjectNode kafkaNode = (ObjectNode) kafkaResourceNode.at("/spec/kafka"); + + ObjectNode entity = (ObjectNode) kafkaResourceNode.at("/spec/entityOperator"); + entity.set("topicOperator", mapper.createObjectNode()); + + // workaround for current Strimzi upgrade (before we will have release containing metadataVersion in examples + CRDs) + boolean metadataVersionFieldSupported = !cmdKubeClient().exec(false, "explain", "kafka.spec.kafka.metadataVersion").err().contains("does not exist"); + + if (version == null) { + kafkaNode.remove("version"); + kafkaNode.remove("metadataVersion"); + } else if (!version.equals("")) { + kafkaNode.put("version", version); + + if (metadataVersionFieldSupported) { + kafkaNode.put("metadataVersion", TestKafkaVersion.getSpecificVersion(version).messageVersion()); + } + } + + if (metadataVersion != null && metadataVersionFieldSupported) { + kafkaNode.put("metadataVersion", metadataVersion); + } + + StringBuilder output = new StringBuilder(); + + for (ObjectNode objectNode : objects) { + output.append(yamlMapper.writeValueAsString(objectNode)); + } + + return output.toString(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + public static String namespacedPlainBootstrapAddress(String clusterName, String namespace) { return namespacedBootstrapAddress(clusterName, namespace, 9092); } diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/AbstractUpgradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/AbstractUpgradeST.java index 4a6bcf22de7..73a4a7709ef 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/AbstractUpgradeST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/AbstractUpgradeST.java @@ -5,6 +5,7 @@ package io.strimzi.systemtest.upgrade; import io.fabric8.kubernetes.api.model.LabelSelector; +import io.fabric8.kubernetes.api.model.LabelSelectorBuilder; import io.fabric8.kubernetes.api.model.Pod; import io.strimzi.api.kafka.model.Constants; import io.strimzi.api.kafka.model.Kafka; @@ -89,6 +90,8 @@ public class AbstractUpgradeST extends AbstractST { protected final LabelSelector kafkaSelector = KafkaResource.getLabelSelector(clusterName, KafkaResources.kafkaStatefulSetName(clusterName)); protected final LabelSelector zkSelector = KafkaResource.getLabelSelector(clusterName, KafkaResources.zookeeperStatefulSetName(clusterName)); + protected final LabelSelector eoSelector = KafkaResource.getLabelSelector(clusterName, KafkaResources.entityOperatorDeploymentName(clusterName)); + protected final LabelSelector coSelector = new LabelSelectorBuilder().withMatchLabels(Map.of(Labels.STRIMZI_KIND_LABEL, "cluster-operator")).build(); protected final LabelSelector connectLabelSelector = KafkaConnectResource.getLabelSelector(clusterName, KafkaConnectResources.deploymentName(clusterName)); protected final String topicName = "my-topic"; @@ -98,6 +101,10 @@ public class AbstractUpgradeST extends AbstractST { protected final int expectedTopicCount = upgradeTopicCount + 3; protected File kafkaYaml; + protected int getExpectedTopicCount() { + return expectedTopicCount; + } + protected void makeSnapshots() { coPods = DeploymentUtils.depSnapshot(TestConstants.CO_NAMESPACE, ResourceManager.getCoDeploymentName()); zkPods = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, zkSelector); @@ -109,11 +116,11 @@ protected void makeSnapshots() { @SuppressWarnings("CyclomaticComplexity") protected void changeKafkaAndLogFormatVersion(CommonVersionModificationData versionModificationData, ExtensionContext extensionContext) throws IOException { // Get Kafka configurations - String operatorVersion = versionModificationData.getToVersion(); - String currentLogMessageFormat = cmdKubeClient().getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL, operatorVersion), clusterName, ".spec.kafka.config.log\\.message\\.format\\.version"); - String currentInterBrokerProtocol = cmdKubeClient().getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL, operatorVersion), clusterName, ".spec.kafka.config.inter\\.broker\\.protocol\\.version"); + String currentLogMessageFormat = cmdKubeClient().getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, ".spec.kafka.config.log\\.message\\.format\\.version"); + String currentInterBrokerProtocol = cmdKubeClient().getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, ".spec.kafka.config.inter\\.broker\\.protocol\\.version"); + // Get Kafka version - String kafkaVersionFromCR = cmdKubeClient().getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL, operatorVersion), clusterName, ".spec.kafka.version"); + String kafkaVersionFromCR = cmdKubeClient().getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, ".spec.kafka.version"); kafkaVersionFromCR = kafkaVersionFromCR.equals("") ? null : kafkaVersionFromCR; String kafkaVersionFromProcedure = versionModificationData.getProcedures().getVersion(); @@ -147,7 +154,7 @@ protected void changeKafkaAndLogFormatVersion(CommonVersionModificationData vers if (versionModificationData.getProcedures() != null && (!currentLogMessageFormat.isEmpty() || !currentInterBrokerProtocol.isEmpty())) { if (kafkaVersionFromProcedure != null && !kafkaVersionFromProcedure.isEmpty() && !kafkaVersionFromCR.contains(kafkaVersionFromProcedure) && extensionContext.getTestClass().get().getSimpleName().toLowerCase(Locale.ROOT).contains("upgrade")) { LOGGER.info("Set Kafka version to " + kafkaVersionFromProcedure); - cmdKubeClient().patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL, operatorVersion), clusterName, "/spec/kafka/version", kafkaVersionFromProcedure); + cmdKubeClient().patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, "/spec/kafka/version", kafkaVersionFromProcedure); LOGGER.info("Waiting for Kafka rolling update to finish"); kafkaPods = RollingUpdateUtils.waitTillComponentHasRolled(TestConstants.CO_NAMESPACE, kafkaSelector, 3, kafkaPods); } @@ -158,13 +165,13 @@ protected void changeKafkaAndLogFormatVersion(CommonVersionModificationData vers if (logMessageVersion != null && !logMessageVersion.isEmpty() || interBrokerProtocolVersion != null && !interBrokerProtocolVersion.isEmpty()) { if (!logMessageVersion.isEmpty()) { LOGGER.info("Set log message format version to {} (current version is {})", logMessageVersion, currentLogMessageFormat); - cmdKubeClient().patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL, operatorVersion), clusterName, "/spec/kafka/config/log.message.format.version", logMessageVersion); + cmdKubeClient().patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, "/spec/kafka/config/log.message.format.version", logMessageVersion); } if (!interBrokerProtocolVersion.isEmpty()) { LOGGER.info("Set inter-broker protocol version to {} (current version is {})", interBrokerProtocolVersion, currentInterBrokerProtocol); LOGGER.info("Set inter-broker protocol version to " + interBrokerProtocolVersion); - cmdKubeClient().patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL, operatorVersion), clusterName, "/spec/kafka/config/inter.broker.protocol.version", interBrokerProtocolVersion); + cmdKubeClient().patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, "/spec/kafka/config/inter.broker.protocol.version", interBrokerProtocolVersion); } if ((currentInterBrokerProtocol != null && !currentInterBrokerProtocol.equals(interBrokerProtocolVersion)) || @@ -177,26 +184,31 @@ protected void changeKafkaAndLogFormatVersion(CommonVersionModificationData vers if (kafkaVersionFromProcedure != null && !kafkaVersionFromProcedure.isEmpty() && !kafkaVersionFromCR.contains(kafkaVersionFromProcedure) && extensionContext.getTestClass().get().getSimpleName().toLowerCase(Locale.ROOT).contains("downgrade")) { LOGGER.info("Set Kafka version to " + kafkaVersionFromProcedure); - cmdKubeClient().patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL, operatorVersion), clusterName, "/spec/kafka/version", kafkaVersionFromProcedure); + cmdKubeClient().patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, "/spec/kafka/version", kafkaVersionFromProcedure); LOGGER.info("Waiting for Kafka rolling update to finish"); kafkaPods = RollingUpdateUtils.waitTillComponentHasRolled(TestConstants.CO_NAMESPACE, kafkaSelector, kafkaPods); } } } - protected void logPodImages(String clusterName) { - List pods = kubeClient().listPods(KafkaResource.getLabelSelector(clusterName, KafkaResources.zookeeperStatefulSetName(clusterName))); - for (Pod pod : pods) { - LOGGER.info("Pod: {}/{} has image {}", pod.getMetadata().getNamespace(), pod.getMetadata().getName(), pod.getSpec().getContainers().get(0).getImage()); - } - pods = kubeClient().listPods(KafkaResource.getLabelSelector(clusterName, KafkaResources.kafkaStatefulSetName(clusterName))); - for (Pod pod : pods) { - LOGGER.info("Pod: {}/{} has image {}", pod.getMetadata().getNamespace(), pod.getMetadata().getName(), pod.getSpec().getContainers().get(0).getImage()); - } - pods = kubeClient().listPods(kubeClient().getDeploymentSelectors(KafkaResources.entityOperatorDeploymentName(clusterName))); - for (Pod pod : pods) { - LOGGER.info("Pod: {}/{} has image {}", pod.getMetadata().getNamespace(), pod.getMetadata().getName(), pod.getSpec().getContainers().get(0).getImage()); - LOGGER.info("Pod: {}/{} has image {}", pod.getMetadata().getNamespace(), pod.getMetadata().getName(), pod.getSpec().getContainers().get(1).getImage()); + protected void logPodImages(String namespaceName) { + logPodImages(namespaceName, zkSelector, kafkaSelector, eoSelector, coSelector); + } + + protected void logPodImagesWithConnect(String namespaceName) { + logPodImages(namespaceName, zkSelector, kafkaSelector, eoSelector, connectLabelSelector, coSelector); + } + + protected void logPodImages(String namespaceName, LabelSelector... labelSelectors) { + for (LabelSelector labelSelector : labelSelectors) { + List pods = kubeClient().listPods(namespaceName, labelSelector); + + pods.forEach(pod -> + pod.getSpec().getContainers().forEach(container -> + LOGGER.info("Pod: {}/{} has image {}", + pod.getMetadata().getNamespace(), pod.getMetadata().getName(), pod.getSpec().getContainers().get(0).getImage()) + ) + ); } } @@ -219,7 +231,7 @@ protected void waitForReadinessOfKafkaCluster() { DeploymentUtils.waitForDeploymentAndPodsReady(TestConstants.CO_NAMESPACE, KafkaResources.entityOperatorDeploymentName(clusterName), 1); } - protected void changeClusterOperator(BundleVersionModificationData versionModificationData, String namespace, ExtensionContext extensionContext) throws IOException { + protected void changeClusterOperator(BundleVersionModificationData versionModificationData, String namespace, ExtensionContext extensionContext) throws IOException { File coDir; // Modify + apply installation files LOGGER.info("Update CO from {} to {}", versionModificationData.getFromVersion(), versionModificationData.getToVersion()); @@ -231,13 +243,13 @@ protected void changeClusterOperator(BundleVersionModificationData versionModif coDir = new File(dir, versionModificationData.getToExamples() + "/install/cluster-operator/"); } - copyModifyApply(coDir, namespace, extensionContext, versionModificationData.getFeatureGatesAfter()); + copyModifyApply(coDir, namespace, versionModificationData.getFeatureGatesAfter()); LOGGER.info("Waiting for CO upgrade"); DeploymentUtils.waitTillDepHasRolled(namespace, ResourceManager.getCoDeploymentName(), 1, coPods); } - protected void copyModifyApply(File root, String namespace, ExtensionContext extensionContext, final String strimziFeatureGatesValue) { + protected void copyModifyApply(File root, String namespace, final String strimziFeatureGatesValue) { Arrays.stream(Objects.requireNonNull(root.listFiles())).sorted().forEach(f -> { if (f.getName().matches(".*RoleBinding.*")) { cmdKubeClient().replaceContent(TestUtils.changeRoleBindingSubject(f, namespace)); @@ -283,30 +295,18 @@ protected void checkAllImages(BundleVersionModificationData versionModificationD fail("There are no expected images"); } - Map zkSelector = Labels.EMPTY - .withStrimziKind(Kafka.RESOURCE_KIND) - .withStrimziCluster(clusterName) - .withStrimziName(KafkaResources.zookeeperStatefulSetName(clusterName)) - .toMap(); - - Map kafkaSelector = Labels.EMPTY - .withStrimziKind(Kafka.RESOURCE_KIND) - .withStrimziCluster(clusterName) - .withStrimziName(KafkaResources.kafkaStatefulSetName(clusterName)) - .toMap(); - checkContainerImages(zkSelector, versionModificationData.getZookeeperImage()); checkContainerImages(kafkaSelector, versionModificationData.getKafkaImage()); - checkContainerImages(kubeClient().getDeployment(namespaceName, KafkaResources.entityOperatorDeploymentName(clusterName)).getSpec().getSelector().getMatchLabels(), versionModificationData.getTopicOperatorImage()); - checkContainerImages(kubeClient().getDeployment(namespaceName, KafkaResources.entityOperatorDeploymentName(clusterName)).getSpec().getSelector().getMatchLabels(), 1, versionModificationData.getUserOperatorImage()); + checkContainerImages(eoSelector, versionModificationData.getTopicOperatorImage()); + checkContainerImages(eoSelector, 1, versionModificationData.getUserOperatorImage()); } - protected void checkContainerImages(Map matchLabels, String image) { - checkContainerImages(matchLabels, 0, image); + protected void checkContainerImages(LabelSelector labelSelector, String image) { + checkContainerImages(labelSelector, 0, image); } - protected void checkContainerImages(Map matchLabels, int container, String image) { - List pods1 = kubeClient().listPods(matchLabels); + protected void checkContainerImages(LabelSelector labelSelector, int container, String image) { + List pods1 = kubeClient().listPods(labelSelector); for (Pod pod : pods1) { if (!image.equals(pod.getSpec().getContainers().get(container).getImage())) { LOGGER.debug("Expected image for Pod: {}/{}: {} \nCurrent image: {}", pod.getMetadata().getNamespace(), pod.getMetadata().getName(), image, pod.getSpec().getContainers().get(container).getImage()); @@ -319,8 +319,6 @@ protected void setupEnvAndUpgradeClusterOperator(ExtensionContext extensionConte LOGGER.info("Test upgrade of Cluster Operator from version: {} to version: {}", upgradeData.getFromVersion(), upgradeData.getToVersion()); cluster.setNamespace(namespace); - String operatorVersion = upgradeData.getFromVersion(); - this.deployCoWithWaitForReadiness(extensionContext, upgradeData, namespace); this.deployKafkaClusterWithWaitForReadiness(extensionContext, upgradeData, upgradeKafkaVersion); this.deployKafkaUserWithWaitForReadiness(extensionContext, upgradeData, namespace); @@ -343,7 +341,7 @@ protected void setupEnvAndUpgradeClusterOperator(ExtensionContext extensionConte // Attach clients which will continuously produce/consume messages to/from Kafka brokers during rolling update // ############################## // Setup topic, which has 3 replicas and 2 min.isr to see if producer will be able to work during rolling update - if (!cmdKubeClient().getResources(getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL, operatorVersion)).contains(testStorage.getTopicName())) { + if (!cmdKubeClient().getResources(getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL)).contains(testStorage.getTopicName())) { String pathToTopicExamples = upgradeData.getFromExamples().equals("HEAD") ? PATH_TO_KAFKA_TOPIC_CONFIG : upgradeData.getFromExamples() + "/examples/topic/kafka-topic.yaml"; kafkaTopicYaml = new File(dir, pathToTopicExamples); @@ -353,7 +351,7 @@ protected void setupEnvAndUpgradeClusterOperator(ExtensionContext extensionConte .replace("replicas: 1", "replicas: 3") + " min.insync.replicas: 2"); - ResourceManager.waitForResourceReadiness(getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL, operatorVersion), testStorage.getTopicName()); + ResourceManager.waitForResourceReadiness(getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL), testStorage.getTopicName()); } String producerAdditionConfiguration = "delivery.timeout.ms=20000\nrequest.timeout.ms=20000"; @@ -374,7 +372,6 @@ protected void setupEnvAndUpgradeClusterOperator(ExtensionContext extensionConte } makeSnapshots(); - logPodImages(clusterName); } protected void verifyProcedure(BundleVersionModificationData upgradeData, String producerName, String consumerName, String namespace) { @@ -383,7 +380,7 @@ protected void verifyProcedure(BundleVersionModificationData upgradeData, String // Check that topics weren't deleted/duplicated during upgrade procedures String listedTopics = cmdKubeClient().getResources(getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL)); int additionalTopics = upgradeData.getAdditionalTopics(); - assertThat("KafkaTopic list doesn't have expected size", Long.valueOf(listedTopics.lines().count() - 1).intValue(), is(expectedTopicCount + additionalTopics)); + assertThat("KafkaTopic list doesn't have expected size", Long.valueOf(listedTopics.lines().count() - 1).intValue(), is(getExpectedTopicCount() + additionalTopics)); assertThat("KafkaTopic " + topicName + " is not in expected Topic list", listedTopics.contains(topicName), is(true)); for (int x = 0; x < upgradeTopicCount; x++) { @@ -399,17 +396,8 @@ protected void verifyProcedure(BundleVersionModificationData upgradeData, String // ############################## } } - protected String getResourceApiVersion(String resourcePlural) { - return getResourceApiVersion(resourcePlural, "HEAD"); - } - - protected String getResourceApiVersion(String resourcePlural, String coVersion) { - if (coVersion.equals("HEAD") || TestKafkaVersion.compareDottedVersions(coVersion, "0.22.0") >= 0) { - return resourcePlural + "." + Constants.V1BETA2 + "." + Constants.RESOURCE_GROUP_NAME; - } else { - return resourcePlural + "." + Constants.V1BETA1 + "." + Constants.RESOURCE_GROUP_NAME; - } + return resourcePlural + "." + Constants.V1BETA2 + "." + Constants.RESOURCE_GROUP_NAME; } protected void deployCoWithWaitForReadiness(final ExtensionContext extensionContext, final BundleVersionModificationData upgradeData, @@ -425,7 +413,7 @@ protected void deployCoWithWaitForReadiness(final ExtensionContext extensionCont } // Modify + apply installation files - copyModifyApply(coDir, namespaceName, extensionContext, upgradeData.getFeatureGatesBefore()); + copyModifyApply(coDir, namespaceName, upgradeData.getFeatureGatesBefore()); LOGGER.info("Waiting for Deployment: {}", ResourceManager.getCoDeploymentName()); DeploymentUtils.waitForDeploymentAndPodsReady(namespaceName, ResourceManager.getCoDeploymentName(), 1); @@ -437,7 +425,7 @@ protected void deployKafkaClusterWithWaitForReadiness(final ExtensionContext ext final UpgradeKafkaVersion upgradeKafkaVersion) { LOGGER.info("Deploying Kafka: {} in Namespace: {}", clusterName, kubeClient().getNamespace()); - if (!cmdKubeClient().getResources(getResourceApiVersion(Kafka.RESOURCE_PLURAL, upgradeData.getFromVersion())).contains(clusterName)) { + if (!cmdKubeClient().getResources(getResourceApiVersion(Kafka.RESOURCE_PLURAL)).contains(clusterName)) { // Deploy a Kafka cluster if (upgradeData.getFromExamples().equals("HEAD")) { resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaPersistent(clusterName, 3, 3) @@ -468,14 +456,14 @@ protected void deployKafkaUserWithWaitForReadiness(final ExtensionContext extens final String namespaceName) { LOGGER.info("Deploying KafkaUser: {}/{}", kubeClient().getNamespace(), userName); - if (!cmdKubeClient().getResources(getResourceApiVersion(KafkaUser.RESOURCE_PLURAL, upgradeData.getFromVersion())).contains(userName)) { + if (!cmdKubeClient().getResources(getResourceApiVersion(KafkaUser.RESOURCE_PLURAL)).contains(userName)) { if (upgradeData.getFromVersion().equals("HEAD")) { resourceManager.createResourceWithWait(extensionContext, KafkaUserTemplates.tlsUser(namespaceName, clusterName, userName).build()); } else { kafkaUserYaml = new File(dir, upgradeData.getFromExamples() + "/examples/user/kafka-user.yaml"); LOGGER.info("Deploying KafkaUser from: {}", kafkaUserYaml.getPath()); cmdKubeClient().applyContent(KafkaUserUtils.removeKafkaUserPart(kafkaUserYaml, "authorization")); - ResourceManager.waitForResourceReadiness(getResourceApiVersion(KafkaUser.RESOURCE_PLURAL, upgradeData.getFromVersion()), userName); + ResourceManager.waitForResourceReadiness(getResourceApiVersion(KafkaUser.RESOURCE_PLURAL), userName); } } } @@ -483,7 +471,7 @@ protected void deployKafkaUserWithWaitForReadiness(final ExtensionContext extens protected void deployKafkaTopicWithWaitForReadiness(final BundleVersionModificationData upgradeData) { LOGGER.info("Deploying KafkaTopic: {}/{}", kubeClient().getNamespace(), topicName); - if (!cmdKubeClient().getResources(getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL, upgradeData.getFromVersion())).contains(topicName)) { + if (!cmdKubeClient().getResources(getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL)).contains(topicName)) { if (upgradeData.getFromVersion().equals("HEAD")) { kafkaTopicYaml = new File(dir, PATH_TO_PACKAGING_EXAMPLES + "/topic/kafka-topic.yaml"); } else { @@ -491,7 +479,7 @@ protected void deployKafkaTopicWithWaitForReadiness(final BundleVersionModificat } LOGGER.info("Deploying KafkaTopic from: {}", kafkaTopicYaml.getPath()); cmdKubeClient().create(kafkaTopicYaml); - ResourceManager.waitForResourceReadiness(getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL, upgradeData.getFromVersion()), topicName); + ResourceManager.waitForResourceReadiness(getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL), topicName); } } @@ -500,7 +488,7 @@ protected void deployKafkaConnectAndKafkaConnectorWithWaitForReadiness(final Ext final UpgradeKafkaVersion upgradeKafkaVersion, final TestStorage testStorage) { // setup KafkaConnect + KafkaConnector - if (!cmdKubeClient().getResources(getResourceApiVersion(KafkaConnect.RESOURCE_PLURAL, acrossUpgradeData.getFromVersion())).contains(clusterName)) { + if (!cmdKubeClient().getResources(getResourceApiVersion(KafkaConnect.RESOURCE_PLURAL)).contains(clusterName)) { if (acrossUpgradeData.getFromVersion().equals("HEAD")) { resourceManager.createResourceWithWait(extensionContext, KafkaConnectTemplates.kafkaConnectWithFilePlugin(clusterName, testStorage.getNamespaceName(), 1) .editMetadata() @@ -559,7 +547,7 @@ protected void deployKafkaConnectAndKafkaConnectorWithWaitForReadiness(final Ext LOGGER.info("Deploying KafkaConnect from: {}", kafkaConnectYaml.getPath()); cmdKubeClient().applyContent(TestUtils.toYamlString(kafkaConnect)); - ResourceManager.waitForResourceReadiness(getResourceApiVersion(KafkaConnect.RESOURCE_PLURAL, acrossUpgradeData.getFromVersion()), kafkaConnect.getMetadata().getName()); + ResourceManager.waitForResourceReadiness(getResourceApiVersion(KafkaConnect.RESOURCE_PLURAL), kafkaConnect.getMetadata().getName()); // in our examples is no sink connector and thus we are using the same as in HEAD verification resourceManager.createResourceWithWait(extensionContext, KafkaConnectorTemplates.kafkaConnector(clusterName) @@ -601,7 +589,7 @@ protected void doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure(final ClientUtils.waitForProducerClientSuccess(testStorage); makeSnapshots(); - logPodImages(clusterName); + logPodImagesWithConnect(TestConstants.CO_NAMESPACE); // Verify FileSink KafkaConnector before upgrade String connectorPodName = kubeClient().listPods(testStorage.getNamespaceName(), Collections.singletonMap(Labels.STRIMZI_KIND_LABEL, KafkaConnect.RESOURCE_KIND)).get(0).getMetadata().getName(); @@ -627,4 +615,13 @@ protected void doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure(final // Verify that pods are stable PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), clusterName); } + + protected String downloadExamplesAndGetPath(CommonVersionModificationData versionModificationData) throws IOException { + if (versionModificationData.getToUrl().equals("HEAD")) { + return PATH_TO_PACKAGING_EXAMPLES; + } else { + File dir = FileUtils.downloadAndUnzip(versionModificationData.getToUrl()); + return dir.getAbsolutePath() + "/" + versionModificationData.getToExamples() + "/examples"; + } + } } diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/AbstractKRaftUpgradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/AbstractKRaftUpgradeST.java new file mode 100644 index 00000000000..edd9a6c4e0a --- /dev/null +++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/AbstractKRaftUpgradeST.java @@ -0,0 +1,228 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.systemtest.upgrade.kraft; + +import io.fabric8.kubernetes.api.model.LabelSelector; +import io.strimzi.api.kafka.model.Kafka; +import io.strimzi.api.kafka.model.KafkaResources; +import io.strimzi.api.kafka.model.nodepool.ProcessRoles; +import io.strimzi.operator.common.Annotations; +import io.strimzi.systemtest.TestConstants; +import io.strimzi.systemtest.resources.ResourceManager; +import io.strimzi.systemtest.resources.crd.KafkaNodePoolResource; +import io.strimzi.systemtest.templates.crd.KafkaNodePoolTemplates; +import io.strimzi.systemtest.templates.crd.KafkaTemplates; +import io.strimzi.systemtest.upgrade.AbstractUpgradeST; +import io.strimzi.systemtest.upgrade.BundleVersionModificationData; +import io.strimzi.systemtest.upgrade.CommonVersionModificationData; +import io.strimzi.systemtest.upgrade.UpgradeKafkaVersion; +import io.strimzi.systemtest.utils.RollingUpdateUtils; +import io.strimzi.systemtest.utils.TestKafkaVersion; +import io.strimzi.systemtest.utils.kafkaUtils.KafkaUserUtils; +import io.strimzi.systemtest.utils.kafkaUtils.KafkaUtils; +import io.strimzi.systemtest.utils.kubeUtils.controllers.DeploymentUtils; +import io.strimzi.systemtest.utils.kubeUtils.objects.PodUtils; +import io.strimzi.test.TestUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.jupiter.api.extension.ExtensionContext; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +import static io.strimzi.test.k8s.KubeClusterResource.cmdKubeClient; +import static io.strimzi.test.k8s.KubeClusterResource.kubeClient; +import static org.junit.jupiter.api.Assertions.fail; + +public class AbstractKRaftUpgradeST extends AbstractUpgradeST { + + private static final Logger LOGGER = LogManager.getLogger(AbstractKRaftUpgradeST.class); + + protected Map brokerPods; + protected Map controllerPods; + + protected static final String CONTROLLER_NODE_NAME = "controller"; + protected static final String BROKER_NODE_NAME = "broker"; + + protected final LabelSelector controllerSelector = KafkaNodePoolResource.getLabelSelector(clusterName, CONTROLLER_NODE_NAME, ProcessRoles.CONTROLLER); + protected final LabelSelector brokerSelector = KafkaNodePoolResource.getLabelSelector(clusterName, BROKER_NODE_NAME, ProcessRoles.BROKER); + + // topics that are just present in Kafka itself are not created as CRs in UTO, thus -3 topics in comparison to regular upgrade + protected final int expectedTopicCount = upgradeTopicCount; + + protected int getExpectedTopicCount() { + return expectedTopicCount; + } + + @Override + protected void makeSnapshots() { + coPods = DeploymentUtils.depSnapshot(TestConstants.CO_NAMESPACE, ResourceManager.getCoDeploymentName()); + eoPods = DeploymentUtils.depSnapshot(TestConstants.CO_NAMESPACE, KafkaResources.entityOperatorDeploymentName(clusterName)); + controllerPods = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, controllerSelector); + brokerPods = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, brokerSelector); + connectPods = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, connectLabelSelector); + } + + @Override + protected void deployKafkaClusterWithWaitForReadiness(final ExtensionContext extensionContext, final BundleVersionModificationData upgradeData, + final UpgradeKafkaVersion upgradeKafkaVersion) { + LOGGER.info("Deploying Kafka: {} in Namespace: {}", clusterName, kubeClient().getNamespace()); + + if (!cmdKubeClient().getResources(getResourceApiVersion(Kafka.RESOURCE_PLURAL)).contains(clusterName)) { + // Deploy a Kafka cluster + if (upgradeData.getFromExamples().equals("HEAD")) { + resourceManager.createResourceWithWait(extensionContext, + KafkaNodePoolTemplates.kafkaNodePoolWithControllerRoleAndPersistentStorage(TestConstants.CO_NAMESPACE, CONTROLLER_NODE_NAME, clusterName, 3).build(), + KafkaNodePoolTemplates.kafkaNodePoolWithBrokerRoleAndPersistentStorage(TestConstants.CO_NAMESPACE, BROKER_NODE_NAME, clusterName, 3).build(), + KafkaTemplates.kafkaPersistent(clusterName, 3, 3) + .editMetadata() + .addToAnnotations(Annotations.ANNO_STRIMZI_IO_NODE_POOLS, "enabled") + .addToAnnotations(Annotations.ANNO_STRIMZI_IO_KRAFT, "enabled") + .endMetadata() + .editSpec() + .editKafka() + .withVersion(upgradeKafkaVersion.getVersion()) + .withMetadataVersion(upgradeKafkaVersion.getMetadataVersion()) + .endKafka() + .endSpec() + .build()); + } else { + kafkaYaml = new File(dir, upgradeData.getFromExamples() + "/examples/kafka/nodepools/kafka-with-kraft.yaml"); + LOGGER.info("Deploying Kafka from: {}", kafkaYaml.getPath()); + // Change kafka version of it's empty (null is for remove the version) + if (upgradeKafkaVersion == null) { + cmdKubeClient().applyContent(KafkaUtils.changeOrRemoveKafkaInKRaft(kafkaYaml, null)); + } else { + cmdKubeClient().applyContent(KafkaUtils.changeOrRemoveKafkaConfigurationInKRaft(kafkaYaml, upgradeKafkaVersion.getVersion(), upgradeKafkaVersion.getMetadataVersion())); + } + // Wait for readiness + waitForReadinessOfKafkaCluster(); + } + } + } + + @Override + protected void waitForKafkaClusterRollingUpdate() { + LOGGER.info("Waiting for Kafka Pods with controller role to be rolled"); + controllerPods = RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(TestConstants.CO_NAMESPACE, controllerSelector, 3, controllerPods); + LOGGER.info("Waiting for Kafka Pods with broker role to be rolled"); + brokerPods = RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(TestConstants.CO_NAMESPACE, brokerSelector, 3, brokerPods); + LOGGER.info("Waiting for EO Deployment to be rolled"); + // Check the TO and UO also got upgraded + eoPods = DeploymentUtils.waitTillDepHasRolled(TestConstants.CO_NAMESPACE, KafkaResources.entityOperatorDeploymentName(clusterName), 1, eoPods); + } + + @Override + protected void waitForReadinessOfKafkaCluster() { + LOGGER.info("Waiting for Kafka Pods with controller role to be ready"); + RollingUpdateUtils.waitForComponentAndPodsReady(TestConstants.CO_NAMESPACE, controllerSelector, 3); + LOGGER.info("Waiting for Kafka Pods with broker role to be ready"); + RollingUpdateUtils.waitForComponentAndPodsReady(TestConstants.CO_NAMESPACE, brokerSelector, 3); + LOGGER.info("Waiting for EO Deployment"); + DeploymentUtils.waitForDeploymentAndPodsReady(TestConstants.CO_NAMESPACE, KafkaResources.entityOperatorDeploymentName(clusterName), 1); + } + + protected void changeKafkaAndMetadataVersion(CommonVersionModificationData versionModificationData, ExtensionContext extensionContext) throws IOException { + changeKafkaAndMetadataVersion(versionModificationData, false, extensionContext); + } + + /** + * Method for changing Kafka `version` and `metadataVersion` fields in Kafka CR based on the current scenario + * @param versionModificationData data structure holding information about the desired steps/versions that should be applied + * @param replaceEvenIfMissing current workaround for the situation when `metadataVersion` is not set in Kafka CR -> that's because previous version of operator + * doesn't contain this kind of field, so even if we set this field in the Kafka CR, it is removed by the operator + * this is needed for correct functionality of the `testUpgradeAcrossVersionsWithUnsupportedKafkaVersion` test + * @param extensionContext context of the test + * @throws IOException exception during application of YAML files + */ + @SuppressWarnings("CyclomaticComplexity") + protected void changeKafkaAndMetadataVersion(CommonVersionModificationData versionModificationData, boolean replaceEvenIfMissing, ExtensionContext extensionContext) throws IOException { + // Get Kafka version + String kafkaVersionFromCR = cmdKubeClient().getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, ".spec.kafka.version"); + kafkaVersionFromCR = kafkaVersionFromCR.equals("") ? null : kafkaVersionFromCR; + // Get Kafka metadata version + String currentMetadataVersion = cmdKubeClient().getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, ".spec.kafka.metadataVersion"); + + String kafkaVersionFromProcedure = versionModificationData.getProcedures().getVersion(); + + // ####################################################################### + // ################# Update CRs to latest version ################### + // ####################################################################### + String examplesPath = downloadExamplesAndGetPath(versionModificationData); + + applyCustomResourcesFromPath(examplesPath, kafkaVersionFromCR); + + // ####################################################################### + + if (versionModificationData.getProcedures() != null && (!currentMetadataVersion.isEmpty() || replaceEvenIfMissing)) { + + if (kafkaVersionFromProcedure != null && !kafkaVersionFromProcedure.isEmpty() && !kafkaVersionFromCR.contains(kafkaVersionFromProcedure)) { + LOGGER.info("Set Kafka version to " + kafkaVersionFromProcedure); + cmdKubeClient().patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, "/spec/kafka/version", kafkaVersionFromProcedure); + + waitForKafkaControllersAndBrokersFinishRollingUpdate(); + } + + String metadataVersion = versionModificationData.getProcedures().getMetadataVersion(); + + if (metadataVersion != null && !metadataVersion.isEmpty()) { + LOGGER.info("Set metadata version to {} (current version is {})", metadataVersion, currentMetadataVersion); + cmdKubeClient().patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, "/spec/kafka/metadataVersion", metadataVersion); + + makeSnapshots(); + } + } + } + + @Override + protected void checkAllImages(BundleVersionModificationData versionModificationData, String namespaceName) { + if (versionModificationData.getImagesAfterOperations().isEmpty()) { + fail("There are no expected images"); + } + + checkContainerImages(controllerSelector, versionModificationData.getKafkaImage()); + checkContainerImages(brokerSelector, versionModificationData.getKafkaImage()); + checkContainerImages(eoSelector, versionModificationData.getTopicOperatorImage()); + checkContainerImages(eoSelector, 1, versionModificationData.getUserOperatorImage()); + } + + @Override + protected void logPodImages(String namespaceName) { + logPodImages(namespaceName, controllerSelector, brokerSelector, eoSelector, coSelector); + } + + @Override + protected void logPodImagesWithConnect(String namespaceName) { + logPodImages(namespaceName, controllerSelector, brokerSelector, eoSelector, coSelector); + } + + protected void waitForKafkaControllersAndBrokersFinishRollingUpdate() { + LOGGER.info("Waiting for Kafka rolling update to finish"); + controllerPods = RollingUpdateUtils.waitTillComponentHasRolled(TestConstants.CO_NAMESPACE, controllerSelector, 3, controllerPods); + brokerPods = RollingUpdateUtils.waitTillComponentHasRolled(TestConstants.CO_NAMESPACE, brokerSelector, 3, brokerPods); + } + + protected void applyKafkaCustomResourceFromPath(String examplesPath, String kafkaVersionFromCR) { + // Change kafka version of it's empty (null is for remove the version) + String metadataVersion = kafkaVersionFromCR == null ? null : TestKafkaVersion.getSpecificVersion(kafkaVersionFromCR).metadataVersion(); + + kafkaYaml = new File(examplesPath + "/kafka/nodepools/kafka-with-kraft.yaml"); + LOGGER.info("Deploying Kafka from: {}", kafkaYaml.getPath()); + cmdKubeClient().applyContent(KafkaUtils.changeOrRemoveKafkaConfigurationInKRaft(kafkaYaml, kafkaVersionFromCR, metadataVersion)); + } + + protected void applyCustomResourcesFromPath(String examplesPath, String kafkaVersionFromCR) { + applyKafkaCustomResourceFromPath(examplesPath, kafkaVersionFromCR); + + kafkaUserYaml = new File(examplesPath + "/user/kafka-user.yaml"); + LOGGER.info("Deploying KafkaUser from: {}", kafkaUserYaml.getPath()); + cmdKubeClient().applyContent(KafkaUserUtils.removeKafkaUserPart(kafkaUserYaml, "authorization")); + + kafkaTopicYaml = new File(examplesPath + "/topic/kafka-topic.yaml"); + LOGGER.info("Deploying KafkaTopic from: {}", kafkaTopicYaml.getPath()); + cmdKubeClient().applyContent(TestUtils.readFile(kafkaTopicYaml)); + } +} diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftKafkaUpgradeDowngradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftKafkaUpgradeDowngradeST.java new file mode 100644 index 00000000000..fe7280f897c --- /dev/null +++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftKafkaUpgradeDowngradeST.java @@ -0,0 +1,278 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.systemtest.upgrade.kraft; + +import io.fabric8.kubernetes.api.model.EnvVar; +import io.strimzi.api.kafka.Crds; +import io.strimzi.api.kafka.model.KafkaBuilder; +import io.strimzi.api.kafka.model.KafkaResources; +import io.strimzi.operator.common.Annotations; +import io.strimzi.systemtest.Environment; +import io.strimzi.systemtest.TestConstants; +import io.strimzi.systemtest.annotations.IsolatedTest; +import io.strimzi.systemtest.kafkaclients.internalClients.KafkaClients; +import io.strimzi.systemtest.kafkaclients.internalClients.KafkaClientsBuilder; +import io.strimzi.systemtest.resources.crd.KafkaResource; +import io.strimzi.systemtest.storage.TestStorage; +import io.strimzi.systemtest.templates.crd.KafkaNodePoolTemplates; +import io.strimzi.systemtest.templates.crd.KafkaTemplates; +import io.strimzi.systemtest.templates.crd.KafkaTopicTemplates; +import io.strimzi.systemtest.utils.ClientUtils; +import io.strimzi.systemtest.utils.RollingUpdateUtils; +import io.strimzi.systemtest.utils.TestKafkaVersion; +import io.strimzi.systemtest.utils.kafkaUtils.KafkaUtils; +import io.strimzi.systemtest.utils.kubeUtils.objects.PodUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.extension.ExtensionContext; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static io.strimzi.systemtest.TestConstants.KRAFT_UPGRADE; +import static io.strimzi.test.k8s.KubeClusterResource.kubeClient; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; + +/** + * This test class contains tests for Kafka upgrade/downgrade from version X to X +/- 1, running in KRaft mode. + * Metadata for upgrade/downgrade procedure are loaded from kafka-versions.yaml in root dir of this repository. + */ +@Tag(KRAFT_UPGRADE) +public class KRaftKafkaUpgradeDowngradeST extends AbstractKRaftUpgradeST { + private static final Logger LOGGER = LogManager.getLogger(KRaftKafkaUpgradeDowngradeST.class); + + private final String continuousTopicName = "continuous-topic"; + private final int continuousClientsMessageCount = 1000; + + @IsolatedTest + void testKafkaClusterUpgrade(ExtensionContext testContext) { + List sortedVersions = TestKafkaVersion.getSupportedKafkaVersions(); + + String producerName = clusterName + "-producer"; + String consumerName = clusterName + "-consumer"; + + for (int x = 0; x < sortedVersions.size() - 1; x++) { + TestKafkaVersion initialVersion = sortedVersions.get(x); + TestKafkaVersion newVersion = sortedVersions.get(x + 1); + + // If it is an upgrade test we keep the metadata version as the lower version number + String metadataVersion = initialVersion.metadataVersion(); + + runVersionChange(initialVersion, newVersion, producerName, consumerName, metadataVersion, 3, 3, testContext); + } + + // ############################## + // Validate that continuous clients finished successfully + // ############################## + ClientUtils.waitForClientsSuccess(producerName, consumerName, TestConstants.CO_NAMESPACE, continuousClientsMessageCount); + // ############################## + } + + @IsolatedTest + void testKafkaClusterDowngrade(ExtensionContext testContext) { + final TestStorage testStorage = storageMap.get(testContext); + List sortedVersions = TestKafkaVersion.getSupportedKafkaVersions(); + + String clusterName = testStorage.getClusterName(); + String producerName = clusterName + "-producer"; + String consumerName = clusterName + "-consumer"; + + for (int x = sortedVersions.size() - 1; x > 0; x--) { + TestKafkaVersion initialVersion = sortedVersions.get(x); + TestKafkaVersion newVersion = sortedVersions.get(x - 1); + + // If it is a downgrade then we make sure that we are using the lowest metadataVersion from the whole list + String metadataVersion = sortedVersions.get(0).metadataVersion(); + runVersionChange(initialVersion, newVersion, producerName, consumerName, metadataVersion, 3, 3, testContext); + } + + // ############################## + // Validate that continuous clients finished successfully + // ############################## + ClientUtils.waitForClientsSuccess(producerName, consumerName, TestConstants.CO_NAMESPACE, continuousClientsMessageCount); + // ############################## + } + + @IsolatedTest + void testUpgradeWithNoMetadataVersionSet(ExtensionContext testContext) { + List sortedVersions = TestKafkaVersion.getSupportedKafkaVersions(); + + String producerName = clusterName + "-producer"; + String consumerName = clusterName + "-consumer"; + + for (int x = 0; x < sortedVersions.size() - 1; x++) { + TestKafkaVersion initialVersion = sortedVersions.get(x); + TestKafkaVersion newVersion = sortedVersions.get(x + 1); + + runVersionChange(initialVersion, newVersion, producerName, consumerName, null, 3, 3, testContext); + } + + // ############################## + // Validate that continuous clients finished successfully + // ############################## + ClientUtils.waitForClientsSuccess(producerName, consumerName, TestConstants.CO_NAMESPACE, continuousClientsMessageCount); + // ############################## + } + + @BeforeAll + void setupEnvironment(final ExtensionContext extensionContext) { + List coEnvVars = new ArrayList<>(); + coEnvVars.add(new EnvVar(Environment.STRIMZI_FEATURE_GATES_ENV, String.join(",", + TestConstants.USE_KRAFT_MODE, TestConstants.USE_KAFKA_NODE_POOLS, TestConstants.USE_UNIDIRECTIONAL_TOPIC_OPERATOR), null)); + + clusterOperator + .defaultInstallation(extensionContext) + .withExtraEnvVars(coEnvVars) + .createInstallation() + .runInstallation(); + } + + @SuppressWarnings({"checkstyle:MethodLength"}) + void runVersionChange(TestKafkaVersion initialVersion, TestKafkaVersion newVersion, String producerName, String consumerName, String initMetadataVersion, int controllerReplicas, int brokerReplicas, ExtensionContext testContext) { + boolean isUpgrade = initialVersion.isUpgrade(newVersion); + Map controllerPods; + Map brokerPods; + + boolean sameMinorVersion = initialVersion.metadataVersion().equals(newVersion.metadataVersion()); + + if (KafkaResource.kafkaClient().inNamespace(TestConstants.CO_NAMESPACE).withName(clusterName).get() == null) { + LOGGER.info("Deploying initial Kafka version {} with metadataVersion={}", initialVersion.version(), initMetadataVersion); + + KafkaBuilder kafka = KafkaTemplates.kafkaPersistent(clusterName, controllerReplicas, brokerReplicas) + .editMetadata() + .addToAnnotations(Annotations.ANNO_STRIMZI_IO_NODE_POOLS, "enabled") + .addToAnnotations(Annotations.ANNO_STRIMZI_IO_KRAFT, "enabled") + .endMetadata() + .editSpec() + .editKafka() + .withVersion(initialVersion.version()) + .withConfig(null) + .endKafka() + .endSpec(); + + // Do not set metadataVersion if it's not passed to method + if (initMetadataVersion != null) { + kafka + .editSpec() + .editKafka() + .withMetadataVersion(initMetadataVersion) + .endKafka() + .endSpec(); + } + + resourceManager.createResourceWithWait(testContext, + KafkaNodePoolTemplates.kafkaNodePoolWithControllerRoleAndPersistentStorage(TestConstants.CO_NAMESPACE, CONTROLLER_NODE_NAME, clusterName, controllerReplicas).build(), + KafkaNodePoolTemplates.kafkaNodePoolWithBrokerRoleAndPersistentStorage(TestConstants.CO_NAMESPACE, BROKER_NODE_NAME, clusterName, brokerReplicas).build(), + kafka.build() + ); + + // ############################## + // Attach clients which will continuously produce/consume messages to/from Kafka brokers during rolling update + // ############################## + // Setup topic, which has 3 replicas and 2 min.isr to see if producer will be able to work during rolling update + resourceManager.createResourceWithWait(testContext, KafkaTopicTemplates.topic(clusterName, continuousTopicName, 3, 3, 2, TestConstants.CO_NAMESPACE).build()); + String producerAdditionConfiguration = "delivery.timeout.ms=20000\nrequest.timeout.ms=20000"; + + KafkaClients kafkaBasicClientJob = new KafkaClientsBuilder() + .withProducerName(producerName) + .withConsumerName(consumerName) + .withBootstrapAddress(KafkaResources.plainBootstrapAddress(clusterName)) + .withTopicName(continuousTopicName) + .withMessageCount(continuousClientsMessageCount) + .withAdditionalConfig(producerAdditionConfiguration) + .withDelayMs(1000) + .build(); + + resourceManager.createResourceWithWait(testContext, kafkaBasicClientJob.producerStrimzi()); + resourceManager.createResourceWithWait(testContext, kafkaBasicClientJob.consumerStrimzi()); + // ############################## + } + + LOGGER.info("Deployment of initial Kafka version (" + initialVersion.version() + ") complete"); + + String controllerVersionResult = KafkaResource.kafkaClient().inNamespace(TestConstants.CO_NAMESPACE).withName(clusterName).get().getStatus().getKafkaVersion(); + LOGGER.info("Pre-change Kafka version: " + controllerVersionResult); + + controllerPods = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, controllerSelector); + brokerPods = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, brokerSelector); + + LOGGER.info("Updating Kafka CR version field to " + newVersion.version()); + + // Change the version in Kafka CR + KafkaResource.replaceKafkaResourceInSpecificNamespace(clusterName, kafka -> { + kafka.getSpec().getKafka().setVersion(newVersion.version()); + }, TestConstants.CO_NAMESPACE); + + LOGGER.info("Waiting for readiness of new Kafka version (" + newVersion.version() + ") to complete"); + + // Wait for the controllers' version change roll + controllerPods = RollingUpdateUtils.waitTillComponentHasRolled(TestConstants.CO_NAMESPACE, controllerSelector, controllerReplicas, controllerPods); + LOGGER.info("1st Controllers roll (image change) is complete"); + + // Wait for the brokers' version change roll + brokerPods = RollingUpdateUtils.waitTillComponentHasRolled(TestConstants.CO_NAMESPACE, brokerSelector, brokerReplicas, brokerPods); + LOGGER.info("1st Brokers roll (image change) is complete"); + + String currentMetadataVersion = KafkaResource.kafkaClient().inNamespace(TestConstants.CO_NAMESPACE).withName(clusterName).get().getSpec().getKafka().getMetadataVersion(); + + LOGGER.info("Deployment of Kafka (" + newVersion.version() + ") complete"); + + PodUtils.verifyThatRunningPodsAreStable(TestConstants.CO_NAMESPACE, clusterName); + + String controllerPodName = kubeClient().listPodsByPrefixInName(TestConstants.CO_NAMESPACE, KafkaResource.getStrimziPodSetName(clusterName, CONTROLLER_NODE_NAME)).get(0).getMetadata().getName(); + String brokerPodName = kubeClient().listPodsByPrefixInName(TestConstants.CO_NAMESPACE, KafkaResource.getStrimziPodSetName(clusterName, BROKER_NODE_NAME)).get(0).getMetadata().getName(); + + // Extract the Kafka version number from the jars in the lib directory + controllerVersionResult = KafkaUtils.getVersionFromKafkaPodLibs(controllerPodName); + LOGGER.info("Post-change Kafka version query returned: " + controllerVersionResult); + + assertThat("Kafka container had version " + controllerVersionResult + " where " + newVersion.version() + + " was expected", controllerVersionResult, is(newVersion.version())); + + // Extract the Kafka version number from the jars in the lib directory + String brokerVersionResult = KafkaUtils.getVersionFromKafkaPodLibs(brokerPodName); + LOGGER.info("Post-change Kafka version query returned: " + brokerVersionResult); + + assertThat("Kafka container had version " + brokerVersionResult + " where " + newVersion.version() + + " was expected", brokerVersionResult, is(newVersion.version())); + + if (isUpgrade && !sameMinorVersion) { + LOGGER.info("Updating Kafka config attribute 'metadataVersion' from '{}' to '{}' version", initialVersion.metadataVersion(), newVersion.metadataVersion()); + + KafkaResource.replaceKafkaResourceInSpecificNamespace(clusterName, kafka -> { + LOGGER.info("Kafka config before updating '{}'", kafka.getSpec().getKafka().toString()); + + kafka.getSpec().getKafka().setMetadataVersion(newVersion.metadataVersion()); + + LOGGER.info("Kafka config after updating '{}'", kafka.getSpec().getKafka().toString()); + }, TestConstants.CO_NAMESPACE); + + LOGGER.info("Metadata version changed, it doesn't require rolling update, so the Pods should be stable"); + PodUtils.verifyThatRunningPodsAreStable(TestConstants.CO_NAMESPACE, clusterName); + assertFalse(RollingUpdateUtils.componentHasRolled(TestConstants.CO_NAMESPACE, controllerSelector, controllerPods)); + assertFalse(RollingUpdateUtils.componentHasRolled(TestConstants.CO_NAMESPACE, brokerSelector, brokerPods)); + } + + if (!isUpgrade) { + LOGGER.info("Verifying that metadataVersion attribute updated correctly to version {}", initMetadataVersion); + assertThat(Crds.kafkaOperation(kubeClient().getClient()).inNamespace(TestConstants.CO_NAMESPACE).withName(clusterName) + .get().getStatus().getKafkaMetadataVersion().contains(initMetadataVersion), is(true)); + } else { + if (currentMetadataVersion != null) { + LOGGER.info("Verifying that metadataVersion attribute updated correctly to version {}", newVersion.metadataVersion()); + assertThat(Crds.kafkaOperation(kubeClient().getClient()).inNamespace(TestConstants.CO_NAMESPACE).withName(clusterName) + .get().getStatus().getKafkaMetadataVersion().contains(newVersion.metadataVersion()), is(true)); + } + } + + LOGGER.info("Waiting till Kafka Cluster {}/{} with specified version {} has the same version in status and specification", TestConstants.CO_NAMESPACE, clusterName, newVersion.version()); + KafkaUtils.waitUntilStatusKafkaVersionMatchesExpectedVersion(clusterName, TestConstants.CO_NAMESPACE, newVersion.version()); + } +} diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziDowngradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziDowngradeST.java new file mode 100644 index 00000000000..cdebcfc6aa1 --- /dev/null +++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziDowngradeST.java @@ -0,0 +1,113 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.systemtest.upgrade.kraft; + +import io.strimzi.systemtest.TestConstants; +import io.strimzi.systemtest.storage.TestStorage; +import io.strimzi.systemtest.upgrade.BundleVersionModificationData; +import io.strimzi.systemtest.upgrade.UpgradeKafkaVersion; +import io.strimzi.systemtest.upgrade.VersionModificationDataLoader; +import io.strimzi.systemtest.utils.StUtils; +import io.strimzi.systemtest.utils.kubeUtils.objects.NamespaceUtils; +import io.strimzi.systemtest.utils.kubeUtils.objects.PodUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.util.List; + +import static io.strimzi.systemtest.TestConstants.CO_NAMESPACE; +import static io.strimzi.systemtest.TestConstants.INTERNAL_CLIENTS_USED; +import static io.strimzi.systemtest.TestConstants.KRAFT_UPGRADE; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +/** + * Class for testing downgrade process of Strimzi with its components when running in KRaft mode + * -> KRaft to KRaft downgrades + * Metadata for the following tests are collected from systemtest/src/test/resources/upgrade/BundleDowngrade.yaml + */ +@Tag(KRAFT_UPGRADE) +@Disabled("The tests are currently disabled, as the KRaft to KRaft downgrade (with operator downgrade) is not handled in Strimzi 0.38.0") +public class KRaftStrimziDowngradeST extends AbstractKRaftUpgradeST { + private static final Logger LOGGER = LogManager.getLogger(KRaftStrimziDowngradeST.class); + private final List bundleDowngradeMetadata = new VersionModificationDataLoader(VersionModificationDataLoader.ModificationType.BUNDLE_DOWNGRADE).getBundleUpgradeOrDowngradeDataList(); + + @ParameterizedTest(name = "testDowngradeStrimziVersion-{0}-{1}") + @MethodSource("io.strimzi.systemtest.upgrade.VersionModificationDataLoader#loadYamlDowngradeDataForKRaft") + @Tag(INTERNAL_CLIENTS_USED) + void testDowngradeStrimziVersion(String from, String to, BundleVersionModificationData parameters, ExtensionContext extensionContext) throws Exception { + assumeTrue(StUtils.isAllowOnCurrentEnvironment(parameters.getEnvFlakyVariable())); + assumeTrue(StUtils.isAllowedOnCurrentK8sVersion(parameters.getEnvMaxK8sVersion())); + + LOGGER.debug("Running downgrade test from version {} to {}", from, to); + performDowngrade(parameters, extensionContext); + } + + @Test + void testDowngradeOfKafkaConnectAndKafkaConnector(final ExtensionContext extensionContext) throws IOException { + final TestStorage testStorage = new TestStorage(extensionContext, TestConstants.CO_NAMESPACE); + final BundleVersionModificationData bundleDowngradeDataWithFeatureGates = bundleDowngradeMetadata.stream() + .filter(bundleMetadata -> bundleMetadata.getFeatureGatesBefore() != null && !bundleMetadata.getFeatureGatesBefore().isEmpty() || + bundleMetadata.getFeatureGatesAfter() != null && !bundleMetadata.getFeatureGatesAfter().isEmpty()).toList().get(0); + UpgradeKafkaVersion upgradeKafkaVersion = new UpgradeKafkaVersion(bundleDowngradeDataWithFeatureGates.getDeployKafkaVersion()); + + doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure(extensionContext, bundleDowngradeDataWithFeatureGates, testStorage, upgradeKafkaVersion); + } + + @SuppressWarnings("MethodLength") + private void performDowngrade(BundleVersionModificationData downgradeData, ExtensionContext extensionContext) throws IOException { + TestStorage testStorage = new TestStorage(extensionContext); + + String lowerMetadataVersion = downgradeData.getProcedures().getMetadataVersion(); + UpgradeKafkaVersion testUpgradeKafkaVersion = new UpgradeKafkaVersion(downgradeData.getDeployKafkaVersion(), lowerMetadataVersion); + + // Setup env + // We support downgrade only when you didn't upgrade to new inter.broker.protocol.version and log.message.format.version + // https://strimzi.io/docs/operators/latest/full/deploying.html#con-target-downgrade-version-str + + setupEnvAndUpgradeClusterOperator(extensionContext, downgradeData, testStorage, testUpgradeKafkaVersion, TestConstants.CO_NAMESPACE); + + logPodImages(TestConstants.CO_NAMESPACE); + + // Downgrade CO + changeClusterOperator(downgradeData, TestConstants.CO_NAMESPACE, extensionContext); + + // Wait for Kafka cluster rolling update + waitForKafkaClusterRollingUpdate(); + + logPodImages(TestConstants.CO_NAMESPACE); + + // Downgrade kafka + changeKafkaAndMetadataVersion(downgradeData, extensionContext); + + // Verify that pods are stable + PodUtils.verifyThatRunningPodsAreStable(TestConstants.CO_NAMESPACE, clusterName); + + checkAllImages(downgradeData, TestConstants.CO_NAMESPACE); + + // Verify upgrade + verifyProcedure(downgradeData, testStorage.getProducerName(), testStorage.getConsumerName(), TestConstants.CO_NAMESPACE); + } + + @BeforeEach + void setupEnvironment() { + cluster.createNamespace(TestConstants.CO_NAMESPACE); + StUtils.copyImagePullSecrets(TestConstants.CO_NAMESPACE); + } + + @AfterEach + void afterEach() { + deleteInstalledYamls(coDir, TestConstants.CO_NAMESPACE); + NamespaceUtils.deleteNamespaceWithWait(CO_NAMESPACE); + } +} diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziUpgradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziUpgradeST.java new file mode 100644 index 00000000000..c05063f5d34 --- /dev/null +++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziUpgradeST.java @@ -0,0 +1,226 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.systemtest.upgrade.kraft; + +import io.strimzi.api.kafka.model.KafkaResources; +import io.strimzi.api.kafka.model.KafkaTopic; +import io.strimzi.systemtest.TestConstants; +import io.strimzi.systemtest.annotations.IsolatedTest; +import io.strimzi.systemtest.resources.ResourceManager; +import io.strimzi.systemtest.resources.crd.KafkaResource; +import io.strimzi.systemtest.storage.TestStorage; +import io.strimzi.systemtest.upgrade.BundleVersionModificationData; +import io.strimzi.systemtest.upgrade.UpgradeKafkaVersion; +import io.strimzi.systemtest.upgrade.VersionModificationDataLoader; +import io.strimzi.systemtest.utils.RollingUpdateUtils; +import io.strimzi.systemtest.utils.StUtils; +import io.strimzi.systemtest.utils.TestKafkaVersion; +import io.strimzi.systemtest.utils.kafkaUtils.KafkaTopicUtils; +import io.strimzi.systemtest.utils.kafkaUtils.KafkaUtils; +import io.strimzi.systemtest.utils.kubeUtils.controllers.DeploymentUtils; +import io.strimzi.systemtest.utils.kubeUtils.objects.NamespaceUtils; +import io.strimzi.systemtest.utils.kubeUtils.objects.PodUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.util.Map; + +import static io.strimzi.systemtest.TestConstants.INTERNAL_CLIENTS_USED; +import static io.strimzi.systemtest.TestConstants.KRAFT_UPGRADE; +import static io.strimzi.test.k8s.KubeClusterResource.cmdKubeClient; +import static io.strimzi.test.k8s.KubeClusterResource.kubeClient; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +/** + * Class for testing upgrade process of Strimzi with its components when running in KRaft mode + * -> KRaft to KRaft upgrades + * Metadata for the following tests are collected from systemtest/src/test/resources/upgrade/BundleUpgrade.yaml + */ +@Tag(KRAFT_UPGRADE) +public class KRaftStrimziUpgradeST extends AbstractKRaftUpgradeST { + + private static final Logger LOGGER = LogManager.getLogger(KRaftStrimziUpgradeST.class); + private final BundleVersionModificationData acrossUpgradeData = new VersionModificationDataLoader(VersionModificationDataLoader.ModificationType.BUNDLE_UPGRADE).buildDataForUpgradeAcrossVersionsForKRaft(); + + @ParameterizedTest(name = "from: {0} (using FG <{2}>) to: {1} (using FG <{3}>) ") + @MethodSource("io.strimzi.systemtest.upgrade.VersionModificationDataLoader#loadYamlUpgradeDataForKRaft") + @Tag(INTERNAL_CLIENTS_USED) + void testUpgradeStrimziVersion(String fromVersion, String toVersion, String fgBefore, String fgAfter, BundleVersionModificationData upgradeData, ExtensionContext extensionContext) throws Exception { + assumeTrue(StUtils.isAllowOnCurrentEnvironment(upgradeData.getEnvFlakyVariable())); + assumeTrue(StUtils.isAllowedOnCurrentK8sVersion(upgradeData.getEnvMaxK8sVersion())); + + performUpgrade(upgradeData, extensionContext); + } + + @IsolatedTest + void testUpgradeKafkaWithoutVersion(ExtensionContext extensionContext) throws IOException { + UpgradeKafkaVersion upgradeKafkaVersion = UpgradeKafkaVersion.getKafkaWithVersionFromUrl(acrossUpgradeData.getFromKafkaVersionsUrl(), acrossUpgradeData.getStartingKafkaVersion()); + upgradeKafkaVersion.setVersion(null); + + TestStorage testStorage = new TestStorage(extensionContext); + + // Setup env + setupEnvAndUpgradeClusterOperator(extensionContext, acrossUpgradeData, testStorage, upgradeKafkaVersion, TestConstants.CO_NAMESPACE); + + Map controllerSnapshot = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, controllerSelector); + Map brokerSnapshot = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, brokerSelector); + Map eoSnapshot = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, eoSelector); + + // Make snapshots of all Pods + makeSnapshots(); + + // Upgrade CO + changeClusterOperator(acrossUpgradeData, TestConstants.CO_NAMESPACE, extensionContext); + + logPodImages(TestConstants.CO_NAMESPACE); + + RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(TestConstants.CO_NAMESPACE, controllerSelector, 3, controllerSnapshot); + RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(TestConstants.CO_NAMESPACE, brokerSelector, 3, brokerSnapshot); + DeploymentUtils.waitTillDepHasRolled(TestConstants.CO_NAMESPACE, KafkaResources.entityOperatorDeploymentName(clusterName), 1, eoSnapshot); + + logPodImages(TestConstants.CO_NAMESPACE); + checkAllImages(acrossUpgradeData, TestConstants.CO_NAMESPACE); + + // Verify that Pods are stable + PodUtils.verifyThatRunningPodsAreStable(TestConstants.CO_NAMESPACE, clusterName); + // Verify upgrade + verifyProcedure(acrossUpgradeData, testStorage.getProducerName(), testStorage.getConsumerName(), TestConstants.CO_NAMESPACE); + + String controllerPodName = kubeClient().listPodsByPrefixInName(TestConstants.CO_NAMESPACE, KafkaResource.getStrimziPodSetName(clusterName, CONTROLLER_NODE_NAME)).get(0).getMetadata().getName(); + String brokerPodName = kubeClient().listPodsByPrefixInName(TestConstants.CO_NAMESPACE, KafkaResource.getStrimziPodSetName(clusterName, BROKER_NODE_NAME)).get(0).getMetadata().getName(); + + assertThat(KafkaUtils.getVersionFromKafkaPodLibs(controllerPodName), containsString(acrossUpgradeData.getProcedures().getVersion())); + assertThat(KafkaUtils.getVersionFromKafkaPodLibs(brokerPodName), containsString(acrossUpgradeData.getProcedures().getVersion())); + } + + @IsolatedTest + void testUpgradeAcrossVersionsWithUnsupportedKafkaVersion(ExtensionContext extensionContext) throws IOException { + TestStorage testStorage = new TestStorage(extensionContext); + UpgradeKafkaVersion upgradeKafkaVersion = UpgradeKafkaVersion.getKafkaWithVersionFromUrl(acrossUpgradeData.getFromKafkaVersionsUrl(), acrossUpgradeData.getStartingKafkaVersion()); + + // Setup env + setupEnvAndUpgradeClusterOperator(extensionContext, acrossUpgradeData, testStorage, upgradeKafkaVersion, TestConstants.CO_NAMESPACE); + + // Make snapshots of all Pods + makeSnapshots(); + + // Upgrade CO + changeClusterOperator(acrossUpgradeData, TestConstants.CO_NAMESPACE, extensionContext); + + waitForKafkaClusterRollingUpdate(); + + logPodImages(TestConstants.CO_NAMESPACE); + + // Upgrade kafka + changeKafkaAndMetadataVersion(acrossUpgradeData, true, extensionContext); + + logPodImages(TestConstants.CO_NAMESPACE); + + checkAllImages(acrossUpgradeData, TestConstants.CO_NAMESPACE); + + // Verify that Pods are stable + PodUtils.verifyThatRunningPodsAreStable(TestConstants.CO_NAMESPACE, clusterName); + + // Verify upgrade + verifyProcedure(acrossUpgradeData, testStorage.getProducerName(), testStorage.getConsumerName(), TestConstants.CO_NAMESPACE); + } + + @IsolatedTest + void testUpgradeAcrossVersionsWithNoKafkaVersion(ExtensionContext extensionContext) throws IOException { + TestStorage testStorage = new TestStorage(extensionContext); + + // Setup env + setupEnvAndUpgradeClusterOperator(extensionContext, acrossUpgradeData, testStorage, null, TestConstants.CO_NAMESPACE); + + // Upgrade CO + changeClusterOperator(acrossUpgradeData, TestConstants.CO_NAMESPACE, extensionContext); + + // Wait till first upgrade finished + controllerPods = RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(TestConstants.CO_NAMESPACE, controllerSelector, 3, controllerPods); + brokerPods = RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(TestConstants.CO_NAMESPACE, brokerSelector, 3, brokerPods); + eoPods = DeploymentUtils.waitTillDepHasRolled(TestConstants.CO_NAMESPACE, KafkaResources.entityOperatorDeploymentName(clusterName), 1, eoPods); + + LOGGER.info("Rolling to new images has finished!"); + logPodImages(TestConstants.CO_NAMESPACE); + + // Upgrade kafka + changeKafkaAndMetadataVersion(acrossUpgradeData, extensionContext); + + logPodImages(TestConstants.CO_NAMESPACE); + + checkAllImages(acrossUpgradeData, TestConstants.CO_NAMESPACE); + + // Verify that Pods are stable + PodUtils.verifyThatRunningPodsAreStable(TestConstants.CO_NAMESPACE, clusterName); + + // Verify upgrade + verifyProcedure(acrossUpgradeData, testStorage.getProducerName(), testStorage.getConsumerName(), TestConstants.CO_NAMESPACE); + } + + @IsolatedTest + void testUpgradeOfKafkaConnectAndKafkaConnector(final ExtensionContext extensionContext) throws IOException { + final TestStorage testStorage = new TestStorage(extensionContext, TestConstants.CO_NAMESPACE); + final UpgradeKafkaVersion upgradeKafkaVersion = new UpgradeKafkaVersion(acrossUpgradeData.getDefaultKafka()); + + doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure(extensionContext, acrossUpgradeData, testStorage, upgradeKafkaVersion); + } + + private void performUpgrade(BundleVersionModificationData upgradeData, ExtensionContext extensionContext) throws IOException { + TestStorage testStorage = new TestStorage(extensionContext); + + // leave empty, so the original Kafka version from appropriate Strimzi's yaml will be used + UpgradeKafkaVersion upgradeKafkaVersion = new UpgradeKafkaVersion(); + + // Setup env + setupEnvAndUpgradeClusterOperator(extensionContext, upgradeData, testStorage, upgradeKafkaVersion, TestConstants.CO_NAMESPACE); + + // Upgrade CO to HEAD + logPodImages(TestConstants.CO_NAMESPACE); + + changeClusterOperator(upgradeData, TestConstants.CO_NAMESPACE, extensionContext); + + if (TestKafkaVersion.supportedVersionsContainsVersion(upgradeData.getDefaultKafkaVersionPerStrimzi())) { + waitForKafkaClusterRollingUpdate(); + } + + logPodImages(TestConstants.CO_NAMESPACE); + + // Upgrade kafka + changeKafkaAndMetadataVersion(upgradeData, true, extensionContext); + + logPodImages(TestConstants.CO_NAMESPACE); + + checkAllImages(upgradeData, TestConstants.CO_NAMESPACE); + + // Verify that Pods are stable + PodUtils.verifyThatRunningPodsAreStable(TestConstants.CO_NAMESPACE, clusterName); + + // Verify upgrade + verifyProcedure(upgradeData, testStorage.getProducerName(), testStorage.getConsumerName(), TestConstants.CO_NAMESPACE); + } + + @BeforeEach + void setupEnvironment() { + cluster.createNamespace(TestConstants.CO_NAMESPACE); + StUtils.copyImagePullSecrets(TestConstants.CO_NAMESPACE); + } + + protected void afterEachMayOverride(ExtensionContext extensionContext) { + // delete all topics created in test + cmdKubeClient(TestConstants.CO_NAMESPACE).deleteAllByResource(KafkaTopic.RESOURCE_KIND); + KafkaTopicUtils.waitForTopicWithPrefixDeletion(TestConstants.CO_NAMESPACE, topicName); + + ResourceManager.getInstance().deleteResources(extensionContext); + NamespaceUtils.deleteNamespaceWithWait(TestConstants.CO_NAMESPACE); + } +} diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/KafkaUpgradeDowngradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/KafkaUpgradeDowngradeST.java similarity index 99% rename from systemtest/src/test/java/io/strimzi/systemtest/upgrade/KafkaUpgradeDowngradeST.java rename to systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/KafkaUpgradeDowngradeST.java index c8c1528aab8..4951111093a 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/KafkaUpgradeDowngradeST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/KafkaUpgradeDowngradeST.java @@ -2,7 +2,7 @@ * Copyright Strimzi authors. * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). */ -package io.strimzi.systemtest.upgrade; +package io.strimzi.systemtest.upgrade.regular; import io.strimzi.api.kafka.Crds; import io.strimzi.api.kafka.model.KafkaBuilder; @@ -16,6 +16,7 @@ import io.strimzi.systemtest.storage.TestStorage; import io.strimzi.systemtest.templates.crd.KafkaTemplates; import io.strimzi.systemtest.templates.crd.KafkaTopicTemplates; +import io.strimzi.systemtest.upgrade.AbstractUpgradeST; import io.strimzi.systemtest.utils.ClientUtils; import io.strimzi.systemtest.utils.RollingUpdateUtils; import io.strimzi.systemtest.utils.TestKafkaVersion; diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/OlmUpgradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/OlmUpgradeST.java similarity index 96% rename from systemtest/src/test/java/io/strimzi/systemtest/upgrade/OlmUpgradeST.java rename to systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/OlmUpgradeST.java index 51c657ab23c..ef9cb9af731 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/OlmUpgradeST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/OlmUpgradeST.java @@ -2,7 +2,7 @@ * Copyright Strimzi authors. * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). */ -package io.strimzi.systemtest.upgrade; +package io.strimzi.systemtest.upgrade.regular; import com.fasterxml.jackson.dataformat.yaml.YAMLMapper; import io.strimzi.api.kafka.model.KafkaResources; @@ -17,6 +17,9 @@ import io.strimzi.systemtest.resources.operator.configuration.OlmConfiguration; import io.strimzi.systemtest.resources.operator.configuration.OlmConfigurationBuilder; import io.strimzi.systemtest.storage.TestStorage; +import io.strimzi.systemtest.upgrade.AbstractUpgradeST; +import io.strimzi.systemtest.upgrade.OlmVersionModificationData; +import io.strimzi.systemtest.upgrade.VersionModificationDataLoader; import io.strimzi.systemtest.utils.ClientUtils; import io.strimzi.systemtest.utils.FileUtils; import io.strimzi.systemtest.utils.RollingUpdateUtils; @@ -139,9 +142,9 @@ void testStrimziUpgrade(ExtensionContext extensionContext) throws IOException { // ======== Cluster Operator upgrade ends ======== // ======== Kafka upgrade starts ======== - logPodImages(clusterName); + logPodImages(TestConstants.CO_NAMESPACE); changeKafkaAndLogFormatVersion(olmUpgradeData, extensionContext); - logPodImages(clusterName); + logPodImages(TestConstants.CO_NAMESPACE); // ======== Kafka upgrade ends ======== // Wait for messages of previously created clients diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/StrimziDowngradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziDowngradeST.java similarity index 93% rename from systemtest/src/test/java/io/strimzi/systemtest/upgrade/StrimziDowngradeST.java rename to systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziDowngradeST.java index e3577aa7d21..c7f03cd7c8c 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/StrimziDowngradeST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziDowngradeST.java @@ -2,11 +2,15 @@ * Copyright Strimzi authors. * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). */ -package io.strimzi.systemtest.upgrade; +package io.strimzi.systemtest.upgrade.regular; import io.strimzi.systemtest.TestConstants; import io.strimzi.systemtest.annotations.KRaftNotSupported; import io.strimzi.systemtest.storage.TestStorage; +import io.strimzi.systemtest.upgrade.AbstractUpgradeST; +import io.strimzi.systemtest.upgrade.BundleVersionModificationData; +import io.strimzi.systemtest.upgrade.UpgradeKafkaVersion; +import io.strimzi.systemtest.upgrade.VersionModificationDataLoader; import io.strimzi.systemtest.utils.StUtils; import io.strimzi.systemtest.utils.kubeUtils.objects.NamespaceUtils; import io.strimzi.systemtest.utils.kubeUtils.objects.PodUtils; @@ -71,12 +75,12 @@ private void performDowngrade(BundleVersionModificationData downgradeData, Exten // We support downgrade only when you didn't upgrade to new inter.broker.protocol.version and log.message.format.version // https://strimzi.io/docs/operators/latest/full/deploying.html#con-target-downgrade-version-str setupEnvAndUpgradeClusterOperator(extensionContext, downgradeData, testStorage, testUpgradeKafkaVersion, TestConstants.CO_NAMESPACE); - logPodImages(clusterName); + logPodImages(TestConstants.CO_NAMESPACE); // Downgrade CO changeClusterOperator(downgradeData, TestConstants.CO_NAMESPACE, extensionContext); // Wait for Kafka cluster rolling update waitForKafkaClusterRollingUpdate(); - logPodImages(clusterName); + logPodImages(TestConstants.CO_NAMESPACE); // Verify that pods are stable PodUtils.verifyThatRunningPodsAreStable(TestConstants.CO_NAMESPACE, clusterName); checkAllImages(downgradeData, TestConstants.CO_NAMESPACE); diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/StrimziUpgradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziUpgradeST.java similarity index 92% rename from systemtest/src/test/java/io/strimzi/systemtest/upgrade/StrimziUpgradeST.java rename to systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziUpgradeST.java index fb1c7aad43f..dcba8f333b9 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/StrimziUpgradeST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziUpgradeST.java @@ -2,7 +2,7 @@ * Copyright Strimzi authors. * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). */ -package io.strimzi.systemtest.upgrade; +package io.strimzi.systemtest.upgrade.regular; import io.strimzi.api.kafka.model.KafkaResources; import io.strimzi.api.kafka.model.KafkaTopic; @@ -10,6 +10,10 @@ import io.strimzi.systemtest.annotations.KRaftNotSupported; import io.strimzi.systemtest.resources.ResourceManager; import io.strimzi.systemtest.storage.TestStorage; +import io.strimzi.systemtest.upgrade.AbstractUpgradeST; +import io.strimzi.systemtest.upgrade.BundleVersionModificationData; +import io.strimzi.systemtest.upgrade.UpgradeKafkaVersion; +import io.strimzi.systemtest.upgrade.VersionModificationDataLoader; import io.strimzi.systemtest.utils.RollingUpdateUtils; import io.strimzi.systemtest.utils.StUtils; import io.strimzi.systemtest.utils.TestKafkaVersion; @@ -74,7 +78,7 @@ void testUpgradeKafkaWithoutVersion(ExtensionContext extensionContext) throws IO setupEnvAndUpgradeClusterOperator(extensionContext, acrossUpgradeData, testStorage, upgradeKafkaVersion, TestConstants.CO_NAMESPACE); Map zooSnapshot = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, zkSelector); - Map kafkaSnapshot = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, zkSelector); + Map kafkaSnapshot = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, kafkaSelector); Map eoSnapshot = DeploymentUtils.depSnapshot(TestConstants.CO_NAMESPACE, KafkaResources.entityOperatorDeploymentName(clusterName)); // Make snapshots of all Pods @@ -83,13 +87,13 @@ void testUpgradeKafkaWithoutVersion(ExtensionContext extensionContext) throws IO // Upgrade CO changeClusterOperator(acrossUpgradeData, TestConstants.CO_NAMESPACE, extensionContext); - logPodImages(clusterName); + logPodImages(TestConstants.CO_NAMESPACE); RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(TestConstants.CO_NAMESPACE, zkSelector, 3, zooSnapshot); - RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(TestConstants.CO_NAMESPACE, zkSelector, 3, kafkaSnapshot); + RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(TestConstants.CO_NAMESPACE, kafkaSelector, 3, kafkaSnapshot); DeploymentUtils.waitTillDepHasRolled(TestConstants.CO_NAMESPACE, KafkaResources.entityOperatorDeploymentName(clusterName), 1, eoSnapshot); - logPodImages(clusterName); + logPodImages(TestConstants.CO_NAMESPACE); checkAllImages(acrossUpgradeData, TestConstants.CO_NAMESPACE); // Verify that Pods are stable @@ -112,10 +116,10 @@ void testUpgradeAcrossVersionsWithUnsupportedKafkaVersion(ExtensionContext exten // Upgrade CO changeClusterOperator(acrossUpgradeData, TestConstants.CO_NAMESPACE, extensionContext); - logPodImages(clusterName); + logPodImages(TestConstants.CO_NAMESPACE); // Upgrade kafka changeKafkaAndLogFormatVersion(acrossUpgradeData, extensionContext); - logPodImages(clusterName); + logPodImages(TestConstants.CO_NAMESPACE); checkAllImages(acrossUpgradeData, TestConstants.CO_NAMESPACE); // Verify that Pods are stable PodUtils.verifyThatRunningPodsAreStable(TestConstants.CO_NAMESPACE, clusterName); @@ -136,10 +140,10 @@ void testUpgradeAcrossVersionsWithNoKafkaVersion(ExtensionContext extensionConte eoPods = DeploymentUtils.waitTillDepHasRolled(TestConstants.CO_NAMESPACE, KafkaResources.entityOperatorDeploymentName(clusterName), 1, eoPods); LOGGER.info("Rolling to new images has finished!"); - logPodImages(clusterName); + logPodImages(TestConstants.CO_NAMESPACE); // Upgrade kafka changeKafkaAndLogFormatVersion(acrossUpgradeData, extensionContext); - logPodImages(clusterName); + logPodImages(TestConstants.CO_NAMESPACE); checkAllImages(acrossUpgradeData, TestConstants.CO_NAMESPACE); // Verify that Pods are stable PodUtils.verifyThatRunningPodsAreStable(TestConstants.CO_NAMESPACE, clusterName); @@ -164,17 +168,17 @@ private void performUpgrade(BundleVersionModificationData upgradeData, Extension setupEnvAndUpgradeClusterOperator(extensionContext, upgradeData, testStorage, upgradeKafkaVersion, TestConstants.CO_NAMESPACE); // Upgrade CO to HEAD - logPodImages(clusterName); + logPodImages(TestConstants.CO_NAMESPACE); changeClusterOperator(upgradeData, TestConstants.CO_NAMESPACE, extensionContext); if (TestKafkaVersion.supportedVersionsContainsVersion(upgradeData.getDefaultKafkaVersionPerStrimzi())) { waitForKafkaClusterRollingUpdate(); } - logPodImages(clusterName); + logPodImages(TestConstants.CO_NAMESPACE); // Upgrade kafka changeKafkaAndLogFormatVersion(upgradeData, extensionContext); - logPodImages(clusterName); + logPodImages(TestConstants.CO_NAMESPACE); checkAllImages(upgradeData, TestConstants.CO_NAMESPACE); // Verify that Pods are stable diff --git a/systemtest/src/test/resources/upgrade/BundleDowngrade.yaml b/systemtest/src/test/resources/upgrade/BundleDowngrade.yaml index 5948ba11dcf..06b1757a5ba 100644 --- a/systemtest/src/test/resources/upgrade/BundleDowngrade.yaml +++ b/systemtest/src/test/resources/upgrade/BundleDowngrade.yaml @@ -70,3 +70,4 @@ flakyEnvVariable: none reason: Test is working on all environment used by QE. featureGatesBefore: "-KafkaNodePools" + featureGatesAfter: ""