From 51f73bbc71221756ddd7b4925c87054ed5d2061a Mon Sep 17 00:00:00 2001 From: amirschw <24677563+amirschw@users.noreply.github.com> Date: Thu, 5 Aug 2021 22:20:03 +0300 Subject: [PATCH] Support non-public cloud environments in the Azure Service Bus scaler (#1907) Signed-off-by: amirschw <24677563+amirschw@users.noreply.github.com> Signed-off-by: Tsuyoshi Ushio --- CHANGELOG.md | 1 + pkg/scalers/azure/azure_cloud_environment.go | 33 +++++++++++ .../azure/azure_cloud_environment_test.go | 52 +++++++++++++++++ pkg/scalers/azure/azure_storage.go | 16 +----- pkg/scalers/azure/azure_storage_test.go | 6 +- pkg/scalers/azure_servicebus_scaler.go | 16 +++++- pkg/scalers/azure_servicebus_scaler_test.go | 57 ++++++++++++------- 7 files changed, 143 insertions(+), 38 deletions(-) create mode 100644 pkg/scalers/azure/azure_cloud_environment.go create mode 100644 pkg/scalers/azure/azure_cloud_environment_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e9019efc84..002160748d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ ### New - Extend Azure Monitor scaler to support custom metrics ([#1883](https://github.com/kedacore/keda/pull/1883)) +- Support non-public cloud environments in the Azure Service Bus scaler ([#1907](https://github.com/kedacore/keda/pull/1907)) - Support non-public cloud environments in the Azure Storage Queue and Azure Storage Blob scalers ([#1863](https://github.com/kedacore/keda/pull/1863)) - Show HashiCorp Vault Address when using `kubectl get ta` or `kubectl get cta` ([#1862](https://github.com/kedacore/keda/pull/1862)) - Add Solace PubSub+ Event Broker Scaler ([#1945](https://github.com/kedacore/keda/pull/1945)) diff --git a/pkg/scalers/azure/azure_cloud_environment.go b/pkg/scalers/azure/azure_cloud_environment.go new file mode 100644 index 00000000000..a16f8bea314 --- /dev/null +++ b/pkg/scalers/azure/azure_cloud_environment.go @@ -0,0 +1,33 @@ +package azure + +import ( + "fmt" + "strings" + + az "github.com/Azure/go-autorest/autorest/azure" +) + +// EnvironmentSuffixProvider for different types of Azure scalers +type EnvironmentSuffixProvider func(env az.Environment) (string, error) + +// ParseEndpointSuffix parses cloud and endpointSuffix metadata and returns the resolved endpoint suffix +func ParseEndpointSuffix(metadata map[string]string, suffixProvider EnvironmentSuffixProvider) (string, error) { + if val, ok := metadata["cloud"]; ok && val != "" { + if strings.EqualFold(val, PrivateCloud) { + if val, ok := metadata["endpointSuffix"]; ok && val != "" { + return val, nil + } + return "", fmt.Errorf("endpointSuffix must be provided for %s cloud type", PrivateCloud) + } + + env, err := az.EnvironmentFromName(val) + if err != nil { + return "", fmt.Errorf("invalid cloud environment %s", val) + } + + return suffixProvider(env) + } + + // Use public cloud suffix if `cloud` isn't specified + return suffixProvider(az.PublicCloud) +} diff --git a/pkg/scalers/azure/azure_cloud_environment_test.go b/pkg/scalers/azure/azure_cloud_environment_test.go new file mode 100644 index 00000000000..3f56e0f4a47 --- /dev/null +++ b/pkg/scalers/azure/azure_cloud_environment_test.go @@ -0,0 +1,52 @@ +package azure + +import ( + "fmt" + "testing" + + az "github.com/Azure/go-autorest/autorest/azure" +) + +type parseEndpointSuffixTestData struct { + metadata map[string]string + endpointSuffix string + suffixProvider EnvironmentSuffixProvider + isError bool +} + +var testSuffixProvider EnvironmentSuffixProvider = func(env az.Environment) (string, error) { + if env == az.USGovernmentCloud { + return "", fmt.Errorf("test endpoint is not available in %s", env.Name) + } + return fmt.Sprintf("%s.suffix", env.Name), nil +} + +var parseEndpointSuffixTestDataset = []parseEndpointSuffixTestData{ + {map[string]string{}, "AzurePublicCloud.suffix", testSuffixProvider, false}, + {map[string]string{"cloud": "Invalid"}, "", testSuffixProvider, true}, + {map[string]string{"cloud": "AzureUSGovernmentCloud"}, "", testSuffixProvider, true}, + {map[string]string{"cloud": "AzureGermanCloud"}, "AzureGermanCloud.suffix", testSuffixProvider, false}, + {map[string]string{"cloud": "Private"}, "", testSuffixProvider, true}, + {map[string]string{"cloud": "Private", "endpointSuffix": "suffix.private.cloud"}, "suffix.private.cloud", testSuffixProvider, false}, + {map[string]string{"endpointSuffix": "ignored"}, "AzurePublicCloud.suffix", testSuffixProvider, false}, +} + +func TestParseEndpointSuffix(t *testing.T) { + for _, testData := range parseEndpointSuffixTestDataset { + endpointSuffix, err := ParseEndpointSuffix(testData.metadata, testData.suffixProvider) + if !testData.isError && err != nil { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + if err == nil { + if endpointSuffix != testData.endpointSuffix { + t.Error( + "For", testData.metadata, + "expected endpointSuffix=", testData.endpointSuffix, + "but got", endpointSuffix) + } + } + } +} diff --git a/pkg/scalers/azure/azure_storage.go b/pkg/scalers/azure/azure_storage.go index 856153a6c29..5166e8a7ef8 100644 --- a/pkg/scalers/azure/azure_storage.go +++ b/pkg/scalers/azure/azure_storage.go @@ -53,23 +53,11 @@ func (e StorageEndpointType) GetEndpointSuffix(environment az.Environment) strin // ParseAzureStorageEndpointSuffix parses cloud and endpointSuffix metadata and returns endpoint suffix func ParseAzureStorageEndpointSuffix(metadata map[string]string, endpointType StorageEndpointType) (string, error) { - if val, ok := metadata["cloud"]; ok && val != "" { - if strings.EqualFold(val, PrivateCloud) { - if val, ok := metadata["endpointSuffix"]; ok && val != "" { - return val, nil - } - return "", fmt.Errorf("endpointSuffix must be provided for %s cloud type", PrivateCloud) - } - - env, err := az.EnvironmentFromName(val) - if err != nil { - return "", fmt.Errorf("invalid cloud environment %s", val) - } + envSuffixProvider := func(env az.Environment) (string, error) { return endpointType.GetEndpointSuffix(env), nil } - // Use the default public cloud endpoint suffix if `cloud` isn't specified - return endpointType.GetEndpointSuffix(az.PublicCloud), nil + return ParseEndpointSuffix(metadata, envSuffixProvider) } // ParseAzureStorageQueueConnection parses queue connection string and returns credential and resource url diff --git a/pkg/scalers/azure/azure_storage_test.go b/pkg/scalers/azure/azure_storage_test.go index f5657bf700f..82cdc8f8dcf 100644 --- a/pkg/scalers/azure/azure_storage_test.go +++ b/pkg/scalers/azure/azure_storage_test.go @@ -61,14 +61,14 @@ func TestParseStorageConnectionString(t *testing.T) { } } -type parseEndpointSuffixTestData struct { +type parseAzureStorageEndpointSuffixTestData struct { metadata map[string]string endpointSuffix string endpointType StorageEndpointType isError bool } -var parseEndpointSuffixTestDataset = []parseEndpointSuffixTestData{ +var parseAzureStorageEndpointSuffixTestDataset = []parseAzureStorageEndpointSuffixTestData{ {map[string]string{}, "queue.core.windows.net", QueueEndpoint, false}, {map[string]string{"cloud": "InvalidCloud"}, "", QueueEndpoint, true}, {map[string]string{"cloud": "AzureUSGovernmentCloud"}, "queue.core.usgovcloudapi.net", QueueEndpoint, false}, @@ -78,7 +78,7 @@ var parseEndpointSuffixTestDataset = []parseEndpointSuffixTestData{ } func TestParseAzureStorageEndpointSuffix(t *testing.T) { - for _, testData := range parseEndpointSuffixTestDataset { + for _, testData := range parseAzureStorageEndpointSuffixTestDataset { endpointSuffix, err := ParseAzureStorageEndpointSuffix(testData.metadata, testData.endpointType) if !testData.isError && err != nil { t.Error("Expected success but got error", err) diff --git a/pkg/scalers/azure_servicebus_scaler.go b/pkg/scalers/azure_servicebus_scaler.go index cf3e61aa2e8..ffbe4d5995a 100755 --- a/pkg/scalers/azure_servicebus_scaler.go +++ b/pkg/scalers/azure_servicebus_scaler.go @@ -8,6 +8,7 @@ import ( "github.com/Azure/azure-amqp-common-go/v3/auth" servicebus "github.com/Azure/azure-service-bus-go" + az "github.com/Azure/go-autorest/autorest/azure" "github.com/kedacore/keda/v2/pkg/scalers/azure" v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" @@ -46,6 +47,7 @@ type azureServiceBusMetadata struct { connection string entityType entityType namespace string + endpointSuffix string } // NewAzureServiceBusScaler creates a new AzureServiceBusScaler @@ -102,6 +104,16 @@ func parseAzureServiceBusMetadata(config *ScalerConfig) (*azureServiceBusMetadat } } + envSuffixProvider := func(env az.Environment) (string, error) { + return env.ServiceBusEndpointSuffix, nil + } + + endpointSuffix, err := azure.ParseEndpointSuffix(config.TriggerMetadata, envSuffixProvider) + if err != nil { + return nil, err + } + meta.endpointSuffix = endpointSuffix + if meta.entityType == none { return nil, fmt.Errorf("no service bus entity type set") } @@ -198,7 +210,8 @@ type azureTokenProvider struct { // GetToken implements TokenProvider interface for azureTokenProvider func (a azureTokenProvider) GetToken(uri string) (*auth.Token, error) { - token, err := azure.GetAzureADPodIdentityToken(a.httpClient, "https://servicebus.azure.net") + // Service bus resource id is "https://servicebus.azure.net/" in all cloud environments + token, err := azure.GetAzureADPodIdentityToken(a.httpClient, "https://servicebus.azure.net/") if err != nil { return nil, err } @@ -249,6 +262,7 @@ func (s *azureServiceBusScaler) getServiceBusNamespace() (*servicebus.Namespace, namespace.Name = s.metadata.namespace } + namespace.Suffix = s.metadata.endpointSuffix return namespace, nil } diff --git a/pkg/scalers/azure_servicebus_scaler_test.go b/pkg/scalers/azure_servicebus_scaler_test.go index d41704386dc..8b14fa22d1d 100755 --- a/pkg/scalers/azure_servicebus_scaler_test.go +++ b/pkg/scalers/azure_servicebus_scaler_test.go @@ -17,14 +17,16 @@ const ( connectionSetting = "none" namespaceName = "ns" messageCount = "1000" + defaultSuffix = "servicebus.windows.net" ) type parseServiceBusMetadataTestData struct { - metadata map[string]string - isError bool - entityType entityType - authParams map[string]string - podIdentity kedav1alpha1.PodIdentityProvider + metadata map[string]string + isError bool + entityType entityType + endpointSuffix string + authParams map[string]string + podIdentity kedav1alpha1.PodIdentityProvider } type azServiceBusMetricIdentifier struct { @@ -43,31 +45,41 @@ var connectionResolvedEnv = map[string]string{ } var parseServiceBusMetadataDataset = []parseServiceBusMetadataTestData{ - {map[string]string{}, true, none, map[string]string{}, ""}, + {map[string]string{}, true, none, "", map[string]string{}, ""}, // properly formed queue - {map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting}, false, queue, map[string]string{}, ""}, + {map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting}, false, queue, defaultSuffix, map[string]string{}, ""}, // properly formed queue with message count - {map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "messageCount": messageCount}, false, queue, map[string]string{}, ""}, + {map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "messageCount": messageCount}, false, queue, defaultSuffix, map[string]string{}, ""}, // properly formed topic & subscription - {map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting}, false, subscription, map[string]string{}, ""}, + {map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting}, false, subscription, defaultSuffix, map[string]string{}, ""}, // properly formed topic & subscription with message count - {map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting, "messageCount": messageCount}, false, subscription, map[string]string{}, ""}, + {map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting, "messageCount": messageCount}, false, subscription, defaultSuffix, map[string]string{}, ""}, // queue and topic specified - {map[string]string{"queueName": queueName, "topicName": topicName, "connectionFromEnv": connectionSetting}, true, none, map[string]string{}, ""}, + {map[string]string{"queueName": queueName, "topicName": topicName, "connectionFromEnv": connectionSetting}, true, none, "", map[string]string{}, ""}, // queue and subscription specified - {map[string]string{"queueName": queueName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting}, true, none, map[string]string{}, ""}, + {map[string]string{"queueName": queueName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting}, true, none, "", map[string]string{}, ""}, // topic but no subscription specified - {map[string]string{"topicName": topicName, "connectionFromEnv": connectionSetting}, true, none, map[string]string{}, ""}, + {map[string]string{"topicName": topicName, "connectionFromEnv": connectionSetting}, true, none, "", map[string]string{}, ""}, // subscription but no topic specified - {map[string]string{"subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting}, true, none, map[string]string{}, ""}, + {map[string]string{"subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting}, true, none, "", map[string]string{}, ""}, + // valid cloud + {map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "cloud": "AzureChinaCloud"}, false, queue, "servicebus.chinacloudapi.cn", map[string]string{}, ""}, + // invalid cloud + {map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "cloud": "InvalidCloud"}, true, none, "", map[string]string{}, ""}, + // private cloud with endpoint suffix + {map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "cloud": "Private", "endpointSuffix": "servicebus.private.cloud"}, false, queue, "servicebus.private.cloud", map[string]string{}, ""}, + // private cloud without endpoint suffix + {map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "cloud": "Private"}, true, none, "", map[string]string{}, ""}, + // endpoint suffix without cloud + {map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "endpointSuffix": "ignored"}, false, queue, defaultSuffix, map[string]string{}, ""}, // connection not set - {map[string]string{"queueName": queueName}, true, queue, map[string]string{}, ""}, + {map[string]string{"queueName": queueName}, true, queue, "", map[string]string{}, ""}, // connection set in auth params - {map[string]string{"queueName": queueName}, false, queue, map[string]string{"connection": connectionSetting}, ""}, + {map[string]string{"queueName": queueName}, false, queue, defaultSuffix, map[string]string{"connection": connectionSetting}, ""}, // pod identity but missing namespace - {map[string]string{"queueName": queueName}, true, queue, map[string]string{}, kedav1alpha1.PodIdentityProviderAzure}, + {map[string]string{"queueName": queueName}, true, queue, "", map[string]string{}, kedav1alpha1.PodIdentityProviderAzure}, // correct pod identity - {map[string]string{"queueName": queueName, "namespace": namespaceName}, false, queue, map[string]string{}, kedav1alpha1.PodIdentityProviderAzure}, + {map[string]string{"queueName": queueName, "namespace": namespaceName}, false, queue, defaultSuffix, map[string]string{}, kedav1alpha1.PodIdentityProviderAzure}, } var azServiceBusMetricIdentifiers = []azServiceBusMetricIdentifier{ @@ -116,8 +128,13 @@ func TestParseServiceBusMetadata(t *testing.T) { if testData.isError && err == nil { t.Error("Expected error but got success") } - if meta != nil && meta.entityType != testData.entityType { - t.Errorf("Expected entity type %v but got %v\n", testData.entityType, meta.entityType) + if meta != nil { + if meta.entityType != testData.entityType { + t.Errorf("Expected entity type %v but got %v\n", testData.entityType, meta.entityType) + } + if meta.endpointSuffix != testData.endpointSuffix { + t.Errorf("Expected endpoint suffix %v but got %v\n", testData.endpointSuffix, meta.endpointSuffix) + } } } }