From 085bef7edd6f410fb3228d2a8c0e7071fa6558e4 Mon Sep 17 00:00:00 2001 From: Rahul Jain Date: Thu, 22 Jun 2023 12:33:16 -0700 Subject: [PATCH] Move Plugin level lock to Account level Signed-off-by: Rahul Jain --- .../cloudresource/cloudresource.go | 8 ---- .../plugins/aws/aws_converters.go | 2 +- pkg/cloudprovider/plugins/aws/aws_security.go | 10 ++++- .../plugins/aws/aws_security_impl.go | 31 +++++++--------- .../plugins/azure/azure_security.go | 4 -- .../plugins/azure/azure_security_impl.go | 37 ++++++++----------- .../plugins/internal/accounts_common.go | 14 +++++-- .../plugins/internal/cloud_common.go | 20 +++++++--- 8 files changed, 62 insertions(+), 64 deletions(-) diff --git a/pkg/cloudprovider/cloudresource/cloudresource.go b/pkg/cloudprovider/cloudresource/cloudresource.go index 0702ed85..7c134cfe 100644 --- a/pkg/cloudprovider/cloudresource/cloudresource.go +++ b/pkg/cloudprovider/cloudresource/cloudresource.go @@ -32,14 +32,6 @@ var ( ControllerAppliedToPrefix string ) -var ProtocolNameNumMap = map[string]int{ - "icmp": 1, - "igmp": 2, - "tcp": 6, - "udp": 17, - "icmpv6": 58, -} - // CloudResourceType specifies the type of cloud resource. type CloudResourceType string diff --git a/pkg/cloudprovider/plugins/aws/aws_converters.go b/pkg/cloudprovider/plugins/aws/aws_converters.go index 21b8278b..e3678506 100644 --- a/pkg/cloudprovider/plugins/aws/aws_converters.go +++ b/pkg/cloudprovider/plugins/aws/aws_converters.go @@ -252,6 +252,6 @@ func convertFromIPPermissionProtocol(proto string) *int { if strings.Compare(proto, awsAnyProtocolValue) == 0 { return nil } - protoNum := cloudresource.ProtocolNameNumMap[strings.ToLower(proto)] + protoNum := protocolNameNumMap[strings.ToLower(proto)] return &protoNum } diff --git a/pkg/cloudprovider/plugins/aws/aws_security.go b/pkg/cloudprovider/plugins/aws/aws_security.go index c81feed5..923ef375 100644 --- a/pkg/cloudprovider/plugins/aws/aws_security.go +++ b/pkg/cloudprovider/plugins/aws/aws_security.go @@ -36,13 +36,19 @@ const ( ) var ( - mutex sync.Mutex - awsAnyProtocolValue = "-1" tcpUDPPortStart = 0 tcpUDPPortEnd = 65535 ) +var protocolNameNumMap = map[string]int{ + "icmp": 1, + "igmp": 2, + "tcp": 6, + "udp": 17, + "icmpv6": 58, +} + var vpcIDToDefaultSecurityGroup = make(map[string]string) func buildEc2UserIDGroupPairs(addressGroupIdentifiers []*cloudresource.CloudResourceID, diff --git a/pkg/cloudprovider/plugins/aws/aws_security_impl.go b/pkg/cloudprovider/plugins/aws/aws_security_impl.go index a87d93ad..e37317eb 100644 --- a/pkg/cloudprovider/plugins/aws/aws_security_impl.go +++ b/pkg/cloudprovider/plugins/aws/aws_security_impl.go @@ -16,26 +16,24 @@ package aws import ( "fmt" - "sync" - "time" - "github.com/aws/aws-sdk-go/service/ec2" "k8s.io/apimachinery/pkg/types" + "sync" "antrea.io/nephe/pkg/cloudprovider/cloudresource" + "antrea.io/nephe/pkg/cloudprovider/plugins/internal" "antrea.io/nephe/pkg/cloudprovider/utils" ) // CreateSecurityGroup invokes cloud api and creates the cloud security group based on securityGroupIdentifier. func (c *awsCloud) CreateSecurityGroup(securityGroupIdentifier *cloudresource.CloudResource, membershipOnly bool) (*string, error) { - mutex.Lock() - defer mutex.Unlock() - vpcID := securityGroupIdentifier.Vpc accCfg, found := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID) if !found { return nil, fmt.Errorf("aws account not found managing virtual private cloud [%v]", vpcID) } + accCfg.LockMutex() + defer accCfg.UnlockMutex() cloudSgName := securityGroupIdentifier.GetCloudName(membershipOnly) ec2Service := accCfg.GetServiceConfig().(*ec2ServiceConfig) @@ -51,9 +49,6 @@ func (c *awsCloud) CreateSecurityGroup(securityGroupIdentifier *cloudresource.Cl // UpdateSecurityGroupRules invokes cloud api and updates cloud security group with addRules and rmRules. func (c *awsCloud) UpdateSecurityGroupRules(appliedToGroupIdentifier *cloudresource.CloudResource, addRules, rmRules []*cloudresource.CloudRule) error { - mutex.Lock() - defer mutex.Unlock() - addIRule, addERule := utils.SplitCloudRulesByDirection(addRules) rmIRule, rmERule := utils.SplitCloudRulesByDirection(rmRules) @@ -62,6 +57,8 @@ func (c *awsCloud) UpdateSecurityGroupRules(appliedToGroupIdentifier *cloudresou if !found { return fmt.Errorf("aws account not found managing virtual private cloud [%v]", vpcID) } + accCfg.LockMutex() + defer accCfg.UnlockMutex() // build from addressGroups, cloudSgNames from rules cloudSgNames := buildEc2CloudSgNamesFromRules(&appliedToGroupIdentifier.CloudResourceID, append(addIRule, rmIRule...), @@ -124,14 +121,13 @@ func (c *awsCloud) UpdateSecurityGroupRules(appliedToGroupIdentifier *cloudresou // UpdateSecurityGroupMembers invokes cloud api and attaches/detaches nics to/from the cloud security group. func (c *awsCloud) UpdateSecurityGroupMembers(securityGroupIdentifier *cloudresource.CloudResource, cloudResourceIdentifiers []*cloudresource.CloudResource, membershipOnly bool) error { - mutex.Lock() - defer mutex.Unlock() - vpcID := securityGroupIdentifier.Vpc accCfg, found := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID) if !found { return fmt.Errorf("aws account not found managing virtual private cloud [%v]", vpcID) } + accCfg.LockMutex() + defer accCfg.UnlockMutex() // get addressGroup cloudSgID cloudSgName := securityGroupIdentifier.GetCloudName(membershipOnly) @@ -158,14 +154,13 @@ func (c *awsCloud) UpdateSecurityGroupMembers(securityGroupIdentifier *cloudreso // DeleteSecurityGroup invokes cloud api and deletes the cloud security group. Any attached resource will be moved to default sg. func (c *awsCloud) DeleteSecurityGroup(securityGroupIdentifier *cloudresource.CloudResource, membershipOnly bool) error { - mutex.Lock() - defer mutex.Unlock() - vpcID := securityGroupIdentifier.Vpc accCfg, found := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID) if !found { return fmt.Errorf("aws account not found managing virtual private cloud [%v]", vpcID) } + accCfg.LockMutex() + defer accCfg.UnlockMutex() // check if sg exists in cloud and get its cloud sg id to delete vpcIDs := []string{vpcID} @@ -196,8 +191,6 @@ func (c *awsCloud) DeleteSecurityGroup(securityGroupIdentifier *cloudresource.Cl } func (c *awsCloud) GetEnforcedSecurity() []cloudresource.SynchronizationContent { - inventoryInitWaitDuration := 30 * time.Second - var accNamespacedNames []types.NamespacedName accountConfigs := c.cloudCommon.GetCloudAccounts() for _, accCfg := range accountConfigs { @@ -227,9 +220,11 @@ func (c *awsCloud) GetEnforcedSecurity() []cloudresource.SynchronizationContent awsPluginLogger().Info("Enforced-security-cloud-view GET for account skipped (account no longer exists)", "account", name) return } + accCfg.LockMutex() + defer accCfg.UnlockMutex() ec2Service := accCfg.GetServiceConfig().(*ec2ServiceConfig) - if err := ec2Service.waitForInventoryInit(inventoryInitWaitDuration); err != nil { + if err := ec2Service.waitForInventoryInit(internal.InventoryInitWaitDuration); err != nil { awsPluginLogger().Error(err, "Enforced-security-cloud-view GET for account skipped", "account", accCfg.GetNamespacedName()) return } diff --git a/pkg/cloudprovider/plugins/azure/azure_security.go b/pkg/cloudprovider/plugins/azure/azure_security.go index 6d3b6b03..92a90e5f 100644 --- a/pkg/cloudprovider/plugins/azure/azure_security.go +++ b/pkg/cloudprovider/plugins/azure/azure_security.go @@ -28,10 +28,6 @@ import ( "antrea.io/nephe/pkg/cloudprovider/utils" ) -var ( - mutex sync.Mutex -) - type networkInterfaceInternal struct { armnetwork.Interface vnetID string diff --git a/pkg/cloudprovider/plugins/azure/azure_security_impl.go b/pkg/cloudprovider/plugins/azure/azure_security_impl.go index e96e9fce..43e91a07 100644 --- a/pkg/cloudprovider/plugins/azure/azure_security_impl.go +++ b/pkg/cloudprovider/plugins/azure/azure_security_impl.go @@ -17,24 +17,20 @@ package azure import ( "context" "fmt" - "strings" - "sync" - "time" - "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork" "github.com/Azure/go-autorest/autorest/to" "k8s.io/apimachinery/pkg/types" + "strings" + "sync" "antrea.io/nephe/pkg/cloudprovider/cloudresource" + "antrea.io/nephe/pkg/cloudprovider/plugins/internal" ) // CreateSecurityGroup invokes cloud api and creates the cloud security group based on securityGroupIdentifier. // For addressGroup it will attempt to create an asg, for appliedTo groups it will attempt to create both nsg and asg. func (c *azureCloud) CreateSecurityGroup(securityGroupIdentifier *cloudresource.CloudResource, membershipOnly bool) (*string, error) { - mutex.Lock() - defer mutex.Unlock() var cloudSecurityGroupID string - // find account managing the vnet vnetID := securityGroupIdentifier.Vpc accCfg, found := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID) @@ -42,6 +38,8 @@ func (c *azureCloud) CreateSecurityGroup(securityGroupIdentifier *cloudresource. azurePluginLogger().Info("Azure account not found managing virtual network", vnetID, "vnetID") return nil, fmt.Errorf("azure account not found managing virtual network [%v]", vnetID) } + accCfg.LockMutex() + defer accCfg.UnlockMutex() // extract resource-group-name from vnet ID _, rgName, _, err := extractFieldsFromAzureResourceID(securityGroupIdentifier.Vpc) @@ -84,15 +82,15 @@ func (c *azureCloud) CreateSecurityGroup(securityGroupIdentifier *cloudresource. // UpdateSecurityGroupRules invokes cloud api and updates cloud security group with allRules. func (c *azureCloud) UpdateSecurityGroupRules(appliedToGroupIdentifier *cloudresource.CloudResource, addRules, rmRules []*cloudresource.CloudRule) error { - mutex.Lock() - defer mutex.Unlock() - // find account managing the vnet and get compute service config vnetID := appliedToGroupIdentifier.Vpc accCfg, found := c.cloudCommon.GetCloudAccountByAccountId(&appliedToGroupIdentifier.AccountID) if !found { return fmt.Errorf("azure account not found managing virtual network [%v]", vnetID) } + accCfg.LockMutex() + defer accCfg.UnlockMutex() + computeService := accCfg.GetServiceConfig().(*computeServiceConfig) location := computeService.credentials.region @@ -151,28 +149,28 @@ func (c *azureCloud) UpdateSecurityGroupRules(appliedToGroupIdentifier *cloudres // UpdateSecurityGroupMembers invokes cloud api and attaches/detaches nics to/from the cloud security group. func (c *azureCloud) UpdateSecurityGroupMembers(securityGroupIdentifier *cloudresource.CloudResource, computeResourceIdentifier []*cloudresource.CloudResource, membershipOnly bool) error { - mutex.Lock() - defer mutex.Unlock() - vnetID := securityGroupIdentifier.Vpc accCfg, found := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID) if !found { return fmt.Errorf("azure account not found managing virtual network [%v]", vnetID) } + accCfg.LockMutex() + defer accCfg.UnlockMutex() + computeService := accCfg.GetServiceConfig().(*computeServiceConfig) return computeService.updateSecurityGroupMembers(&securityGroupIdentifier.CloudResourceID, computeResourceIdentifier, membershipOnly) } // DeleteSecurityGroup invokes cloud api and deletes the cloud application security group. func (c *azureCloud) DeleteSecurityGroup(securityGroupIdentifier *cloudresource.CloudResource, membershipOnly bool) error { - mutex.Lock() - defer mutex.Unlock() - vnetID := securityGroupIdentifier.Vpc accCfg, found := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID) if !found { return fmt.Errorf("azure account not found managing virtual network [%v]", vnetID) } + accCfg.LockMutex() + defer accCfg.UnlockMutex() + computeService := accCfg.GetServiceConfig().(*computeServiceConfig) location := computeService.credentials.region @@ -203,11 +201,6 @@ func (c *azureCloud) DeleteSecurityGroup(securityGroupIdentifier *cloudresource. } func (c *azureCloud) GetEnforcedSecurity() []cloudresource.SynchronizationContent { - mutex.Lock() - defer mutex.Unlock() - - inventoryInitWaitDuration := 30 * time.Second - var accNamespacedNames []types.NamespacedName accountConfigs := c.cloudCommon.GetCloudAccounts() for _, accCfg := range accountConfigs { @@ -239,7 +232,7 @@ func (c *azureCloud) GetEnforcedSecurity() []cloudresource.SynchronizationConten } computeService := accCfg.GetServiceConfig().(*computeServiceConfig) - if err := computeService.waitForInventoryInit(inventoryInitWaitDuration); err != nil { + if err := computeService.waitForInventoryInit(internal.InventoryInitWaitDuration); err != nil { azurePluginLogger().Error(err, "enforced-security-cloud-view GET for account skipped", "account", accCfg.GetNamespacedName()) return } diff --git a/pkg/cloudprovider/plugins/internal/accounts_common.go b/pkg/cloudprovider/plugins/internal/accounts_common.go index 8beae092..12ba8061 100644 --- a/pkg/cloudprovider/plugins/internal/accounts_common.go +++ b/pkg/cloudprovider/plugins/internal/accounts_common.go @@ -29,7 +29,8 @@ type CloudAccountInterface interface { GetNamespacedName() *types.NamespacedName GetServiceConfig() CloudServiceInterface GetStatus() *crdv1alpha1.CloudProviderAccountStatus - + LockMutex() + UnlockMutex() performInventorySync() error resetInventoryCache() } @@ -116,9 +117,6 @@ func (c *cloudCommon) updateCloudAccountConfig(client client.Client, credentials } func (accCfg *cloudAccountConfig) performInventorySync() error { - accCfg.mutex.Lock() - defer accCfg.mutex.Unlock() - err := accCfg.serviceConfig.DoResourceInventory() if err != nil { // set the error status to be used later in `CloudProviderAccount` CR. @@ -143,3 +141,11 @@ func (accCfg *cloudAccountConfig) GetStatus() *crdv1alpha1.CloudProviderAccountS func (accCfg *cloudAccountConfig) resetInventoryCache() { accCfg.serviceConfig.ResetInventoryCache() } + +func (accCfg *cloudAccountConfig) LockMutex() { + accCfg.mutex.Lock() +} + +func (accCfg *cloudAccountConfig) UnlockMutex() { + accCfg.mutex.Unlock() +} diff --git a/pkg/cloudprovider/plugins/internal/cloud_common.go b/pkg/cloudprovider/plugins/internal/cloud_common.go index 0a6cdca6..47837ced 100644 --- a/pkg/cloudprovider/plugins/internal/cloud_common.go +++ b/pkg/cloudprovider/plugins/internal/cloud_common.go @@ -19,6 +19,7 @@ import ( "reflect" "strings" "sync" + "time" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -33,6 +34,7 @@ var ( VirtualMachineRuntimeObjectKind = reflect.TypeOf(runtimev1alpha1.VirtualMachine{}).Name() MaxCloudResourceResponse int64 = 100 + InventoryInitWaitDuration = time.Second * 30 AccountCredentialsDefault = "default" ) @@ -171,6 +173,8 @@ func (c *cloudCommon) GetCloudAccountComputeInternalResourceObjects(accountNames if !found { return nil, fmt.Errorf("unable to find cloud account config") } + accCfg.LockMutex() + defer accCfg.UnlockMutex() return accCfg.GetServiceConfig().GetInternalResourceObjects(accCfg.GetNamespacedName().Namespace, accCfg.GetNamespacedName()), nil } @@ -179,6 +183,8 @@ func (c *cloudCommon) AddResourceFilters(accountNamespacedName *types.Namespaced if !found { return fmt.Errorf("unable to find cloud account config") } + accCfg.LockMutex() + defer accCfg.UnlockMutex() return accCfg.GetServiceConfig().AddResourceFilters(selector) } @@ -188,6 +194,8 @@ func (c *cloudCommon) RemoveResourceFilters(accNamespacedName, selectorNamespace c.logger().Info("Cloud account config not found", "account", *accNamespacedName, "selector", selectorNamespacedName) return } + accCfg.LockMutex() + defer accCfg.UnlockMutex() accCfg.GetServiceConfig().RemoveResourceFilters(selectorNamespacedName) } @@ -206,12 +214,10 @@ func (c *cloudCommon) DoInventoryPoll(accountNamespacedName *types.NamespacedNam if !found { return fmt.Errorf("unable to find cloud account config: %v", *accountNamespacedName) } + accCfg.LockMutex() + defer accCfg.UnlockMutex() - if err := accCfg.performInventorySync(); err != nil { - return err - } - - return nil + return accCfg.performInventorySync() } // ResetInventoryCache resets cloud snapshot and poll stats to nil. @@ -220,6 +226,8 @@ func (c *cloudCommon) ResetInventoryCache(accountNamespacedName *types.Namespace if !found { return fmt.Errorf("unable to find cloud account config %v", *accountNamespacedName) } + accCfg.LockMutex() + defer accCfg.UnlockMutex() accCfg.resetInventoryCache() return nil @@ -231,6 +239,8 @@ func (c *cloudCommon) GetVpcInventory(accountNamespacedName *types.NamespacedNam if !found { return nil, fmt.Errorf("unable to find cloud account config") } + accCfg.LockMutex() + defer accCfg.UnlockMutex() return accCfg.GetServiceConfig().GetVpcInventory(), nil }