From 670d72460bef3ec2c7bef5f1abf5a84c30d81e1e Mon Sep 17 00:00:00 2001 From: Himanshu Sharma Date: Mon, 9 Jan 2023 14:12:19 +0530 Subject: [PATCH 1/5] fixNodeGroupSize logic silenced --- cluster-autoscaler/core/static_autoscaler.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 2b7a97738eb8..64ca0b958deb 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -347,15 +347,16 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError // Check if there has been a constant difference between the number of nodes in k8s and // the number of nodes on the cloud provider side. // TODO: andrewskim - add protection for ready AWS nodes. - fixedSomething, err := fixNodeGroupSize(autoscalingContext, a.clusterStateRegistry, currentTime) - if err != nil { - klog.Errorf("Failed to fix node group sizes: %v", err) - return errors.ToAutoscalerError(errors.CloudProviderError, err) - } - if fixedSomething { - klog.V(0).Infof("Some node group target size was fixed, skipping the iteration") - return nil - } + // NOTE: Commented this code as it removes `Registered but long not Ready` nodes which causes issues like scaling below minimum size and removing ready nodes during meltdown scenario + //fixedSomething, err := fixNodeGroupSize(autoscalingContext, a.clusterStateRegistry, currentTime) + //if err != nil { + // klog.Errorf("Failed to fix node group sizes: %v", err) + // return errors.ToAutoscalerError(errors.CloudProviderError, err) + //} + //if fixedSomething { + // klog.V(0).Infof("Some node group target size was fixed, skipping the iteration") + // return nil + //} metrics.UpdateLastTime(metrics.Autoscaling, time.Now()) From 985d8b57ee9ef427b68efaa5d1eb50ca47b54f65 Mon Sep 17 00:00:00 2001 From: Himanshu Sharma Date: Mon, 9 Jan 2023 14:13:03 +0530 Subject: [PATCH 2/5] no scale-down allowed during rolling update --- .../cloudprovider/mcm/mcm_cloud_provider.go | 4 + .../cloudprovider/mcm/mcm_manager.go | 85 +++++++++++++++---- 2 files changed, 72 insertions(+), 17 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go index ccccb6977509..ad3e07ed79fd 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go @@ -319,6 +319,10 @@ func (machinedeployment *MachineDeployment) DecreaseTargetSize(delta int) error if err != nil { return err } + if int(size+int64(delta)) < machinedeployment.minSize { + klog.Warningf("Cannot go below min size= %d for machineDeployment %s, requested target size= %d , applied target size= %d", machinedeployment.minSize, machinedeployment.Name, size+int64(delta), machinedeployment.minSize) + return machinedeployment.mcmManager.SetMachineDeploymentSize(machinedeployment, int64(machinedeployment.minSize)) + } return machinedeployment.mcmManager.SetMachineDeploymentSize(machinedeployment, size+int64(delta)) } diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go index 79b7707cebbc..d2d8ba3c6d19 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go @@ -82,6 +82,13 @@ const ( machineGroup = "machine.sapcloud.io" // machineGroup is the API version used to identify machine API group objects machineVersion = "v1alpha1" + // machineDeploymentProgressing tells that deployment is progressing. Progress for a MachineDeployment is considered when a new machine set is created or adopted, and when new machines scale up or old machines scale down. + // Progress is not estimated for paused MachineDeployments. It is also updated if progressDeadlineSeconds is not specified(treated as infinite deadline), in which case it would never be updated to "false". + machineDeploymentProgressing v1alpha1.MachineDeploymentConditionType = "Progressing" + // newISAvailableReason is the reason in "Progressing" condition when machineDeployment rollout is complete + newISAvailableReason = "NewMachineSetAvailable" + // conditionTrue means the given condition status is true + conditionTrue v1alpha1.ConditionStatus = "True" ) var ( @@ -97,7 +104,7 @@ var ( machineDeploymentGVR = schema.GroupVersionResource{Group: machineGroup, Version: machineVersion, Resource: "machinedeployments"} ) -//McmManager manages the client communication for MachineDeployments. +// McmManager manages the client communication for MachineDeployments. type McmManager struct { namespace string interrupt chan struct{} @@ -248,7 +255,7 @@ func createMCMManagerInternal(discoveryOpts cloudprovider.NodeGroupDiscoveryOpti } // TODO: In general, any controller checking this needs to be dynamic so -// users don't have to restart their controller manager if they change the apiserver. +// users don't have to restart their controller manager if they change the apiserver. // Until we get there, the structure here needs to be exposed for the construction of a proper ControllerContext. func getAvailableResources(clientBuilder CoreClientBuilder) (map[schema.GroupVersionResource]bool, error) { var discoveryClient discovery.DiscoveryInterface @@ -279,7 +286,7 @@ func getAvailableResources(clientBuilder CoreClientBuilder) (map[schema.GroupVer return nil, fmt.Errorf("failed to get api versions from server: %v: %v", healthzContent, err) } - resourceMap, err := discoveryClient.ServerResources() + _, resourceMap, err := discoveryClient.ServerGroupsAndResources() if err != nil { utilruntime.HandleError(fmt.Errorf("unable to get all supported resources from server: %v", err)) } @@ -374,7 +381,6 @@ func (m *McmManager) GetMachineDeploymentForMachine(machine *Ref) (*MachineDeplo } // Refresh does nothing at the moment. -// func (m *McmManager) Refresh() error { return nil } @@ -394,6 +400,25 @@ func (m *McmManager) GetMachineDeploymentSize(machinedeployment *MachineDeployme return int64(md.Spec.Replicas), nil } +func isRollingUpdateFinished(md *v1alpha1.MachineDeployment) bool { + found := false + for _, cond := range md.Status.Conditions { + switch { + case cond.Type == machineDeploymentProgressing && cond.Status == conditionTrue && cond.Reason == newISAvailableReason: + return true + case cond.Type == machineDeploymentProgressing: + found = true + break + } + } + if !found { + // no "Progressing" condition means the deployment has not undergone any rolling update yet + return true + } + + return false +} + // SetMachineDeploymentSize sets the desired size for the Machinedeployment. func (m *McmManager) SetMachineDeploymentSize(machinedeployment *MachineDeployment, size int64) error { @@ -410,6 +435,11 @@ func (m *McmManager) SetMachineDeploymentSize(machinedeployment *MachineDeployme return err } + // don't scale down during rolling update, as that could remove ready node with workload + if md.Spec.Replicas >= int32(size) && !isRollingUpdateFinished(md) { + return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", md.Name) + } + clone := md.DeepCopy() clone.Spec.Replicas = int32(size) @@ -430,9 +460,23 @@ func (m *McmManager) SetMachineDeploymentSize(machinedeployment *MachineDeployme return nil } +func (m *McmManager) getMachineDeploymentObjUntilDeadline(mdName string, retryDeadline time.Time) (*v1alpha1.MachineDeployment, error) { + md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(mdName) + if err != nil && time.Now().Before(retryDeadline) { + klog.Warningf("Unable to fetch MachineDeployment object %s, Error: %s", mdName, err) + time.Sleep(conflictRetryInterval) + return nil, nil + } else if err != nil { + // Timeout occurred + klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", mdName, err) + return nil, err + } + + return md, nil +} + // DeleteMachines deletes the Machines and also reduces the desired replicas of the Machinedeplyoment in parallel. func (m *McmManager) DeleteMachines(machines []*Ref) error { - var ( mdclone *v1alpha1.MachineDeployment terminatingMachines []*v1alpha1.Machine @@ -456,6 +500,23 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error { } } + var md *v1alpha1.MachineDeployment + retryDeadline := time.Now().Add(maxRetryDeadline) + for { + md, err = m.getMachineDeploymentObjUntilDeadline(commonMachineDeployment.Name, retryDeadline) + if err != nil { + return err + } else if md == nil { + continue + } + + if !isRollingUpdateFinished(md) { + return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", md.Name) + } else { + break + } + } + for _, machine := range machines { retryDeadline := time.Now().Add(maxRetryDeadline) @@ -501,21 +562,11 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error { // Break out of loop when update succeeds break } + + klog.Infof("Machine %s of md %s marked with priority 1 successfully", machine.Name, md.Name) } - retryDeadline := time.Now().Add(maxRetryDeadline) for { - md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(commonMachineDeployment.Name) - if err != nil && time.Now().Before(retryDeadline) { - klog.Warningf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err) - time.Sleep(conflictRetryInterval) - continue - } else if err != nil { - // Timeout occurred - klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err) - return err - } - mdclone = md.DeepCopy() if (int(mdclone.Spec.Replicas) - len(machines)) < 0 { return fmt.Errorf("Unable to delete machine in MachineDeployment object %s , machine replicas are < 0 ", commonMachineDeployment.Name) From d52cd90c0f0df6ec75665ff1c1cbf86f8a9d2fc3 Mon Sep 17 00:00:00 2001 From: Himanshu Sharma Date: Mon, 9 Jan 2023 14:27:14 +0530 Subject: [PATCH 3/5] updated logs --- cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go | 2 +- cluster-autoscaler/cloudprovider/mcm/mcm_manager.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go index ad3e07ed79fd..c7684f6e3218 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go @@ -320,7 +320,7 @@ func (machinedeployment *MachineDeployment) DecreaseTargetSize(delta int) error return err } if int(size+int64(delta)) < machinedeployment.minSize { - klog.Warningf("Cannot go below min size= %d for machineDeployment %s, requested target size= %d , applied target size= %d", machinedeployment.minSize, machinedeployment.Name, size+int64(delta), machinedeployment.minSize) + klog.Warningf("Cannot go below min size= %d for machineDeployment %s, requested target size= %d . Setting target size to min size", machinedeployment.minSize, machinedeployment.Name, size+int64(delta)) return machinedeployment.mcmManager.SetMachineDeploymentSize(machinedeployment, int64(machinedeployment.minSize)) } return machinedeployment.mcmManager.SetMachineDeploymentSize(machinedeployment, size+int64(delta)) diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go index d2d8ba3c6d19..8f14da527384 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go @@ -563,7 +563,7 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error { break } - klog.Infof("Machine %s of md %s marked with priority 1 successfully", machine.Name, md.Name) + klog.Infof("Machine %s of machineDeployment %s marked with priority 1 successfully", machine.Name, md.Name) } for { From 2fe8b13ea6d59e49bd28a51b29dcbc334fb89ba8 Mon Sep 17 00:00:00 2001 From: Himanshu Sharma Date: Tue, 10 Jan 2023 11:21:05 +0530 Subject: [PATCH 4/5] removed useless found variable --- cluster-autoscaler/cloudprovider/mcm/mcm_manager.go | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go index 8f14da527384..aa91e25cbe4a 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go @@ -401,22 +401,16 @@ func (m *McmManager) GetMachineDeploymentSize(machinedeployment *MachineDeployme } func isRollingUpdateFinished(md *v1alpha1.MachineDeployment) bool { - found := false for _, cond := range md.Status.Conditions { switch { case cond.Type == machineDeploymentProgressing && cond.Status == conditionTrue && cond.Reason == newISAvailableReason: return true case cond.Type == machineDeploymentProgressing: - found = true - break + return false } } - if !found { - // no "Progressing" condition means the deployment has not undergone any rolling update yet - return true - } - - return false + // no "Progressing" condition means the deployment has not undergone any rolling update yet + return true } // SetMachineDeploymentSize sets the desired size for the Machinedeployment. From afe5a90563aed37c0575fc4d7fc23a764e152e82 Mon Sep 17 00:00:00 2001 From: Himanshu Sharma Date: Mon, 23 Jan 2023 12:30:13 +0530 Subject: [PATCH 5/5] addressed review comments and some refactoring --- .../cloudprovider/mcm/mcm_cloud_provider.go | 2 +- .../cloudprovider/mcm/mcm_manager.go | 126 ++++++++---------- 2 files changed, 59 insertions(+), 69 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go index c7684f6e3218..b390a9831c6e 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go @@ -319,7 +319,7 @@ func (machinedeployment *MachineDeployment) DecreaseTargetSize(delta int) error if err != nil { return err } - if int(size+int64(delta)) < machinedeployment.minSize { + if int(size)+delta < machinedeployment.minSize { klog.Warningf("Cannot go below min size= %d for machineDeployment %s, requested target size= %d . Setting target size to min size", machinedeployment.minSize, machinedeployment.Name, size+int64(delta)) return machinedeployment.mcmManager.SetMachineDeploymentSize(machinedeployment, int64(machinedeployment.minSize)) } diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go index aa91e25cbe4a..b37f2ec86820 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go @@ -59,17 +59,14 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" - klog "k8s.io/klog/v2" + "k8s.io/klog/v2" kubeletapis "k8s.io/kubelet/pkg/apis" ) const ( - operationWaitTimeout = 5 * time.Second - operationPollInterval = 100 * time.Millisecond - maxRecordsReturnedByAPI = 100 - maxRetryDeadline = 1 * time.Minute - conflictRetryInterval = 5 * time.Second - minResyncPeriodDefault = 1 * time.Hour + maxRetryTimeout = 1 * time.Minute + conflictRetryInterval = 5 * time.Second + minResyncPeriodDefault = 1 * time.Hour // machinePriorityAnnotation is the annotation to set machine priority while deletion machinePriorityAnnotation = "machinepriority.machine.sapcloud.io" // kindMachineClass is the kind for generic machine class used by the OOT providers @@ -400,40 +397,21 @@ func (m *McmManager) GetMachineDeploymentSize(machinedeployment *MachineDeployme return int64(md.Spec.Replicas), nil } -func isRollingUpdateFinished(md *v1alpha1.MachineDeployment) bool { - for _, cond := range md.Status.Conditions { - switch { - case cond.Type == machineDeploymentProgressing && cond.Status == conditionTrue && cond.Reason == newISAvailableReason: - return true - case cond.Type == machineDeploymentProgressing: - return false - } - } - // no "Progressing" condition means the deployment has not undergone any rolling update yet - return true -} - // SetMachineDeploymentSize sets the desired size for the Machinedeployment. func (m *McmManager) SetMachineDeploymentSize(machinedeployment *MachineDeployment, size int64) error { + md, err := m.getMachineDeploymentUntilTimeout(machinedeployment.Name, conflictRetryInterval, maxRetryTimeout) + if err != nil { + klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", machinedeployment.Name, err) + return err + } - retryDeadline := time.Now().Add(maxRetryDeadline) - for { - md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(machinedeployment.Name) - if err != nil && time.Now().Before(retryDeadline) { - klog.Warningf("Unable to fetch MachineDeployment object %s, Error: %+v", machinedeployment.Name, err) - time.Sleep(conflictRetryInterval) - continue - } else if err != nil { - // Timeout occurred - klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", machinedeployment.Name, err) - return err - } - - // don't scale down during rolling update, as that could remove ready node with workload - if md.Spec.Replicas >= int32(size) && !isRollingUpdateFinished(md) { - return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", md.Name) - } + // don't scale down during rolling update, as that could remove ready node with workload + if md.Spec.Replicas >= int32(size) && !isRollingUpdateFinished(md) { + return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", md.Name) + } + retryDeadline := time.Now().Add(maxRetryTimeout) + for { clone := md.DeepCopy() clone.Spec.Replicas = int32(size) @@ -454,21 +432,6 @@ func (m *McmManager) SetMachineDeploymentSize(machinedeployment *MachineDeployme return nil } -func (m *McmManager) getMachineDeploymentObjUntilDeadline(mdName string, retryDeadline time.Time) (*v1alpha1.MachineDeployment, error) { - md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(mdName) - if err != nil && time.Now().Before(retryDeadline) { - klog.Warningf("Unable to fetch MachineDeployment object %s, Error: %s", mdName, err) - time.Sleep(conflictRetryInterval) - return nil, nil - } else if err != nil { - // Timeout occurred - klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", mdName, err) - return nil, err - } - - return md, nil -} - // DeleteMachines deletes the Machines and also reduces the desired replicas of the Machinedeplyoment in parallel. func (m *McmManager) DeleteMachines(machines []*Ref) error { var ( @@ -494,26 +457,18 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error { } } - var md *v1alpha1.MachineDeployment - retryDeadline := time.Now().Add(maxRetryDeadline) - for { - md, err = m.getMachineDeploymentObjUntilDeadline(commonMachineDeployment.Name, retryDeadline) - if err != nil { - return err - } else if md == nil { - continue - } - - if !isRollingUpdateFinished(md) { - return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", md.Name) - } else { - break - } + md, err := m.getMachineDeploymentUntilTimeout(commonMachineDeployment.Name, conflictRetryInterval, maxRetryTimeout) + if err != nil { + klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err) + return err + } + if !isRollingUpdateFinished(md) { + return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", commonMachineDeployment.Name) } for _, machine := range machines { - retryDeadline := time.Now().Add(maxRetryDeadline) + retryDeadline := time.Now().Add(maxRetryTimeout) for { machine, err := m.machineLister.Machines(m.namespace).Get(machine.Name) if err != nil && time.Now().Before(retryDeadline) { @@ -560,6 +515,12 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error { klog.Infof("Machine %s of machineDeployment %s marked with priority 1 successfully", machine.Name, md.Name) } + // getting fresh copy of machineDeployment before updating + md, err = m.getMachineDeploymentUntilTimeout(commonMachineDeployment.Name, conflictRetryInterval, maxRetryTimeout) + if err != nil { + klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err) + return err + } for { mdclone = md.DeepCopy() if (int(mdclone.Spec.Replicas) - len(machines)) < 0 { @@ -573,6 +534,7 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error { mdclone.Spec.Replicas = expectedReplicas + retryDeadline := time.Now().Add(maxRetryTimeout) _, err = m.machineClient.MachineDeployments(mdclone.Namespace).Update(context.TODO(), mdclone, metav1.UpdateOptions{}) if err != nil && time.Now().Before(retryDeadline) { klog.Warningf("Unable to update MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err) @@ -802,6 +764,34 @@ func (m *McmManager) GetMachineDeploymentNodeTemplate(machinedeployment *Machine return nodeTmpl, nil } +func isRollingUpdateFinished(md *v1alpha1.MachineDeployment) bool { + for _, cond := range md.Status.Conditions { + switch { + case cond.Type == machineDeploymentProgressing && cond.Status == conditionTrue && cond.Reason == newISAvailableReason: + return true + case cond.Type == machineDeploymentProgressing: + return false + } + } + // no "Progressing" condition means the deployment has not undergone any rolling update yet + return true +} + +func (m *McmManager) getMachineDeploymentUntilTimeout(mdName string, retryInterval, timeout time.Duration) (*v1alpha1.MachineDeployment, error) { + var md *v1alpha1.MachineDeployment + err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) { + var err error + md, err = m.machineDeploymentLister.MachineDeployments(m.namespace).Get(mdName) + if err != nil { + klog.Warningf("Unable to fetch MachineDeployment object %s, Error: %s, will retry", mdName, err) + return false, nil + } + + return true, nil + }) + return md, err +} + func filterOutNodes(nodes []*v1.Node, instanceType string) []*v1.Node { var filteredNodes []*v1.Node for _, node := range nodes {