Skip to content

Commit

Permalink
Move Plugin level lock to Account level
Browse files Browse the repository at this point in the history
Signed-off-by: Rahul Jain <rahulj@vmware.com>
  • Loading branch information
reachjainrahul committed Jun 22, 2023
1 parent 826519f commit 085bef7
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 64 deletions.
8 changes: 0 additions & 8 deletions pkg/cloudprovider/cloudresource/cloudresource.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@ var (
ControllerAppliedToPrefix string
)

var ProtocolNameNumMap = map[string]int{
"icmp": 1,
"igmp": 2,
"tcp": 6,
"udp": 17,
"icmpv6": 58,
}

// CloudResourceType specifies the type of cloud resource.
type CloudResourceType string

Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudprovider/plugins/aws/aws_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,6 @@ func convertFromIPPermissionProtocol(proto string) *int {
if strings.Compare(proto, awsAnyProtocolValue) == 0 {
return nil
}
protoNum := cloudresource.ProtocolNameNumMap[strings.ToLower(proto)]
protoNum := protocolNameNumMap[strings.ToLower(proto)]
return &protoNum
}
10 changes: 8 additions & 2 deletions pkg/cloudprovider/plugins/aws/aws_security.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,19 @@ const (
)

var (
mutex sync.Mutex

awsAnyProtocolValue = "-1"
tcpUDPPortStart = 0
tcpUDPPortEnd = 65535
)

var protocolNameNumMap = map[string]int{
"icmp": 1,
"igmp": 2,
"tcp": 6,
"udp": 17,
"icmpv6": 58,
}

var vpcIDToDefaultSecurityGroup = make(map[string]string)

