Skip to content

Commit

Permalink
Restart account poller after all CES are added after nephe restart.
Browse files Browse the repository at this point in the history
- On nephe restart, In order to avoid poller restart on CPA and every CES add,
postpone restart till all CES attached to the CPA are reconciled.
- Fix logging in CES webhook.

Signed-off-by: Archana Holla <harchana@vmware.com>
  • Loading branch information
archanapholla committed Jul 6, 2023
1 parent b4f9763 commit 54c66b3
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 30 deletions.
41 changes: 33 additions & 8 deletions pkg/accountmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type AccountManager struct {
Inventory inventory.Interface
accPollers map[types.NamespacedName]*accountPoller
accountConfigMap map[types.NamespacedName]*accountConfig
// accountToNumSelectors stores number of configured CloudEntitySelector CRs for a given account, required for handling nephe restart case.
accountToNumSelectors map[types.NamespacedName]int
}

type accountConfig struct {
Expand All @@ -80,6 +82,7 @@ func (a *AccountManager) ConfigureAccountManager() {
// Init maps.
a.accPollers = make(map[types.NamespacedName]*accountPoller)
a.accountConfigMap = make(map[types.NamespacedName]*accountConfig)
a.accountToNumSelectors = make(map[types.NamespacedName]int)
}

// AddAccount consumes CloudProviderAccount CR and calls cloud plugin to add account. It also creates and starts account
Expand All @@ -102,11 +105,14 @@ func (a *AccountManager) AddAccount(namespacedName *types.NamespacedName, accoun
// Create an account poller for polling cloud inventory.
accPoller, exists := a.addAccountPoller(cloudInterface, namespacedName, account)
if !exists {
if !util.DoesCesCrExistsForAccount(a.Client, namespacedName) {
numCes := util.GetNumberOfCesCrForAccount(a.Client, namespacedName)
a.Log.Info("Test:", "number of ces", numCes)
if numCes == 0 {
a.Log.Info("Starting account poller", "account", namespacedName)
go wait.Until(accPoller.doAccountPolling, time.Duration(accPoller.pollIntvInSeconds)*time.Second, accPoller.ch)
} else {
a.Log.V(1).Info("Ignoring start of account poller", "account", namespacedName)
a.accountToNumSelectors[*namespacedName] = numCes
if ctrlsync.GetControllerSyncStatusInstance().IsControllerSynced(ctrlsync.ControllerTypeCPA) && !config.initialized {
// Replay CES CR only when account init state is changed from failure to success.
a.replaySelectorsForAccount(namespacedName, config)
Expand Down Expand Up @@ -159,30 +165,51 @@ func (a *AccountManager) RemoveAccount(namespacedName *types.NamespacedName) err
// restart poller once done.
func (a *AccountManager) AddResourceFiltersToAccount(accNamespacedName *types.NamespacedName,
selectorNamespacedName *types.NamespacedName, selector *crdv1alpha1.CloudEntitySelector, replay bool) (bool, error) {
var retError error = nil

defer func() {
// Upon restart, wait for all CES to be added before starting cloud inventory poll.
// accountToSelectorMap holds number of CES CRs only in restart case if any CES CR is present in etcd.
pendingCes, found := a.accountToNumSelectors[*accNamespacedName]
if found {
pendingCes--
a.accountToNumSelectors[*accNamespacedName] = pendingCes
}
if !found || pendingCes == 0 {
delete(a.accountToNumSelectors, *accNamespacedName)
accPoller, exists := a.getAccountPoller(accNamespacedName)
if exists {
accPoller.restartPoller(accNamespacedName)
// wait for polling to complete after restart.
retError = accPoller.waitForPollDone(accNamespacedName)
}
}
}()

cloudProviderType, ok := a.getAccountProviderType(accNamespacedName)
if !ok {
return true, fmt.Errorf(fmt.Sprintf("failed to add or update selector %v, account %v: "+
retError = fmt.Errorf(fmt.Sprintf("failed to add or update selector %v, account %v: "+
"provider type not found", selectorNamespacedName, accNamespacedName))
}
cloudInterface, _ := cloud.GetCloudInterface(cloudProviderType)
if !replay {
// Update the selector config only when the reconciler processes the CR request.
if err := a.addSelectorToAccountConfig(accNamespacedName, selectorNamespacedName, selector); err != nil {
return false, fmt.Errorf(fmt.Sprintf("failed to add or update selector %v, %v: %v",
retError = fmt.Errorf(fmt.Sprintf("failed to add or update selector %v, %v: %v",
selectorNamespacedName, accNamespacedName, err))
}
}

if err := cloudInterface.AddAccountResourceSelector(accNamespacedName, selector); err != nil {
return false, fmt.Errorf(fmt.Sprintf("failed to add or update selector %v, account %v: %v",
retError = fmt.Errorf(fmt.Sprintf("failed to add or update selector %v, account %v: %v",
selectorNamespacedName, accNamespacedName, err))
}

// Fetch and restart account poller as selector has changed.
accPoller, exists := a.getAccountPoller(accNamespacedName)
if !exists {
// Account poller may not exist when account is not successfully initialized.
return false, fmt.Errorf(fmt.Sprintf("failed to add or update selector %v, account %v: %v",
retError = fmt.Errorf(fmt.Sprintf("failed to add or update selector %v, account %v: %v",
selectorNamespacedName, accNamespacedName, errorMsgAccountPollerNotFound))
}

Expand All @@ -192,9 +219,7 @@ func (a *AccountManager) AddResourceFiltersToAccount(accNamespacedName *types.Na
accPoller.addOrUpdateSelector(selectorCopy)
}

accPoller.restartPoller(accNamespacedName)
// wait for polling to complete after restart.
return false, accPoller.waitForPollDone(accNamespacedName)
return false, retError
}

// RemoveResourceFiltersFromAccount removes selector from cloud plugin and restart the poller.
Expand Down
8 changes: 4 additions & 4 deletions pkg/apiserver/webhook/cloudentityselector_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ func (v *CESMutator) Handle(_ context.Context, req admission.Request) admission.
referencedAccount := &v1alpha1.CloudProviderAccount{}
err = v.Client.Get(context.TODO(), *accountNameSpacedName, referencedAccount)
if err != nil {
v.Log.Error(err, errorMsgReferencedAccountNotFound, "CloudEntitySelector", selector, "Account name", selector.Spec.AccountName,
"Account namespace", selector.Spec.AccountNamespace)
return admission.Errored(http.StatusBadRequest, fmt.Errorf("%s, account name : %s, account namespace: %s",
errorMsgReferencedAccountNotFound, selector.Spec.AccountName, selector.Spec.AccountNamespace))
errorMsg := fmt.Sprintf("%s, account name : %s, account namespace: %s",
errorMsgReferencedAccountNotFound, selector.Spec.AccountName, selector.Spec.AccountNamespace)
v.Log.Error(err, errorMsg, "CloudEntitySelector", selector)
return admission.Errored(http.StatusBadRequest, fmt.Errorf(errorMsg))
}
cloudProviderType, err := util.GetAccountProviderType(referencedAccount)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/cloudprovider/plugins/aws/aws_ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type ec2ServiceConfig struct {
type ec2ResourcesCacheSnapshot struct {
vms map[types.NamespacedName][]*ec2.Instance
vpcs []*ec2.Vpc
managedVpcIDs map[string]struct{}
managedVpcIds map[string]struct{}
vpcNameToID map[string]string
vpcPeers map[string][]string
}
Expand Down Expand Up @@ -127,7 +127,7 @@ func (ec2Cfg *ec2ServiceConfig) getManagedVpcIDs() map[string]struct{} {
awsPluginLogger().V(4).Info("Cache snapshot nil", "type", providerType, "account", ec2Cfg.accountNamespacedName)
return vpcIDsCopy
}
vpcIDsSet := snapshot.(*ec2ResourcesCacheSnapshot).managedVpcIDs
vpcIDsSet := snapshot.(*ec2ResourcesCacheSnapshot).managedVpcIds

for vpcID := range vpcIDsSet {
vpcIDsCopy[vpcID] = struct{}{}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cloudprovider/plugins/azure/azure_compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type computeResourcesCacheSnapshot struct {
// vm resources for each CloudEntitySelector CR.
vms map[types.NamespacedName][]*virtualMachineTable
vnets []armnetwork.VirtualNetwork
managedVnetIDs map[string]struct{}
managedVnetIds map[string]struct{}
vnetPeers map[string][][]string
}

Expand Down Expand Up @@ -166,7 +166,7 @@ func (computeCfg *computeServiceConfig) getManagedVnetIDs() map[string]struct{}
"type", providerType, "account", computeCfg.accountNamespacedName)
return vnetIDsCopy
}
vnetIDsSet := snapshot.(*computeResourcesCacheSnapshot).managedVnetIDs
vnetIDsSet := snapshot.(*computeResourcesCacheSnapshot).managedVnetIds

for vnetID := range vnetIDsSet {
vnetIDsCopy[vnetID] = struct{}{}
Expand Down
6 changes: 2 additions & 4 deletions pkg/controllers/cloudentityselector/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,9 @@ func (r *CloudEntitySelectorReconciler) processCreateOrUpdate(selector *crdv1alp
Name: selector.Spec.AccountName,
}
r.selectorToAccountMap[*selectorNamespacedName] = *accountNamespacedName
retry, err := r.AccManager.AddResourceFiltersToAccount(accountNamespacedName, selectorNamespacedName,
// No errors can be retried for a CES.
_, err := r.AccManager.AddResourceFiltersToAccount(accountNamespacedName, selectorNamespacedName,
selector, false)
if err != nil && retry {
return err
}
r.updateStatus(selectorNamespacedName, err)
return nil
}
Expand Down
18 changes: 8 additions & 10 deletions pkg/util/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,18 @@ func GetAccountProviderType(account *crdv1alpha1.CloudProviderAccount) (runtimev
}
}

// DoesCesCrExistsForAccount returns true if there is a CloudEntitySelector CR for a given account.
func DoesCesCrExistsForAccount(k8sClient client.Client, namespacedName *types.NamespacedName) bool {
// GetNumberOfCesCrForAccount returns number of CloudEntitySelector CR for a given account.
func GetNumberOfCesCrForAccount(k8sClient client.Client, namespacedName *types.NamespacedName) int {
var numCes int
cesList := &crdv1alpha1.CloudEntitySelectorList{}
listOptions := &client.ListOptions{
Namespace: namespacedName.Namespace,
}
if err := k8sClient.List(context.TODO(), cesList, listOptions); err != nil {
return false
if err := k8sClient.List(context.TODO(), cesList); err != nil {
return numCes
}

for _, ces := range cesList.Items {
if ces.Spec.AccountName == namespacedName.Name {
return true
if ces.Spec.AccountName == namespacedName.Name && ces.Spec.AccountNamespace == namespacedName.Namespace {
numCes++
}
}
return false
return numCes
}

0 comments on commit 54c66b3

Please sign in to comment.