From e3462f6cab24fbe426405d8e3b48f75067a3ca05 Mon Sep 17 00:00:00 2001 From: Ram Cohen Date: Wed, 23 Mar 2022 10:35:03 +0200 Subject: [PATCH] Add a scaler based on a GCP Stackdriver metric (#2758) Signed-off-by: Ram Cohen --- CHANGELOG.md | 1 + pkg/scalers/gcp_stackdriver_scaler.go | 189 +++++++++++++++++++++ pkg/scalers/gcp_stackdriver_scaler_test.go | 77 +++++++++ pkg/scaling/scale_handler.go | 2 + tests/scalers/gcp-stackdriver.test.ts | 184 ++++++++++++++++++++ 5 files changed, 453 insertions(+) create mode 100644 pkg/scalers/gcp_stackdriver_scaler.go create mode 100644 pkg/scalers/gcp_stackdriver_scaler_test.go create mode 100644 tests/scalers/gcp-stackdriver.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 71c55997c86..0b8fd0421de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ - **General:** Introduce new AWS DynamoDB Scaler ([#2486](https://github.com/kedacore/keda/issues/2482)) - **General:** Introduce new Azure Data Explorer Scaler ([#1488](https://github.com/kedacore/keda/issues/1488)) +- **General:** Introduce new GCP Stackdriver Scaler ([#2661](https://github.com/kedacore/keda/issues/2661)) - **General:** Introduce new GCP Storage Scaler ([#2628](https://github.com/kedacore/keda/issues/2628)) - **General:** Introduce ARM-based container image for KEDA ([#2263](https://github.com/kedacore/keda/issues/2263)|[#2262](https://github.com/kedacore/keda/issues/2262)) - **General:** Provide support for authentication via Azure Key Vault ([#900](https://github.com/kedacore/keda/issues/900)) diff --git a/pkg/scalers/gcp_stackdriver_scaler.go b/pkg/scalers/gcp_stackdriver_scaler.go new file mode 100644 index 00000000000..df2c7990a08 --- /dev/null +++ b/pkg/scalers/gcp_stackdriver_scaler.go @@ -0,0 +1,189 @@ +package scalers + +import ( + "context" + "fmt" + "strconv" + + "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + kedautil "github.com/kedacore/keda/v2/pkg/util" +) + +const ( + defaultStackdriverTargetValue = 5 +) + +type stackdriverScaler struct { + client *StackDriverClient + metadata *stackdriverMetadata +} + +type stackdriverMetadata struct { + projectID string + filter string + targetValue int64 + metricName string + + gcpAuthorization *gcpAuthorizationMetadata +} + +var gcpStackdriverLog = logf.Log.WithName("gcp_stackdriver_scaler") + +// NewStackdriverScaler creates a new stackdriverScaler +func NewStackdriverScaler(ctx context.Context, config *ScalerConfig) (Scaler, error) { + meta, err := parseStackdriverMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing Stackdriver metadata: %s", err) + } + + client, err := initializeStackdriverClient(ctx, meta.gcpAuthorization) + if err != nil { + gcpStackdriverLog.Error(err, "Failed to create stack driver client") + return nil, err + } + + return &stackdriverScaler{ + metadata: meta, + client: client, + }, nil +} + +func parseStackdriverMetadata(config *ScalerConfig) (*stackdriverMetadata, error) { + meta := stackdriverMetadata{} + meta.targetValue = defaultStackdriverTargetValue + + if val, ok := config.TriggerMetadata["projectId"]; ok { + if val == "" { + return nil, fmt.Errorf("no projectId name given") + } + + meta.projectID = val + } else { + return nil, fmt.Errorf("no projectId name given") + } + + if val, ok := config.TriggerMetadata["filter"]; ok { + if val == "" { + return nil, fmt.Errorf("no filter given") + } + + meta.filter = val + } else { + return nil, fmt.Errorf("no filter given") + } + + name := kedautil.NormalizeString(fmt.Sprintf("gcp-stackdriver-%s", meta.projectID)) + meta.metricName = GenerateMetricNameWithIndex(config.ScalerIndex, name) + + if val, ok := config.TriggerMetadata["targetValue"]; ok { + targetValue, err := strconv.ParseInt(val, 10, 64) + if err != nil { + gcpStackdriverLog.Error(err, "Error parsing targetValue") + return nil, fmt.Errorf("error parsing targetValue: %s", err.Error()) + } + + meta.targetValue = targetValue + } + + auth, err := getGcpAuthorization(config, config.ResolvedEnv) + if err != nil { + return nil, err + } + meta.gcpAuthorization = auth + return &meta, nil +} + +func initializeStackdriverClient(ctx context.Context, gcpAuthorization *gcpAuthorizationMetadata) (*StackDriverClient, error) { + var client *StackDriverClient + var err error + if gcpAuthorization.podIdentityProviderEnabled { + client, err = NewStackDriverClientPodIdentity(ctx) + } else { + client, err = NewStackDriverClient(ctx, gcpAuthorization.GoogleApplicationCredentials) + } + + if err != nil { + gcpStackdriverLog.Error(err, "Failed to create stack driver client") + return nil, err + } + return client, nil +} + +func (s *stackdriverScaler) IsActive(ctx context.Context) (bool, error) { + value, err := s.getMetrics(ctx) + if err != nil { + gcpStackdriverLog.Error(err, "error getting metric value") + return false, err + } + return value > 0, nil +} + +func (s *stackdriverScaler) Close(context.Context) error { + if s.client != nil { + err := s.client.metricsClient.Close() + s.client = nil + if err != nil { + gcpStackdriverLog.Error(err, "error closing StackDriver client") + } + } + + return nil +} + +// GetMetricSpecForScaling returns the metric spec for the HPA +func (s *stackdriverScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec { + // Construct the target value as a quantity + targetValueQty := resource.NewQuantity(s.metadata.targetValue, resource.DecimalSI) + + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: s.metadata.metricName, + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetValueQty, + }, + } + + // Create the metric spec for the HPA + metricSpec := v2beta2.MetricSpec{ + External: externalMetric, + Type: externalMetricType, + } + + return []v2beta2.MetricSpec{metricSpec} +} + +// GetMetrics connects to Stack Driver and retrieves the metric +func (s *stackdriverScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + value, err := s.getMetrics(ctx) + if err != nil { + gcpStackdriverLog.Error(err, "error getting metric value") + return []external_metrics.ExternalMetricValue{}, err + } + + metric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(value, resource.DecimalSI), + Timestamp: metav1.Now(), + } + + return append([]external_metrics.ExternalMetricValue{}, metric), nil +} + +// getMetrics gets metric type value from stackdriver api +func (s *stackdriverScaler) getMetrics(ctx context.Context) (int64, error) { + val, err := s.client.GetMetrics(ctx, s.metadata.filter, s.metadata.projectID) + if err == nil { + gcpStackdriverLog.V(1).Info( + fmt.Sprintf("Getting metrics for project %s and filter %s. Result: %d", s.metadata.projectID, s.metadata.filter, val)) + } + + return val, err +} diff --git a/pkg/scalers/gcp_stackdriver_scaler_test.go b/pkg/scalers/gcp_stackdriver_scaler_test.go new file mode 100644 index 00000000000..ed278bdf7c7 --- /dev/null +++ b/pkg/scalers/gcp_stackdriver_scaler_test.go @@ -0,0 +1,77 @@ +package scalers + +import ( + "context" + "testing" +) + +var testStackdriverResolvedEnv = map[string]string{ + "SAMPLE_CREDS": "{}", +} + +type parseStackdriverMetadataTestData struct { + authParams map[string]string + metadata map[string]string + isError bool +} + +type gcpStackdriverMetricIdentifier struct { + metadataTestData *parseStackdriverMetadataTestData + scalerIndex int + name string +} + +var sdFilter = "metric.type=\"storage.googleapis.com/storage/object_count\" resource.type=\"gcs_bucket\"" + +var testStackdriverMetadata = []parseStackdriverMetadataTestData{ + {map[string]string{}, map[string]string{}, true}, + // all properly formed + {nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "targetValue": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false}, + // all required properly formed + {nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "credentialsFromEnv": "SAMPLE_CREDS"}, false}, + // missing projectId + {nil, map[string]string{"filter": sdFilter, "targetValue": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, + // missing filter + {nil, map[string]string{"projectId": "myProject", "targetValue": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, + // missing credentials + {nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "targetValue": "7"}, true}, + // malformed targetValue + {nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "targetValue": "aa", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, + // Credentials from AuthParams + {map[string]string{"GoogleApplicationCredentials": "Creds", "podIdentityOwner": ""}, map[string]string{"projectId": "myProject", "filter": sdFilter}, false}, + // Credentials from AuthParams with empty creds + {map[string]string{"GoogleApplicationCredentials": "", "podIdentityOwner": ""}, map[string]string{"projectId": "myProject", "filter": sdFilter}, true}, +} + +var gcpStackdriverMetricIdentifiers = []gcpStackdriverMetricIdentifier{ + {&testStackdriverMetadata[1], 0, "s0-gcp-stackdriver-myProject"}, + {&testStackdriverMetadata[1], 1, "s1-gcp-stackdriver-myProject"}, +} + +func TestStackdriverParseMetadata(t *testing.T) { + for _, testData := range testStackdriverMetadata { + _, err := parseStackdriverMetadata(&ScalerConfig{AuthParams: testData.authParams, TriggerMetadata: testData.metadata, ResolvedEnv: testStackdriverResolvedEnv}) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + } +} + +func TestGcpStackdriverGetMetricSpecForScaling(t *testing.T) { + for _, testData := range gcpStackdriverMetricIdentifiers { + meta, err := parseStackdriverMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testStackdriverResolvedEnv, ScalerIndex: testData.scalerIndex}) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + mockGcpStackdriverScaler := stackdriverScaler{nil, meta} + + metricSpec := mockGcpStackdriverScaler.GetMetricSpecForScaling(context.Background()) + metricName := metricSpec[0].External.Metric.Name + if metricName != testData.name { + t.Error("Wrong External metric source name:", metricName) + } + } +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index a87bbd7cda5..acb3e5bcef4 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -394,6 +394,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, return scalers.NewExternalPushScaler(config) case "gcp-pubsub": return scalers.NewPubSubScaler(config) + case "gcp-stackdriver": + return scalers.NewStackdriverScaler(ctx, config) case "gcp-storage": return scalers.NewGcsScaler(config) case "graphite": diff --git a/tests/scalers/gcp-stackdriver.test.ts b/tests/scalers/gcp-stackdriver.test.ts new file mode 100644 index 00000000000..0146235f854 --- /dev/null +++ b/tests/scalers/gcp-stackdriver.test.ts @@ -0,0 +1,184 @@ +import * as fs from 'fs' +import * as sh from 'shelljs' +import * as tmp from 'tmp' +import test from 'ava' +import { createNamespace, waitForDeploymentReplicaCount } from './helpers'; + +const gcpKey = process.env['GCP_SP_KEY'] +const projectId = JSON.parse(gcpKey).project_id +const testNamespace = 'gcp-stackdriver-test' +const bucketName = 'keda-test-stackdriver-bucket' +const deploymentName = 'dummy-consumer' +const maxReplicaCount = '3' +const gsPrefix = `kubectl exec --namespace ${testNamespace} deploy/gcp-sdk -- ` + +test.before(t => { + createNamespace(testNamespace) + + // deploy dummy consumer app, scaled object etc. + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, deployYaml.replace("{{GCP_CREDS}}", Buffer.from(gcpKey).toString("base64"))) + + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${testNamespace}`).code, + 'creating a deployment should work..' + ) + +}) + +test.serial('Deployment should have 0 replicas on start', async t => { + t.true(await waitForDeploymentReplicaCount(0, deploymentName, testNamespace, 30, 2000), 'replica count should start out as 0') +}) + +test.serial('creating the gcp-sdk pod should work..', async t => { + let tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, gcpSdkYaml) + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${testNamespace}`).code, + 'creating the gcp-sdk pod should work..' + ) + + // wait for the gcp-sdk pod to be ready + t.true(await waitForDeploymentReplicaCount(1, 'gcp-sdk', testNamespace, 30, 2000), 'GCP-SDK pod is not in a ready state') +}) + +test.serial('initializing the gcp-sdk pod should work..', t => { + sh.exec(`kubectl wait --for=condition=ready --namespace ${testNamespace} pod -l app=gcp-sdk --timeout=30s`) + sh.exec('sleep 5s') + + // Authenticate to GCP + const creds = JSON.parse(gcpKey) + t.is( + 0, + sh.exec(gsPrefix + `gcloud auth activate-service-account ${creds.client_email} --key-file /etc/secret-volume/creds.json --project=${creds.project_id}`).code, + 'Setting GCP authentication on gcp-sdk should work..' + ) + + // Create bucket + sh.exec(gsPrefix + `gsutil mb gs://${bucketName}`) +}) + +test.serial(`Deployment should scale to ${maxReplicaCount} (the max) then back to 0`, async t => { + // Wait for the number of replicas to be scaled up to maxReplicaCount + var haveAllReplicas = false + for (let i = 0; i < 60 && !haveAllReplicas; i++) { + // Upload a file to generate traffic + t.is( + 0, + sh.exec(gsPrefix + `gsutil cp /usr/lib/google-cloud-sdk/bin/gsutil gs://${bucketName}`).code, + 'Copying an object should work..' + ) + if (await waitForDeploymentReplicaCount(parseInt(maxReplicaCount, 10), deploymentName, testNamespace, 1, 2000)) { + haveAllReplicas = true + } + } + + t.true(haveAllReplicas, `Replica count should be ${maxReplicaCount} after 120 seconds`) + + // Wait for the number of replicas to be scaled down to 0 + t.true( + await waitForDeploymentReplicaCount(0, deploymentName, testNamespace, 30, 10000), + `Replica count should be 0 after 5 minutes`) +}) + +test.after.always.cb('clean up', t => { + // Cleanup the bucket + sh.exec(gsPrefix + `gsutil -m rm -r gs://${bucketName}`) + + sh.exec(`kubectl delete deployment.apps/${deploymentName} --namespace ${testNamespace}`) + sh.exec(`kubectl delete namespace ${testNamespace}`) + + t.end() +}) + + +const deployYaml = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: ${deploymentName} + namespace: ${testNamespace} + labels: + app: ${deploymentName} +spec: + replicas: 0 + selector: + matchLabels: + app: ${deploymentName} + template: + metadata: + labels: + app: ${deploymentName} + spec: + containers: + - name: noop-processor + image: ubuntu:20.04 + command: ["/bin/bash", "-c", "--"] + args: ["sleep 10"] + env: + - name: GOOGLE_APPLICATION_CREDENTIALS_JSON + valueFrom: + secretKeyRef: + name: stackdriver-secrets + key: creds.json +--- +apiVersion: v1 +kind: Secret +metadata: + name: stackdriver-secrets +type: Opaque +data: + creds.json: {{GCP_CREDS}} +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: test-scaledobject +spec: + scaleTargetRef: + name: ${deploymentName} + pollingInterval: 5 + maxReplicaCount: ${maxReplicaCount} + cooldownPeriod: 10 + triggers: + - type: gcp-stackdriver + metadata: + projectId: ${projectId} + filter: 'metric.type="storage.googleapis.com/network/received_bytes_count" AND resource.type="gcs_bucket" AND metric.label.method="WriteObject" AND resource.label.bucket_name="${bucketName}"' + metricName: ${bucketName} + targetValue: '5' + credentialsFromEnv: GOOGLE_APPLICATION_CREDENTIALS_JSON +` + +const gcpSdkYaml = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: gcp-sdk + namespace: ${testNamespace} + labels: + app: gcp-sdk +spec: + replicas: 1 + selector: + matchLabels: + app: gcp-sdk + template: + metadata: + labels: + app: gcp-sdk + spec: + containers: + - name: gcp-sdk-container + image: google/cloud-sdk:slim + # Just spin & wait forever + command: [ "/bin/bash", "-c", "--" ] + args: [ "ls /tmp && while true; do sleep 30; done;" ] + volumeMounts: + - name: secret-volume + mountPath: /etc/secret-volume + volumes: + - name: secret-volume + secret: + secretName: stackdriver-secrets +`