diff --git a/pkg/accountmanager/manager.go b/pkg/accountmanager/manager.go index 58b5bc75..a704c217 100644 --- a/pkg/accountmanager/manager.go +++ b/pkg/accountmanager/manager.go @@ -60,6 +60,8 @@ type AccountManager struct { Inventory inventory.Interface accPollers map[types.NamespacedName]*accountPoller accountConfigMap map[types.NamespacedName]*accountConfig + // accountToNumSelectors stores number of configured CloudEntitySelector CRs for a given account, required for handling nephe restart case. + accountToNumSelectors map[types.NamespacedName]int } type accountConfig struct { @@ -80,6 +82,7 @@ func (a *AccountManager) ConfigureAccountManager() { // Init maps. a.accPollers = make(map[types.NamespacedName]*accountPoller) a.accountConfigMap = make(map[types.NamespacedName]*accountConfig) + a.accountToNumSelectors = make(map[types.NamespacedName]int) } // AddAccount consumes CloudProviderAccount CR and calls cloud plugin to add account. It also creates and starts account @@ -102,11 +105,14 @@ func (a *AccountManager) AddAccount(namespacedName *types.NamespacedName, accoun // Create an account poller for polling cloud inventory. accPoller, exists := a.addAccountPoller(cloudInterface, namespacedName, account) if !exists { - if !util.DoesCesCrExistsForAccount(a.Client, namespacedName) { + numCes := util.GetNumberOfCesCrForAccount(a.Client, namespacedName) + a.Log.Info("Test:", "number of ces", numCes) + if numCes == 0 { a.Log.Info("Starting account poller", "account", namespacedName) go wait.Until(accPoller.doAccountPolling, time.Duration(accPoller.pollIntvInSeconds)*time.Second, accPoller.ch) } else { a.Log.V(1).Info("Ignoring start of account poller", "account", namespacedName) + a.accountToNumSelectors[*namespacedName] = numCes if ctrlsync.GetControllerSyncStatusInstance().IsControllerSynced(ctrlsync.ControllerTypeCPA) && !config.initialized { // Replay CES CR only when account init state is changed from failure to success. a.replaySelectorsForAccount(namespacedName, config) @@ -159,22 +165,43 @@ func (a *AccountManager) RemoveAccount(namespacedName *types.NamespacedName) err // restart poller once done. func (a *AccountManager) AddResourceFiltersToAccount(accNamespacedName *types.NamespacedName, selectorNamespacedName *types.NamespacedName, selector *crdv1alpha1.CloudEntitySelector, replay bool) (bool, error) { + var retError error = nil + + defer func() { + // Upon restart, wait for all CES to be added before starting cloud inventory poll. + // accountToSelectorMap holds number of CES CRs only in restart case if any CES CR is present in etcd. + pendingCes, found := a.accountToNumSelectors[*accNamespacedName] + if found { + pendingCes-- + a.accountToNumSelectors[*accNamespacedName] = pendingCes + } + if !found || pendingCes == 0 { + delete(a.accountToNumSelectors, *accNamespacedName) + accPoller, exists := a.getAccountPoller(accNamespacedName) + if exists { + accPoller.restartPoller(accNamespacedName) + // wait for polling to complete after restart. + retError = accPoller.waitForPollDone(accNamespacedName) + } + } + }() + cloudProviderType, ok := a.getAccountProviderType(accNamespacedName) if !ok { - return true, fmt.Errorf(fmt.Sprintf("failed to add or update selector %v, account %v: "+ + retError = fmt.Errorf(fmt.Sprintf("failed to add or update selector %v, account %v: "+ "provider type not found", selectorNamespacedName, accNamespacedName)) } cloudInterface, _ := cloud.GetCloudInterface(cloudProviderType) if !replay { // Update the selector config only when the reconciler processes the CR request. if err := a.addSelectorToAccountConfig(accNamespacedName, selectorNamespacedName, selector); err != nil { - return false, fmt.Errorf(fmt.Sprintf("failed to add or update selector %v, %v: %v", + retError = fmt.Errorf(fmt.Sprintf("failed to add or update selector %v, %v: %v", selectorNamespacedName, accNamespacedName, err)) } } if err := cloudInterface.AddAccountResourceSelector(accNamespacedName, selector); err != nil { - return false, fmt.Errorf(fmt.Sprintf("failed to add or update selector %v, account %v: %v", + retError = fmt.Errorf(fmt.Sprintf("failed to add or update selector %v, account %v: %v", selectorNamespacedName, accNamespacedName, err)) } @@ -182,7 +209,7 @@ func (a *AccountManager) AddResourceFiltersToAccount(accNamespacedName *types.Na accPoller, exists := a.getAccountPoller(accNamespacedName) if !exists { // Account poller may not exist when account is not successfully initialized. - return false, fmt.Errorf(fmt.Sprintf("failed to add or update selector %v, account %v: %v", + retError = fmt.Errorf(fmt.Sprintf("failed to add or update selector %v, account %v: %v", selectorNamespacedName, accNamespacedName, errorMsgAccountPollerNotFound)) } @@ -192,9 +219,7 @@ func (a *AccountManager) AddResourceFiltersToAccount(accNamespacedName *types.Na accPoller.addOrUpdateSelector(selectorCopy) } - accPoller.restartPoller(accNamespacedName) - // wait for polling to complete after restart. - return false, accPoller.waitForPollDone(accNamespacedName) + return false, retError } // RemoveResourceFiltersFromAccount removes selector from cloud plugin and restart the poller. diff --git a/pkg/apiserver/webhook/cloudentityselector_webhook.go b/pkg/apiserver/webhook/cloudentityselector_webhook.go index b5e193da..2ba36c17 100644 --- a/pkg/apiserver/webhook/cloudentityselector_webhook.go +++ b/pkg/apiserver/webhook/cloudentityselector_webhook.go @@ -83,10 +83,10 @@ func (v *CESMutator) Handle(_ context.Context, req admission.Request) admission. referencedAccount := &v1alpha1.CloudProviderAccount{} err = v.Client.Get(context.TODO(), *accountNameSpacedName, referencedAccount) if err != nil { - v.Log.Error(err, errorMsgReferencedAccountNotFound, "CloudEntitySelector", selector, "Account name", selector.Spec.AccountName, - "Account namespace", selector.Spec.AccountNamespace) - return admission.Errored(http.StatusBadRequest, fmt.Errorf("%s, account name : %s, account namespace: %s", - errorMsgReferencedAccountNotFound, selector.Spec.AccountName, selector.Spec.AccountNamespace)) + errorMsg := fmt.Sprintf("%s, account name : %s, account namespace: %s", + errorMsgReferencedAccountNotFound, selector.Spec.AccountName, selector.Spec.AccountNamespace) + v.Log.Error(err, errorMsg, "CloudEntitySelector", selector) + return admission.Errored(http.StatusBadRequest, fmt.Errorf(errorMsg)) } cloudProviderType, err := util.GetAccountProviderType(referencedAccount) if err != nil { diff --git a/pkg/cloudprovider/plugins/aws/aws_ec2.go b/pkg/cloudprovider/plugins/aws/aws_ec2.go index 07475494..6d21e44d 100644 --- a/pkg/cloudprovider/plugins/aws/aws_ec2.go +++ b/pkg/cloudprovider/plugins/aws/aws_ec2.go @@ -46,7 +46,7 @@ type ec2ServiceConfig struct { type ec2ResourcesCacheSnapshot struct { vms map[types.NamespacedName][]*ec2.Instance vpcs []*ec2.Vpc - managedVpcIDs map[string]struct{} + managedVpcIds map[string]struct{} vpcNameToID map[string]string vpcPeers map[string][]string } @@ -127,7 +127,7 @@ func (ec2Cfg *ec2ServiceConfig) getManagedVpcIDs() map[string]struct{} { awsPluginLogger().V(4).Info("Cache snapshot nil", "type", providerType, "account", ec2Cfg.accountNamespacedName) return vpcIDsCopy } - vpcIDsSet := snapshot.(*ec2ResourcesCacheSnapshot).managedVpcIDs + vpcIDsSet := snapshot.(*ec2ResourcesCacheSnapshot).managedVpcIds for vpcID := range vpcIDsSet { vpcIDsCopy[vpcID] = struct{}{} diff --git a/pkg/cloudprovider/plugins/azure/azure_compute.go b/pkg/cloudprovider/plugins/azure/azure_compute.go index 33d30409..a9757ed5 100644 --- a/pkg/cloudprovider/plugins/azure/azure_compute.go +++ b/pkg/cloudprovider/plugins/azure/azure_compute.go @@ -50,7 +50,7 @@ type computeResourcesCacheSnapshot struct { // vm resources for each CloudEntitySelector CR. vms map[types.NamespacedName][]*virtualMachineTable vnets []armnetwork.VirtualNetwork - managedVnetIDs map[string]struct{} + managedVnetIds map[string]struct{} vnetPeers map[string][][]string } @@ -166,7 +166,7 @@ func (computeCfg *computeServiceConfig) getManagedVnetIDs() map[string]struct{} "type", providerType, "account", computeCfg.accountNamespacedName) return vnetIDsCopy } - vnetIDsSet := snapshot.(*computeResourcesCacheSnapshot).managedVnetIDs + vnetIDsSet := snapshot.(*computeResourcesCacheSnapshot).managedVnetIds for vnetID := range vnetIDsSet { vnetIDsCopy[vnetID] = struct{}{} diff --git a/pkg/controllers/cloudentityselector/controller.go b/pkg/controllers/cloudentityselector/controller.go index 3154ea9f..9267546c 100644 --- a/pkg/controllers/cloudentityselector/controller.go +++ b/pkg/controllers/cloudentityselector/controller.go @@ -138,11 +138,9 @@ func (r *CloudEntitySelectorReconciler) processCreateOrUpdate(selector *crdv1alp Name: selector.Spec.AccountName, } r.selectorToAccountMap[*selectorNamespacedName] = *accountNamespacedName - retry, err := r.AccManager.AddResourceFiltersToAccount(accountNamespacedName, selectorNamespacedName, + // No errors can be retried for a CES. + _, err := r.AccManager.AddResourceFiltersToAccount(accountNamespacedName, selectorNamespacedName, selector, false) - if err != nil && retry { - return err - } r.updateStatus(selectorNamespacedName, err) return nil } diff --git a/pkg/util/helpers.go b/pkg/util/helpers.go index d1ae45b4..a11e82e5 100644 --- a/pkg/util/helpers.go +++ b/pkg/util/helpers.go @@ -54,20 +54,18 @@ func GetAccountProviderType(account *crdv1alpha1.CloudProviderAccount) (runtimev } } -// DoesCesCrExistsForAccount returns true if there is a CloudEntitySelector CR for a given account. -func DoesCesCrExistsForAccount(k8sClient client.Client, namespacedName *types.NamespacedName) bool { +// GetNumberOfCesCrForAccount returns number of CloudEntitySelector CR for a given account. +func GetNumberOfCesCrForAccount(k8sClient client.Client, namespacedName *types.NamespacedName) int { + var numCes int cesList := &crdv1alpha1.CloudEntitySelectorList{} - listOptions := &client.ListOptions{ - Namespace: namespacedName.Namespace, - } - if err := k8sClient.List(context.TODO(), cesList, listOptions); err != nil { - return false + if err := k8sClient.List(context.TODO(), cesList); err != nil { + return numCes } for _, ces := range cesList.Items { - if ces.Spec.AccountName == namespacedName.Name { - return true + if ces.Spec.AccountName == namespacedName.Name && ces.Spec.AccountNamespace == namespacedName.Namespace { + numCes++ } } - return false + return numCes }