diff --git a/CHANGELOG.md b/CHANGELOG.md index dbeee8100bc..4c86e4a2e7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,7 +36,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md ### New -- TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX)) +- **General:** Support for Azure AD Workload Identity as a pod identity provider. ([2487](https://github.com/kedacore/keda/issues/2487)) ### Improvements diff --git a/Makefile b/Makefile index edb4e001550..ce585ef46e1 100644 --- a/Makefile +++ b/Makefile @@ -229,6 +229,11 @@ deploy: manifests kustomize ## Deploy controller to the K8s cluster specified in $(KUSTOMIZE) edit set image ghcr.io/kedacore/keda=${IMAGE_CONTROLLER} cd config/metrics-server && \ $(KUSTOMIZE) edit set image ghcr.io/kedacore/keda-metrics-apiserver=${IMAGE_ADAPTER} + if [ "$(AZURE_RUN_WORKLOAD_IDENTITY_TESTS)" = true ]; then \ + cd config/service_account && \ + $(KUSTOMIZE) edit add label --force azure.workload.identity/use:true; \ + $(KUSTOMIZE) edit add annotation --force azure.workload.identity/client-id:${AZURE_SP_APP_ID} azure.workload.identity/tenant-id:${AZURE_SP_TENANT}; \ + fi # Need this workaround to mitigate a problem with inserting labels into selectors, # until this issue is solved: https://github.com/kubernetes-sigs/kustomize/issues/1009 @sed -i".out" -e 's@version:[ ].*@version: $(VERSION)@g' config/default/kustomize-config/metadataLabelTransformer.yaml diff --git a/apis/keda/v1alpha1/triggerauthentication_types.go b/apis/keda/v1alpha1/triggerauthentication_types.go index a04466f669d..24f4ea44c02 100644 --- a/apis/keda/v1alpha1/triggerauthentication_types.go +++ b/apis/keda/v1alpha1/triggerauthentication_types.go @@ -95,12 +95,13 @@ type PodIdentityProvider string // PodIdentityProviderNone specifies the default state when there is no Identity Provider // PodIdentityProvider specifies other available Identity providers const ( - PodIdentityProviderNone PodIdentityProvider = "none" - PodIdentityProviderAzure PodIdentityProvider = "azure" - PodIdentityProviderGCP PodIdentityProvider = "gcp" - PodIdentityProviderSpiffe PodIdentityProvider = "spiffe" - PodIdentityProviderAwsEKS PodIdentityProvider = "aws-eks" - PodIdentityProviderAwsKiam PodIdentityProvider = "aws-kiam" + PodIdentityProviderNone PodIdentityProvider = "none" + PodIdentityProviderAzure PodIdentityProvider = "azure" + PodIdentityProviderAzureWorkload PodIdentityProvider = "azure-workload" + PodIdentityProviderGCP PodIdentityProvider = "gcp" + PodIdentityProviderSpiffe PodIdentityProvider = "spiffe" + PodIdentityProviderAwsEKS PodIdentityProvider = "aws-eks" + PodIdentityProviderAwsKiam PodIdentityProvider = "aws-kiam" ) // PodIdentityAnnotationEKS specifies aws role arn for aws-eks Identity Provider @@ -180,9 +181,10 @@ type VaultSecret struct { // AzureKeyVault is used to authenticate using Azure Key Vault type AzureKeyVault struct { - VaultURI string `json:"vaultUri"` + VaultURI string `json:"vaultUri"` + Secrets []AzureKeyVaultSecret `json:"secrets"` + // +optional Credentials *AzureKeyVaultCredentials `json:"credentials"` - Secrets []AzureKeyVaultSecret `json:"secrets"` // +optional Cloud *AzureKeyVaultCloudInfo `json:"cloud"` } diff --git a/apis/keda/v1alpha1/zz_generated.deepcopy.go b/apis/keda/v1alpha1/zz_generated.deepcopy.go index fc63e23d1f9..a7d7732de17 100644 --- a/apis/keda/v1alpha1/zz_generated.deepcopy.go +++ b/apis/keda/v1alpha1/zz_generated.deepcopy.go @@ -95,16 +95,16 @@ func (in *AuthSecretTargetRef) DeepCopy() *AuthSecretTargetRef { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AzureKeyVault) DeepCopyInto(out *AzureKeyVault) { *out = *in - if in.Credentials != nil { - in, out := &in.Credentials, &out.Credentials - *out = new(AzureKeyVaultCredentials) - (*in).DeepCopyInto(*out) - } if in.Secrets != nil { in, out := &in.Secrets, &out.Secrets *out = make([]AzureKeyVaultSecret, len(*in)) copy(*out, *in) } + if in.Credentials != nil { + in, out := &in.Credentials, &out.Credentials + *out = new(AzureKeyVaultCredentials) + (*in).DeepCopyInto(*out) + } if in.Cloud != nil { in, out := &in.Cloud, &out.Cloud *out = new(AzureKeyVaultCloudInfo) diff --git a/config/crd/bases/keda.sh_clustertriggerauthentications.yaml b/config/crd/bases/keda.sh_clustertriggerauthentications.yaml index a07a808c67b..0aed26b43a7 100644 --- a/config/crd/bases/keda.sh_clustertriggerauthentications.yaml +++ b/config/crd/bases/keda.sh_clustertriggerauthentications.yaml @@ -116,7 +116,6 @@ spec: vaultUri: type: string required: - - credentials - secrets - vaultUri type: object diff --git a/config/crd/bases/keda.sh_triggerauthentications.yaml b/config/crd/bases/keda.sh_triggerauthentications.yaml index a8ad47f2279..f54cbd72e69 100644 --- a/config/crd/bases/keda.sh_triggerauthentications.yaml +++ b/config/crd/bases/keda.sh_triggerauthentications.yaml @@ -115,7 +115,6 @@ spec: vaultUri: type: string required: - - credentials - secrets - vaultUri type: object diff --git a/config/default/kustomization.yaml b/config/default/kustomization.yaml index cdf3a30e51c..a9a8390e9ea 100644 --- a/config/default/kustomization.yaml +++ b/config/default/kustomization.yaml @@ -28,3 +28,4 @@ resources: - ../rbac - ../manager - ../metrics-server +- ../service_account diff --git a/config/general/kustomization.yaml b/config/general/kustomization.yaml index 29f4bdc8d83..bf20f4df68b 100644 --- a/config/general/kustomization.yaml +++ b/config/general/kustomization.yaml @@ -1,3 +1,2 @@ resources: - namespace.yaml -- service_account.yaml diff --git a/config/service_account/kustomization.yaml b/config/service_account/kustomization.yaml new file mode 100644 index 00000000000..4256b2be3c7 --- /dev/null +++ b/config/service_account/kustomization.yaml @@ -0,0 +1,5 @@ +resources: +- service_account.yaml + +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization diff --git a/config/general/service_account.yaml b/config/service_account/service_account.yaml similarity index 100% rename from config/general/service_account.yaml rename to config/service_account/service_account.yaml diff --git a/go.mod b/go.mod index 74fbe4208fd..d43b7389f7b 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd github.com/Azure/go-autorest/autorest v0.11.27 github.com/Azure/go-autorest/autorest/azure/auth v0.5.11 + github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 github.com/DataDog/datadog-api-client-go v1.13.0 github.com/Huawei/gophercloud v1.0.21 github.com/Shopify/sarama v1.32.0 @@ -140,6 +141,7 @@ require ( github.com/go-playground/universal-translator v0.18.0 // indirect github.com/go-stack/stack v1.8.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang-jwt/jwt/v4 v4.2.0 // indirect github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect github.com/golang-sql/sqlexp v0.0.0-20170517235910-f1bb20e5a188 // indirect @@ -185,6 +187,7 @@ require ( github.com/jpillora/backoff v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.15.0 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect github.com/leodido/go-urn v1.2.1 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.8 // indirect diff --git a/go.sum b/go.sum index 288b9e5890e..a1e6db32933 100644 --- a/go.sum +++ b/go.sum @@ -137,6 +137,8 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk= github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo= github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= +github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 h1:WVsrXCnHlDDX8ls+tootqRE87/hL9S/g4ewig9RsD/c= +github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= @@ -420,6 +422,9 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= +github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= +github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= github.com/golang-jwt/jwt/v4 v4.1.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= github.com/golang-jwt/jwt/v4 v4.2.0 h1:besgBTC8w8HjP6NzQdxwKH9Z5oQMZ24ThTrHp3cZ8eU= @@ -795,6 +800,7 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3PzxT8aQXRPkAt8xlV/e7d7w8GM5g0fa5F0D8= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= @@ -850,6 +856,7 @@ github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA= +github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4/go.mod h1:N6UoU20jOqggOuDwUaBQpluzLNDqif3kq9z2wpdYEfQ= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/pkg/scalers/azure/azure_aad_auth.go b/pkg/scalers/azure/azure_aad_auth.go new file mode 100644 index 00000000000..a96084c08b7 --- /dev/null +++ b/pkg/scalers/azure/azure_aad_auth.go @@ -0,0 +1,33 @@ +/* +Copyright 2022 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import "time" + +// AADToken is the token from Azure AD +type AADToken struct { + AccessToken string `json:"access_token"` + RefreshToken string `json:"refresh_token"` + ExpiresIn string `json:"expires_in"` + ExpiresOn string `json:"expires_on"` + ExpiresOnTimeObject time.Time `json:"expires_on_object"` + NotBefore string `json:"not_before"` + Resource string `json:"resource"` + TokenType string `json:"token_type"` + GrantedScopes []string `json:"grantedScopes"` + DeclinedScopes []string `json:"DeclinedScopes"` +} diff --git a/pkg/scalers/azure/azure_aad_podidentity.go b/pkg/scalers/azure/azure_aad_podidentity.go index 8a4f2566b8b..aa67d0e4899 100644 --- a/pkg/scalers/azure/azure_aad_podidentity.go +++ b/pkg/scalers/azure/azure_aad_podidentity.go @@ -47,14 +47,3 @@ func GetAzureADPodIdentityToken(ctx context.Context, httpClient util.HTTPDoer, a return token, nil } - -// AADToken is the token from Azure AD -type AADToken struct { - AccessToken string `json:"access_token"` - RefreshToken string `json:"refresh_token"` - ExpiresIn string `json:"expires_in"` - ExpiresOn string `json:"expires_on"` - NotBefore string `json:"not_before"` - Resource string `json:"resource"` - TokenType string `json:"token_type"` -} diff --git a/pkg/scalers/azure/azure_aad_workload_identity.go b/pkg/scalers/azure/azure_aad_workload_identity.go new file mode 100644 index 00000000000..d9bfd321cdf --- /dev/null +++ b/pkg/scalers/azure/azure_aad_workload_identity.go @@ -0,0 +1,171 @@ +/* +Copyright 2022 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "context" + "fmt" + "os" + "strconv" + "strings" + "time" + + amqpAuth "github.com/Azure/azure-amqp-common-go/v3/auth" + "github.com/Azure/go-autorest/autorest" + "github.com/Azure/go-autorest/autorest/azure/auth" + "github.com/AzureAD/microsoft-authentication-library-for-go/apps/confidential" +) + +// Azure AD Workload Identity Webhook will inject the following environment variables. +// * AZURE_CLIENT_ID - Client id set in the service account annotation +// * AZURE_TENANT_ID - Tenant id set in the service account annotation. If not defined, then tenant id provided via +// azure-wi-webhook-config will be used. +// * AZURE_FEDERATED_TOKEN_FILE - Service account token file path +// * AZURE_AUTHORITY_HOST - Azure Active Directory (AAD) endpoint. +const ( + azureClientIDEnv = "AZURE_CLIENT_ID" + azureTenantIDEnv = "AZURE_TENANT_ID" + azureFederatedTokenFileEnv = "AZURE_FEDERATED_TOKEN_FILE" + azureAuthrityHostEnv = "AZURE_AUTHORITY_HOST" +) + +// GetAzureADWorkloadIdentityToken returns the AADToken for resource +func GetAzureADWorkloadIdentityToken(ctx context.Context, resource string) (AADToken, error) { + clientID := os.Getenv(azureClientIDEnv) + tenantID := os.Getenv(azureTenantIDEnv) + tokenFilePath := os.Getenv(azureFederatedTokenFileEnv) + authorityHost := os.Getenv(azureAuthrityHostEnv) + + signedAssertion, err := readJWTFromFileSystem(tokenFilePath) + if err != nil { + return AADToken{}, fmt.Errorf("error reading service account token - %w", err) + } + + cred, err := confidential.NewCredFromAssertion(signedAssertion) + if err != nil { + return AADToken{}, fmt.Errorf("error getting credentials from service account token - %w", err) + } + + authorityOption := confidential.WithAuthority(fmt.Sprintf("%s%s/oauth2/token", authorityHost, tenantID)) + confidentialClient, err := confidential.New( + clientID, + cred, + authorityOption, + ) + if err != nil { + return AADToken{}, fmt.Errorf("error creating confidential client - %w", err) + } + + result, err := confidentialClient.AcquireTokenByCredential(ctx, []string{getScopedResource(resource)}) + if err != nil { + return AADToken{}, fmt.Errorf("error acquiring aad token - %w", err) + } + + return AADToken{ + AccessToken: result.AccessToken, + ExpiresOn: strconv.FormatInt(result.ExpiresOn.Unix(), 10), + ExpiresOnTimeObject: result.ExpiresOn, + GrantedScopes: result.GrantedScopes, + DeclinedScopes: result.DeclinedScopes, + }, nil +} + +func readJWTFromFileSystem(tokenFilePath string) (string, error) { + token, err := os.ReadFile(tokenFilePath) + if err != nil { + return "", err + } + return string(token), nil +} + +func getScopedResource(resource string) string { + resource = strings.TrimSuffix(resource, "/") + if !strings.HasSuffix(resource, ".default") { + resource += "/.default" + } + + return resource +} + +type ADWorkloadIdentityConfig struct { + ctx context.Context + Resource string +} + +func NewAzureADWorkloadIdentityConfig(ctx context.Context, resource string) auth.AuthorizerConfig { + return ADWorkloadIdentityConfig{ctx: ctx, Resource: resource} +} + +// Authorizer implements the auth.AuthorizerConfig interface +func (aadWiConfig ADWorkloadIdentityConfig) Authorizer() (autorest.Authorizer, error) { + return autorest.NewBearerAuthorizer(&ADWorkloadIdentityTokenProvider{ctx: aadWiConfig.ctx, Resource: aadWiConfig.Resource}), nil +} + +// ADWorkloadIdentityTokenProvider is a type that implements the adal.OAuthTokenProvider and adal.Refresher interfaces. +// The OAuthTokenProvider interface is used by the BearerAuthorizer to get the token when preparing the HTTP Header. +// The Refresher interface is used by the BearerAuthorizer to refresh the token. +type ADWorkloadIdentityTokenProvider struct { + ctx context.Context + Resource string + aadToken AADToken +} + +func NewADWorkloadIdentityTokenProvider(ctx context.Context, resource string) *ADWorkloadIdentityTokenProvider { + return &ADWorkloadIdentityTokenProvider{ctx: ctx, Resource: resource} +} + +// OAuthToken is for implementing the adal.OAuthTokenProvider interface. It returns the current access token. +func (wiTokenProvider *ADWorkloadIdentityTokenProvider) OAuthToken() string { + return wiTokenProvider.aadToken.AccessToken +} + +// Refresh is for implementing the adal.Refresher interface +func (wiTokenProvider *ADWorkloadIdentityTokenProvider) Refresh() error { + if time.Now().Before(wiTokenProvider.aadToken.ExpiresOnTimeObject) { + return nil + } + + aadToken, err := GetAzureADWorkloadIdentityToken(wiTokenProvider.ctx, wiTokenProvider.Resource) + if err != nil { + return err + } + + wiTokenProvider.aadToken = aadToken + return nil +} + +// RefreshExchange is for implementing the adal.Refresher interface +func (wiTokenProvider *ADWorkloadIdentityTokenProvider) RefreshExchange(resource string) error { + wiTokenProvider.Resource = resource + return wiTokenProvider.Refresh() +} + +// EnsureFresh is for implementing the adal.Refresher interface +func (wiTokenProvider *ADWorkloadIdentityTokenProvider) EnsureFresh() error { + return wiTokenProvider.Refresh() +} + +// GetToken is for implementing the auth.TokenProvider interface +func (wiTokenProvider *ADWorkloadIdentityTokenProvider) GetToken(uri string) (*amqpAuth.Token, error) { + err := wiTokenProvider.Refresh() + if err != nil { + return nil, err + } + + return amqpAuth.NewToken(amqpAuth.CBSTokenTypeJWT, wiTokenProvider.aadToken.AccessToken, + wiTokenProvider.aadToken.ExpiresOn), nil +} diff --git a/pkg/scalers/azure/azure_app_insights.go b/pkg/scalers/azure/azure_app_insights.go index 5a8de634013..22a0371f97e 100644 --- a/pkg/scalers/azure/azure_app_insights.go +++ b/pkg/scalers/azure/azure_app_insights.go @@ -60,17 +60,21 @@ func toISO8601(time string) (string, error) { return fmt.Sprintf("PT%02dH%02dM", hours, minutes), nil } -func getAuthConfig(info AppInsightsInfo, podIdentity kedav1alpha1.PodIdentityProvider) auth.AuthorizerConfig { - if podIdentity == "" || podIdentity == kedav1alpha1.PodIdentityProviderNone { +func getAuthConfig(ctx context.Context, info AppInsightsInfo, podIdentity kedav1alpha1.PodIdentityProvider) auth.AuthorizerConfig { + switch podIdentity { + case "", kedav1alpha1.PodIdentityProviderNone: config := auth.NewClientCredentialsConfig(info.ClientID, info.ClientPassword, info.TenantID) config.Resource = info.AppInsightsResourceURL config.AADEndpoint = info.ActiveDirectoryEndpoint return config + case kedav1alpha1.PodIdentityProviderAzure: + config := auth.NewMSIConfig() + config.Resource = info.AppInsightsResourceURL + return config + case kedav1alpha1.PodIdentityProviderAzureWorkload: + return NewAzureADWorkloadIdentityConfig(ctx, info.AppInsightsResourceURL) } - - config := auth.NewMSIConfig() - config.Resource = info.AppInsightsResourceURL - return config + return nil } func extractAppInsightValue(info AppInsightsInfo, metric ApplicationInsightsMetric) (int64, error) { @@ -112,7 +116,7 @@ func queryParamsForAppInsightsRequest(info AppInsightsInfo) (map[string]interfac // GetAzureAppInsightsMetricValue returns the value of an Azure App Insights metric, rounded to the nearest int func GetAzureAppInsightsMetricValue(ctx context.Context, info AppInsightsInfo, podIdentity kedav1alpha1.PodIdentityProvider) (int64, error) { - config := getAuthConfig(info, podIdentity) + config := getAuthConfig(ctx, info, podIdentity) authorizer, err := config.Authorizer() if err != nil { return -1, err diff --git a/pkg/scalers/azure/azure_app_insights_test.go b/pkg/scalers/azure/azure_app_insights_test.go index 36b953d1ae1..b7006f13a7e 100644 --- a/pkg/scalers/azure/azure_app_insights_test.go +++ b/pkg/scalers/azure/azure_app_insights_test.go @@ -1,6 +1,7 @@ package azure import ( + "context" "testing" "github.com/Azure/go-autorest/autorest/azure/auth" @@ -72,28 +73,40 @@ func TestAzGetAzureAppInsightsMetricValue(t *testing.T) { type testAppInsightsAuthConfigTestData struct { testName string - expectMSI bool + config string info AppInsightsInfo podIdentity kedav1alpha1.PodIdentityProvider } +const ( + msiConfig = "msiConfig" + clientCredentialsConfig = "clientCredentialsConfig" + workloadIdentityConfig = "workloadIdentityConfig" +) + var testAppInsightsAuthConfigData = []testAppInsightsAuthConfigTestData{ - {"client credentials", false, AppInsightsInfo{ClientID: "1234", ClientPassword: "pw", TenantID: "5678"}, ""}, - {"client credentials - pod id none", false, AppInsightsInfo{ClientID: "1234", ClientPassword: "pw", TenantID: "5678"}, kedav1alpha1.PodIdentityProviderNone}, - {"azure pod identity", true, AppInsightsInfo{}, kedav1alpha1.PodIdentityProviderAzure}, + {"client credentials", clientCredentialsConfig, AppInsightsInfo{ClientID: "1234", ClientPassword: "pw", TenantID: "5678"}, ""}, + {"client credentials - pod id none", clientCredentialsConfig, AppInsightsInfo{ClientID: "1234", ClientPassword: "pw", TenantID: "5678"}, kedav1alpha1.PodIdentityProviderNone}, + {"azure pod identity", msiConfig, AppInsightsInfo{}, kedav1alpha1.PodIdentityProviderAzure}, + {"azure workload identity", workloadIdentityConfig, AppInsightsInfo{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, } func TestAzAppInfoGetAuthConfig(t *testing.T) { for _, testData := range testAppInsightsAuthConfigData { - authConfig := getAuthConfig(testData.info, testData.podIdentity) - if testData.expectMSI { + authConfig := getAuthConfig(context.TODO(), testData.info, testData.podIdentity) + switch testData.config { + case msiConfig: if _, ok := authConfig.(auth.MSIConfig); !ok { t.Errorf("Test %v; incorrect auth config. expected MSI config", testData.testName) } - } else { + case clientCredentialsConfig: if _, ok := authConfig.(auth.ClientCredentialsConfig); !ok { t.Errorf("Test: %v; incorrect auth config. expected client credentials config", testData.testName) } + case workloadIdentityConfig: + if _, ok := authConfig.(ADWorkloadIdentityConfig); !ok { + t.Errorf("Test: %v; incorrect auth config. expected ad workload identity config", testData.testName) + } } } } diff --git a/pkg/scalers/azure/azure_data_explorer.go b/pkg/scalers/azure/azure_data_explorer.go index 9d1e0e8f1b2..3bc5d1bc840 100644 --- a/pkg/scalers/azure/azure_data_explorer.go +++ b/pkg/scalers/azure/azure_data_explorer.go @@ -27,6 +27,8 @@ import ( "github.com/Azure/azure-kusto-go/kusto/unsafe" "github.com/Azure/go-autorest/autorest/azure/auth" logf "sigs.k8s.io/controller-runtime/pkg/log" + + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" ) type DataExplorerMetadata struct { @@ -35,7 +37,7 @@ type DataExplorerMetadata struct { DatabaseName string Endpoint string MetricName string - PodIdentity string + PodIdentity kedav1alpha1.PodIdentityProvider Query string TenantID string Threshold int64 @@ -44,13 +46,18 @@ type DataExplorerMetadata struct { var azureDataExplorerLogger = logf.Log.WithName("azure_data_explorer_scaler") -func CreateAzureDataExplorerClient(metadata *DataExplorerMetadata) (*kusto.Client, error) { - authConfig, err := getDataExplorerAuthConfig(metadata) +func CreateAzureDataExplorerClient(ctx context.Context, metadata *DataExplorerMetadata) (*kusto.Client, error) { + authConfig, err := getDataExplorerAuthConfig(ctx, metadata) if err != nil { return nil, fmt.Errorf("failed to get data explorer auth config: %v", err) } - client, err := kusto.New(metadata.Endpoint, kusto.Authorization{Config: *authConfig}) + authorizer, err := authConfig.Authorizer() + if err != nil { + return nil, fmt.Errorf("failed to get authorizer: %v", err) + } + + client, err := kusto.New(metadata.Endpoint, kusto.Authorization{Authorizer: authorizer}) if err != nil { return nil, fmt.Errorf("failed to create kusto client: %v", err) } @@ -58,26 +65,31 @@ func CreateAzureDataExplorerClient(metadata *DataExplorerMetadata) (*kusto.Clien return client, nil } -func getDataExplorerAuthConfig(metadata *DataExplorerMetadata) (*auth.AuthorizerConfig, error) { +func getDataExplorerAuthConfig(ctx context.Context, metadata *DataExplorerMetadata) (auth.AuthorizerConfig, error) { var authConfig auth.AuthorizerConfig - if metadata.PodIdentity != "" { + switch metadata.PodIdentity { + case "", kedav1alpha1.PodIdentityProviderNone: + if metadata.ClientID != "" && metadata.ClientSecret != "" && metadata.TenantID != "" { + config := auth.NewClientCredentialsConfig(metadata.ClientID, metadata.ClientSecret, metadata.TenantID) + config.Resource = metadata.Endpoint + config.AADEndpoint = metadata.ActiveDirectoryEndpoint + azureDataExplorerLogger.V(1).Info("Creating Azure Data Explorer Client using clientID, clientSecret and tenantID") + + authConfig = config + return authConfig, nil + } + case kedav1alpha1.PodIdentityProviderAzure: config := auth.NewMSIConfig() config.Resource = metadata.Endpoint azureDataExplorerLogger.V(1).Info("Creating Azure Data Explorer Client using Pod Identity") authConfig = config - return &authConfig, nil - } - - if metadata.ClientID != "" && metadata.ClientSecret != "" && metadata.TenantID != "" { - config := auth.NewClientCredentialsConfig(metadata.ClientID, metadata.ClientSecret, metadata.TenantID) - config.Resource = metadata.Endpoint - config.AADEndpoint = metadata.ActiveDirectoryEndpoint - azureDataExplorerLogger.V(1).Info("Creating Azure Data Explorer Client using clientID, clientSecret and tenantID") - - authConfig = config - return &authConfig, nil + return authConfig, nil + case kedav1alpha1.PodIdentityProviderAzureWorkload: + azureDataExplorerLogger.V(1).Info("Creating Azure Data Explorer Client using Workload Identity") + authConfig = NewAzureADWorkloadIdentityConfig(ctx, metadata.Endpoint) + return authConfig, nil } return nil, fmt.Errorf("missing credentials. please reconfigure your scaled object metadata") diff --git a/pkg/scalers/azure/azure_data_explorer_test.go b/pkg/scalers/azure/azure_data_explorer_test.go index 8f467f6605c..a18918927d7 100644 --- a/pkg/scalers/azure/azure_data_explorer_test.go +++ b/pkg/scalers/azure/azure_data_explorer_test.go @@ -17,12 +17,15 @@ limitations under the License. package azure import ( + "context" "testing" "github.com/Azure/azure-kusto-go/kusto/data/errors" "github.com/Azure/azure-kusto-go/kusto/data/table" "github.com/Azure/azure-kusto-go/kusto/data/types" "github.com/Azure/azure-kusto-go/kusto/data/value" + + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" ) type testExtractDataExplorerMetricValue struct { @@ -36,13 +39,12 @@ type testGetDataExplorerAuthConfig struct { } var ( - clientID = "test_client_id" - rowName = "result" - rowType types.Column = "long" - rowValue int64 = 3 - podIdentity = "Azure" - secret = "test_secret" - tenantID = "test_tenant_id" + clientID = "test_client_id" + rowName = "result" + rowType types.Column = "long" + rowValue int64 = 3 + secret = "test_secret" + tenantID = "test_tenant_id" ) var testExtractDataExplorerMetricValues = []testExtractDataExplorerMetricValue{ @@ -64,7 +66,9 @@ var testGetDataExplorerAuthConfigs = []testGetDataExplorerAuthConfig{ // Auth with aad app - pass {testMetadata: &DataExplorerMetadata{ClientID: clientID, ClientSecret: secret, TenantID: tenantID}, isError: false}, // Auth with podIdentity - pass - {testMetadata: &DataExplorerMetadata{PodIdentity: podIdentity}, isError: false}, + {testMetadata: &DataExplorerMetadata{PodIdentity: kedav1alpha1.PodIdentityProviderAzure}, isError: false}, + // Auth with workload identity - pass + {testMetadata: &DataExplorerMetadata{PodIdentity: kedav1alpha1.PodIdentityProviderAzureWorkload}, isError: false}, // Empty metadata - fail {testMetadata: &DataExplorerMetadata{}, isError: true}, // Empty tenantID - fail @@ -89,7 +93,7 @@ func TestExtractDataExplorerMetricValue(t *testing.T) { func TestGetDataExplorerAuthConfig(t *testing.T) { for _, testData := range testGetDataExplorerAuthConfigs { - _, err := getDataExplorerAuthConfig(testData.testMetadata) + _, err := getDataExplorerAuthConfig(context.TODO(), testData.testMetadata) if err != nil && !testData.isError { t.Error("Expected success but got error", err) } diff --git a/pkg/scalers/azure/azure_eventhub.go b/pkg/scalers/azure/azure_eventhub.go index 624ab47075d..a9deab48835 100644 --- a/pkg/scalers/azure/azure_eventhub.go +++ b/pkg/scalers/azure/azure_eventhub.go @@ -1,6 +1,7 @@ package azure import ( + "context" "errors" "fmt" "strings" @@ -8,6 +9,8 @@ import ( "github.com/Azure/azure-amqp-common-go/v3/aad" eventhub "github.com/Azure/azure-event-hubs-go/v3" "github.com/Azure/go-autorest/autorest/azure" + + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" ) // EventHubInfo to keep event hub connection and resources @@ -19,40 +22,50 @@ type EventHubInfo struct { Namespace string EventHubName string CheckpointStrategy string - Cloud string ServiceBusEndpointSuffix string ActiveDirectoryEndpoint string + EventHubResourceURL string + PodIdentity kedav1alpha1.PodIdentityProvider } +const ( + DefaultEventhubResourceURL = "https://eventhubs.azure.net/" +) + // GetEventHubClient returns eventhub client -func GetEventHubClient(info EventHubInfo) (*eventhub.Hub, error) { - // The user wants to use a connectionstring, not a pod identity - if info.EventHubConnection != "" { +func GetEventHubClient(ctx context.Context, info EventHubInfo) (*eventhub.Hub, error) { + switch info.PodIdentity { + case "", kedav1alpha1.PodIdentityProviderNone: + // The user wants to use a connectionstring, not a pod identity hub, err := eventhub.NewHubFromConnectionString(info.EventHubConnection) if err != nil { return nil, fmt.Errorf("failed to create hub client: %s", err) } return hub, nil - } - - env := azure.Environment{ActiveDirectoryEndpoint: info.ActiveDirectoryEndpoint, ServiceBusEndpointSuffix: info.ServiceBusEndpointSuffix} + case kedav1alpha1.PodIdentityProviderAzure: + env := azure.Environment{ActiveDirectoryEndpoint: info.ActiveDirectoryEndpoint, ServiceBusEndpointSuffix: info.ServiceBusEndpointSuffix} + hubEnvOptions := eventhub.HubWithEnvironment(env) + // Since there is no connectionstring, then user wants to use AAD Pod identity + // Internally, the JWTProvider will use Managed Service Identity to authenticate if no Service Principal info supplied + envJWTProviderOption := aad.JWTProviderWithAzureEnvironment(&env) + resourceURLJWTProviderOption := aad.JWTProviderWithResourceURI(info.EventHubResourceURL) + provider, aadErr := aad.NewJWTProvider(envJWTProviderOption, resourceURLJWTProviderOption) - // Since there is no connectionstring, then user wants to use pod identity - // Internally, the JWTProvider will use Managed Service Identity to authenticate if no Service Principal info supplied - provider, aadErr := aad.NewJWTProvider(func(config *aad.TokenProviderConfiguration) error { - if config.Env == nil { - config.Env = &env + if aadErr == nil { + return eventhub.NewHub(info.Namespace, info.EventHubName, provider, hubEnvOptions) } - return nil - }) - hubEnvOptions := eventhub.HubWithEnvironment(env) + return nil, aadErr + case kedav1alpha1.PodIdentityProviderAzureWorkload: + // User wants to use AAD Workload Identity + env := azure.Environment{ActiveDirectoryEndpoint: info.ActiveDirectoryEndpoint, ServiceBusEndpointSuffix: info.ServiceBusEndpointSuffix} + hubEnvOptions := eventhub.HubWithEnvironment(env) + provider := NewADWorkloadIdentityTokenProvider(ctx, info.EventHubResourceURL) - if aadErr == nil { return eventhub.NewHub(info.Namespace, info.EventHubName, provider, hubEnvOptions) } - return nil, aadErr + return nil, fmt.Errorf("event hub does not support pod identity %v", info.PodIdentity) } // ParseAzureEventHubConnectionString parses Event Hub connection string into (namespace, name) diff --git a/pkg/scalers/azure/azure_eventhub_test.go b/pkg/scalers/azure/azure_eventhub_test.go index d58e60ea269..2f7f2f30dd3 100644 --- a/pkg/scalers/azure/azure_eventhub_test.go +++ b/pkg/scalers/azure/azure_eventhub_test.go @@ -12,6 +12,8 @@ import ( "github.com/Azure/azure-storage-blob-go/azblob" "github.com/go-playground/assert/v2" + + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" ) // Add a valid Storage account connection string here @@ -256,12 +258,19 @@ func TestShouldParseCheckpointForFunctionWithPodIdentity(t *testing.T) { EventHubName: "hub-test", EventHubConsumerGroup: "$Default", ServiceBusEndpointSuffix: "servicebus.windows.net", + PodIdentity: kedav1alpha1.PodIdentityProviderAzure, } cp := newCheckpointer(eventHubInfo, "0") url, _ := cp.resolvePath(eventHubInfo) assert.Equal(t, url.Path, "/azure-webjobs-eventhub/eventhubnamespace.servicebus.windows.net/hub-test/$Default/0") + + eventHubInfo.PodIdentity = kedav1alpha1.PodIdentityProviderAzureWorkload + cp = newCheckpointer(eventHubInfo, "0") + url, _ = cp.resolvePath(eventHubInfo) + + assert.Equal(t, url.Path, "/azure-webjobs-eventhub/eventhubnamespace.servicebus.windows.net/hub-test/$Default/0") } func TestShouldParseCheckpointForFunctionWithCheckpointStrategyAndPodIdentity(t *testing.T) { @@ -271,12 +280,19 @@ func TestShouldParseCheckpointForFunctionWithCheckpointStrategyAndPodIdentity(t EventHubConsumerGroup: "$Default", ServiceBusEndpointSuffix: "servicebus.windows.net", CheckpointStrategy: "azureFunction", + PodIdentity: kedav1alpha1.PodIdentityProviderAzure, } cp := newCheckpointer(eventHubInfo, "0") url, _ := cp.resolvePath(eventHubInfo) assert.Equal(t, url.Path, "/azure-webjobs-eventhub/eventhubnamespace.servicebus.windows.net/hub-test/$Default/0") + + eventHubInfo.PodIdentity = kedav1alpha1.PodIdentityProviderAzureWorkload + cp = newCheckpointer(eventHubInfo, "0") + url, _ = cp.resolvePath(eventHubInfo) + + assert.Equal(t, url.Path, "/azure-webjobs-eventhub/eventhubnamespace.servicebus.windows.net/hub-test/$Default/0") } func TestShouldParseCheckpointForDefault(t *testing.T) { diff --git a/pkg/scalers/azure/azure_monitor.go b/pkg/scalers/azure/azure_monitor.go index 2a9aebef653..c4fd65b9f75 100644 --- a/pkg/scalers/azure/azure_monitor.go +++ b/pkg/scalers/azure/azure_monitor.go @@ -68,13 +68,7 @@ var azureMonitorLog = logf.Log.WithName("azure_monitor_scaler") // GetAzureMetricValue returns the value of an Azure Monitor metric, rounded to the nearest int func GetAzureMetricValue(ctx context.Context, info MonitorInfo, podIdentity kedav1alpha1.PodIdentityProvider) (int64, error) { - var podIdentityEnabled = true - - if podIdentity == "" || podIdentity == kedav1alpha1.PodIdentityProviderNone { - podIdentityEnabled = false - } - - client := createMetricsClient(info, podIdentityEnabled) + client := createMetricsClient(ctx, info, podIdentity) requestPtr, err := createMetricsRequest(info) if err != nil { return -1, err @@ -83,21 +77,25 @@ func GetAzureMetricValue(ctx context.Context, info MonitorInfo, podIdentity keda return executeRequest(ctx, client, requestPtr) } -func createMetricsClient(info MonitorInfo, podIdentityEnabled bool) insights.MetricsClient { +func createMetricsClient(ctx context.Context, info MonitorInfo, podIdentity kedav1alpha1.PodIdentityProvider) insights.MetricsClient { client := insights.NewMetricsClientWithBaseURI(info.AzureResourceManagerEndpoint, info.SubscriptionID) var authConfig auth.AuthorizerConfig - if podIdentityEnabled { - config := auth.NewMSIConfig() + switch podIdentity { + case "", kedav1alpha1.PodIdentityProviderNone: + config := auth.NewClientCredentialsConfig(info.ClientID, info.ClientPassword, info.TenantID) config.Resource = info.AzureResourceManagerEndpoint + config.AADEndpoint = info.ActiveDirectoryEndpoint authConfig = config - } else { - config := auth.NewClientCredentialsConfig(info.ClientID, info.ClientPassword, info.TenantID) + case kedav1alpha1.PodIdentityProviderAzure: + config := auth.NewMSIConfig() config.Resource = info.AzureResourceManagerEndpoint - config.AADEndpoint = info.ActiveDirectoryEndpoint authConfig = config + case kedav1alpha1.PodIdentityProviderAzureWorkload: + authConfig = NewAzureADWorkloadIdentityConfig(ctx, info.AzureResourceManagerEndpoint) } + authorizer, _ := authConfig.Authorizer() client.Authorizer = authorizer diff --git a/pkg/scalers/azure/azure_storage.go b/pkg/scalers/azure/azure_storage.go index dd5d7f28296..b649baf9172 100644 --- a/pkg/scalers/azure/azure_storage.go +++ b/pkg/scalers/azure/azure_storage.go @@ -50,6 +50,11 @@ const ( FileEndpoint ) +const ( + // Azure storage resource is "https://storage.azure.com/" in all cloud environments + storageResource = "https://storage.azure.com/" +) + // Prefix returns prefix for a StorageEndpointType func (e StorageEndpointType) Prefix() string { return [...]string{"BlobEndpoint", "QueueEndpoint", "TableEndpoint", "FileEndpoint"}[e] @@ -77,8 +82,8 @@ func ParseAzureStorageEndpointSuffix(metadata map[string]string, endpointType St // ParseAzureStorageQueueConnection parses queue connection string and returns credential and resource url func ParseAzureStorageQueueConnection(ctx context.Context, httpClient util.HTTPDoer, podIdentity kedav1alpha1.PodIdentityProvider, connectionString, accountName, endpointSuffix string) (azqueue.Credential, *url.URL, error) { switch podIdentity { - case kedav1alpha1.PodIdentityProviderAzure: - token, endpoint, err := parseAcessTokenAndEndpoint(ctx, httpClient, accountName, endpointSuffix) + case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload: + token, endpoint, err := parseAcessTokenAndEndpoint(ctx, httpClient, accountName, endpointSuffix, podIdentity) if err != nil { return nil, nil, err } @@ -105,8 +110,8 @@ func ParseAzureStorageQueueConnection(ctx context.Context, httpClient util.HTTPD // ParseAzureStorageBlobConnection parses blob connection string and returns credential and resource url func ParseAzureStorageBlobConnection(ctx context.Context, httpClient util.HTTPDoer, podIdentity kedav1alpha1.PodIdentityProvider, connectionString, accountName, endpointSuffix string) (azblob.Credential, *url.URL, error) { switch podIdentity { - case kedav1alpha1.PodIdentityProviderAzure: - token, endpoint, err := parseAcessTokenAndEndpoint(ctx, httpClient, accountName, endpointSuffix) + case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload: + token, endpoint, err := parseAcessTokenAndEndpoint(ctx, httpClient, accountName, endpointSuffix, podIdentity) if err != nil { return nil, nil, err } @@ -187,9 +192,18 @@ func parseAzureStorageConnectionString(connectionString string, endpointType Sto return u, name, key, nil } -func parseAcessTokenAndEndpoint(ctx context.Context, httpClient util.HTTPDoer, accountName string, endpointSuffix string) (string, *url.URL, error) { - // Azure storage resource is "https://storage.azure.com/" in all cloud environments - token, err := GetAzureADPodIdentityToken(ctx, httpClient, "https://storage.azure.com/") +func parseAcessTokenAndEndpoint(ctx context.Context, httpClient util.HTTPDoer, accountName string, endpointSuffix string, + podIdentity kedav1alpha1.PodIdentityProvider) (string, *url.URL, error) { + var token AADToken + var err error + + switch podIdentity { + case kedav1alpha1.PodIdentityProviderAzure: + token, err = GetAzureADPodIdentityToken(ctx, httpClient, storageResource) + case kedav1alpha1.PodIdentityProviderAzureWorkload: + token, err = GetAzureADWorkloadIdentityToken(ctx, storageResource) + } + if err != nil { return "", nil, err } diff --git a/pkg/scalers/azure_app_insights_scaler_test.go b/pkg/scalers/azure_app_insights_scaler_test.go index 345b01b8b9c..6d6c63235e5 100644 --- a/pkg/scalers/azure_app_insights_scaler_test.go +++ b/pkg/scalers/azure_app_insights_scaler_test.go @@ -104,6 +104,30 @@ var azureAppInsightsScalerData = []azureAppInsightsScalerTestData{ "activeDirectoryClientId": "5678", "activeDirectoryClientPassword": "pw", }, }}, + {name: "correct pod identity", isError: false, config: ScalerConfig{ + TriggerMetadata: map[string]string{ + "targetValue": "11", "applicationInsightsId": "1234", "metricId": "unittest/test", "metricAggregationTimespan": "01:02", "metricAggregationType": "max", "metricFilter": "cloud/roleName eq 'test'", "tenantId": "1234", + }, + PodIdentity: kedav1alpha1.PodIdentityProviderAzure, + }}, + {name: "invalid pod Identity", isError: true, config: ScalerConfig{ + TriggerMetadata: map[string]string{ + "targetValue": "11", "applicationInsightsId": "1234", "metricId": "unittest/test", "metricAggregationTimespan": "01:02", "metricAggregationType": "max", "metricFilter": "cloud/roleName eq 'test'", "tenantId": "1234", + }, + PodIdentity: kedav1alpha1.PodIdentityProvider("notAzure"), + }}, + {name: "correct workload identity", isError: false, config: ScalerConfig{ + TriggerMetadata: map[string]string{ + "targetValue": "11", "applicationInsightsId": "1234", "metricId": "unittest/test", "metricAggregationTimespan": "01:02", "metricAggregationType": "max", "metricFilter": "cloud/roleName eq 'test'", "tenantId": "1234", + }, + PodIdentity: kedav1alpha1.PodIdentityProviderAzureWorkload, + }}, + {name: "invalid workload Identity", isError: true, config: ScalerConfig{ + TriggerMetadata: map[string]string{ + "targetValue": "11", "applicationInsightsId": "1234", "metricId": "unittest/test", "metricAggregationTimespan": "01:02", "metricAggregationType": "max", "metricFilter": "cloud/roleName eq 'test'", "tenantId": "1234", + }, + PodIdentity: kedav1alpha1.PodIdentityProvider("notAzureWorkload"), + }}, {name: "app insights id in auth", isError: false, config: ScalerConfig{ TriggerMetadata: map[string]string{ "targetValue": "11", "metricId": "unittest/test", "metricAggregationTimespan": "01:02", "metricAggregationType": "max", "metricFilter": "cloud/roleName eq 'test'", "tenantId": "1234", diff --git a/pkg/scalers/azure_blob_scaler.go b/pkg/scalers/azure_blob_scaler.go index 89382c21e37..6f46e2f5b8f 100644 --- a/pkg/scalers/azure_blob_scaler.go +++ b/pkg/scalers/azure_blob_scaler.go @@ -153,8 +153,8 @@ func parseAzureBlobMetadata(config *ScalerConfig) (*azure.BlobMetadata, kedav1al if len(meta.Connection) == 0 { return nil, "", fmt.Errorf("no connection setting given") } - case kedav1alpha1.PodIdentityProviderAzure: - // If the Use AAD Pod Identity is present then check account name + case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload: + // If the Use AAD Pod Identity / Workload Identity is present then check account name if val, ok := config.TriggerMetadata["accountName"]; ok && val != "" { meta.AccountName = val } else { diff --git a/pkg/scalers/azure_blob_scaler_test.go b/pkg/scalers/azure_blob_scaler_test.go index 33bf3b6871d..5191f53f078 100644 --- a/pkg/scalers/azure_blob_scaler_test.go +++ b/pkg/scalers/azure_blob_scaler_test.go @@ -69,6 +69,22 @@ var testAzBlobMetadata = []parseAzBlobMetadataTestData{ {map[string]string{"accountName": "sample_acc", "blobContainerName": "sample_container", "cloud": "Private", "endpointSuffix": ""}, true, testAzBlobResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzure}, // podIdentity = azure with endpoint suffix and no cloud {map[string]string{"accountName": "sample_acc", "blobContainerName": "sample_container", "cloud": "", "endpointSuffix": "ignored"}, false, testAzBlobResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzure}, + // podIdentity = azure-workload with account name + {map[string]string{"accountName": "sample_acc", "blobContainerName": "sample_container"}, false, testAzBlobResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, + // podIdentity = azure-workload without account name + {map[string]string{"accountName": "", "blobContainerName": "sample_container"}, true, testAzBlobResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, + // podIdentity = azure-workload without blob container name + {map[string]string{"accountName": "sample_acc", "blobContainerName": ""}, true, testAzBlobResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, + // podIdentity = azure-workload with cloud + {map[string]string{"accountName": "sample_acc", "blobContainerName": "sample_container", "cloud": "AzureGermanCloud"}, false, testAzBlobResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, + // podIdentity = azure-workload with invalid cloud + {map[string]string{"accountName": "sample_acc", "blobContainerName": "sample_container", "cloud": "InvalidCloud"}, true, testAzBlobResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, + // podIdentity = azure-workload with private cloud and endpoint suffix + {map[string]string{"accountName": "sample_acc", "blobContainerName": "sample_container", "cloud": "Private", "endpointSuffix": "queue.core.private.cloud"}, false, testAzBlobResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, + // podIdentity = azure-workload with private cloud and no endpoint suffix + {map[string]string{"accountName": "sample_acc", "blobContainerName": "sample_container", "cloud": "Private", "endpointSuffix": ""}, true, testAzBlobResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, + // podIdentity = azure-workload with endpoint suffix and no cloud + {map[string]string{"accountName": "sample_acc", "blobContainerName": "sample_container", "cloud": "", "endpointSuffix": "ignored"}, false, testAzBlobResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, // connection from authParams {map[string]string{"blobContainerName": "sample_container", "blobCount": "5"}, false, testAzBlobResolvedEnv, map[string]string{"connection": "value"}, kedav1alpha1.PodIdentityProviderNone}, // with globPattern diff --git a/pkg/scalers/azure_data_explorer_scaler.go b/pkg/scalers/azure_data_explorer_scaler.go index 940beebc364..38952bfab3a 100644 --- a/pkg/scalers/azure_data_explorer_scaler.go +++ b/pkg/scalers/azure_data_explorer_scaler.go @@ -46,7 +46,7 @@ const adxName = "azure-data-explorer" var dataExplorerLogger = logf.Log.WithName("azure_data_explorer_scaler") -func NewAzureDataExplorerScaler(config *ScalerConfig) (Scaler, error) { +func NewAzureDataExplorerScaler(ctx context.Context, config *ScalerConfig) (Scaler, error) { metricType, err := GetMetricTargetType(config) if err != nil { return nil, fmt.Errorf("error getting scaler metric type: %s", err) @@ -57,7 +57,7 @@ func NewAzureDataExplorerScaler(config *ScalerConfig) (Scaler, error) { return nil, fmt.Errorf("failed to parse azure data explorer metadata: %s", err) } - client, err := azure.CreateAzureDataExplorerClient(metadata) + client, err := azure.CreateAzureDataExplorerClient(ctx, metadata) if err != nil { return nil, fmt.Errorf("failed to create azure data explorer client: %s", err) } @@ -132,8 +132,8 @@ func parseAzureDataExplorerAuthParams(config *ScalerConfig) (*azure.DataExplorer metadata := azure.DataExplorerMetadata{} switch config.PodIdentity { - case kedav1alpha1.PodIdentityProviderAzure: - metadata.PodIdentity = string(config.PodIdentity) + case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload: + metadata.PodIdentity = config.PodIdentity case "", kedav1alpha1.PodIdentityProviderNone: dataExplorerLogger.V(1).Info("Pod Identity is not provided. Trying to resolve clientId, clientSecret and tenantId.") diff --git a/pkg/scalers/azure_data_explorer_scaler_test.go b/pkg/scalers/azure_data_explorer_scaler_test.go index 432ce71561e..7c1879764c9 100644 --- a/pkg/scalers/azure_data_explorer_scaler_test.go +++ b/pkg/scalers/azure_data_explorer_scaler_test.go @@ -135,6 +135,23 @@ func TestDataExplorerParseMetadata(t *testing.T) { t.Error("Expected error but got success") } } + + // Auth through Workload Identity + for _, testData := range testDataExplorerMetadataWithPodIdentity { + _, err := parseAzureDataExplorerMetadata( + &ScalerConfig{ + ResolvedEnv: dataExplorerResolvedEnv, + TriggerMetadata: testData.metadata, + AuthParams: map[string]string{}, + PodIdentity: kedav1alpha1.PodIdentityProviderAzureWorkload}) + + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + } } func TestDataExplorerGetMetricSpecForScaling(t *testing.T) { diff --git a/pkg/scalers/azure_eventhub_scaler.go b/pkg/scalers/azure_eventhub_scaler.go index a3f2875d147..a94717f8042 100644 --- a/pkg/scalers/azure_eventhub_scaler.go +++ b/pkg/scalers/azure_eventhub_scaler.go @@ -23,6 +23,7 @@ import ( "math" "net/http" "strconv" + "strings" eventhub "github.com/Azure/azure-event-hubs-go/v3" "github.com/Azure/azure-storage-blob-go/azblob" @@ -64,7 +65,7 @@ type eventHubMetadata struct { } // NewAzureEventHubScaler creates a new scaler for eventHub -func NewAzureEventHubScaler(config *ScalerConfig) (Scaler, error) { +func NewAzureEventHubScaler(ctx context.Context, config *ScalerConfig) (Scaler, error) { metricType, err := GetMetricTargetType(config) if err != nil { return nil, fmt.Errorf("error getting scaler metric type: %s", err) @@ -75,7 +76,7 @@ func NewAzureEventHubScaler(config *ScalerConfig) (Scaler, error) { return nil, fmt.Errorf("unable to get eventhub metadata: %s", err) } - hub, err := azure.GetEventHubClient(parsedMetadata.eventHubInfo) + hub, err := azure.GetEventHubClient(ctx, parsedMetadata.eventHubInfo) if err != nil { return nil, fmt.Errorf("unable to get eventhub client: %s", err) } @@ -129,9 +130,15 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error) meta.eventHubInfo.BlobContainer = val } - meta.eventHubInfo.Cloud = azure.DefaultCloud + meta.eventHubInfo.EventHubResourceURL = azure.DefaultEventhubResourceURL if val, ok := config.TriggerMetadata["cloud"]; ok { - meta.eventHubInfo.Cloud = val + if strings.EqualFold(val, azure.PrivateCloud) { + if resourceURL, ok := config.TriggerMetadata["eventHubResourceURL"]; ok { + meta.eventHubInfo.EventHubResourceURL = resourceURL + } else { + return nil, fmt.Errorf("eventHubResourceURL must be provided for %s cloud type", azure.PrivateCloud) + } + } } serviceBusEndpointSuffixProvider := func(env az.Environment) (string, error) { @@ -149,7 +156,9 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error) } meta.eventHubInfo.ActiveDirectoryEndpoint = activeDirectoryEndpoint - if config.PodIdentity == "" || config.PodIdentity == v1alpha1.PodIdentityProviderNone { + meta.eventHubInfo.PodIdentity = config.PodIdentity + switch config.PodIdentity { + case "", v1alpha1.PodIdentityProviderNone: if config.AuthParams["connection"] != "" { meta.eventHubInfo.EventHubConnection = config.AuthParams["connection"] } else if config.TriggerMetadata["connectionFromEnv"] != "" { @@ -159,7 +168,7 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error) if len(meta.eventHubInfo.EventHubConnection) == 0 { return nil, fmt.Errorf("no event hub connection string given") } - } else { + case v1alpha1.PodIdentityProviderAzure, v1alpha1.PodIdentityProviderAzureWorkload: if config.TriggerMetadata["eventHubNamespace"] != "" { meta.eventHubInfo.Namespace = config.TriggerMetadata["eventHubNamespace"] } else if config.TriggerMetadata["eventHubNamespaceFromEnv"] != "" { diff --git a/pkg/scalers/azure_eventhub_scaler_test.go b/pkg/scalers/azure_eventhub_scaler_test.go index 7e2eda66f87..74399ee8c0b 100644 --- a/pkg/scalers/azure_eventhub_scaler_test.go +++ b/pkg/scalers/azure_eventhub_scaler_test.go @@ -11,6 +11,7 @@ import ( eventhub "github.com/Azure/azure-event-hubs-go/v3" "github.com/Azure/azure-storage-blob-go/azblob" + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/kedacore/keda/v2/pkg/scalers/azure" ) @@ -20,6 +21,7 @@ const ( storageConnectionSetting = "testStorageConnectionSetting" serviceBusEndpointSuffix = "serviceBusEndpointSuffix" activeDirectoryEndpoint = "activeDirectoryEndpoint" + eventHubResourceURL = "eventHubResourceURL" testEventHubNamespace = "kedatesteventhub" testEventHubName = "eventhub1" checkpointFormat = "{\"SequenceNumber\":%d,\"PartitionId\":\"%s\"}" @@ -68,18 +70,21 @@ var parseEventHubMetadataDatasetWithPodIdentity = []parseEventHubMetadataTestDat // metadata with cloud specified {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace, "cloud": "azurePublicCloud"}, false}, - // metadata with private cloud missing service bus endpoint suffix and active directory endpoint + // metadata with private cloud missing service bus endpoint suffix and active directory endpoint and eventHubResourceURL {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace, "cloud": "private"}, true}, - // metadata with private cloud missing active directory endpoint + // metadata with private cloud missing active directory endpoint and resourceURL {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace, "cloud": "private", "endpointSuffix": serviceBusEndpointSuffix}, true}, - // metadata with private cloud missing service bus endpoint suffix + // metadata with private cloud missing service bus endpoint suffix and resource URL {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace, "cloud": "private", "activeDirectoryEndpoint": activeDirectoryEndpoint}, true}, + // metadata with private cloud missing service bus endpoint suffix and active directory endpoint + {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, + "eventHubNamespace": testEventHubNamespace, "cloud": "private", "eventHubResourceURL": eventHubResourceURL}, true}, // properly formed metadata with private cloud {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, - "eventHubNamespace": testEventHubNamespace, "cloud": "private", "endpointSuffix": serviceBusEndpointSuffix, "activeDirectoryEndpoint": activeDirectoryEndpoint}, false}, + "eventHubNamespace": testEventHubNamespace, "cloud": "private", "endpointSuffix": serviceBusEndpointSuffix, "activeDirectoryEndpoint": activeDirectoryEndpoint, "eventHubResourceURL": eventHubResourceURL}, false}, } var eventHubMetricIdentifiers = []eventHubMetricIdentifier{ @@ -110,7 +115,18 @@ func TestParseEventHubMetadata(t *testing.T) { } for _, testData := range parseEventHubMetadataDatasetWithPodIdentity { - _, err := parseAzureEventHubMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv, AuthParams: map[string]string{}, PodIdentity: "Azure"}) + _, err := parseAzureEventHubMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv, AuthParams: map[string]string{}, PodIdentity: kedav1alpha1.PodIdentityProviderAzure}) + + if err != nil && !testData.isError { + t.Errorf("Expected success but got error: %s", err) + } + if testData.isError && err == nil { + t.Error("Expected error and got success") + } + } + + for _, testData := range parseEventHubMetadataDatasetWithPodIdentity { + _, err := parseAzureEventHubMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv, AuthParams: map[string]string{}, PodIdentity: kedav1alpha1.PodIdentityProviderAzureWorkload}) if err != nil && !testData.isError { t.Errorf("Expected success but got error: %s", err) diff --git a/pkg/scalers/azure_log_analytics_scaler.go b/pkg/scalers/azure_log_analytics_scaler.go index 083f19bdc5b..96b9243b6f2 100644 --- a/pkg/scalers/azure_log_analytics_scaler.go +++ b/pkg/scalers/azure_log_analytics_scaler.go @@ -31,6 +31,7 @@ import ( "sync" "time" + "github.com/Azure/azure-amqp-common-go/v3/auth" v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -64,7 +65,7 @@ type azureLogAnalyticsMetadata struct { clientID string clientSecret string workspaceID string - podIdentity string + podIdentity kedav1alpha1.PodIdentityProvider query string threshold int64 metricName string // Custom metric name for trigger @@ -79,13 +80,14 @@ type sessionCache struct { } type tokenData struct { - TokenType string `json:"token_type"` - ExpiresIn int `json:"expires_in,string"` - ExtExpiresIn int `json:"ext_expires_in,string"` - ExpiresOn int64 `json:"expires_on,string"` - NotBefore int64 `json:"not_before,string"` - Resource string `json:"resource"` - AccessToken string `json:"access_token"` + TokenType string `json:"token_type"` + ExpiresIn int `json:"expires_in,string"` + ExtExpiresIn int `json:"ext_expires_in,string"` + ExpiresOn int64 `json:"expires_on,string"` + NotBefore int64 `json:"not_before,string"` + Resource string `json:"resource"` + AccessToken string `json:"access_token"` + IsWorkloadIdentityToken bool `json:"isWorkloadIdentityToken"` } type metricsData struct { @@ -165,8 +167,8 @@ func parseAzureLogAnalyticsMetadata(config *ScalerConfig) (*azureLogAnalyticsMet meta.clientSecret = clientSecret meta.podIdentity = "" - case kedav1alpha1.PodIdentityProviderAzure: - meta.podIdentity = string(config.PodIdentity) + case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload: + meta.podIdentity = config.PodIdentity default: return nil, fmt.Errorf("error parsing metadata. Details: Log Analytics Scaler doesn't support pod identity %s", config.PodIdentity) } @@ -333,10 +335,11 @@ func (s *azureLogAnalyticsScaler) getAccessToken(ctx context.Context) (tokenData currentTimeSec := time.Now().Unix() tokenInfo := tokenData{} - if s.metadata.podIdentity == "" { + switch s.metadata.podIdentity { + case "", kedav1alpha1.PodIdentityProviderNone: tokenInfo, _ = getTokenFromCache(s.metadata.clientID, s.metadata.clientSecret) - } else { - tokenInfo, _ = getTokenFromCache(s.metadata.podIdentity, s.metadata.podIdentity) + case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload: + tokenInfo, _ = getTokenFromCache(string(s.metadata.podIdentity), string(s.metadata.podIdentity)) } if currentTimeSec+30 > tokenInfo.ExpiresOn { @@ -345,12 +348,13 @@ func (s *azureLogAnalyticsScaler) getAccessToken(ctx context.Context) (tokenData return tokenData{}, err } - if s.metadata.podIdentity == "" { + switch s.metadata.podIdentity { + case "", kedav1alpha1.PodIdentityProviderNone: logAnalyticsLog.V(1).Info("Token for Service Principal has been refreshed", "clientID", s.metadata.clientID, "scaler name", s.name, "namespace", s.namespace) _ = setTokenInCache(s.metadata.clientID, s.metadata.clientSecret, newTokenInfo) - } else { + case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload: logAnalyticsLog.V(1).Info("Token for Pod Identity has been refreshed", "type", s.metadata.podIdentity, "scaler name", s.name, "namespace", s.namespace) - _ = setTokenInCache(s.metadata.podIdentity, s.metadata.podIdentity, newTokenInfo) + _ = setTokenInCache(string(s.metadata.podIdentity), string(s.metadata.podIdentity), newTokenInfo) } return newTokenInfo, nil @@ -373,12 +377,13 @@ func (s *azureLogAnalyticsScaler) executeQuery(ctx context.Context, query string return metricsData{}, err } - if s.metadata.podIdentity == "" { + switch s.metadata.podIdentity { + case "", kedav1alpha1.PodIdentityProviderNone: logAnalyticsLog.V(1).Info("Token for Service Principal has been refreshed", "clientID", s.metadata.clientID, "scaler name", s.name, "namespace", s.namespace) _ = setTokenInCache(s.metadata.clientID, s.metadata.clientSecret, tokenInfo) - } else { + case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload: logAnalyticsLog.V(1).Info("Token for Pod Identity has been refreshed", "type", s.metadata.podIdentity, "scaler name", s.name, "namespace", s.namespace) - _ = setTokenInCache(s.metadata.podIdentity, s.metadata.podIdentity, tokenInfo) + _ = setTokenInCache(string(s.metadata.podIdentity), string(s.metadata.podIdentity), tokenInfo) } if err == nil { @@ -473,6 +478,10 @@ func (s *azureLogAnalyticsScaler) refreshAccessToken(ctx context.Context) (token return tokenData{}, err } + if tokenInfo.IsWorkloadIdentityToken { + return tokenInfo, nil + } + // Now, let's check we can use this token. If no, wait until we can use it currentTimeSec := time.Now().Unix() if currentTimeSec < tokenInfo.NotBefore { @@ -494,9 +503,30 @@ func (s *azureLogAnalyticsScaler) getAuthorizationToken(ctx context.Context) (to var err error var tokenInfo tokenData - if s.metadata.podIdentity == "" { + switch s.metadata.podIdentity { + case kedav1alpha1.PodIdentityProviderAzureWorkload: + aadToken, err := azure.GetAzureADWorkloadIdentityToken(ctx, s.metadata.logAnalyticsResourceURL) + if err != nil { + return tokenData{}, nil + } + + expiresOn := aadToken.ExpiresOnTimeObject.Unix() + if err != nil { + return tokenData{}, nil + } + + tokenInfo = tokenData{ + TokenType: string(auth.CBSTokenTypeJWT), + AccessToken: aadToken.AccessToken, + ExpiresOn: expiresOn, + Resource: s.metadata.logAnalyticsResourceURL, + IsWorkloadIdentityToken: true, + } + + return tokenInfo, nil + case "", kedav1alpha1.PodIdentityProviderNone: body, statusCode, err = s.executeAADApicall(ctx) - } else { + case kedav1alpha1.PodIdentityProviderAzure: body, statusCode, err = s.executeIMDSApicall(ctx) } diff --git a/pkg/scalers/azure_log_analytics_scaler_test.go b/pkg/scalers/azure_log_analytics_scaler_test.go index 824af19fee7..29ae365523b 100644 --- a/pkg/scalers/azure_log_analytics_scaler_test.go +++ b/pkg/scalers/azure_log_analytics_scaler_test.go @@ -168,6 +168,17 @@ func TestLogAnalyticsParseMetadata(t *testing.T) { t.Error("Expected error but got success") } } + + // test with workload identity params should not fail + for _, testData := range testLogAnalyticsMetadataWithPodIdentity { + _, err := parseAzureLogAnalyticsMetadata(&ScalerConfig{ResolvedEnv: sampleLogAnalyticsResolvedEnv, TriggerMetadata: testData.metadata, AuthParams: LogAnalyticsAuthParams, PodIdentity: kedav1alpha1.PodIdentityProviderAzureWorkload}) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + } } func TestLogAnalyticsGetMetricSpecForScaling(t *testing.T) { diff --git a/pkg/scalers/azure_monitor_scaler.go b/pkg/scalers/azure_monitor_scaler.go index 21da3b06293..37f1f3fcc69 100644 --- a/pkg/scalers/azure_monitor_scaler.go +++ b/pkg/scalers/azure_monitor_scaler.go @@ -176,7 +176,8 @@ func parseAzureMonitorMetadata(config *ScalerConfig) (*azureMonitorMetadata, err // parseAzurePodIdentityParams gets the activeDirectory clientID and password func parseAzurePodIdentityParams(config *ScalerConfig) (clientID string, clientPassword string, err error) { - if config.PodIdentity == "" || config.PodIdentity == kedav1alpha1.PodIdentityProviderNone { + switch config.PodIdentity { + case "", kedav1alpha1.PodIdentityProviderNone: clientID, err = getParameterFromConfig(config, "activeDirectoryClientId", true) if err != nil || clientID == "" { return "", "", fmt.Errorf("no activeDirectoryClientId given") @@ -191,7 +192,9 @@ func parseAzurePodIdentityParams(config *ScalerConfig) (clientID string, clientP if len(clientPassword) == 0 { return "", "", fmt.Errorf("no activeDirectoryClientPassword given") } - } else if config.PodIdentity != kedav1alpha1.PodIdentityProviderAzure { + case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload: + // no params required to be parsed + default: return "", "", fmt.Errorf("azure Monitor doesn't support pod identity %s", config.PodIdentity) } diff --git a/pkg/scalers/azure_monitor_scaler_test.go b/pkg/scalers/azure_monitor_scaler_test.go index d787d1889fd..4109e563b2a 100644 --- a/pkg/scalers/azure_monitor_scaler_test.go +++ b/pkg/scalers/azure_monitor_scaler_test.go @@ -83,6 +83,10 @@ var testParseAzMonitorMetadata = []parseAzMonitorMetadataTestData{ {map[string]string{"resourceURI": "test/resource/uri", "tenantId": "123", "subscriptionId": "456", "resourceGroupName": "test", "metricName": "metric", "metricAggregationInterval": "0:15:0", "metricAggregationType": "Average", "targetValue": "5"}, false, map[string]string{}, map[string]string{}, kedav1alpha1.PodIdentityProviderAzure}, // wrong podIdentity {map[string]string{"resourceURI": "test/resource/uri", "tenantId": "123", "subscriptionId": "456", "resourceGroupName": "test", "metricName": "metric", "metricAggregationInterval": "0:15:0", "metricAggregationType": "Average", "targetValue": "5"}, true, map[string]string{}, map[string]string{}, kedav1alpha1.PodIdentityProvider("notAzure")}, + // connection with workload Identity + {map[string]string{"resourceURI": "test/resource/uri", "tenantId": "123", "subscriptionId": "456", "resourceGroupName": "test", "metricName": "metric", "metricAggregationInterval": "0:15:0", "metricAggregationType": "Average", "targetValue": "5"}, false, map[string]string{}, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, + // wrong workload Identity + {map[string]string{"resourceURI": "test/resource/uri", "tenantId": "123", "subscriptionId": "456", "resourceGroupName": "test", "metricName": "metric", "metricAggregationInterval": "0:15:0", "metricAggregationType": "Average", "targetValue": "5"}, true, map[string]string{}, map[string]string{}, kedav1alpha1.PodIdentityProvider("notAzureWorkload")}, // known azure cloud {map[string]string{"resourceURI": "test/resource/uri", "tenantId": "123", "subscriptionId": "456", "resourceGroupName": "test", "metricName": "metric", "metricAggregationInterval": "0:15:0", "metricAggregationType": "Average", "activeDirectoryClientId": "CLIENT_ID", "activeDirectoryClientPasswordFromEnv": "CLIENT_PASSWORD", "targetValue": "5", "metricNamespace": "namespace", "cloud": "azureChinaCloud"}, false, testAzMonitorResolvedEnv, map[string]string{}, ""}, // private cloud diff --git a/pkg/scalers/azure_queue_scaler.go b/pkg/scalers/azure_queue_scaler.go index a1c66d54ed3..4189da95b97 100644 --- a/pkg/scalers/azure_queue_scaler.go +++ b/pkg/scalers/azure_queue_scaler.go @@ -128,7 +128,7 @@ func parseAzureQueueMetadata(config *ScalerConfig) (*azureQueueMetadata, kedav1a if len(meta.connection) == 0 { return nil, "", fmt.Errorf("no connection setting given") } - case kedav1alpha1.PodIdentityProviderAzure: + case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload: // If the Use AAD Pod Identity is present then check account name if val, ok := config.TriggerMetadata["accountName"]; ok && val != "" { meta.accountName = val diff --git a/pkg/scalers/azure_queue_scaler_test.go b/pkg/scalers/azure_queue_scaler_test.go index 45a8cca6fcf..d8aa2fe5ff4 100644 --- a/pkg/scalers/azure_queue_scaler_test.go +++ b/pkg/scalers/azure_queue_scaler_test.go @@ -73,6 +73,22 @@ var testAzQueueMetadata = []parseAzQueueMetadataTestData{ {map[string]string{"accountName": "sample_acc", "queueName": "sample_queue", "cloud": "Private", "endpointSuffix": ""}, true, testAzQueueResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzure}, // podIdentity = azure with endpoint suffix and no cloud {map[string]string{"accountName": "sample_acc", "queueName": "sample_queue", "cloud": "", "endpointSuffix": "ignored"}, false, testAzQueueResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzure}, + // podIdentity = azure-workload with account name + {map[string]string{"accountName": "sample_acc", "queueName": "sample_queue"}, false, testAzQueueResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, + // podIdentity = azure-workload without account name + {map[string]string{"accountName": "", "queueName": "sample_queue"}, true, testAzQueueResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, + // podIdentity = azure-workload without queue name + {map[string]string{"accountName": "sample_acc", "queueName": ""}, true, testAzQueueResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, + // podIdentity = azure-workload with cloud + {map[string]string{"accountName": "sample_acc", "queueName": "sample_queue", "cloud": "AzurePublicCloud"}, false, testAzQueueResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, + // podIdentity = azure-workload with invalid cloud + {map[string]string{"accountName": "sample_acc", "queueName": "sample_queue", "cloud": "InvalidCloud"}, true, testAzQueueResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, + // podIdentity = azure-workload with private cloud and endpoint suffix + {map[string]string{"accountName": "sample_acc", "queueName": "sample_queue", "cloud": "Private", "endpointSuffix": "queue.core.private.cloud"}, false, testAzQueueResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, + // podIdentity = azure-workload with private cloud and no endpoint suffix + {map[string]string{"accountName": "sample_acc", "queueName": "sample_queue", "cloud": "Private", "endpointSuffix": ""}, true, testAzQueueResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, + // podIdentity = azure-workload with endpoint suffix and no cloud + {map[string]string{"accountName": "sample_acc", "queueName": "sample_queue", "cloud": "", "endpointSuffix": "ignored"}, false, testAzQueueResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, // connection from authParams {map[string]string{"queueName": "sample", "queueLength": "5"}, false, testAzQueueResolvedEnv, map[string]string{"connection": "value"}, kedav1alpha1.PodIdentityProviderNone}, } diff --git a/pkg/scalers/azure_servicebus_scaler.go b/pkg/scalers/azure_servicebus_scaler.go index 210bf1f96d8..6f434be3bba 100755 --- a/pkg/scalers/azure_servicebus_scaler.go +++ b/pkg/scalers/azure_servicebus_scaler.go @@ -45,6 +45,8 @@ const ( subscription entityType = 2 messageCountMetricName = "messageCount" defaultTargetMessageCount = 5 + // Service bus resource id is "https://servicebus.azure.net/" in all cloud environments + serviceBusResource = "https://servicebus.azure.net/" ) var azureServiceBusLog = logf.Log.WithName("azure_servicebus_scaler") @@ -155,7 +157,7 @@ func parseAzureServiceBusMetadata(config *ScalerConfig) (*azureServiceBusMetadat if len(meta.connection) == 0 { return nil, fmt.Errorf("no connection setting given") } - case kedav1alpha1.PodIdentityProviderAzure: + case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload: if val, ok := config.TriggerMetadata["namespace"]; ok { meta.namespace = val } else { @@ -224,24 +226,31 @@ func (s *azureServiceBusScaler) GetMetrics(ctx context.Context, metricName strin } type azureTokenProvider struct { - httpClient *http.Client - ctx context.Context + httpClient *http.Client + ctx context.Context + podIdentity kedav1alpha1.PodIdentityProvider } // GetToken implements TokenProvider interface for azureTokenProvider func (a azureTokenProvider) GetToken(uri string) (*auth.Token, error) { ctx := a.ctx - // Service bus resource id is "https://servicebus.azure.net/" in all cloud environments - token, err := azure.GetAzureADPodIdentityToken(ctx, a.httpClient, "https://servicebus.azure.net/") + + var token azure.AADToken + var err error + + switch a.podIdentity { + case kedav1alpha1.PodIdentityProviderAzure: + token, err = azure.GetAzureADPodIdentityToken(ctx, a.httpClient, serviceBusResource) + case kedav1alpha1.PodIdentityProviderAzureWorkload: + token, err = azure.GetAzureADWorkloadIdentityToken(ctx, serviceBusResource) + default: + err = fmt.Errorf("unknown pod identity provider") + } if err != nil { return nil, err } - return &auth.Token{ - TokenType: auth.CBSTokenTypeJWT, - Token: token.AccessToken, - Expiry: token.ExpiresOn, - }, nil + return auth.NewToken(auth.CBSTokenTypeJWT, token.AccessToken, token.ExpiresOn), nil } // Returns the length of the queue or subscription @@ -267,19 +276,21 @@ func (s *azureServiceBusScaler) getServiceBusNamespace(ctx context.Context) (*se var namespace *servicebus.Namespace var err error - if s.podIdentity == "" || s.podIdentity == kedav1alpha1.PodIdentityProviderNone { + switch s.podIdentity { + case "", kedav1alpha1.PodIdentityProviderNone: namespace, err = servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(s.metadata.connection)) if err != nil { return namespace, err } - } else if s.podIdentity == kedav1alpha1.PodIdentityProviderAzure { + case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload: namespace, err = servicebus.NewNamespace() if err != nil { return namespace, err } namespace.TokenProvider = azureTokenProvider{ - ctx: ctx, - httpClient: s.httpClient, + ctx: ctx, + httpClient: s.httpClient, + podIdentity: s.podIdentity, } namespace.Name = s.metadata.namespace } diff --git a/pkg/scalers/azure_servicebus_scaler_test.go b/pkg/scalers/azure_servicebus_scaler_test.go index dcf783c9272..9f7c877f394 100755 --- a/pkg/scalers/azure_servicebus_scaler_test.go +++ b/pkg/scalers/azure_servicebus_scaler_test.go @@ -97,6 +97,10 @@ var parseServiceBusMetadataDataset = []parseServiceBusMetadataTestData{ {map[string]string{"queueName": queueName}, true, queue, "", map[string]string{}, kedav1alpha1.PodIdentityProviderAzure}, // correct pod identity {map[string]string{"queueName": queueName, "namespace": namespaceName}, false, queue, defaultSuffix, map[string]string{}, kedav1alpha1.PodIdentityProviderAzure}, + // workload identity but missing namespace + {map[string]string{"queueName": queueName}, true, queue, "", map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, + // correct workload identity + {map[string]string{"queueName": queueName, "namespace": namespaceName}, false, queue, defaultSuffix, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload}, } var azServiceBusMetricIdentifiers = []azServiceBusMetricIdentifier{ @@ -133,6 +137,15 @@ var getServiceBusLengthTestScalers = []azureServiceBusScaler{ podIdentity: kedav1alpha1.PodIdentityProviderAzure, httpClient: commonHTTPClient, }, + { + metadata: &azureServiceBusMetadata{ + entityType: subscription, + topicName: topicName, + subscriptionName: subscriptionName, + }, + podIdentity: kedav1alpha1.PodIdentityProviderAzureWorkload, + httpClient: commonHTTPClient, + }, } func TestParseServiceBusMetadata(t *testing.T) { diff --git a/pkg/scaling/resolver/azure_keyvault_handler.go b/pkg/scaling/resolver/azure_keyvault_handler.go index 68784728091..3dde2437834 100644 --- a/pkg/scaling/resolver/azure_keyvault_handler.go +++ b/pkg/scaling/resolver/azure_keyvault_handler.go @@ -34,32 +34,28 @@ import ( type AzureKeyVaultHandler struct { vault *kedav1alpha1.AzureKeyVault keyvaultClient *keyvault.BaseClient + podIdentity kedav1alpha1.PodIdentityProvider } -func NewAzureKeyVaultHandler(v *kedav1alpha1.AzureKeyVault) *AzureKeyVaultHandler { +func NewAzureKeyVaultHandler(v *kedav1alpha1.AzureKeyVault, podIdentity kedav1alpha1.PodIdentityProvider) *AzureKeyVaultHandler { return &AzureKeyVaultHandler{ - vault: v, + vault: v, + podIdentity: podIdentity, } } func (vh *AzureKeyVaultHandler) Initialize(ctx context.Context, client client.Client, logger logr.Logger, triggerNamespace string) error { - clientID := vh.vault.Credentials.ClientID - tenantID := vh.vault.Credentials.TenantID - - clientSecretName := vh.vault.Credentials.ClientSecret.ValueFrom.SecretKeyRef.Name - clientSecretKey := vh.vault.Credentials.ClientSecret.ValueFrom.SecretKeyRef.Key - clientSecret := resolveAuthSecret(ctx, client, logger, clientSecretName, triggerNamespace, clientSecretKey) - - clientCredentialsConfig := auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID) - keyvaultResourceURL, activeDirectoryEndpoint, err := vh.getPropertiesForCloud() if err != nil { return err } - clientCredentialsConfig.Resource = keyvaultResourceURL - clientCredentialsConfig.AADEndpoint = activeDirectoryEndpoint - authorizer, err := clientCredentialsConfig.Authorizer() + authConfig, err := vh.getAuthConfig(ctx, client, logger, triggerNamespace, keyvaultResourceURL, activeDirectoryEndpoint) + if err != nil { + return err + } + + authorizer, err := authConfig.Authorizer() if err != nil { return err } @@ -105,3 +101,35 @@ func (vh *AzureKeyVaultHandler) getPropertiesForCloud() (string, string, error) return env.ResourceIdentifiers.KeyVault, env.ActiveDirectoryEndpoint, nil } + +func (vh *AzureKeyVaultHandler) getAuthConfig(ctx context.Context, client client.Client, logger logr.Logger, + triggerNamespace, keyVaultResourceURL, activeDirectoryEndpoint string) (auth.AuthorizerConfig, error) { + switch vh.podIdentity { + case "", kedav1alpha1.PodIdentityProviderNone: + clientID := vh.vault.Credentials.ClientID + tenantID := vh.vault.Credentials.TenantID + + clientSecretName := vh.vault.Credentials.ClientSecret.ValueFrom.SecretKeyRef.Name + clientSecretKey := vh.vault.Credentials.ClientSecret.ValueFrom.SecretKeyRef.Key + clientSecret := resolveAuthSecret(ctx, client, logger, clientSecretName, triggerNamespace, clientSecretKey) + + if clientID == "" || tenantID == "" || clientSecret == "" { + return nil, fmt.Errorf("clientID, tenantID and clientSecret are expected when not using a pod identity provider") + } + + config := auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID) + config.Resource = keyVaultResourceURL + config.AADEndpoint = activeDirectoryEndpoint + + return config, nil + case kedav1alpha1.PodIdentityProviderAzure: + config := auth.NewMSIConfig() + config.Resource = keyVaultResourceURL + + return config, nil + case kedav1alpha1.PodIdentityProviderAzureWorkload: + return azure.NewAzureADWorkloadIdentityConfig(ctx, keyVaultResourceURL), nil + default: + return nil, fmt.Errorf("key vault does not support pod identity provider - %s", vh.podIdentity) + } +} diff --git a/pkg/scaling/resolver/azure_keyvault_handler_test.go b/pkg/scaling/resolver/azure_keyvault_handler_test.go index 4a6740f567d..2f0e1e1bc6a 100644 --- a/pkg/scaling/resolver/azure_keyvault_handler_test.go +++ b/pkg/scaling/resolver/azure_keyvault_handler_test.go @@ -110,7 +110,7 @@ var testDataset = []testData{ func TestGetPropertiesForCloud(t *testing.T) { for _, testData := range testDataset { - vh := NewAzureKeyVaultHandler(&testData.vault) + vh := NewAzureKeyVaultHandler(&testData.vault, kedav1alpha1.PodIdentityProviderNone) kvResourceURL, adEndpoint, err := vh.getPropertiesForCloud() diff --git a/pkg/scaling/resolver/scale_resolvers.go b/pkg/scaling/resolver/scale_resolvers.go index a0015a117a1..d93a7c24280 100644 --- a/pkg/scaling/resolver/scale_resolvers.go +++ b/pkg/scaling/resolver/scale_resolvers.go @@ -211,7 +211,7 @@ func resolveAuthRef(ctx context.Context, client client.Client, logger logr.Logge } } if triggerAuthSpec.AzureKeyVault != nil && len(triggerAuthSpec.AzureKeyVault.Secrets) > 0 { - vaultHandler := NewAzureKeyVaultHandler(triggerAuthSpec.AzureKeyVault) + vaultHandler := NewAzureKeyVaultHandler(triggerAuthSpec.AzureKeyVault, podIdentity) err := vaultHandler.Initialize(ctx, client, logger, triggerNamespace) if err != nil { logger.Error(err, "Error authenticating to Azure Key Vault", "triggerAuthRef.Name", triggerAuthRef.Name) diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index faba46a1449..3d5c39ff8ad 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -366,9 +366,9 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, case "azure-blob": return scalers.NewAzureBlobScaler(config) case "azure-data-explorer": - return scalers.NewAzureDataExplorerScaler(config) + return scalers.NewAzureDataExplorerScaler(ctx, config) case "azure-eventhub": - return scalers.NewAzureEventHubScaler(config) + return scalers.NewAzureEventHubScaler(ctx, config) case "azure-log-analytics": return scalers.NewAzureLogAnalyticsScaler(config) case "azure-monitor": diff --git a/tests/cleanup.test.ts b/tests/cleanup.test.ts index b779649d6da..a991ab6bfd5 100644 --- a/tests/cleanup.test.ts +++ b/tests/cleanup.test.ts @@ -1,14 +1,31 @@ import * as sh from 'shelljs' import test from 'ava' +const workloadIdentityNamespace = "azure-workload-identity-system" +const RUN_WORKLOAD_IDENTITY_TESTS = process.env['AZURE_RUN_WORKLOAD_IDENTITY_TESTS'] + test.before('setup shelljs', () => { sh.config.silent = true }) -test('Remove KEDA', t => { +test.serial('Remove KEDA', t => { let result = sh.exec('(cd .. && make undeploy)') if (result.code !== 0) { t.fail('error removing keda. ' + result) } t.pass('KEDA undeployed successfully using make undeploy command') }) + +test.serial('remove azure workload identity kubernetes components', t => { + if (!RUN_WORKLOAD_IDENTITY_TESTS || RUN_WORKLOAD_IDENTITY_TESTS == 'false') { + t.pass('skipping as workload identity tests are disabled') + return + } + + t.is(0, + sh.exec(`helm uninstall workload-identity-webhook --namespace ${workloadIdentityNamespace}`).code, + 'should be able to uninstall workload identity webhook' + ) + + sh.exec(`kubectl delete ns ${workloadIdentityNamespace}`) +}) diff --git a/tests/scalers/azure-app-insights.test.ts b/tests/scalers/azure-app-insights.test.ts index 4dbe1ade8f4..dc69e91091e 100644 --- a/tests/scalers/azure-app-insights.test.ts +++ b/tests/scalers/azure-app-insights.test.ts @@ -12,6 +12,7 @@ const namespacePrefix = 'azure-ai-test-' const app_insights_app_id = process.env['AZURE_APP_INSIGHTS_APP_ID'] const app_insights_instrumentation_key = process.env['AZURE_APP_INSIGHTS_INSTRUMENTATION_KEY'] const sp_id = process.env['AZURE_SP_APP_ID'] +const app_insights_connection_string = process.env['AZURE_APP_INSIGHTS_CONNECTION_STRING'] const sp_key = process.env['AZURE_SP_KEY'] const sp_tenant = process.env['AZURE_SP_TENANT'] const test_pod_id = process.env['TEST_POD_ID'] == "true" @@ -48,13 +49,15 @@ function sleep(sec: number) { } function set_metric(metric_value, t, test_callback) { - appinsights.setup(app_insights_instrumentation_key).setUseDiskRetryCaching(true) + appinsights.setup(app_insights_connection_string).setUseDiskRetryCaching(true) appinsights.defaultClient.context.tags[appinsights.defaultClient.context.keys.cloudRole] = test_app_insights_role appinsights.defaultClient.trackMetric({name: test_app_insights_metric, value: metric_value}); appinsights.defaultClient.flush({ callback: function(response: string) { - let resp = JSON.parse(response) - t.is(0, resp['errors'].length, `failed to set metric: ${response['errors']}`) + let resp_errors = JSON.parse(response)['errors'] + if (resp_errors != null && resp_errors != undefined) { + t.is(0, resp_errors.length, `failed to set metric: ${JSON.stringify(resp_errors)}`) + } test_callback() } }) @@ -76,7 +79,8 @@ function assert_replicas(t, namespace: string, name: string, replicas: number, w } test.before(t => { - if (!app_insights_app_id || !app_insights_instrumentation_key || !sp_id || !sp_key || !sp_tenant) { + if (!app_insights_app_id || !app_insights_instrumentation_key + || !app_insights_connection_string || !sp_id || !sp_key || !sp_tenant) { t.fail('A required parameters app insights scaler was not resolved') } diff --git a/tests/scalers/azure-service-bus-queue-workload-identity.test.ts b/tests/scalers/azure-service-bus-queue-workload-identity.test.ts new file mode 100644 index 00000000000..656e6ee66ca --- /dev/null +++ b/tests/scalers/azure-service-bus-queue-workload-identity.test.ts @@ -0,0 +1,187 @@ +import * as sh from "shelljs" +import * as azure from "@azure/service-bus" +import test from "ava" +import { createNamespace, createYamlFile, waitForDeploymentReplicaCount } from "./helpers" + +const connectionString = process.env["AZURE_SERVICE_BUS_CONNECTION_STRING"] +// Format for connection string - +// Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=;SharedAccessKey=" +const serviceBusNameSpace = connectionString.split("//")[1].split(".")[0] +const queueName = "sb-queue-workload" + +const testName = "test-azure-service-bus-queue-workload-identity" +const testNamespace = `${testName}-ns` +const deploymentName = `${testName}-deployment` +const triggerAuthName = `${testName}-trigger-auth` +const scaledObjectName = `${testName}-scaled-object` + +test.before(async t => { + if (!connectionString) { + t.fail("AZURE_SERVICE_BUS_CONNECTION_STRING environment variable is required for service bus tests") + } + + sh.config.silent = true + + // Create queue within the Service Bus Namespace + const serviceBusAdminClient = new azure.ServiceBusAdministrationClient(connectionString) + const queueExists = await serviceBusAdminClient.queueExists(queueName) + // Clean up (delete) queue if already exists and create again + if (queueExists) { + await serviceBusAdminClient.deleteQueue(queueName) + } + await serviceBusAdminClient.createQueue(queueName) + + // Create Kubernetes Namespace + createNamespace(testNamespace) + + // Create deployment + t.is( + sh.exec(`kubectl apply -f ${createYamlFile(deploymentYaml)} -n ${testNamespace}`).code, + 0, + "Creating a deployment should work" + ) + + // Create trigger auth resource + t.is( + sh.exec(`kubectl apply -f ${createYamlFile(triggerAuthYaml)} -n ${testNamespace}`).code, + 0, + "Creating a trigger authentication resource should work" + ) + + // Create scaled object + t.is( + sh.exec(`kubectl apply -f ${createYamlFile(scaledObjectYaml)} -n ${testNamespace}`).code, + 0, + "Creating a scaled object should work" + ) + + t.true(await waitForDeploymentReplicaCount(0, deploymentName, testNamespace, 60, 1000), "Replica count should be 0 after 1 minute") +}) + +test.serial("Deployment should scale up with messages on service bus queue", async t => { + // Send messages to service bus queue + const serviceBusClient = new azure.ServiceBusClient(connectionString) + const sender = serviceBusClient.createSender(queueName) + + const messages: azure.ServiceBusMessage[] = [ + {"body": "1"}, + {"body": "2"}, + {"body": "3"}, + {"body": "4"}, + {"body": "5"}, + ] + + await sender.sendMessages(messages) + + await serviceBusClient.close() + + // Scale out when messages available + t.true(await waitForDeploymentReplicaCount(1, deploymentName, testNamespace, 60, 1000), "Replica count should be 1 after 1 minute") +}) + +test.serial("Deployment should scale down with messages on service bus queue", async t => { + // Receive messages from service bus queue + const serviceBusClient = new azure.ServiceBusClient(connectionString) + const receiver = serviceBusClient.createReceiver(queueName) + + var numOfReceivedMessages = 0 + + while (numOfReceivedMessages < 5) { + const messages = await receiver.receiveMessages(10, { + maxWaitTimeInMs: 60 * 1000, + }) + + for (const message of messages) { + await receiver.completeMessage(message) + numOfReceivedMessages += 1 + } + } + + await serviceBusClient.close() + + // Scale down when messages unavailable + t.true(await waitForDeploymentReplicaCount(0, deploymentName, testNamespace, 60, 1000), "Replica count should be 0 after 1 minute") +}) + +test.after.always("Clean up E2E K8s objects", async t => { + const resources = [ + `scaledobject.keda.sh/${scaledObjectName}`, + `triggerauthentications.keda.sh/${triggerAuthName}`, + `deployments.apps/${deploymentName}`, + ] + + for (const resource of resources) { + sh.exec(`kubectl delete ${resource} -n ${testNamespace}`) + } + + sh.exec(`kubectl delete ns ${testNamespace}`) + + // Delete queue + const serviceBusAdminClient = new azure.ServiceBusAdministrationClient(connectionString) + const response = await serviceBusAdminClient.deleteQueue(queueName) + t.is( + response._response.status, + 200, + "Queue deletion must succeed" + ) +}) + +// YAML Definitions for Kubernetes resources +// Deployment +const deploymentYaml = +`apiVersion: apps/v1 +kind: Deployment +metadata: + name: ${deploymentName} + namespace: ${testNamespace} +spec: + replicas: 0 + selector: + matchLabels: + app: ${deploymentName} + template: + metadata: + labels: + app: ${deploymentName} + spec: + containers: + - name: nginx + image: nginx:1.16.1 +` + +// Trigger Authentication +const triggerAuthYaml = +`apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: ${triggerAuthName} + namespace: ${testNamespace} +spec: + podIdentity: + provider: azure-workload +` + +// Scaled Object +const scaledObjectYaml = +`apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: ${scaledObjectName} + namespace: ${testNamespace} + labels: + deploymentName: ${deploymentName} +spec: + scaleTargetRef: + name: ${deploymentName} + pollingInterval: 5 + cooldownPeriod: 10 + minReplicaCount: 0 + maxReplicaCount: 1 + triggers: + - type: azure-servicebus + metadata: + namespace: ${serviceBusNameSpace} + queueName: ${queueName} + authenticationRef: + name: ${triggerAuthName} +` diff --git a/tests/setup.test.ts b/tests/setup.test.ts index 650d4e5dbcd..5de723e2415 100644 --- a/tests/setup.test.ts +++ b/tests/setup.test.ts @@ -5,6 +5,10 @@ import test from 'ava' const kc = new k8s.KubeConfig() kc.loadFromDefault() +const AZURE_AD_TENANT_ID = process.env['AZURE_SP_TENANT'] +const RUN_WORKLOAD_IDENTITY_TESTS = process.env['AZURE_RUN_WORKLOAD_IDENTITY_TESTS'] +const workloadIdentityNamespace = "azure-workload-identity-system" + test.before('configure shelljs', () => { sh.config.silent = true }) @@ -33,6 +37,68 @@ test.serial('Get Kubernetes version', t => { } }) +test.serial('setup helm', t => { + // check if helm is already installed. + let result = sh.exec('helm version') + if(result.code == 0) { + t.pass('helm is already installed. skipping setup') + return + } + t.is(0, sh.exec(`curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3`).code, 'should be able to download helm script') + t.is(0, sh.exec(`chmod 700 get_helm.sh`).code, 'should be able to change helm script permissions') + t.is(0, sh.exec(`./get_helm.sh`).code, 'should be able to download helm') + t.is(0, sh.exec(`helm version`).code, 'should be able to get helm version') +}) + +test.serial('setup and verify azure workload identity kubernetes components', t => { + if (!RUN_WORKLOAD_IDENTITY_TESTS || RUN_WORKLOAD_IDENTITY_TESTS == 'false') { + t.pass('skipping as workload identity tests are disabled') + return + } + + // check if helm is already installed. + let result = sh.exec('helm version') + if (result.code != 0) { + t.fail('helm is not installed') + return + } + + // Add Azure AD Workload Identity Helm Repo + t.is(0, + sh.exec('helm repo add azure-workload-identity https://azure.github.io/azure-workload-identity/charts').code, + 'should be able to add Azure AD workload identity helm repo' + ) + t.is(0, + sh.exec(`helm repo update azure-workload-identity`).code, + "should be able to update" + ) + + // Install Workload Identity Webhook if not present + t.is(0, + sh.exec(`helm upgrade --install workload-identity-webhook azure-workload-identity/workload-identity-webhook --namespace ${workloadIdentityNamespace} --create-namespace --set azureTenantID="${AZURE_AD_TENANT_ID}"`).code, + 'should be able to install workload identity webhook' + ) + + let success = false + for (let i = 0; i < 20; i++) { + result = sh.exec( + `kubectl get deployment.apps/azure-wi-webhook-controller-manager -n ${workloadIdentityNamespace} -o jsonpath="{.status.readyReplicas}"` + ) + const parsedPods = parseInt(result.stdout, 10) + if (isNaN(parsedPods) || parsedPods != 2) { + t.log('Workload Identity webhook is not ready. sleeping') + sh.exec('sleep 5s') + } else if (parsedPods == 2) { + t.log('Workload Identity webhook is ready') + success = true + sh.exec('sleep 120s') // Sleep for some time for webhook to setup properly + break + } + } + + t.true(success, 'expected workload identity deployments to start 2 pods successfully') +}) + test.serial('Deploy KEDA', t => { let result = sh.exec('(cd .. && make deploy)') if (result.code !== 0) { @@ -64,16 +130,3 @@ test.serial('verifyKeda', t => { t.true(success, 'expected keda deployments to start 2 pods successfully') }) - -test.serial('setup helm', t => { - // check if helm is already installed. - let result = sh.exec('helm version') - if(result.code == 0) { - t.pass('helm is already installed. skipping setup') - return - } - t.is(0, sh.exec(`curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3`).code, 'should be able to download helm script') - t.is(0, sh.exec(`chmod 700 get_helm.sh`).code, 'should be able to change helm script permissions') - t.is(0, sh.exec(`./get_helm.sh`).code, 'should be able to download helm') - t.is(0, sh.exec(`helm version`).code, 'should be able to get helm version') -})