Skip to content

Commit

Permalink
fix: Use new SDK with proper errors for service bus with pod identity (
Browse files Browse the repository at this point in the history
…#4030)

Signed-off-by: Jorge Turrado Ferrero <Jorge_turrado@hotmail.es>
Signed-off-by: Jorge Turrado <jorge_turrado@hotmail.es>
  • Loading branch information
JorTurFer authored Jan 13, 2023
1 parent be56ebe commit c161148
Show file tree
Hide file tree
Showing 31 changed files with 1,144 additions and 169 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Here is an overview of all new **experimental** features:
### Fixes

- **General**: Prevent a panic that might occur while refreshing a scaler cache ([#4092](https://github.com/kedacore/keda/issues/4092))
- **Azure Service Bus Scaler:** Use correct auth flows with pod identity ([#4026](https://github.com/kedacore/keda/issues/4026))
- **CPU Memory Scaler** Store forgotten logger ([#4022](https://github.com/kedacore/keda/issues/4022))

### Deprecations
Expand Down
10 changes: 8 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,15 @@ uninstall: manifests kustomize ## Uninstall CRDs from the K8s cluster specified

deploy: manifests kustomize ## Deploy controller to the K8s cluster specified in ~/.kube/config.
cd config/manager && \
$(KUSTOMIZE) edit set image ghcr.io/kedacore/keda=${IMAGE_CONTROLLER}
$(KUSTOMIZE) edit set image ghcr.io/kedacore/keda=${IMAGE_CONTROLLER} && \
if [ "$(AZURE_RUN_AAD_POD_IDENTITY_TESTS)" = true ]; then \
$(KUSTOMIZE) edit add label --force aadpodidbinding:keda; \
fi
cd config/metrics-server && \
$(KUSTOMIZE) edit set image ghcr.io/kedacore/keda-metrics-apiserver=${IMAGE_ADAPTER}
$(KUSTOMIZE) edit set image ghcr.io/kedacore/keda-metrics-apiserver=${IMAGE_ADAPTER} && \
if [ "$(AZURE_RUN_AAD_POD_IDENTITY_TESTS)" = true ]; then \
$(KUSTOMIZE) edit add label --force aadpodidbinding:keda; \
fi
if [ "$(AZURE_RUN_WORKLOAD_IDENTITY_TESTS)" = true ]; then \
cd config/service_account && \
$(KUSTOMIZE) edit add label --force azure.workload.identity/use:true; \
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/Azure/azure-kusto-go v0.9.2
github.com/Azure/azure-sdk-for-go v67.1.0+incompatible
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0-beta.2
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.1.3
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.1/go.mod h1:fBF9PQNqB8scdgpZ3
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0 h1:sVW/AFBTGyJxDaMYlq0ct3jUXTtj12tQ6zE2GZUgVQw=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.11.0/go.mod h1:HcM1YX14R7CJcghJGOYCgdezslRSVzqwLf/q+4Y2r/0=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 h1:t/W5MYAuQy81cvM8VUNfRLzhtKpXhVUAN7Cd7KVbTyc=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0/go.mod h1:NBanQUfSWiWn3QEpWDTCU0IjBECKOYvl2R8xdRtMtiM=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0-beta.2 h1:NsprcuNHEsCR48QYlLxx/gAi9OcCzcwX8VVTZVK8fdQ=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0-beta.2/go.mod h1:NBanQUfSWiWn3QEpWDTCU0IjBECKOYvl2R8xdRtMtiM=
github.com/Azure/azure-sdk-for-go/sdk/internal v0.7.0/go.mod h1:yqy467j36fJxcRV2TzfVZ1pCb5vxm4BtZPUdYWe/Xo8=
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.3/go.mod h1:KLF4gFr6DcKFZwSuH8w8yEK6DpFl3LP5rhdvAb7Yz5I=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 h1:XUNQ4mw+zJmaA2KXzP9JlQiecy1SI+Eog7xVkPiqIbg=
Expand Down
4 changes: 2 additions & 2 deletions pkg/metricsservice/utils/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"os"
"path"

"google.golang.org/grpc/credentials"
Expand All @@ -29,7 +29,7 @@ import (
// LoadGrpcTLSCredentials reads the certificate from the given path and returns TLS transport credentials
func LoadGrpcTLSCredentials(certDir string, server bool) (credentials.TransportCredentials, error) {
// Load certificate of the CA who signed client's certificate
pemClientCA, err := ioutil.ReadFile(path.Join(certDir, "ca.crt"))
pemClientCA, err := os.ReadFile(path.Join(certDir, "ca.crt"))
if err != nil {
return nil, err
}
Expand Down
51 changes: 51 additions & 0 deletions pkg/scalers/azure/azure_aad_podidentity.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ import (
"io"
"net/http"
"net/url"
"os"
"strconv"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"

"github.com/kedacore/keda/v2/pkg/util"
)
Expand All @@ -17,6 +24,20 @@ const (
MSIURLWithClientID = "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=%s&client_id=%s"
)

var globalHTTPTimeout time.Duration

func init() {
valueStr, found := os.LookupEnv("KEDA_HTTP_DEFAULT_TIMEOUT")
globalHTTPTimeoutMS := 3000
if found && valueStr != "" {
value, err := strconv.Atoi(valueStr)
if err == nil {
globalHTTPTimeoutMS = value
}
}
globalHTTPTimeout = time.Duration(globalHTTPTimeoutMS) * time.Millisecond
}

// GetAzureADPodIdentityToken returns the AADToken for resource
func GetAzureADPodIdentityToken(ctx context.Context, httpClient util.HTTPDoer, identityID, audience string) (AADToken, error) {
var token AADToken
Expand Down Expand Up @@ -54,3 +75,33 @@ func GetAzureADPodIdentityToken(ctx context.Context, httpClient util.HTTPDoer, i

return token, nil
}

type ManagedIdentityWrapper struct {
cred *azidentity.ManagedIdentityCredential
}

func ManagedIdentityWrapperCredential(clientID string) (*ManagedIdentityWrapper, error) {
opts := &azidentity.ManagedIdentityCredentialOptions{}
if clientID != "" {
opts.ID = azidentity.ClientID(clientID)
}

msiCred, err := azidentity.NewManagedIdentityCredential(opts)
if err != nil {
return nil, err
}
return &ManagedIdentityWrapper{
cred: msiCred,
}, nil
}

func (w *ManagedIdentityWrapper) GetToken(ctx context.Context, opts policy.TokenRequestOptions) (azcore.AccessToken, error) {
c, cancel := context.WithTimeout(ctx, globalHTTPTimeout)
defer cancel()
tk, err := w.cred.GetToken(c, opts)
if ctxErr := c.Err(); errors.Is(ctxErr, context.DeadlineExceeded) {
// timeout: signal the chain to try its next credential, if any
err = azidentity.NewCredentialUnavailableError("managed identity timed out")
}
return tk, err
}
69 changes: 21 additions & 48 deletions pkg/scalers/azure/azure_aad_workload_identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import (
"time"

amqpAuth "github.com/Azure/azure-amqp-common-go/v3/auth"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/AzureAD/microsoft-authentication-library-for-go/apps/confidential"
Expand All @@ -45,18 +44,26 @@ const (
azureAuthrityHostEnv = "AZURE_AUTHORITY_HOST"
)

var DefaultClientID string
var TenantID string
var TokenFilePath string
var AuthorityHost string

func init() {
DefaultClientID = os.Getenv(azureClientIDEnv)
TenantID = os.Getenv(azureTenantIDEnv)
TokenFilePath = os.Getenv(azureFederatedTokenFileEnv)
AuthorityHost = os.Getenv(azureAuthrityHostEnv)
}

// GetAzureADWorkloadIdentityToken returns the AADToken for resource
func GetAzureADWorkloadIdentityToken(ctx context.Context, identityID, resource string) (AADToken, error) {
clientID := os.Getenv(azureClientIDEnv)
tenantID := os.Getenv(azureTenantIDEnv)
tokenFilePath := os.Getenv(azureFederatedTokenFileEnv)
authorityHost := os.Getenv(azureAuthrityHostEnv)

clientID := DefaultClientID
if identityID != "" {
clientID = identityID
}

signedAssertion, err := readJWTFromFileSystem(tokenFilePath)
signedAssertion, err := readJWTFromFileSystem(TokenFilePath)
if err != nil {
return AADToken{}, fmt.Errorf("error reading service account token - %w", err)
}
Expand All @@ -69,7 +76,7 @@ func GetAzureADWorkloadIdentityToken(ctx context.Context, identityID, resource s
return signedAssertion, nil
})

authorityOption := confidential.WithAuthority(fmt.Sprintf("%s%s/oauth2/token", authorityHost, tenantID))
authorityOption := confidential.WithAuthority(fmt.Sprintf("%s%s/oauth2/token", AuthorityHost, TenantID))
confidentialClient, err := confidential.New(
clientID,
cred,
Expand Down Expand Up @@ -126,46 +133,12 @@ func (aadWiConfig ADWorkloadIdentityConfig) Authorizer() (autorest.Authorizer, e
aadWiConfig.ctx, aadWiConfig.IdentityID, aadWiConfig.Resource)), nil
}

// ADWorkloadIdentityCredential is a type that implements the TokenCredential interface.
// Once azure-sdk-for-go supports Workload Identity we can remove this and use default implementation
// https://github.com/Azure/azure-sdk-for-go/issues/15615
type ADWorkloadIdentityCredential struct {
ctx context.Context
IdentityID string
Resource string
aadToken AADToken
}

func NewADWorkloadIdentityCredential(ctx context.Context, identityID, resource string) *ADWorkloadIdentityCredential {
return &ADWorkloadIdentityCredential{ctx: ctx, IdentityID: identityID, Resource: resource}
}

func (wiCredential *ADWorkloadIdentityCredential) refresh() error {
if time.Now().Before(wiCredential.aadToken.ExpiresOnTimeObject) {
return nil
}

aadToken, err := GetAzureADWorkloadIdentityToken(wiCredential.ctx, wiCredential.IdentityID, wiCredential.Resource)
if err != nil {
return err
}

wiCredential.aadToken = aadToken
return nil
}

// GetToken is for implementing the TokenCredential interface
func (wiCredential *ADWorkloadIdentityCredential) GetToken(ctx context.Context, opts policy.TokenRequestOptions) (azcore.AccessToken, error) {
accessToken := azcore.AccessToken{}
err := wiCredential.refresh()
if err != nil {
return accessToken, err
func NewADWorkloadIdentityCredential(identityID string) (*azidentity.WorkloadIdentityCredential, error) {
clientID := DefaultClientID
if identityID != "" {
clientID = identityID
}

accessToken.Token = wiCredential.aadToken.AccessToken
accessToken.ExpiresOn = wiCredential.aadToken.ExpiresOnTimeObject

return accessToken, nil
return azidentity.NewWorkloadIdentityCredential(TenantID, clientID, TokenFilePath, &azidentity.WorkloadIdentityCredentialOptions{})
}

// ADWorkloadIdentityTokenProvider is a type that implements the adal.OAuthTokenProvider and adal.Refresher interfaces.
Expand Down
35 changes: 35 additions & 0 deletions pkg/scalers/azure/azure_azidentity_chain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package azure

import (
"os"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
)

func NewChainedCredential(identityID string) (*azidentity.ChainedTokenCredential, error) {
var creds []azcore.TokenCredential

// Used for local debug based on az-cli user
// As production images don't have shell, we can't register this provider always
if _, err := os.Stat("/bin/sh"); err == nil {
cliCred, err := azidentity.NewAzureCLICredential(&azidentity.AzureCLICredentialOptions{})
if err == nil {
creds = append(creds, cliCred)
}
}

// Used for aad-pod-identity
msiCred, err := ManagedIdentityWrapperCredential(identityID)
if err == nil {
creds = append(creds, msiCred)
}

wiCred, err := NewADWorkloadIdentityCredential(identityID)
if err == nil {
creds = append(creds, wiCred)
}

// Create the chained credential based on the previous 3
return azidentity.NewChainedTokenCredential(creds, nil)
}
52 changes: 6 additions & 46 deletions pkg/scalers/azure_servicebus_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"regexp"
"strconv"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
az "github.com/Azure/go-autorest/autorest/azure"
"github.com/go-logr/logr"
Expand All @@ -44,8 +42,6 @@ const (
messageCountMetricName = "messageCount"
activationMessageCountMetricName = "activationMessageCount"
defaultTargetMessageCount = 5
// Service bus resource id is "https://servicebus.azure.net/" in all cloud environments
serviceBusResource = "https://servicebus.azure.net/"
)

type azureServiceBusScaler struct {
Expand Down Expand Up @@ -275,7 +271,7 @@ func (s *azureServiceBusScaler) GetMetricsAndActivity(ctx context.Context, metri
// Returns the length of the queue or subscription
func (s *azureServiceBusScaler) getAzureServiceBusLength(ctx context.Context) (int64, error) {
// get adminClient
adminClient, err := s.getServiceBusAdminClient(ctx)
adminClient, err := s.getServiceBusAdminClient()
if err != nil {
return -1, err
}
Expand All @@ -291,58 +287,22 @@ func (s *azureServiceBusScaler) getAzureServiceBusLength(ctx context.Context) (i
}

// Returns service bus namespace object
func (s *azureServiceBusScaler) getServiceBusAdminClient(ctx context.Context) (*admin.Client, error) {
func (s *azureServiceBusScaler) getServiceBusAdminClient() (*admin.Client, error) {
if s.client != nil {
return s.client, nil
}

var adminClient *admin.Client
var err error

switch s.podIdentity.Provider {
case "", kedav1alpha1.PodIdentityProviderNone:
adminClient, err = admin.NewClientFromConnectionString(s.metadata.connection, nil)
if err != nil {
return nil, err
}

return admin.NewClientFromConnectionString(s.metadata.connection, nil)
case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload:
var creds []azcore.TokenCredential
options := &azidentity.DefaultAzureCredentialOptions{}

// Used for local debug based on az-cli user
cliCred, err := azidentity.NewAzureCLICredential(&azidentity.AzureCLICredentialOptions{TenantID: options.TenantID})
if err == nil {
creds = append(creds, cliCred)
}

// Once azure-sdk-for-go supports Workload Identity we can remove this and use default implementation
// https://github.com/Azure/azure-sdk-for-go/issues/15615
wiCred := azure.NewADWorkloadIdentityCredential(ctx, s.podIdentity.IdentityID, serviceBusResource)
creds = append(creds, wiCred)

// Used for aad-pod-identity
o := &azidentity.ManagedIdentityCredentialOptions{ClientOptions: options.ClientOptions}
if s.podIdentity.IdentityID != "" {
o.ID = azidentity.ClientID(s.podIdentity.IdentityID)
}
msiCred, err := azidentity.NewManagedIdentityCredential(o)
if err == nil {
creds = append(creds, msiCred)
}

// Create the chained credential based on the previous 3
chain, err := azidentity.NewChainedTokenCredential(creds, nil)
if err != nil {
return nil, err
}
adminClient, err = admin.NewClient(s.metadata.fullyQualifiedNamespace, chain, nil)
creds, err := azure.NewChainedCredential(s.podIdentity.IdentityID)
if err != nil {
return nil, err
}
return admin.NewClient(s.metadata.fullyQualifiedNamespace, creds, nil)
}

return adminClient, nil
return nil, fmt.Errorf("incorrect podIdentity type")
}

func getQueueLength(ctx context.Context, adminClient *admin.Client, meta *azureServiceBusMetadata) (int64, error) {
Expand Down
4 changes: 4 additions & 0 deletions tests/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
)

const (
AzureAdPodIdentityNamespace = "azure-ad-identity-system"
AzureWorkloadIdentityNamespace = "azure-workload-identity-system"
AwsIdentityNamespace = "aws-identity-system"
GcpIdentityNamespace = "gcp-identity-system"
Expand All @@ -54,7 +55,10 @@ var random = rand.New(rand.NewSource(time.Now().UnixNano()))

// Env variables required for setup and cleanup.
var (
AzureADMsiID = os.Getenv("TF_AZURE_IDENTITY_1_APP_FULL_ID")
AzureADMsiClientID = os.Getenv("TF_AZURE_IDENTITY_1_APP_ID")
AzureADTenantID = os.Getenv("TF_AZURE_SP_TENANT")
AzureRunAadPodIdentityTests = os.Getenv("AZURE_RUN_AAD_POD_IDENTITY_TESTS")
AzureRunWorkloadIdentityTests = os.Getenv("AZURE_RUN_WORKLOAD_IDENTITY_TESTS")
AwsIdentityTests = os.Getenv("AWS_RUN_IDENTITY_TESTS")
GcpIdentityTests = os.Getenv("GCP_RUN_IDENTITY_TESTS")
Expand Down
Loading

0 comments on commit c161148

Please sign in to comment.