Skip to content

Commit

Permalink
Update StoragePolicyUsage based on CnsVolumeInfo AggregatedSnapshotSi…
Browse files Browse the repository at this point in the history
…ze sum (#2974)
  • Loading branch information
nikhilbarge authored Jul 30, 2024
1 parent 71db576 commit 2bb14e7
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 47 deletions.
27 changes: 16 additions & 11 deletions pkg/syncer/fullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,27 @@ func CsiFullSync(ctx context.Context, metadataSyncer *metadataSyncInformer, vc s
migrationFeatureStateForFullSync = true
}
}
// Attempt to create StoragePolicyUsage CRs.
if metadataSyncer.clusterFlavor == cnstypes.CnsClusterFlavorWorkload {
if IsPodVMOnStretchSupervisorFSSEnabled {
createStoragePolicyUsageCRS(ctx, metadataSyncer)
}
}
// Sync VolumeInfo CRs for the below conditions:
// Either it is a Vanilla k8s deployment with Multi-VC configuration or, it's a StretchSupervisor cluster
if isMultiVCenterFssEnabled && len(metadataSyncer.configInfo.Cfg.VirtualCenter) > 1 ||
(metadataSyncer.clusterFlavor == cnstypes.CnsClusterFlavorWorkload && IsPodVMOnStretchSupervisorFSSEnabled) {
volumeInfoCRFullSync(ctx, metadataSyncer, vc)
cleanUpVolumeInfoCrDeletionMap(ctx, metadataSyncer, vc)
}
// Attempt to create & patch StoragePolicyUsage CRs. For storagePolicyUsageCRSync to work,
// Attempt to patch StoragePolicyUsage CRs. For storagePolicyUsageCRSync to work,
// we need CNSVolumeInfo CRs to be present for all existing volumes.
if metadataSyncer.clusterFlavor == cnstypes.CnsClusterFlavorWorkload {
if IsPodVMOnStretchSupervisorFSSEnabled {
storagePolicyUsageCRSync(ctx, metadataSyncer)
}
}

