Skip to content

Commit

Permalink
provide new option to enable scaling past partition count
Browse files Browse the repository at this point in the history
Signed-off-by: Lionel Villard <villard@us.ibm.com>
  • Loading branch information
lionelvillard committed Mar 25, 2021
1 parent b9d3501 commit 5f8ed08
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
- Add `publishRate` trigger to RabbitMQ scaler ([#1653](https://github.com/kedacore/keda/pull/1653))
- AWS SQS Scaler: Add Visible + NotVisible messages for scaling considerations ([#1664](https://github.com/kedacore/keda/pull/1664))
- Fixing behavior on ScaledJob with incorrect External Scaler ([#1672](https://github.com/kedacore/keda/pull/1672))
- Apache Kafka Scaler: Add `allowIdleConsumers` to the list of trigger parameters ([#1684](https://github.com/kedacore/keda/pull/1684))

### Breaking Changes

Expand Down
28 changes: 20 additions & 8 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ type kafkaScaler struct {
}

type kafkaMetadata struct {
bootstrapServers []string
group string
topic string
lagThreshold int64
offsetResetPolicy offsetResetPolicy
bootstrapServers []string
group string
topic string
lagThreshold int64
offsetResetPolicy offsetResetPolicy
allowIdleConsumers bool

// SASL
saslType kafkaSaslType
Expand Down Expand Up @@ -181,6 +182,15 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) {
}
}

meta.allowIdleConsumers = false
if val, ok := config.TriggerMetadata["allowIdleConsumers"]; ok {
t, err := strconv.ParseBool(val)
if err != nil {
return meta, fmt.Errorf("error parsing allowIdleConsumers: %s", err)
}
meta.allowIdleConsumers = t
}

return meta, nil
}

Expand Down Expand Up @@ -360,9 +370,11 @@ func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricS

kafkaLog.V(1).Info(fmt.Sprintf("Kafka scaler: Providing metrics based on totalLag %v, partitions %v, threshold %v", totalLag, len(partitions), s.metadata.lagThreshold))

// don't scale out beyond the number of partitions
if (totalLag / s.metadata.lagThreshold) > int64(len(partitions)) {
totalLag = int64(len(partitions)) * s.metadata.lagThreshold
if !s.metadata.allowIdleConsumers {
// don't scale out beyond the number of partitions
if (totalLag / s.metadata.lagThreshold) > int64(len(partitions)) {
totalLag = int64(len(partitions)) * s.metadata.lagThreshold
}
}

metric := external_metrics.ExternalMetricValue{
Expand Down
7 changes: 4 additions & 3 deletions pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ type kafkaMetricIdentifier struct {

// A complete valid metadata example for reference
var validKafkaMetadata = map[string]string{
"bootstrapServers": "broker1:9092,broker2:9092",
"consumerGroup": "my-group",
"topic": "my-topic",
"bootstrapServers": "broker1:9092,broker2:9092",
"consumerGroup": "my-group",
"topic": "my-topic",
"allowIdleConsumers": "false",
}

// A complete valid authParams example for sasl, with username and passwd
Expand Down

0 comments on commit 5f8ed08

Please sign in to comment.