Skip to content

Commit

Permalink
Add a scaler based on a GCP Stackdriver metric (#2758)
Browse files Browse the repository at this point in the history
Signed-off-by: Ram Cohen <ram.cohen@gmail.com>
  • Loading branch information
RamCohen authored Mar 23, 2022
1 parent 6c9af03 commit e3462f6
Show file tree
Hide file tree
Showing 5 changed files with 453 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

- **General:** Introduce new AWS DynamoDB Scaler ([#2486](https://github.com/kedacore/keda/issues/2482))
- **General:** Introduce new Azure Data Explorer Scaler ([#1488](https://github.com/kedacore/keda/issues/1488))
- **General:** Introduce new GCP Stackdriver Scaler ([#2661](https://github.com/kedacore/keda/issues/2661))
- **General:** Introduce new GCP Storage Scaler ([#2628](https://github.com/kedacore/keda/issues/2628))
- **General:** Introduce ARM-based container image for KEDA ([#2263](https://github.com/kedacore/keda/issues/2263)|[#2262](https://github.com/kedacore/keda/issues/2262))
- **General:** Provide support for authentication via Azure Key Vault ([#900](https://github.com/kedacore/keda/issues/900))
Expand Down
189 changes: 189 additions & 0 deletions pkg/scalers/gcp_stackdriver_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package scalers

import (
"context"
"fmt"
"strconv"

"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

const (
defaultStackdriverTargetValue = 5
)

type stackdriverScaler struct {
client *StackDriverClient
metadata *stackdriverMetadata
}

type stackdriverMetadata struct {
projectID string
filter string
targetValue int64
metricName string

gcpAuthorization *gcpAuthorizationMetadata
}

var gcpStackdriverLog = logf.Log.WithName("gcp_stackdriver_scaler")

// NewStackdriverScaler creates a new stackdriverScaler
func NewStackdriverScaler(ctx context.Context, config *ScalerConfig) (Scaler, error) {
meta, err := parseStackdriverMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing Stackdriver metadata: %s", err)
}

client, err := initializeStackdriverClient(ctx, meta.gcpAuthorization)
if err != nil {
gcpStackdriverLog.Error(err, "Failed to create stack driver client")
return nil, err
}

return &stackdriverScaler{
metadata: meta,
client: client,
}, nil
}

func parseStackdriverMetadata(config *ScalerConfig) (*stackdriverMetadata, error) {
meta := stackdriverMetadata{}
meta.targetValue = defaultStackdriverTargetValue

if val, ok := config.TriggerMetadata["projectId"]; ok {
if val == "" {
return nil, fmt.Errorf("no projectId name given")
}

meta.projectID = val
} else {
return nil, fmt.Errorf("no projectId name given")
}

if val, ok := config.TriggerMetadata["filter"]; ok {
if val == "" {
return nil, fmt.Errorf("no filter given")
}

meta.filter = val
} else {
return nil, fmt.Errorf("no filter given")
}

name := kedautil.NormalizeString(fmt.Sprintf("gcp-stackdriver-%s", meta.projectID))
meta.metricName = GenerateMetricNameWithIndex(config.ScalerIndex, name)

if val, ok := config.TriggerMetadata["targetValue"]; ok {
targetValue, err := strconv.ParseInt(val, 10, 64)
if err != nil {
gcpStackdriverLog.Error(err, "Error parsing targetValue")
return nil, fmt.Errorf("error parsing targetValue: %s", err.Error())
}

meta.targetValue = targetValue
}

auth, err := getGcpAuthorization(config, config.ResolvedEnv)
if err != nil {
return nil, err
}
meta.gcpAuthorization = auth
return &meta, nil
}

func initializeStackdriverClient(ctx context.Context, gcpAuthorization *gcpAuthorizationMetadata) (*StackDriverClient, error) {
var client *StackDriverClient
var err error
if gcpAuthorization.podIdentityProviderEnabled {
client, err = NewStackDriverClientPodIdentity(ctx)
} else {
client, err = NewStackDriverClient(ctx, gcpAuthorization.GoogleApplicationCredentials)
}

if err != nil {
gcpStackdriverLog.Error(err, "Failed to create stack driver client")
return nil, err
}
return client, nil
}

func (s *stackdriverScaler) IsActive(ctx context.Context) (bool, error) {
value, err := s.getMetrics(ctx)
if err != nil {
gcpStackdriverLog.Error(err, "error getting metric value")
return false, err
}
return value > 0, nil
}

func (s *stackdriverScaler) Close(context.Context) error {
if s.client != nil {
err := s.client.metricsClient.Close()
s.client = nil
if err != nil {
gcpStackdriverLog.Error(err, "error closing StackDriver client")
}
}

return nil
}

// GetMetricSpecForScaling returns the metric spec for the HPA
func (s *stackdriverScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
// Construct the target value as a quantity
targetValueQty := resource.NewQuantity(s.metadata.targetValue, resource.DecimalSI)

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: s.metadata.metricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetValueQty,
},
}

// Create the metric spec for the HPA
metricSpec := v2beta2.MetricSpec{
External: externalMetric,
Type: externalMetricType,
}

return []v2beta2.MetricSpec{metricSpec}
}

// GetMetrics connects to Stack Driver and retrieves the metric
func (s *stackdriverScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
value, err := s.getMetrics(ctx)
if err != nil {
gcpStackdriverLog.Error(err, "error getting metric value")
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(value, resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// getMetrics gets metric type value from stackdriver api
func (s *stackdriverScaler) getMetrics(ctx context.Context) (int64, error) {
val, err := s.client.GetMetrics(ctx, s.metadata.filter, s.metadata.projectID)
if err == nil {
gcpStackdriverLog.V(1).Info(
fmt.Sprintf("Getting metrics for project %s and filter %s. Result: %d", s.metadata.projectID, s.metadata.filter, val))
}

return val, err
}
77 changes: 77 additions & 0 deletions pkg/scalers/gcp_stackdriver_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package scalers

import (
"context"
"testing"
)

var testStackdriverResolvedEnv = map[string]string{
"SAMPLE_CREDS": "{}",
}

type parseStackdriverMetadataTestData struct {
authParams map[string]string
metadata map[string]string
isError bool
}

type gcpStackdriverMetricIdentifier struct {
metadataTestData *parseStackdriverMetadataTestData
scalerIndex int
name string
}

var sdFilter = "metric.type=\"storage.googleapis.com/storage/object_count\" resource.type=\"gcs_bucket\""

var testStackdriverMetadata = []parseStackdriverMetadataTestData{
{map[string]string{}, map[string]string{}, true},
// all properly formed
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "targetValue": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// all required properly formed
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// missing projectId
{nil, map[string]string{"filter": sdFilter, "targetValue": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// missing filter
{nil, map[string]string{"projectId": "myProject", "targetValue": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// missing credentials
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "targetValue": "7"}, true},
// malformed targetValue
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "targetValue": "aa", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// Credentials from AuthParams
{map[string]string{"GoogleApplicationCredentials": "Creds", "podIdentityOwner": ""}, map[string]string{"projectId": "myProject", "filter": sdFilter}, false},
// Credentials from AuthParams with empty creds
{map[string]string{"GoogleApplicationCredentials": "", "podIdentityOwner": ""}, map[string]string{"projectId": "myProject", "filter": sdFilter}, true},
}

var gcpStackdriverMetricIdentifiers = []gcpStackdriverMetricIdentifier{
{&testStackdriverMetadata[1], 0, "s0-gcp-stackdriver-myProject"},
{&testStackdriverMetadata[1], 1, "s1-gcp-stackdriver-myProject"},
}

func TestStackdriverParseMetadata(t *testing.T) {
for _, testData := range testStackdriverMetadata {
_, err := parseStackdriverMetadata(&ScalerConfig{AuthParams: testData.authParams, TriggerMetadata: testData.metadata, ResolvedEnv: testStackdriverResolvedEnv})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
}
}

func TestGcpStackdriverGetMetricSpecForScaling(t *testing.T) {
for _, testData := range gcpStackdriverMetricIdentifiers {
meta, err := parseStackdriverMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testStackdriverResolvedEnv, ScalerIndex: testData.scalerIndex})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockGcpStackdriverScaler := stackdriverScaler{nil, meta}

metricSpec := mockGcpStackdriverScaler.GetMetricSpecForScaling(context.Background())
metricName := metricSpec[0].External.Metric.Name
if metricName != testData.name {
t.Error("Wrong External metric source name:", metricName)
}
}
}
2 changes: 2 additions & 0 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string,
return scalers.NewExternalPushScaler(config)
case "gcp-pubsub":
return scalers.NewPubSubScaler(config)
case "gcp-stackdriver":
return scalers.NewStackdriverScaler(ctx, config)
case "gcp-storage":
return scalers.NewGcsScaler(config)
case "graphite":
Expand Down
Loading

0 comments on commit e3462f6

Please sign in to comment.