diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b13e4022f6..42903ad5013 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ - Add `publishRate` trigger to RabbitMQ scaler ([#1653](https://github.com/kedacore/keda/pull/1653)) - AWS SQS Scaler: Add Visible + NotVisible messages for scaling considerations ([#1664](https://github.com/kedacore/keda/pull/1664)) - Fixing behavior on ScaledJob with incorrect External Scaler ([#1672](https://github.com/kedacore/keda/pull/1672)) +- Apache Kafka Scaler: Add `allowIdleConsumers` to the list of trigger parameters ([#1684](https://github.com/kedacore/keda/pull/1684)) ### Breaking Changes diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index a1227b27015..5772341bb2f 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -25,11 +25,12 @@ type kafkaScaler struct { } type kafkaMetadata struct { - bootstrapServers []string - group string - topic string - lagThreshold int64 - offsetResetPolicy offsetResetPolicy + bootstrapServers []string + group string + topic string + lagThreshold int64 + offsetResetPolicy offsetResetPolicy + allowIdleConsumers bool // SASL saslType kafkaSaslType @@ -181,6 +182,15 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) { } } + meta.allowIdleConsumers = false + if val, ok := config.TriggerMetadata["allowIdleConsumers"]; ok { + t, err := strconv.ParseBool(val) + if err != nil { + return meta, fmt.Errorf("error parsing allowIdleConsumers: %s", err) + } + meta.allowIdleConsumers = t + } + return meta, nil } @@ -360,9 +370,11 @@ func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricS kafkaLog.V(1).Info(fmt.Sprintf("Kafka scaler: Providing metrics based on totalLag %v, partitions %v, threshold %v", totalLag, len(partitions), s.metadata.lagThreshold)) - // don't scale out beyond the number of partitions - if (totalLag / s.metadata.lagThreshold) > int64(len(partitions)) { - totalLag = int64(len(partitions)) * s.metadata.lagThreshold + if !s.metadata.allowIdleConsumers { + // don't scale out beyond the number of partitions + if (totalLag / s.metadata.lagThreshold) > int64(len(partitions)) { + totalLag = int64(len(partitions)) * s.metadata.lagThreshold + } } metric := external_metrics.ExternalMetricValue{ diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index 1a140a248fe..71eeb0047cd 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -6,13 +6,14 @@ import ( ) type parseKafkaMetadataTestData struct { - metadata map[string]string - isError bool - numBrokers int - brokers []string - group string - topic string - offsetResetPolicy offsetResetPolicy + metadata map[string]string + isError bool + numBrokers int + brokers []string + group string + topic string + offsetResetPolicy offsetResetPolicy + allowIdleConsumers bool } type parseKafkaAuthParamsTestData struct { @@ -28,9 +29,10 @@ type kafkaMetricIdentifier struct { // A complete valid metadata example for reference var validKafkaMetadata = map[string]string{ - "bootstrapServers": "broker1:9092,broker2:9092", - "consumerGroup": "my-group", - "topic": "my-topic", + "bootstrapServers": "broker1:9092,broker2:9092", + "consumerGroup": "my-group", + "topic": "my-topic", + "allowIdleConsumers": "false", } // A complete valid authParams example for sasl, with username and passwd @@ -45,21 +47,25 @@ var validWithoutAuthParams = map[string]string{} var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{ // failure, no bootstrapServers - {map[string]string{}, true, 0, nil, "", "", ""}, + {map[string]string{}, true, 0, nil, "", "", "", false}, // failure, no consumer group - {map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", "", "latest"}, + {map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", "", "latest", false}, // failure, no topic - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, true, 1, []string{"foobar:9092"}, "my-group", "", offsetResetPolicy("latest")}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, true, 1, []string{"foobar:9092"}, "my-group", "", offsetResetPolicy("latest"), false}, // success - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest")}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, // success, more brokers - {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest")}, + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, // success, offsetResetPolicy policy latest - {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "latest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest")}, + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "latest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, // failure, offsetResetPolicy policy wrong - {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "foo"}, true, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", ""}, + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "foo"}, true, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", "", false}, // success, offsetResetPolicy policy earliest - {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "earliest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("earliest")}, + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "earliest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("earliest"), false}, + // failure, allowIdleConsumers malformed + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, + // success, allowIdleConsumers is true + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), true}, } var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{ @@ -156,6 +162,9 @@ func TestGetBrokers(t *testing.T) { if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy { t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy) } + if err == nil && meta.allowIdleConsumers != testData.allowIdleConsumers { + t.Errorf("Expected allowIdleConsumers %t but got %t\n", testData.allowIdleConsumers, meta.allowIdleConsumers) + } } }