diff --git a/CHANGELOG.md b/CHANGELOG.md index 959f4f87ffa..768ed737aef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,6 +58,7 @@ - GCP PubSub scaler may be used in SubscriptionSize and OldestUnackedMessageAge modes - Cleanup metric names inside scalers ([#2260](https://github.com/kedacore/keda/pull/2260)) - Validating values length in prometheus query response ([#2264](https://github.com/kedacore/keda/pull/2264)) +- Add possibility to reference a GCP PubSub subscription by full link, including project ID ([#2269](https://github.com/kedacore/keda/pull/2269)) - Add `unsafeSsl` parameter in SeleniumGrid scaler ([#2157](https://github.com/kedacore/keda/pull/2157)) - Improve logs of Azure Pipelines Scaler. ([#2297](https://github.com/kedacore/keda/pull/2297)) diff --git a/pkg/scalers/gcp_pub_sub_scaler.go b/pkg/scalers/gcp_pub_sub_scaler.go index 07461076f36..64a5e558cb0 100644 --- a/pkg/scalers/gcp_pub_sub_scaler.go +++ b/pkg/scalers/gcp_pub_sub_scaler.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" + "regexp" "strconv" + "strings" "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" @@ -18,6 +20,7 @@ import ( ) const ( + compositeSubscriptionIDPrefix = "projects/[a-z][a-zA-Z0-9-]*[a-zA-Z0-9]/subscriptions/[a-zA-Z][a-zA-Z0-9-_~%\\+\\.]*" defaultTargetSubscriptionSize = 5 defaultTargetOldestUnackedMessageAge = 10 pubSubStackDriverSubscriptionSizeMetricName = "pubsub.googleapis.com/subscription/num_undelivered_messages" @@ -232,10 +235,23 @@ func (s *pubsubScaler) getMetrics(ctx context.Context, metricType string) (int64 return -1, err } } + subscriptionID, projectID := getSubscriptionData(s) + filter := `metric.type="` + metricType + `" AND resource.labels.subscription_id="` + subscriptionID + `"` - filter := `metric.type="` + metricType + `" AND resource.labels.subscription_id="` + s.metadata.subscriptionName + `"` + return s.client.GetMetrics(ctx, filter, projectID) +} - return s.client.GetMetrics(ctx, filter) +func getSubscriptionData(s *pubsubScaler) (string, string) { + var subscriptionID string + var projectID string + regexpExpression, _ := regexp.Compile(compositeSubscriptionIDPrefix) + if regexpExpression.MatchString(s.metadata.subscriptionName) { + subscriptionID = strings.Split(s.metadata.subscriptionName, "/")[3] + projectID = strings.Split(s.metadata.subscriptionName, "/")[1] + } else { + subscriptionID = s.metadata.subscriptionName + } + return subscriptionID, projectID } func getGcpAuthorization(config *ScalerConfig, resolvedEnv map[string]string) (*gcpAuthorizationMetadata, error) { diff --git a/pkg/scalers/gcp_pubsub_scaler_test.go b/pkg/scalers/gcp_pubsub_scaler_test.go index 85da975ce6c..aba69eb0c2e 100644 --- a/pkg/scalers/gcp_pubsub_scaler_test.go +++ b/pkg/scalers/gcp_pubsub_scaler_test.go @@ -21,6 +21,13 @@ type gcpPubSubMetricIdentifier struct { name string } +type gcpPubSubSubscription struct { + metadataTestData *parsePubSubMetadataTestData + scalerIndex int + name string + projectID string +} + var testPubSubMetadata = []parsePubSubMetadataTestData{ {map[string]string{}, map[string]string{}, true}, // all properly formed with deprecated field @@ -40,7 +47,11 @@ var testPubSubMetadata = []parsePubSubMetadataTestData{ // Credentials from AuthParams {map[string]string{"GoogleApplicationCredentials": "Creds", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "value": "7"}, false}, // Credentials from AuthParams with empty creds - {map[string]string{"GoogleApplicationCredentials": "", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "value": "7"}, true}, + {map[string]string{"GoogleApplicationCredentials": "", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7"}, true}, + // with full link to subscription + {nil, map[string]string{"subscriptionName": "projects/myproject/subscriptions/mysubscription", "subscriptionSize": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false}, + // with full (bad) link to subscription + {nil, map[string]string{"subscriptionName": "projects/myproject/mysubscription", "subscriptionSize": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false}, } var gcpPubSubMetricIdentifiers = []gcpPubSubMetricIdentifier{ @@ -48,6 +59,11 @@ var gcpPubSubMetricIdentifiers = []gcpPubSubMetricIdentifier{ {&testPubSubMetadata[1], 1, "s1-gcp-ps-mysubscription"}, } +var gcpSubscriptionNameTests = []gcpPubSubSubscription{ + {&testPubSubMetadata[10], 1, "mysubscription", "myproject"}, + {&testPubSubMetadata[11], 1, "projects/myproject/mysubscription", ""}, +} + func TestPubSubParseMetadata(t *testing.T) { for _, testData := range testPubSubMetadata { _, err := parsePubSubMetadata(&ScalerConfig{AuthParams: testData.authParams, TriggerMetadata: testData.metadata, ResolvedEnv: testPubSubResolvedEnv}) @@ -75,3 +91,18 @@ func TestGcpPubSubGetMetricSpecForScaling(t *testing.T) { } } } + +func TestGcpPubSubSubscriptionName(t *testing.T) { + for _, testData := range gcpSubscriptionNameTests { + meta, err := parsePubSubMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testPubSubResolvedEnv, ScalerIndex: testData.scalerIndex}) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + mockGcpPubSubScaler := pubsubScaler{nil, meta} + subscriptionID, projectID := getSubscriptionData(&mockGcpPubSubScaler) + + if subscriptionID != testData.name || projectID != testData.projectID { + t.Error("Wrong Subscription parsing:", subscriptionID, projectID) + } + } +} diff --git a/pkg/scalers/stackdriver_client.go b/pkg/scalers/stackdriver_client.go index cb72037629f..40f1b48b60b 100644 --- a/pkg/scalers/stackdriver_client.go +++ b/pkg/scalers/stackdriver_client.go @@ -62,7 +62,7 @@ func NewStackDriverClientPodIdentity(ctx context.Context) (*StackDriverClient, e } // GetMetrics fetches metrics from stackdriver for a specific filter for the last minute -func (s StackDriverClient) GetMetrics(ctx context.Context, filter string) (int64, error) { +func (s StackDriverClient) GetMetrics(ctx context.Context, filter string, projectID string) (int64, error) { // Set the start time to 1 minute ago startTime := time.Now().UTC().Add(time.Minute * -2) @@ -70,34 +70,25 @@ func (s StackDriverClient) GetMetrics(ctx context.Context, filter string) (int64 endTime := time.Now().UTC() // Create a request with the filter and the GCP project ID - var req *monitoringpb.ListTimeSeriesRequest - if len(s.projectID) > 0 { - req = &monitoringpb.ListTimeSeriesRequest{ - Name: "projects/" + s.projectID, - Filter: filter, - Interval: &monitoringpb.TimeInterval{ - StartTime: ×tamp.Timestamp{ - Seconds: startTime.Unix(), - }, - EndTime: ×tamp.Timestamp{ - Seconds: endTime.Unix(), - }, - }, - } - } else { - req = &monitoringpb.ListTimeSeriesRequest{ - Name: "projects/" + s.credentials.ProjectID, - Filter: filter, - Interval: &monitoringpb.TimeInterval{ - StartTime: ×tamp.Timestamp{ - Seconds: startTime.Unix(), - }, - EndTime: ×tamp.Timestamp{ - Seconds: endTime.Unix(), - }, - }, + var req = &monitoringpb.ListTimeSeriesRequest{ + Filter: filter, + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamp.Timestamp{Seconds: startTime.Unix()}, + EndTime: ×tamp.Timestamp{Seconds: endTime.Unix()}, + }, + } + + switch projectID { + case "": + if len(s.projectID) > 0 { + req.Name = "projects/" + s.projectID + } else { + req.Name = "projects/" + s.credentials.ProjectID } + default: + req.Name = "projects/" + projectID } + // Get an iterator with the list of time series it := s.metricsClient.ListTimeSeries(ctx, req)