func buildEc2UserIDGroupPairs(addressGroupIdentifiers []*cloudresource.CloudResourceID,
Expand Down
31 changes: 13 additions & 18 deletions pkg/cloudprovider/plugins/aws/aws_security_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,24 @@ package aws

import (
"fmt"
"sync"
"time"

"github.com/aws/aws-sdk-go/service/ec2"
"k8s.io/apimachinery/pkg/types"
"sync"

"antrea.io/nephe/pkg/cloudprovider/cloudresource"
"antrea.io/nephe/pkg/cloudprovider/plugins/internal"
"antrea.io/nephe/pkg/cloudprovider/utils"
)

// CreateSecurityGroup invokes cloud api and creates the cloud security group based on securityGroupIdentifier.
func (c *awsCloud) CreateSecurityGroup(securityGroupIdentifier *cloudresource.CloudResource, membershipOnly bool) (*string, error) {
mutex.Lock()
defer mutex.Unlock()

vpcID := securityGroupIdentifier.Vpc
accCfg, found := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID)
if !found {
return nil, fmt.Errorf("aws account not found managing virtual private cloud [%v]", vpcID)
}
accCfg.LockMutex()
defer accCfg.UnlockMutex()

cloudSgName := securityGroupIdentifier.GetCloudName(membershipOnly)
ec2Service := accCfg.GetServiceConfig().(*ec2ServiceConfig)
Expand All @@ -51,9 +49,6 @@ func (c *awsCloud) CreateSecurityGroup(securityGroupIdentifier *cloudresource.Cl
// UpdateSecurityGroupRules invokes cloud api and updates cloud security group with addRules and rmRules.
func (c *awsCloud) UpdateSecurityGroupRules(appliedToGroupIdentifier *cloudresource.CloudResource,
addRules, rmRules []*cloudresource.CloudRule) error {
mutex.Lock()
defer mutex.Unlock()

addIRule, addERule := utils.SplitCloudRulesByDirection(addRules)
rmIRule, rmERule := utils.SplitCloudRulesByDirection(rmRules)

Expand All @@ -62,6 +57,8 @@ func (c *awsCloud) UpdateSecurityGroupRules(appliedToGroupIdentifier *cloudresou
if !found {
return fmt.Errorf("aws account not found managing virtual private cloud [%v]", vpcID)
}
accCfg.LockMutex()
defer accCfg.UnlockMutex()

// build from addressGroups, cloudSgNames from rules
cloudSgNames := buildEc2CloudSgNamesFromRules(&appliedToGroupIdentifier.CloudResourceID, append(addIRule, rmIRule...),
Expand Down Expand Up @@ -124,14 +121,13 @@ func (c *awsCloud) UpdateSecurityGroupRules(appliedToGroupIdentifier *cloudresou
// UpdateSecurityGroupMembers invokes cloud api and attaches/detaches nics to/from the cloud security group.
func (c *awsCloud) UpdateSecurityGroupMembers(securityGroupIdentifier *cloudresource.CloudResource,
cloudResourceIdentifiers []*cloudresource.CloudResource, membershipOnly bool) error {
mutex.Lock()
defer mutex.Unlock()

vpcID := securityGroupIdentifier.Vpc
accCfg, found := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID)
if !found {
return fmt.Errorf("aws account not found managing virtual private cloud [%v]", vpcID)
}
accCfg.LockMutex()
defer accCfg.UnlockMutex()

// get addressGroup cloudSgID
cloudSgName := securityGroupIdentifier.GetCloudName(membershipOnly)
Expand All @@ -158,14 +154,13 @@ func (c *awsCloud) UpdateSecurityGroupMembers(securityGroupIdentifier *cloudreso

// DeleteSecurityGroup invokes cloud api and deletes the cloud security group. Any attached resource will be moved to default sg.
func (c *awsCloud) DeleteSecurityGroup(securityGroupIdentifier *cloudresource.CloudResource, membershipOnly bool) error {
mutex.Lock()
defer mutex.Unlock()

vpcID := securityGroupIdentifier.Vpc
accCfg, found := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID)
if !found {
return fmt.Errorf("aws account not found managing virtual private cloud [%v]", vpcID)
}
accCfg.LockMutex()
defer accCfg.UnlockMutex()

// check if sg exists in cloud and get its cloud sg id to delete
vpcIDs := []string{vpcID}
Expand Down Expand Up @@ -196,8 +191,6 @@ func (c *awsCloud) DeleteSecurityGroup(securityGroupIdentifier *cloudresource.Cl
}

func (c *awsCloud) GetEnforcedSecurity() []cloudresource.SynchronizationContent {
inventoryInitWaitDuration := 30 * time.Second

var accNamespacedNames []types.NamespacedName
accountConfigs := c.cloudCommon.GetCloudAccounts()
for _, accCfg := range accountConfigs {
Expand Down Expand Up @@ -227,9 +220,11 @@ func (c *awsCloud) GetEnforcedSecurity() []cloudresource.SynchronizationContent
awsPluginLogger().Info("Enforced-security-cloud-view GET for account skipped (account no longer exists)", "account", name)
return
}
accCfg.LockMutex()
defer accCfg.UnlockMutex()

ec2Service := accCfg.GetServiceConfig().(*ec2ServiceConfig)
if err := ec2Service.waitForInventoryInit(inventoryInitWaitDuration); err != nil {
if err := ec2Service.waitForInventoryInit(internal.InventoryInitWaitDuration); err != nil {
awsPluginLogger().Error(err, "Enforced-security-cloud-view GET for account skipped", "account", accCfg.GetNamespacedName())
return
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/cloudprovider/plugins/azure/azure_security.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ import (
"antrea.io/nephe/pkg/cloudprovider/utils"
)

var (
mutex sync.Mutex
)

type networkInterfaceInternal struct {
armnetwork.Interface
vnetID string
Expand Down
37 changes: 15 additions & 22 deletions pkg/cloudprovider/plugins/azure/azure_security_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,29 @@ package azure
import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork"
"github.com/Azure/go-autorest/autorest/to"
"k8s.io/apimachinery/pkg/types"
"strings"
"sync"

"antrea.io/nephe/pkg/cloudprovider/cloudresource"
"antrea.io/nephe/pkg/cloudprovider/plugins/internal"
)

// CreateSecurityGroup invokes cloud api and creates the cloud security group based on securityGroupIdentifier.
// For addressGroup it will attempt to create an asg, for appliedTo groups it will attempt to create both nsg and asg.
func (c *azureCloud) CreateSecurityGroup(securityGroupIdentifier *cloudresource.CloudResource, membershipOnly bool) (*string, error) {
mutex.Lock()
defer mutex.Unlock()
var cloudSecurityGroupID string

// find account managing the vnet
vnetID := securityGroupIdentifier.Vpc
accCfg, found := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID)
if !found {
azurePluginLogger().Info("Azure account not found managing virtual network", vnetID, "vnetID")
return nil, fmt.Errorf("azure account not found managing virtual network [%v]", vnetID)
}
accCfg.LockMutex()
defer accCfg.UnlockMutex()

// extract resource-group-name from vnet ID
_, rgName, _, err := extractFieldsFromAzureResourceID(securityGroupIdentifier.Vpc)
Expand Down Expand Up @@ -84,15 +82,15 @@ func (c *azureCloud) CreateSecurityGroup(securityGroupIdentifier *cloudresource.
// UpdateSecurityGroupRules invokes cloud api and updates cloud security group with allRules.
func (c *azureCloud) UpdateSecurityGroupRules(appliedToGroupIdentifier *cloudresource.CloudResource,
addRules, rmRules []*cloudresource.CloudRule) error {
mutex.Lock()
defer mutex.Unlock()

// find account managing the vnet and get compute service config
vnetID := appliedToGroupIdentifier.Vpc
accCfg, found := c.cloudCommon.GetCloudAccountByAccountId(&appliedToGroupIdentifier.AccountID)
if !found {
return fmt.Errorf("azure account not found managing virtual network [%v]", vnetID)
}
accCfg.LockMutex()
defer accCfg.UnlockMutex()

computeService := accCfg.GetServiceConfig().(*computeServiceConfig)
location := computeService.credentials.region

Expand Down Expand Up @@ -151,28 +149,28 @@ func (c *azureCloud) UpdateSecurityGroupRules(appliedToGroupIdentifier *cloudres
// UpdateSecurityGroupMembers invokes cloud api and attaches/detaches nics to/from the cloud security group.
func (c *azureCloud) UpdateSecurityGroupMembers(securityGroupIdentifier *cloudresource.CloudResource,
computeResourceIdentifier []*cloudresource.CloudResource, membershipOnly bool) error {
mutex.Lock()
defer mutex.Unlock()

vnetID := securityGroupIdentifier.Vpc
accCfg, found := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID)
if !found {
return fmt.Errorf("azure account not found managing virtual network [%v]", vnetID)
}
accCfg.LockMutex()
defer accCfg.UnlockMutex()

computeService := accCfg.GetServiceConfig().(*computeServiceConfig)
return computeService.updateSecurityGroupMembers(&securityGroupIdentifier.CloudResourceID, computeResourceIdentifier, membershipOnly)
}

// DeleteSecurityGroup invokes cloud api and deletes the cloud application security group.
func (c *azureCloud) DeleteSecurityGroup(securityGroupIdentifier *cloudresource.CloudResource, membershipOnly bool) error {
mutex.Lock()
defer mutex.Unlock()

vnetID := securityGroupIdentifier.Vpc
accCfg, found := c.cloudCommon.GetCloudAccountByAccountId(&securityGroupIdentifier.AccountID)
if !found {
return fmt.Errorf("azure account not found managing virtual network [%v]", vnetID)
}
accCfg.LockMutex()
defer accCfg.UnlockMutex()

computeService := accCfg.GetServiceConfig().(*computeServiceConfig)
location := computeService.credentials.region

Expand Down Expand Up @@ -203,11 +201,6 @@ func (c *azureCloud) DeleteSecurityGroup(securityGroupIdentifier *cloudresource.
}

func (c *azureCloud) GetEnforcedSecurity() []cloudresource.SynchronizationContent {
mutex.Lock()
defer mutex.Unlock()

inventoryInitWaitDuration := 30 * time.Second

var accNamespacedNames []types.NamespacedName
accountConfigs := c.cloudCommon.GetCloudAccounts()
for _, accCfg := range accountConfigs {
Expand Down Expand Up @@ -239,7 +232,7 @@ func (c *azureCloud) GetEnforcedSecurity() []cloudresource.SynchronizationConten
}

computeService := accCfg.GetServiceConfig().(*computeServiceConfig)
if err := computeService.waitForInventoryInit(inventoryInitWaitDuration); err != nil {
if err := computeService.waitForInventoryInit(internal.InventoryInitWaitDuration); err != nil {
azurePluginLogger().Error(err, "enforced-security-cloud-view GET for account skipped", "account", accCfg.GetNamespacedName())
return
}
Expand Down
14 changes: 10 additions & 4 deletions pkg/cloudprovider/plugins/internal/accounts_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type CloudAccountInterface interface {
GetNamespacedName() *types.NamespacedName
GetServiceConfig() CloudServiceInterface
GetStatus() *crdv1alpha1.CloudProviderAccountStatus

LockMutex()
UnlockMutex()
performInventorySync() error
resetInventoryCache()
}
Expand Down Expand Up @@ -116,9 +117,6 @@ func (c *cloudCommon) updateCloudAccountConfig(client client.Client, credentials
}

func (accCfg *cloudAccountConfig) performInventorySync() error {
accCfg.mutex.Lock()
defer accCfg.mutex.Unlock()

err := accCfg.serviceConfig.DoResourceInventory()
if err != nil {
// set the error status to be used later in `CloudProviderAccount` CR.
Expand All @@ -143,3 +141,11 @@ func (accCfg *cloudAccountConfig) GetStatus() *crdv1alpha1.CloudProviderAccountS
func (accCfg *cloudAccountConfig) resetInventoryCache() {
accCfg.serviceConfig.ResetInventoryCache()
}

func (accCfg *cloudAccountConfig) LockMutex() {
accCfg.mutex.Lock()
}

func (accCfg *cloudAccountConfig) UnlockMutex() {
accCfg.mutex.Unlock()
}
20 changes: 15 additions & 5 deletions pkg/cloudprovider/plugins/internal/cloud_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"reflect"
"strings"
"sync"
"time"

"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -33,6 +34,7 @@ var (
VirtualMachineRuntimeObjectKind = reflect.TypeOf(runtimev1alpha1.VirtualMachine{}).Name()

MaxCloudResourceResponse int64 = 100
InventoryInitWaitDuration = time.Second * 30
AccountCredentialsDefault = "default"
)

Expand Down Expand Up @@ -171,6 +173,8 @@ func (c *cloudCommon) GetCloudAccountComputeInternalResourceObjects(accountNames
if !found {
return nil, fmt.Errorf("unable to find cloud account config")
}
accCfg.LockMutex()
defer accCfg.UnlockMutex()
return accCfg.GetServiceConfig().GetInternalResourceObjects(accCfg.GetNamespacedName().Namespace, accCfg.GetNamespacedName()), nil
}

Expand All @@ -179,6 +183,8 @@ func (c *cloudCommon) AddResourceFilters(accountNamespacedName *types.Namespaced
if !found {
return fmt.Errorf("unable to find cloud account config")
}
accCfg.LockMutex()
defer accCfg.UnlockMutex()
return accCfg.GetServiceConfig().AddResourceFilters(selector)
}

Expand All @@ -188,6 +194,8 @@ func (c *cloudCommon) RemoveResourceFilters(accNamespacedName, selectorNamespace
c.logger().Info("Cloud account config not found", "account", *accNamespacedName, "selector", selectorNamespacedName)
return
}
accCfg.LockMutex()
defer accCfg.UnlockMutex()
accCfg.GetServiceConfig().RemoveResourceFilters(selectorNamespacedName)
}

Expand All @@ -206,12 +214,10 @@ func (c *cloudCommon) DoInventoryPoll(accountNamespacedName *types.NamespacedNam
if !found {
return fmt.Errorf("unable to find cloud account config: %v", *accountNamespacedName)
}
accCfg.LockMutex()
defer accCfg.UnlockMutex()

if err := accCfg.performInventorySync(); err != nil {
return err
}

return nil
return accCfg.performInventorySync()
}

// ResetInventoryCache resets cloud snapshot and poll stats to nil.
Expand All @@ -220,6 +226,8 @@ func (c *cloudCommon) ResetInventoryCache(accountNamespacedName *types.Namespace
if !found {
return fmt.Errorf("unable to find cloud account config %v", *accountNamespacedName)
}
accCfg.LockMutex()
defer accCfg.UnlockMutex()

accCfg.resetInventoryCache()
return nil
Expand All @@ -231,6 +239,8 @@ func (c *cloudCommon) GetVpcInventory(accountNamespacedName *types.NamespacedNam
if !found {
return nil, fmt.Errorf("unable to find cloud account config")
}
accCfg.LockMutex()
defer accCfg.UnlockMutex()

return accCfg.GetServiceConfig().GetVpcInventory(), nil
}

0 comments on commit 085bef7

Please sign in to comment.