diff --git a/tests/scalers/kafka/kafka_test.go b/tests/scalers/kafka/kafka_test.go index d2d3eac4831..f231a78c4fd 100644 --- a/tests/scalers/kafka/kafka_test.go +++ b/tests/scalers/kafka/kafka_test.go @@ -444,8 +444,10 @@ func TestScaler(t *testing.T) { testEarliestPolicy(t, kc, data) testLatestPolicy(t, kc, data) testMultiTopic(t, kc, data) - testZeroOnInvalidOffset(t, kc, data) - testOneOnInvalidOffset(t, kc, data) + testZeroOnInvalidOffsetWithLatestOffsetResetPolicy(t, kc, data) + testZeroOnInvalidOffsetWithEarliestOffsetResetPolicy(t, kc, data) + testOneOnInvalidOffsetWithLatestOffsetResetPolicy(t, kc, data) + testOneOnInvalidOffsetWithEarliestOffsetResetPolicy(t, kc, data) testPersistentLag(t, kc, data) testScalingOnlyPartitionsWithLag(t, kc, data) } @@ -545,8 +547,8 @@ func testMultiTopic(t *testing.T, kc *kubernetes.Clientset, data templateData) { "replica count should be %d after 2 minute", 2) } -func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) { - t.Log("--- testing zeroInvalidOffsetTopic: scale out ---") +func testZeroOnInvalidOffsetWithLatestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing zeroInvalidOffsetTopicWithLatestOffsetResetPolicy: scale out ---") data.Params = fmt.Sprintf("--topic %s --group %s", zeroInvalidOffsetTopic, invalidOffsetGroup) data.Commit = StringTrue data.TopicName = zeroInvalidOffsetTopic @@ -556,6 +558,20 @@ func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templa defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + + // Shouldn't scale pods + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) +} + +func testZeroOnInvalidOffsetWithEarliestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing zeroInvalidOffsetTopicWithEarliestOffsetResetPolicy: scale out ---") + data.Params = fmt.Sprintf("--topic %s --group %s", zeroInvalidOffsetTopic, invalidOffsetGroup) + data.Commit = StringTrue + data.TopicName = zeroInvalidOffsetTopic + data.ResetPolicy = invalidOffsetGroup + data.ScaleToZeroOnInvalid = StringTrue + KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) @@ -563,8 +579,8 @@ func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templa AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) } -func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) { - t.Log("--- testing oneInvalidOffsetTopic: scale out ---") +func testOneOnInvalidOffsetWithLatestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing oneInvalidOffsetTopicWithLatestOffsetResetPolicy: scale out ---") data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", oneInvalidOffsetTopic, invalidOffsetGroup) data.Commit = StringTrue data.TopicName = oneInvalidOffsetTopic @@ -574,6 +590,28 @@ func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templat defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + + // Should scale to 1 + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), + "replica count should be %d after 2 minute", 1) + + commitPartition(t, oneInvalidOffsetTopic, invalidOffsetGroup) + publishMessage(t, oneInvalidOffsetTopic) + + // Should scale to 0 + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 10), + "replica count should be %d after 10 minute", 0) +} + +func testOneOnInvalidOffsetWithEarliestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing oneInvalidOffsetTopicWithEarliestOffsetResetPolicy: scale out ---") + data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", oneInvalidOffsetTopic, invalidOffsetGroup) + data.Commit = StringTrue + data.TopicName = oneInvalidOffsetTopic + data.ResetPolicy = invalidOffsetGroup + data.ScaleToZeroOnInvalid = StringFalse + KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)