Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ST] Add STs for Connector offset management feature #10904

Merged
merged 2 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,30 @@
* [connect](labels/connect.md)


## testConnectorOffsetManagement

**Description:** The test verifies functionality of the connector offset management feature by going through all three operation that we can do - `list`, `alter`, `reset`. To do that, it uses one ConfigMap, that is initially created by Cluster Operator during the `list` operation, then it is used for altering the offset in the `alter` phase. For using the particular ConfigMap, we need to specify it on two places - `.spec.listOffsets.toConfigMap` and `.spec.alterOffsets.fromConfigMap`. To verify that everything is really reflected correctly, there are calls to Connect API for the particular Connector throughout the test - where the real offset is gathered. In order to do the `alter` and `reset` the offsets, the particular Connector have to be stopped - in the `.spec.state` - otherwise a warning will be raised in the status section of the Connector and the offsets will not be updated.

**Steps:**

| Step | Action | Result |
| - | - | - |
| 1. | Create KafkaNodePools, Kafka, KafkaConnect with use of Connectors enabled and File Sink plugin. | KafkaNodePools, Kafka, and KafkaConnect are deployed successfully. |
| 2. | Create KafkaConnector for the File Sink plugin and configuration for the offset management - specified ConfigMap in `.spec.listOffsets.toConfigMap` and `.spec.alterOffsets.fromConfigMap`. | KafkaConnector is deployed successfully. |
| 3. | Together with KafkaConnect and KafkaConnector, deploy scraper Pod for obtaining the offsets from Connect API. Also deploy NetworkPolicies for accessing the KafkaConnect from the scraper Pod. | Scraper Pod and NetworkPolicies are deployed, Connect API is now accessible from the scraper Pod. |
| 4. | Produce and Consume 100 messages and wait for the offset to be updated on the Connector. | Messages are successfully transmitted and the offset is updated. |
| 5. | List the offsets - annotate KafkaConnector using the strimzi.io/connector-offsets annotation set to `list`, wait for the creation of the ConfigMap containing the offsets. | Offsets are successfully listed and added into the newly created ConfigMap. |
| 6. | Verify that the ConfigMap contains correct (and expected) offset. | Offsets in the ConfigMap are correct. |
| 7. | Change the offset in the ConfigMap to `20`, stop the Connector, and apply the strimzi.io/connector-offsets annotation set to `alter` to alter the offsets. | The offset in the ConfigMap is changed to `20`, Connector is stopped, and the annotation is applied. |
| 8. | Wait for the removal of the annotation from KafkaConnector (determining that the offsets are altered) and check the Connect API if the values are correct. | The annotation is removed and the offsets from Connect API are correct (set to `20`). |
| 9. | Apply the strimzi.io/connector-offsets annotation set to `reset` (the KafkaConnector is still stopped). | The annotation is applied to the KafkaConnector resource. |
| 10. | Finally, verify that the offsets are empty (the reset was successful) on the Connect API endpoint. | The response from the Connect API shows that the offsets are really empty. |

**Labels:**

* [connect](labels/connect.md)


## testConnectorTaskAutoRestart

