Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a scaler based on a GCP Stackdriver metric #2758

Merged
merged 13 commits into from
Mar 23, 2022
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

- **General:** Introduce new AWS DynamoDB Scaler ([#2486](https://github.com/kedacore/keda/issues/2482))
- **General:** Introduce new Azure Data Explorer Scaler ([#1488](https://github.com/kedacore/keda/issues/1488))
- **General:** Introduce new GCP Stackdriver Scaler ([#2661](https://github.com/kedacore/keda/issues/2661))
- **General:** Introduce new GCP Storage Scaler ([#2628](https://github.com/kedacore/keda/issues/2628))
- **General:** Introduce ARM-based container image for KEDA ([#2263](https://github.com/kedacore/keda/issues/2263)|[#2262](https://github.com/kedacore/keda/issues/2262))
- **General:** Provide support for authentication via Azure Key Vault ([#900](https://github.com/kedacore/keda/issues/900))
Expand Down
189 changes: 189 additions & 0 deletions pkg/scalers/gcp_stackdriver_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package scalers

import (
"context"
"fmt"
"strconv"

"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

const (
defaultStackdriverTargetValue = 5
)

type stackdriverScaler struct {
client *StackDriverClient
metadata *stackdriverMetadata
}

type stackdriverMetadata struct {
projectID string
filter string
targetValue int64
metricName string

gcpAuthorization *gcpAuthorizationMetadata
}

var gcpStackdriverLog = logf.Log.WithName("gcp_stackdriver_scaler")

// NewStackdriverScaler creates a new stackdriverScaler
func NewStackdriverScaler(ctx context.Context, config *ScalerConfig) (Scaler, error) {
meta, err := parseStackdriverMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing Stackdriver metadata: %s", err)
}

client, err := initializeStackdriverClient(ctx, meta.gcpAuthorization)
if err != nil {
gcpStackdriverLog.Error(err, "Failed to create stack driver client")
return nil, err
}

return &stackdriverScaler{
metadata: meta,
client: client,
}, nil
}

func parseStackdriverMetadata(config *ScalerConfig) (*stackdriverMetadata, error) {
meta := stackdriverMetadata{}
meta.targetValue = defaultStackdriverTargetValue

if val, ok := config.TriggerMetadata["projectId"]; ok {
if val == "" {
return nil, fmt.Errorf("no projectId name given")
}

meta.projectID = val
} else {
return nil, fmt.Errorf("no projectId name given")
}

if val, ok := config.TriggerMetadata["filter"]; ok {
if val == "" {
return nil, fmt.Errorf("no filter given")
}

meta.filter = val
} else {
return nil, fmt.Errorf("no filter given")
}

name := kedautil.NormalizeString(fmt.Sprintf("gcp-stackdriver-%s", meta.projectID))
meta.metricName = GenerateMetricNameWithIndex(config.ScalerIndex, name)

if val, ok := config.TriggerMetadata["targetValue"]; ok {
targetValue, err := strconv.ParseInt(val, 10, 64)
if err != nil {
gcpStackdriverLog.Error(err, "Error parsing targetValue")
return nil, fmt.Errorf("error parsing targetValue: %s", err.Error())
}

meta.targetValue = targetValue
}

auth, err := getGcpAuthorization(config, config.ResolvedEnv)
if err != nil {
return nil, err
}
meta.gcpAuthorization = auth
return &meta, nil
}

func initializeStackdriverClient(ctx context.Context, gcpAuthorization *gcpAuthorizationMetadata) (*StackDriverClient, error) {
var client *StackDriverClient
var err error
if gcpAuthorization.podIdentityProviderEnabled {
client, err = NewStackDriverClientPodIdentity(ctx)
} else {
client, err = NewStackDriverClient(ctx, gcpAuthorization.GoogleApplicationCredentials)
}

if err != nil {
gcpStackdriverLog.Error(err, "Failed to create stack driver client")
return nil, err
}
return client, nil
}

func (s *stackdriverScaler) IsActive(ctx context.Context) (bool, error) {
value, err := s.getMetrics(ctx)
if err != nil {
gcpStackdriverLog.Error(err, "error getting metric value")
return false, err
}
return value > 0, nil
}

func (s *stackdriverScaler) Close(context.Context) error {
if s.client != nil {
err := s.client.metricsClient.Close()
s.client = nil
if err != nil {
gcpStackdriverLog.Error(err, "error closing StackDriver client")
}
}

return nil
}

// GetMetricSpecForScaling returns the metric spec for the HPA
func (s *stackdriverScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
// Construct the target value as a quantity
targetValueQty := resource.NewQuantity(s.metadata.targetValue, resource.DecimalSI)

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: s.metadata.metricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetValueQty,
},
}

// Create the metric spec for the HPA
metricSpec := v2beta2.MetricSpec{
External: externalMetric,
Type: externalMetricType,
}

return []v2beta2.MetricSpec{metricSpec}
}

// GetMetrics connects to Stack Driver and retrieves the metric
func (s *stackdriverScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
value, err := s.getMetrics(ctx)
if err != nil {
gcpStackdriverLog.Error(err, "error getting metric value")
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(value, resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// getMetrics gets metric type value from stackdriver api
func (s *stackdriverScaler) getMetrics(ctx context.Context) (int64, error) {
val, err := s.client.GetMetrics(ctx, s.metadata.filter, s.metadata.projectID)
if err == nil {
gcpStackdriverLog.V(1).Info(
fmt.Sprintf("Getting metrics for project %s and filter %s. Result: %d", s.metadata.projectID, s.metadata.filter, val))
}

return val, err
}
77 changes: 77 additions & 0 deletions pkg/scalers/gcp_stackdriver_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package scalers

import (
"context"
"testing"
)

var testStackdriverResolvedEnv = map[string]string{
"SAMPLE_CREDS": "{}",
}

type parseStackdriverMetadataTestData struct {
authParams map[string]string
metadata map[string]string
isError bool
}

type gcpStackdriverMetricIdentifier struct {
metadataTestData *parseStackdriverMetadataTestData
scalerIndex int
name string
}

var sdFilter = "metric.type=\"storage.googleapis.com/storage/object_count\" resource.type=\"gcs_bucket\""

var testStackdriverMetadata = []parseStackdriverMetadataTestData{
{map[string]string{}, map[string]string{}, true},
// all properly formed
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "targetValue": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// all required properly formed
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// missing projectId
{nil, map[string]string{"filter": sdFilter, "targetValue": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// missing filter
{nil, map[string]string{"projectId": "myProject", "targetValue": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// missing credentials
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "targetValue": "7"}, true},
// malformed targetValue
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "targetValue": "aa", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// Credentials from AuthParams
{map[string]string{"GoogleApplicationCredentials": "Creds", "podIdentityOwner": ""}, map[string]string{"projectId": "myProject", "filter": sdFilter}, false},
// Credentials from AuthParams with empty creds
{map[string]string{"GoogleApplicationCredentials": "", "podIdentityOwner": ""}, map[string]string{"projectId": "myProject", "filter": sdFilter}, true},
}

var gcpStackdriverMetricIdentifiers = []gcpStackdriverMetricIdentifier{
{&testStackdriverMetadata[1], 0, "s0-gcp-stackdriver-myProject"},
{&testStackdriverMetadata[1], 1, "s1-gcp-stackdriver-myProject"},
}

func TestStackdriverParseMetadata(t *testing.T) {
for _, testData := range testStackdriverMetadata {
_, err := parseStackdriverMetadata(&ScalerConfig{AuthParams: testData.authParams, TriggerMetadata: testData.metadata, ResolvedEnv: testStackdriverResolvedEnv})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
}
}

func TestGcpStackdriverGetMetricSpecForScaling(t *testing.T) {
for _, testData := range gcpStackdriverMetricIdentifiers {
meta, err := parseStackdriverMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testStackdriverResolvedEnv, ScalerIndex: testData.scalerIndex})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockGcpStackdriverScaler := stackdriverScaler{nil, meta}

metricSpec := mockGcpStackdriverScaler.GetMetricSpecForScaling(context.Background())
metricName := metricSpec[0].External.Metric.Name
if metricName != testData.name {
t.Error("Wrong External metric source name:", metricName)
}
}
}
2 changes: 2 additions & 0 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string,
return scalers.NewExternalPushScaler(config)
case "gcp-pubsub":
return scalers.NewPubSubScaler(config)
case "gcp-stackdriver":
return scalers.NewStackdriverScaler(ctx, config)
case "gcp-storage":
return scalers.NewGcsScaler(config)
case "graphite":
Expand Down
Loading