defer func() {
fullSyncStatus := prometheus.PrometheusPassStatus
if err != nil {
Expand Down Expand Up @@ -245,22 +252,20 @@ func CsiFullSync(ctx context.Context, metadataSyncer *metadataSyncInformer, vc s
log.Errorf("FullSync for VC %s: QueryVolume failed with err=%+v", vc, err.Error())
return err
}

}
if metadataSyncer.clusterFlavor == cnstypes.CnsClusterFlavorWorkload && isStorageQuotaM2FSSEnabled {
cnsVolumeMap := make(map[string]cnstypes.CnsVolume)
for _, vol := range queryAllResult.Volumes {
cnsVolumeMap[vol.VolumeId.Id] = vol
}
if isStorageQuotaM2FSSEnabled {
log.Infof("calling validateAndCorrectVolumeInfoSnapshotDetails with %d volumes", len(cnsVolumeMap))
err = validateAndCorrectVolumeInfoSnapshotDetails(ctx, cnsVolumeMap)
if err != nil {
log.Errorf("FullSync for VC %s: Error while sync CNSVolumeinfo snapshot details, failed with err=%+v",
vc, err.Error())
return err
}
log.Infof("calling validateAndCorrectVolumeInfoSnapshotDetails with %d volumes", len(cnsVolumeMap))
err = validateAndCorrectVolumeInfoSnapshotDetails(ctx, cnsVolumeMap)
if err != nil {
log.Errorf("FullSync for VC %s: Error while sync CNSVolumeinfo snapshot details, failed with err=%+v",
vc, err.Error())
return err
}
}

vcHostObj, vcHostObjFound := metadataSyncer.configInfo.Cfg.VirtualCenter[vc]
if !vcHostObjFound {
log.Errorf("FullSync for VC %s: Failed to get VC host object.", vc)
Expand Down
115 changes: 79 additions & 36 deletions pkg/syncer/metadatasyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3101,7 +3101,7 @@ func startStoragePolicyQuotaCRInformer(ctx context.Context, cfg *restclient.Conf

// storagePolicyUsageCRSync patches StoragePolicyUsage CRs for k8s PVs in Bound state.
// This method also creates StoragePolicyUsage CRs when the CR is not found for PVCs in Pending state.
func storagePolicyUsageCRSync(ctx context.Context, metadataSyncer *metadataSyncInformer) {
func createStoragePolicyUsageCRS(ctx context.Context, metadataSyncer *metadataSyncInformer) {
log := logger.GetLogger(ctx)
log.Infof("storagePolicyUsageCRSync: Starting storage policy usage CR sync")

Expand Down Expand Up @@ -3224,7 +3224,22 @@ func storagePolicyUsageCRSync(ctx context.Context, metadataSyncer *metadataSyncI
}
}
}
}

func storagePolicyUsageCRSync(ctx context.Context, metadataSyncer *metadataSyncInformer) {
log := logger.GetLogger(ctx)
log.Infof("storagePolicyUsageCRSync: Starting storage policy usage CR sync")
// Prepare Config and NewClientForGroup for cnsOperatorClient
restConfig, err := config.GetConfig()
if err != nil {
log.Errorf("storagePolicyUsageCRSync: Failed to get Kubernetes k8sconfig. Err: %+v", err)
return
}
cnsOperatorClient, err := k8s.NewClientForGroup(ctx, restConfig, cnsoperatorv1alpha1.GroupName)
if err != nil {
log.Errorf("storagePolicyUsageCRSync: Failed to create CnsOperator client. Err: %+v", err)
return
}
// Get K8s PVs in "Bound" State.
k8sPVsInBoundState, err := getBoundPVs(ctx, metadataSyncer)
if err != nil {
Expand All @@ -3246,6 +3261,7 @@ func storagePolicyUsageCRSync(ctx context.Context, metadataSyncer *metadataSyncI
}
volumeInfoCRList := volumeInfoService.ListAllVolumeInfos()
cnsVolumeInfoMap := make(map[string]*cnsvolumeinfov1alpha1.CNSVolumeInfo)
spuAggregatedSumMap := make(map[string]*resource.Quantity)
for _, volumeInfo := range volumeInfoCRList {
cnsVolumeInfoObj := &cnsvolumeinfov1alpha1.CNSVolumeInfo{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(volumeInfo.(*unstructured.Unstructured).Object,
Expand All @@ -3256,13 +3272,22 @@ func storagePolicyUsageCRSync(ctx context.Context, metadataSyncer *metadataSyncI
continue
}
cnsVolumeInfoMap[cnsVolumeInfoObj.Name] = cnsVolumeInfoObj.DeepCopy()
if isStorageQuotaM2FSSEnabled && cnsVolumeInfoObj.Spec.AggregatedSnapshotSize != nil {
spuKey := generateSPUKey(cnsVolumeInfoObj)
if usedQty := spuAggregatedSumMap[spuKey]; usedQty == nil {
spuAggregatedSumMap[spuKey] = cnsVolumeInfoObj.Spec.AggregatedSnapshotSize
} else {
spuAggregatedSumMap[spuKey].Add(*cnsVolumeInfoObj.Spec.AggregatedSnapshotSize)
}
}
}
// Check if volumeInfoCRList is not empty
if len(volumeInfoCRList) > 0 {
// Iterate through storagePolicyUsageList
for _, storagePolicyUsage := range storagePolicyUsageList.Items {
totalUsedQty := resource.NewQuantity(int64(0), resource.BinarySI)
updateSpu := false
if storagePolicyUsage.Spec.ResourceKind == ResourceKindPVC {
totalUsedQty := resource.NewQuantity(int64(0), resource.BinarySI)
// For every storagePolicyUsage, fetch "Bound" PVs in that namespace from k8sVolumesToNamespaceMap
if volumes, ok := namespaceToK8sVolumesMap[storagePolicyUsage.Namespace]; ok {
for _, pv := range volumes {
Expand All @@ -3279,53 +3304,71 @@ func storagePolicyUsageCRSync(ctx context.Context, metadataSyncer *metadataSyncI
}
}
}
patchedStoragePolicyUsage := *storagePolicyUsage.DeepCopy()
if patchedStoragePolicyUsage.Status.ResourceTypeLevelQuotaUsage != nil {
// Compare the expected total used capacity vs the actual used capacity value in storagePolicyUsage CR
patchedStoragePolicyUsage.Status.ResourceTypeLevelQuotaUsage.Used = totalUsedQty
currentUsedCapacity := storagePolicyUsage.Status.ResourceTypeLevelQuotaUsage.Used
if !reflect.DeepEqual(currentUsedCapacity.Value(), totalUsedQty.Value()) {
log.Infof("storagePolicyUsageCRSync: The used capacity field for StoragepolicyUsage CR: %q in namespace: %q "+
"is not matching with the total capacity of all the k8s volumes in Bound state. Current: %v . "+
"Expected: %v", storagePolicyUsage.Name, storagePolicyUsage.Namespace,
currentUsedCapacity.Value(), totalUsedQty.Value())
err := PatchStoragePolicyUsage(ctx, cnsOperatorClient, &storagePolicyUsage, &patchedStoragePolicyUsage)
if err != nil {
log.Errorf("storagePolicyUsageCRSync: Patching operation failed for StoragePolicyUsage CR: %q in "+
"namespace: %q. err: %v", storagePolicyUsage.Name, patchedStoragePolicyUsage.Namespace, err)
return
}
log.Infof("storagePolicyUsageCRSync: Successfully updated the used field from %v to %v for StoragepolicyUsage "+
"CR: %q in namespace: %q", currentUsedCapacity.Value(),
totalUsedQty.Value(), patchedStoragePolicyUsage.Name, patchedStoragePolicyUsage.Namespace)
} else {
log.Infof("storagePolicyUsageCRSync: The used capacity field for StoragepolicyUsage CR: %q in namespace: %q "+
"field is matching with the total capacity. Used: %v Skipping the Patch operation",
storagePolicyUsage.Name, storagePolicyUsage.Namespace,
storagePolicyUsage.Status.ResourceTypeLevelQuotaUsage.Used.Value())
}
} else {
patchedStoragePolicyUsage.Status = storagepolicyv1alpha1.StoragePolicyUsageStatus{
ResourceTypeLevelQuotaUsage: &storagepolicyv1alpha1.QuotaUsageDetails{
Used: totalUsedQty,
},
}
updateSpu = true
}
} else if isStorageQuotaM2FSSEnabled && storagePolicyUsage.Spec.ResourceKind == ResourceKindSnapshot {
spuKey := strings.Join([]string{storagePolicyUsage.Spec.StorageClassName,
storagePolicyUsage.Spec.StoragePolicyId, storagePolicyUsage.Namespace}, "-")
if usedQty, ok := spuAggregatedSumMap[spuKey]; ok {
log.Infof("storagePolicyUsageCRSync: The used capacity field for StoragepolicyUsage CR: %q "+
"in namespace: %q Total AggregatedSnapshotSize Sum In MB is: %v", storagePolicyUsage.Name,
storagePolicyUsage.Namespace, usedQty.Value())
totalUsedQty = usedQty
updateSpu = true
}
}
if updateSpu {
patchedStoragePolicyUsage := *storagePolicyUsage.DeepCopy()
if patchedStoragePolicyUsage.Status.ResourceTypeLevelQuotaUsage != nil {
// Compare the expected total used capacity vs the actual used capacity value in storagePolicyUsage CR
patchedStoragePolicyUsage.Status.ResourceTypeLevelQuotaUsage.Used = totalUsedQty
currentUsedCapacity := storagePolicyUsage.Status.ResourceTypeLevelQuotaUsage.Used
if !reflect.DeepEqual(currentUsedCapacity.Value(), totalUsedQty.Value()) {
log.Infof("storagePolicyUsageCRSync: The used capacity field for StoragepolicyUsage CR: %q in namespace: %q "+
"is not matching with the total capacity of all the k8s volumes in Bound state. Current: %v . "+
"Expected: %v", storagePolicyUsage.Name, storagePolicyUsage.Namespace,
currentUsedCapacity.Value(), totalUsedQty.Value())
err := PatchStoragePolicyUsage(ctx, cnsOperatorClient, &storagePolicyUsage, &patchedStoragePolicyUsage)
if err != nil {
log.Errorf("storagePolicyUsageCRSync: Patching operation failed for StoragePolicyUsage CR: %q in "+
"namespace: %q. err: %v", storagePolicyUsage.Name, patchedStoragePolicyUsage.Namespace, err)
return
}
log.Infof("storagePolicyUsageCRSync: Successfully updated the used field to %v for StoragepolicyUsage "+
"CR: %q in namespace: %q", totalUsedQty.Value(), patchedStoragePolicyUsage.Name,
patchedStoragePolicyUsage.Namespace)
log.Infof("storagePolicyUsageCRSync: Successfully updated the used field from %v to %v for StoragepolicyUsage "+
"CR: %q in namespace: %q", currentUsedCapacity.Value(),
totalUsedQty.Value(), patchedStoragePolicyUsage.Name, patchedStoragePolicyUsage.Namespace)
} else {
log.Infof("storagePolicyUsageCRSync: The used capacity field for StoragepolicyUsage CR: %q in namespace: %q "+
"field is matching with the total capacity. Used: %v Skipping the Patch operation",
storagePolicyUsage.Name, storagePolicyUsage.Namespace,
storagePolicyUsage.Status.ResourceTypeLevelQuotaUsage.Used.Value())
}
} else {
patchedStoragePolicyUsage.Status = storagepolicyv1alpha1.StoragePolicyUsageStatus{
ResourceTypeLevelQuotaUsage: &storagepolicyv1alpha1.QuotaUsageDetails{
Used: totalUsedQty,
},
}
err := PatchStoragePolicyUsage(ctx, cnsOperatorClient, &storagePolicyUsage, &patchedStoragePolicyUsage)
if err != nil {
log.Errorf("storagePolicyUsageCRSync: Patching operation failed for StoragePolicyUsage CR: %q in "+
"namespace: %q. err: %v", storagePolicyUsage.Name, patchedStoragePolicyUsage.Namespace, err)
return
}
log.Infof("storagePolicyUsageCRSync: Successfully updated the used field to %v for StoragepolicyUsage "+
"CR: %q in namespace: %q", totalUsedQty.Value(), patchedStoragePolicyUsage.Name,
patchedStoragePolicyUsage.Namespace)
}
}
}
}
}

func generateSPUKey(cnsVolumeInfoObj *cnsvolumeinfov1alpha1.CNSVolumeInfo) string {
return strings.Join([]string{cnsVolumeInfoObj.Spec.StorageClassName, cnsVolumeInfoObj.Spec.StoragePolicyID,
cnsVolumeInfoObj.Spec.Namespace}, "-")
}

// startCnsVolumeInfoCRInformer creates and starts an informer for CnsVolumeInfo custom resource.
func startCnsVolumeInfoCRInformer(ctx context.Context, cfg *restclient.Config,
metadataSyncer *metadataSyncInformer) error {
Expand Down

0 comments on commit 2bb14e7

Please sign in to comment.