**Description:** Test the automatic restart functionality of Kafka Connect tasks when they fail.
Expand Down
15 changes: 8 additions & 7 deletions development-docs/systemtests/labels/connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,25 @@ maintaining data consistency and availability in a streaming ecosystem.
<!-- generated part -->
**Tests:**
- [testBuildFailsWithWrongChecksumOfArtifact](../io.strimzi.systemtest.connect.ConnectBuilderST.md)
- [testKafkaConnectAndConnectorStateWithFileSinkPlugin](../io.strimzi.systemtest.connect.ConnectST.md)
- [testScaleConnectWithoutConnectorToZero](../io.strimzi.systemtest.connect.ConnectST.md)
- [testScaleConnectWithConnectorToZero](../io.strimzi.systemtest.connect.ConnectST.md)
- [testBuildWithJarTgzAndZip](../io.strimzi.systemtest.connect.ConnectBuilderST.md)
- [testKafkaConnectAndConnectorFileSinkPlugin](../io.strimzi.systemtest.connect.ConnectST.md)
- [testKafkaConnectWithPlainAndScramShaAuthentication](../io.strimzi.systemtest.connect.ConnectST.md)
- [testKafkaConnectScaleUpScaleDown](../io.strimzi.systemtest.connect.ConnectST.md)
- [testDeployRollUndeploy](../io.strimzi.systemtest.connect.ConnectST.md)
- [testMultiNodeKafkaConnectWithConnectorCreation](../io.strimzi.systemtest.connect.ConnectST.md)
- [testConnectorTaskAutoRestart](../io.strimzi.systemtest.connect.ConnectST.md)
- [testPushIntoImageStream](../io.strimzi.systemtest.connect.ConnectBuilderST.md)
- [testConnectTlsAuthWithWeirdUserName](../io.strimzi.systemtest.connect.ConnectST.md)
- [testKafkaConnectAndConnectorStateWithFileSinkPlugin](../io.strimzi.systemtest.connect.ConnectST.md)
- [testScaleConnectWithConnectorToZero](../io.strimzi.systemtest.connect.ConnectST.md)
- [testKafkaConnectAndConnectorFileSinkPlugin](../io.strimzi.systemtest.connect.ConnectST.md)
- [testConnectorOffsetManagement](../io.strimzi.systemtest.connect.ConnectST.md)
- [testCustomAndUpdatedValues](../io.strimzi.systemtest.connect.ConnectST.md)
- [testSecretsWithKafkaConnectWithTlsAndTlsClientAuthentication](../io.strimzi.systemtest.connect.ConnectST.md)
- [testScaleConnectAndConnectorSubresource](../io.strimzi.systemtest.connect.ConnectST.md)
- [testMountingSecretAndConfigMapAsVolumesAndEnvVars](../io.strimzi.systemtest.connect.ConnectST.md)
- [testMultiNodeKafkaConnectWithConnectorCreation](../io.strimzi.systemtest.connect.ConnectST.md)
- [testConnectorTaskAutoRestart](../io.strimzi.systemtest.connect.ConnectST.md)
- [testKafkaConnectWithScramShaAuthenticationRolledAfterPasswordChanged](../io.strimzi.systemtest.connect.ConnectST.md)
- [testPushIntoImageStream](../io.strimzi.systemtest.connect.ConnectBuilderST.md)
- [testJvmAndResources](../io.strimzi.systemtest.connect.ConnectST.md)
- [testConnectTlsAuthWithWeirdUserName](../io.strimzi.systemtest.connect.ConnectST.md)
- [testBuildOtherPluginTypeWithAndWithoutFileName](../io.strimzi.systemtest.connect.ConnectBuilderST.md)
- [testSecretsWithKafkaConnectWithTlsAndScramShaAuthentication](../io.strimzi.systemtest.connect.ConnectST.md)
- [testConnectScramShaAuthWithWeirdUserName](../io.strimzi.systemtest.connect.ConnectST.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
*/
package io.strimzi.systemtest.utils.kafkaUtils;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.strimzi.api.kafka.model.connect.KafkaConnectResources;
import io.strimzi.api.kafka.model.connector.KafkaConnector;
import io.strimzi.api.kafka.model.connector.KafkaConnectorStatus;
Expand Down Expand Up @@ -224,4 +227,101 @@ public static void loggerStabilityWait(String namespaceName, String connectClust
}
);
}

/**
* Waits for a removal of the annotation from the KafkaConnector resource.
*
* @param namespaceName name of the Namespace where the KafkaConnector resource is present
* @param connectorName name of the KafkaConnector which should be checked
* @param annotationName name of the annotation that should be deleted from the KafkaConnector's metadata
*/
public static void waitForRemovalOfTheAnnotation(String namespaceName, String connectorName, String annotationName) {
LOGGER.info("Waiting for annotation: {} to be removed from KafkaConnector: {}/{}", annotationName, namespaceName, connectorName);

TestUtils.waitFor(String.format("annotation %s to be removed from KafkaConnector: %s/%s", annotationName, namespaceName, connectorName),
TestConstants.GLOBAL_POLL_INTERVAL_5_SECS,
TestConstants.GLOBAL_STATUS_TIMEOUT,
() -> KafkaConnectorResource.kafkaConnectorClient().inNamespace(namespaceName).withName(connectorName).get().getMetadata().getAnnotations().get(annotationName) == null
);
}

