Skip to content

Commit

Permalink
Support non-public cloud environments in the Azure Service Bus scaler (
Browse files Browse the repository at this point in the history
…#1907)

Signed-off-by: amirschw <24677563+amirschw@users.noreply.github.com>
  • Loading branch information
amirschw authored Aug 5, 2021
1 parent eacf6d8 commit d7f7f09
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
### New

- Extend Azure Monitor scaler to support custom metrics ([#1883](https://github.com/kedacore/keda/pull/1883))
- Support non-public cloud environments in the Azure Service Bus scaler ([#1907](https://github.com/kedacore/keda/pull/1907))
- Support non-public cloud environments in the Azure Storage Queue and Azure Storage Blob scalers ([#1863](https://github.com/kedacore/keda/pull/1863))
- Show HashiCorp Vault Address when using `kubectl get ta` or `kubectl get cta` ([#1862](https://github.com/kedacore/keda/pull/1862))
- Add Solace PubSub+ Event Broker Scaler ([#1945](https://github.com/kedacore/keda/pull/1945))
Expand Down
33 changes: 33 additions & 0 deletions pkg/scalers/azure/azure_cloud_environment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package azure

import (
"fmt"
"strings"

az "github.com/Azure/go-autorest/autorest/azure"
)

// EnvironmentSuffixProvider for different types of Azure scalers
type EnvironmentSuffixProvider func(env az.Environment) (string, error)

// ParseEndpointSuffix parses cloud and endpointSuffix metadata and returns the resolved endpoint suffix
func ParseEndpointSuffix(metadata map[string]string, suffixProvider EnvironmentSuffixProvider) (string, error) {
if val, ok := metadata["cloud"]; ok && val != "" {
if strings.EqualFold(val, PrivateCloud) {
if val, ok := metadata["endpointSuffix"]; ok && val != "" {
return val, nil
}
return "", fmt.Errorf("endpointSuffix must be provided for %s cloud type", PrivateCloud)
}

env, err := az.EnvironmentFromName(val)
if err != nil {
return "", fmt.Errorf("invalid cloud environment %s", val)
}

return suffixProvider(env)
}

// Use public cloud suffix if `cloud` isn't specified
return suffixProvider(az.PublicCloud)
}
52 changes: 52 additions & 0 deletions pkg/scalers/azure/azure_cloud_environment_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package azure

import (
"fmt"
"testing"

az "github.com/Azure/go-autorest/autorest/azure"
)

type parseEndpointSuffixTestData struct {
metadata map[string]string
endpointSuffix string
suffixProvider EnvironmentSuffixProvider
isError bool
}

var testSuffixProvider EnvironmentSuffixProvider = func(env az.Environment) (string, error) {
if env == az.USGovernmentCloud {
return "", fmt.Errorf("test endpoint is not available in %s", env.Name)
}
return fmt.Sprintf("%s.suffix", env.Name), nil
}

var parseEndpointSuffixTestDataset = []parseEndpointSuffixTestData{
{map[string]string{}, "AzurePublicCloud.suffix", testSuffixProvider, false},
{map[string]string{"cloud": "Invalid"}, "", testSuffixProvider, true},
{map[string]string{"cloud": "AzureUSGovernmentCloud"}, "", testSuffixProvider, true},
{map[string]string{"cloud": "AzureGermanCloud"}, "AzureGermanCloud.suffix", testSuffixProvider, false},
{map[string]string{"cloud": "Private"}, "", testSuffixProvider, true},
{map[string]string{"cloud": "Private", "endpointSuffix": "suffix.private.cloud"}, "suffix.private.cloud", testSuffixProvider, false},
{map[string]string{"endpointSuffix": "ignored"}, "AzurePublicCloud.suffix", testSuffixProvider, false},
}

func TestParseEndpointSuffix(t *testing.T) {
for _, testData := range parseEndpointSuffixTestDataset {
endpointSuffix, err := ParseEndpointSuffix(testData.metadata, testData.suffixProvider)
if !testData.isError && err != nil {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
if err == nil {
if endpointSuffix != testData.endpointSuffix {
t.Error(
"For", testData.metadata,
"expected endpointSuffix=", testData.endpointSuffix,
"but got", endpointSuffix)
}
}
}
}
16 changes: 2 additions & 14 deletions pkg/scalers/azure/azure_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,11 @@ func (e StorageEndpointType) GetEndpointSuffix(environment az.Environment) strin

// ParseAzureStorageEndpointSuffix parses cloud and endpointSuffix metadata and returns endpoint suffix
func ParseAzureStorageEndpointSuffix(metadata map[string]string, endpointType StorageEndpointType) (string, error) {
if val, ok := metadata["cloud"]; ok && val != "" {
if strings.EqualFold(val, PrivateCloud) {
if val, ok := metadata["endpointSuffix"]; ok && val != "" {
return val, nil
}
return "", fmt.Errorf("endpointSuffix must be provided for %s cloud type", PrivateCloud)
}

env, err := az.EnvironmentFromName(val)
if err != nil {
return "", fmt.Errorf("invalid cloud environment %s", val)
}
envSuffixProvider := func(env az.Environment) (string, error) {
return endpointType.GetEndpointSuffix(env), nil
}

// Use the default public cloud endpoint suffix if `cloud` isn't specified
return endpointType.GetEndpointSuffix(az.PublicCloud), nil
return ParseEndpointSuffix(metadata, envSuffixProvider)
}

// ParseAzureStorageQueueConnection parses queue connection string and returns credential and resource url
Expand Down
6 changes: 3 additions & 3 deletions pkg/scalers/azure/azure_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ func TestParseStorageConnectionString(t *testing.T) {
}
}

type parseEndpointSuffixTestData struct {
type parseAzureStorageEndpointSuffixTestData struct {
metadata map[string]string
endpointSuffix string
endpointType StorageEndpointType
isError bool
}

var parseEndpointSuffixTestDataset = []parseEndpointSuffixTestData{
var parseAzureStorageEndpointSuffixTestDataset = []parseAzureStorageEndpointSuffixTestData{
{map[string]string{}, "queue.core.windows.net", QueueEndpoint, false},
{map[string]string{"cloud": "InvalidCloud"}, "", QueueEndpoint, true},
{map[string]string{"cloud": "AzureUSGovernmentCloud"}, "queue.core.usgovcloudapi.net", QueueEndpoint, false},
Expand All @@ -78,7 +78,7 @@ var parseEndpointSuffixTestDataset = []parseEndpointSuffixTestData{
}

func TestParseAzureStorageEndpointSuffix(t *testing.T) {
for _, testData := range parseEndpointSuffixTestDataset {
for _, testData := range parseAzureStorageEndpointSuffixTestDataset {
endpointSuffix, err := ParseAzureStorageEndpointSuffix(testData.metadata, testData.endpointType)
if !testData.isError && err != nil {
t.Error("Expected success but got error", err)
Expand Down
16 changes: 15 additions & 1 deletion pkg/scalers/azure_servicebus_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/Azure/azure-amqp-common-go/v3/auth"
servicebus "github.com/Azure/azure-service-bus-go"
az "github.com/Azure/go-autorest/autorest/azure"
"github.com/kedacore/keda/v2/pkg/scalers/azure"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -46,6 +47,7 @@ type azureServiceBusMetadata struct {
connection string
entityType entityType
namespace string
endpointSuffix string
}

// NewAzureServiceBusScaler creates a new AzureServiceBusScaler
Expand Down Expand Up @@ -102,6 +104,16 @@ func parseAzureServiceBusMetadata(config *ScalerConfig) (*azureServiceBusMetadat
}
}

envSuffixProvider := func(env az.Environment) (string, error) {
return env.ServiceBusEndpointSuffix, nil
}

endpointSuffix, err := azure.ParseEndpointSuffix(config.TriggerMetadata, envSuffixProvider)
if err != nil {
return nil, err
}
meta.endpointSuffix = endpointSuffix

if meta.entityType == none {
return nil, fmt.Errorf("no service bus entity type set")
}
Expand Down Expand Up @@ -198,7 +210,8 @@ type azureTokenProvider struct {

// GetToken implements TokenProvider interface for azureTokenProvider
func (a azureTokenProvider) GetToken(uri string) (*auth.Token, error) {
token, err := azure.GetAzureADPodIdentityToken(a.httpClient, "https://servicebus.azure.net")
// Service bus resource id is "https://servicebus.azure.net/" in all cloud environments
token, err := azure.GetAzureADPodIdentityToken(a.httpClient, "https://servicebus.azure.net/")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -249,6 +262,7 @@ func (s *azureServiceBusScaler) getServiceBusNamespace() (*servicebus.Namespace,
namespace.Name = s.metadata.namespace
}

namespace.Suffix = s.metadata.endpointSuffix
return namespace, nil
}

Expand Down
57 changes: 37 additions & 20 deletions pkg/scalers/azure_servicebus_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ const (
connectionSetting = "none"
namespaceName = "ns"
messageCount = "1000"
defaultSuffix = "servicebus.windows.net"
)

type parseServiceBusMetadataTestData struct {
metadata map[string]string
isError bool
entityType entityType
authParams map[string]string
podIdentity kedav1alpha1.PodIdentityProvider
metadata map[string]string
isError bool
entityType entityType
endpointSuffix string
authParams map[string]string
podIdentity kedav1alpha1.PodIdentityProvider
}

type azServiceBusMetricIdentifier struct {
Expand All @@ -43,31 +45,41 @@ var connectionResolvedEnv = map[string]string{
}

var parseServiceBusMetadataDataset = []parseServiceBusMetadataTestData{
{map[string]string{}, true, none, map[string]string{}, ""},
{map[string]string{}, true, none, "", map[string]string{}, ""},
// properly formed queue
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting}, false, queue, map[string]string{}, ""},
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting}, false, queue, defaultSuffix, map[string]string{}, ""},
// properly formed queue with message count
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "messageCount": messageCount}, false, queue, map[string]string{}, ""},
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "messageCount": messageCount}, false, queue, defaultSuffix, map[string]string{}, ""},
// properly formed topic & subscription
{map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting}, false, subscription, map[string]string{}, ""},
{map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting}, false, subscription, defaultSuffix, map[string]string{}, ""},
// properly formed topic & subscription with message count
{map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting, "messageCount": messageCount}, false, subscription, map[string]string{}, ""},
{map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting, "messageCount": messageCount}, false, subscription, defaultSuffix, map[string]string{}, ""},
// queue and topic specified
{map[string]string{"queueName": queueName, "topicName": topicName, "connectionFromEnv": connectionSetting}, true, none, map[string]string{}, ""},
{map[string]string{"queueName": queueName, "topicName": topicName, "connectionFromEnv": connectionSetting}, true, none, "", map[string]string{}, ""},
// queue and subscription specified
{map[string]string{"queueName": queueName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting}, true, none, map[string]string{}, ""},
{map[string]string{"queueName": queueName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting}, true, none, "", map[string]string{}, ""},
// topic but no subscription specified
{map[string]string{"topicName": topicName, "connectionFromEnv": connectionSetting}, true, none, map[string]string{}, ""},
{map[string]string{"topicName": topicName, "connectionFromEnv": connectionSetting}, true, none, "", map[string]string{}, ""},
// subscription but no topic specified
{map[string]string{"subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting}, true, none, map[string]string{}, ""},
{map[string]string{"subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting}, true, none, "", map[string]string{}, ""},
// valid cloud
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "cloud": "AzureChinaCloud"}, false, queue, "servicebus.chinacloudapi.cn", map[string]string{}, ""},
// invalid cloud
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "cloud": "InvalidCloud"}, true, none, "", map[string]string{}, ""},
// private cloud with endpoint suffix
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "cloud": "Private", "endpointSuffix": "servicebus.private.cloud"}, false, queue, "servicebus.private.cloud", map[string]string{}, ""},
// private cloud without endpoint suffix
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "cloud": "Private"}, true, none, "", map[string]string{}, ""},
// endpoint suffix without cloud
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "endpointSuffix": "ignored"}, false, queue, defaultSuffix, map[string]string{}, ""},
// connection not set
{map[string]string{"queueName": queueName}, true, queue, map[string]string{}, ""},
{map[string]string{"queueName": queueName}, true, queue, "", map[string]string{}, ""},
// connection set in auth params
{map[string]string{"queueName": queueName}, false, queue, map[string]string{"connection": connectionSetting}, ""},
{map[string]string{"queueName": queueName}, false, queue, defaultSuffix, map[string]string{"connection": connectionSetting}, ""},
// pod identity but missing namespace
{map[string]string{"queueName": queueName}, true, queue, map[string]string{}, kedav1alpha1.PodIdentityProviderAzure},
{map[string]string{"queueName": queueName}, true, queue, "", map[string]string{}, kedav1alpha1.PodIdentityProviderAzure},
// correct pod identity
{map[string]string{"queueName": queueName, "namespace": namespaceName}, false, queue, map[string]string{}, kedav1alpha1.PodIdentityProviderAzure},
{map[string]string{"queueName": queueName, "namespace": namespaceName}, false, queue, defaultSuffix, map[string]string{}, kedav1alpha1.PodIdentityProviderAzure},
}

var azServiceBusMetricIdentifiers = []azServiceBusMetricIdentifier{
Expand Down Expand Up @@ -116,8 +128,13 @@ func TestParseServiceBusMetadata(t *testing.T) {
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
if meta != nil && meta.entityType != testData.entityType {
t.Errorf("Expected entity type %v but got %v\n", testData.entityType, meta.entityType)
if meta != nil {
if meta.entityType != testData.entityType {
t.Errorf("Expected entity type %v but got %v\n", testData.entityType, meta.entityType)
}
if meta.endpointSuffix != testData.endpointSuffix {
t.Errorf("Expected endpoint suffix %v but got %v\n", testData.endpointSuffix, meta.endpointSuffix)
}
}
}
}
Expand Down

0 comments on commit d7f7f09

Please sign in to comment.