Skip to content

Commit

Permalink
Added custom time horizon in GCP scaler (#5778)
Browse files Browse the repository at this point in the history
* added custom time horizon in gcp scaler

Signed-off-by: Yaxhveer <yaxhcod@gmail.com>

* updated changelog

Signed-off-by: Yaxhveer <yaxhcod@gmail.com>

* add custom duration in other gcp stackdriver

Signed-off-by: Yaxhveer <yaxhcod@gmail.com>

* Update CHANGELOG.md

Co-authored-by: Jorge Turrado Ferrero <Jorge_turrado@hotmail.es>
Signed-off-by: Yashveer <101015836+Yaxhveer@users.noreply.github.com>

---------

Signed-off-by: Yaxhveer <yaxhcod@gmail.com>
Signed-off-by: Yashveer <101015836+Yaxhveer@users.noreply.github.com>
Co-authored-by: Jorge Turrado Ferrero <Jorge_turrado@hotmail.es>
  • Loading branch information
Yaxhveer and JorTurFer authored May 4, 2024
1 parent b7d9bd4 commit 66e824d
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 20 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Here is an overview of all new **experimental** features:

### Improvements

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778))

### Fixes

Expand Down
21 changes: 14 additions & 7 deletions pkg/scalers/gcp/gcp_stackdriver_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,13 @@ func (s StackDriverClient) GetMetrics(
filter string,
projectID string,
aggregation *monitoringpb.Aggregation,
valueIfNull *float64) (float64, error) {
// Set the start time to 1 minute ago
startTime := time.Now().UTC().Add(time.Minute * -2)
valueIfNull *float64,
filterDuration int64) (float64, error) {
// Set the start time (default 2 minute ago)
if filterDuration <= 0 {
filterDuration = 2
}
startTime := time.Now().UTC().Add(time.Minute * -time.Duration(filterDuration))

// Set the end time to now
endTime := time.Now().UTC()
Expand Down Expand Up @@ -339,10 +343,13 @@ func getActualProjectID(s *StackDriverClient, projectID string) string {
// | align delta(3m)
// | every 3m
// | group_by [], count(value)
func (s StackDriverClient) BuildMQLQuery(projectID, resourceType, metric, resourceName, aggregation string) (string, error) {
th := defaultTimeHorizon
if aggregation != "" {
th = aggregationTimeHorizon
func (s StackDriverClient) BuildMQLQuery(projectID, resourceType, metric, resourceName, aggregation, timeHorizon string) (string, error) {
th := timeHorizon
if th == "" {
th = defaultTimeHorizon
if aggregation != "" {
th = aggregationTimeHorizon
}
}

pid := getActualProjectID(&s, projectID)
Expand Down
19 changes: 10 additions & 9 deletions pkg/scalers/gcp/gcp_stackdriver_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,54 +13,55 @@ func TestBuildMQLQuery(t *testing.T) {
metric string
resourceName string
aggregation string
timeHorizon string

expected string
isError bool
}{
{
"topic with aggregation",
"topic", "pubsub.googleapis.com/topic/x", "mytopic", "count",
"topic", "pubsub.googleapis.com/topic/x", "mytopic", "count", "1m",
"fetch pubsub_topic | metric 'pubsub.googleapis.com/topic/x' | filter (resource.project_id == 'myproject' && resource.topic_id == 'mytopic')" +
" | within 5m | align delta(3m) | every 3m | group_by [], count(value)",
" | within 1m | align delta(3m) | every 3m | group_by [], count(value)",
false,
},
{
"topic without aggregation",
"topic", "pubsub.googleapis.com/topic/x", "mytopic", "",
"topic", "pubsub.googleapis.com/topic/x", "mytopic", "", "",
"fetch pubsub_topic | metric 'pubsub.googleapis.com/topic/x' | filter (resource.project_id == 'myproject' && resource.topic_id == 'mytopic')" +
" | within 2m",
false,
},
{
"subscription with aggregation",
"subscription", "pubsub.googleapis.com/subscription/x", "mysubscription", "percentile99",
"subscription", "pubsub.googleapis.com/subscription/x", "mysubscription", "percentile99", "",
"fetch pubsub_subscription | metric 'pubsub.googleapis.com/subscription/x' | filter (resource.project_id == 'myproject' && resource.subscription_id == 'mysubscription')" +
" | within 5m | align delta(3m) | every 3m | group_by [], percentile(value, 99)",
false,
},
{
"subscription without aggregation",
"subscription", "pubsub.googleapis.com/subscription/x", "mysubscription", "",
"subscription", "pubsub.googleapis.com/subscription/x", "mysubscription", "", "4m",
"fetch pubsub_subscription | metric 'pubsub.googleapis.com/subscription/x' | filter (resource.project_id == 'myproject' && resource.subscription_id == 'mysubscription')" +
" | within 2m",
" | within 4m",
false,
},
{
"invalid percentile",
"topic", "pubsub.googleapis.com/topic/x", "mytopic", "percentile101",
"topic", "pubsub.googleapis.com/topic/x", "mytopic", "percentile101", "1m",
"invalid percentile value: 101",
true,
},
{
"unsupported aggregation function",
"topic", "pubsub.googleapis.com/topic/x", "mytopic", "max",
"topic", "pubsub.googleapis.com/topic/x", "mytopic", "max", "",
"unsupported aggregation function: max",
true,
},
} {
s := &StackDriverClient{}
t.Run(tc.name, func(t *testing.T) {
q, err := s.BuildMQLQuery("myproject", tc.resourceType, tc.metric, tc.resourceName, tc.aggregation)
q, err := s.BuildMQLQuery("myproject", tc.resourceType, tc.metric, tc.resourceName, tc.aggregation, tc.timeHorizon)
if tc.isError {
assert.Error(t, err)
assert.Equal(t, tc.expected, err.Error())
Expand Down
11 changes: 10 additions & 1 deletion pkg/scalers/gcp_cloud_tasks_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type gcpCloudTasksScaler struct {
type gcpCloudTaskMetadata struct {
value float64
activationValue float64
filterDuration int64

queueName string
projectID string
Expand Down Expand Up @@ -80,6 +81,14 @@ func parseGcpCloudTasksMetadata(config *scalersconfig.ScalerConfig) (*gcpCloudTa
return nil, fmt.Errorf("no queue name given")
}

if val, ok := config.TriggerMetadata["filterDuration"]; ok {
filterDuration, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("filterDuration parsing error %w", err)
}
meta.filterDuration = filterDuration
}

meta.activationValue = 0
if val, ok := config.TriggerMetadata["activationValue"]; ok {
activationValue, err := strconv.ParseFloat(val, 64)
Expand Down Expand Up @@ -180,5 +189,5 @@ func (s *gcpCloudTasksScaler) getMetrics(ctx context.Context, metricType string)

// Cloud Tasks metrics are collected every 60 seconds so no need to aggregate them.
// See: https://cloud.google.com/monitoring/api/metrics_gcp#gcp-cloudtasks
return s.client.GetMetrics(ctx, filter, s.metadata.projectID, nil, nil)
return s.client.GetMetrics(ctx, filter, s.metadata.projectID, nil, nil, s.metadata.filterDuration)
}
5 changes: 4 additions & 1 deletion pkg/scalers/gcp_pubsub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type pubsubMetadata struct {
gcpAuthorization *gcp.AuthorizationMetadata
triggerIndex int
aggregation string
timeHorizon string
}

// NewPubSubScaler creates a new pubsubScaler
Expand Down Expand Up @@ -180,6 +181,8 @@ func parsePubSubMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger)

meta.aggregation = config.TriggerMetadata["aggregation"]

meta.timeHorizon = config.TriggerMetadata["timeHorizon"]

err := parsePubSubResourceConfig(config, &meta)
if err != nil {
return nil, err
Expand Down Expand Up @@ -280,7 +283,7 @@ func (s *pubsubScaler) getMetrics(ctx context.Context, metricType string) (float
}
resourceID, projectID := getResourceData(s)
query, err := s.client.BuildMQLQuery(
projectID, s.metadata.resourceType, metricType, resourceID, s.metadata.aggregation,
projectID, s.metadata.resourceType, metricType, resourceID, s.metadata.aggregation, s.metadata.timeHorizon,
)
if err != nil {
return -1, err
Expand Down
11 changes: 10 additions & 1 deletion pkg/scalers/gcp_stackdriver_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type stackdriverMetadata struct {
activationTargetValue float64
metricName string
valueIfNull *float64
filterDuration int64

gcpAuthorization *gcp.AuthorizationMetadata
aggregation *monitoringpb.Aggregation
Expand Down Expand Up @@ -120,6 +121,14 @@ func parseStackdriverMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo
meta.valueIfNull = &valueIfNull
}

if val, ok := config.TriggerMetadata["filterDuration"]; ok {
filterDuration, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("filterDuration parsing error %w", err)
}
meta.filterDuration = filterDuration
}

auth, err := gcp.GetGCPAuthorization(config)
if err != nil {
return nil, err
Expand Down Expand Up @@ -217,7 +226,7 @@ func (s *stackdriverScaler) GetMetricsAndActivity(ctx context.Context, metricNam

// getMetrics gets metric type value from stackdriver api
func (s *stackdriverScaler) getMetrics(ctx context.Context) (float64, error) {
val, err := s.client.GetMetrics(ctx, s.metadata.filter, s.metadata.projectID, s.metadata.aggregation, s.metadata.valueIfNull)
val, err := s.client.GetMetrics(ctx, s.metadata.filter, s.metadata.projectID, s.metadata.aggregation, s.metadata.valueIfNull, s.metadata.filterDuration)
if err == nil {
s.logger.V(1).Info(
fmt.Sprintf("Getting metrics for project %s, filter %s and aggregation %v. Result: %f",
Expand Down

0 comments on commit 66e824d

Please sign in to comment.