/**
* Using Connect API it collects the offsets from the /offset endpoint of the particular connector.
* It returns the result in JsonNode object for easier handling in the particular tests.
*
* @param namespaceName name of the Namespace where the scraper Pod and Connect are running
* @param scraperPodName name of the scraper Pod name for execution of the cURL command
* @param serviceName name of the service which exposes the 8083 port
* @param connectorName name of the connector that should be checked for the offsets
* @return JsonNode object with the offsets (the result of the API call)
* @throws JsonProcessingException when the JsonNode object cannot be processed
*/
public static JsonNode getOffsetOfConnectorFromConnectAPI(
String namespaceName,
String scraperPodName,
String serviceName,
String connectorName
) throws JsonProcessingException {
final ObjectMapper mapper = new ObjectMapper();

return mapper.readTree(cmdKubeClient().namespace(namespaceName).execInPod(scraperPodName,
"curl", "-X", "GET",
"http://" + serviceName + ":8083/connectors/" + connectorName + "/offsets").out().trim());
}

/**
* Waits for a specific offset to be present in the File Sink Connector.
*
* @param namespaceName name of the Namespace where the scraper Pod and Connect are running
* @param scraperPodName name of the scraper Pod name for execution of the cURL command
* @param connectName name of the KafkaConnect resource which should be used for building the service name
* @param connectorName name of the connector that should be checked for the offsets
* @param expectedOffset offset for which we should wait
*/
public static void waitForOffsetInFileSinkConnector(
String namespaceName,
String scraperPodName,
String connectName,
String connectorName,
int expectedOffset
) {
waitForOffsetInConnector(
namespaceName,
scraperPodName,
KafkaConnectResources.serviceName(connectName),
connectorName,
"/offsets/0/offset/kafka_offset",
expectedOffset
);
}

/**
* Waits for a specific offset to be present in the Connector.
*
* @param namespaceName name of the Namespace where the scraper Pod and Connect are running
* @param scraperPodName name of the scraper Pod name for execution of the cURL command
* @param serviceName name of the service which exposes the 8083 port
* @param connectorName name of the connector that should be checked for the offsets
* @param expectedOffset offset for which we should wait
*/
public static void waitForOffsetInConnector(
String namespaceName,
String scraperPodName,
String serviceName,
String connectorName,
String pathInJsonToOffsetObject,
int expectedOffset
) {
TestUtils.waitFor("offset on the Connector will contain expected number",
TestConstants.GLOBAL_POLL_INTERVAL_5_SECS,
TestConstants.GLOBAL_STATUS_TIMEOUT,
() -> {
try {
JsonNode offsets = getOffsetOfConnectorFromConnectAPI(namespaceName, scraperPodName, serviceName, connectorName);
return offsets.at(pathInJsonToOffsetObject).asInt() == expectedOffset;
} catch (Exception e) {
return false;
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,19 @@ public static void waitForConfigMapLabelsChange(String namespaceName, String con
}
}
}

/**
* Waits for ConfigMap with specified name and in specified Namespace will be created.
*
* @param namespaceName name of the Namespace where the ConfigMap should be created
* @param configMapName name of the ConfigMap that should be created
*/
public static void waitForCreationOfConfigMap(String namespaceName, String configMapName) {
LOGGER.info("Waiting for ConfigMap: {}/{} to be created", namespaceName, configMapName);
TestUtils.waitFor(String.format("ConfigMap: %s/%s to be created", namespaceName, configMapName),
TestConstants.POLL_INTERVAL_FOR_RESOURCE_READINESS,
TestConstants.GLOBAL_TIMEOUT,
() -> kubeClient().getConfigMap(namespaceName, configMapName) != null
);
}
}
Loading
Loading