diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go index ccccb6977509..b390a9831c6e 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go @@ -319,6 +319,10 @@ func (machinedeployment *MachineDeployment) DecreaseTargetSize(delta int) error if err != nil { return err } + if int(size)+delta < machinedeployment.minSize { + klog.Warningf("Cannot go below min size= %d for machineDeployment %s, requested target size= %d . Setting target size to min size", machinedeployment.minSize, machinedeployment.Name, size+int64(delta)) + return machinedeployment.mcmManager.SetMachineDeploymentSize(machinedeployment, int64(machinedeployment.minSize)) + } return machinedeployment.mcmManager.SetMachineDeploymentSize(machinedeployment, size+int64(delta)) } diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go index 79b7707cebbc..b37f2ec86820 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go @@ -59,17 +59,14 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" - klog "k8s.io/klog/v2" + "k8s.io/klog/v2" kubeletapis "k8s.io/kubelet/pkg/apis" ) const ( - operationWaitTimeout = 5 * time.Second - operationPollInterval = 100 * time.Millisecond - maxRecordsReturnedByAPI = 100 - maxRetryDeadline = 1 * time.Minute - conflictRetryInterval = 5 * time.Second - minResyncPeriodDefault = 1 * time.Hour + maxRetryTimeout = 1 * time.Minute + conflictRetryInterval = 5 * time.Second + minResyncPeriodDefault = 1 * time.Hour // machinePriorityAnnotation is the annotation to set machine priority while deletion machinePriorityAnnotation = "machinepriority.machine.sapcloud.io" // kindMachineClass is the kind for generic machine class used by the OOT providers @@ -82,6 +79,13 @@ const ( machineGroup = "machine.sapcloud.io" // machineGroup is the API version used to identify machine API group objects machineVersion = "v1alpha1" + // machineDeploymentProgressing tells that deployment is progressing. Progress for a MachineDeployment is considered when a new machine set is created or adopted, and when new machines scale up or old machines scale down. + // Progress is not estimated for paused MachineDeployments. It is also updated if progressDeadlineSeconds is not specified(treated as infinite deadline), in which case it would never be updated to "false". + machineDeploymentProgressing v1alpha1.MachineDeploymentConditionType = "Progressing" + // newISAvailableReason is the reason in "Progressing" condition when machineDeployment rollout is complete + newISAvailableReason = "NewMachineSetAvailable" + // conditionTrue means the given condition status is true + conditionTrue v1alpha1.ConditionStatus = "True" ) var ( @@ -97,7 +101,7 @@ var ( machineDeploymentGVR = schema.GroupVersionResource{Group: machineGroup, Version: machineVersion, Resource: "machinedeployments"} ) -//McmManager manages the client communication for MachineDeployments. +// McmManager manages the client communication for MachineDeployments. type McmManager struct { namespace string interrupt chan struct{} @@ -248,7 +252,7 @@ func createMCMManagerInternal(discoveryOpts cloudprovider.NodeGroupDiscoveryOpti } // TODO: In general, any controller checking this needs to be dynamic so -// users don't have to restart their controller manager if they change the apiserver. +// users don't have to restart their controller manager if they change the apiserver. // Until we get there, the structure here needs to be exposed for the construction of a proper ControllerContext. func getAvailableResources(clientBuilder CoreClientBuilder) (map[schema.GroupVersionResource]bool, error) { var discoveryClient discovery.DiscoveryInterface @@ -279,7 +283,7 @@ func getAvailableResources(clientBuilder CoreClientBuilder) (map[schema.GroupVer return nil, fmt.Errorf("failed to get api versions from server: %v: %v", healthzContent, err) } - resourceMap, err := discoveryClient.ServerResources() + _, resourceMap, err := discoveryClient.ServerGroupsAndResources() if err != nil { utilruntime.HandleError(fmt.Errorf("unable to get all supported resources from server: %v", err)) } @@ -374,7 +378,6 @@ func (m *McmManager) GetMachineDeploymentForMachine(machine *Ref) (*MachineDeplo } // Refresh does nothing at the moment. -// func (m *McmManager) Refresh() error { return nil } @@ -396,20 +399,19 @@ func (m *McmManager) GetMachineDeploymentSize(machinedeployment *MachineDeployme // SetMachineDeploymentSize sets the desired size for the Machinedeployment. func (m *McmManager) SetMachineDeploymentSize(machinedeployment *MachineDeployment, size int64) error { + md, err := m.getMachineDeploymentUntilTimeout(machinedeployment.Name, conflictRetryInterval, maxRetryTimeout) + if err != nil { + klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", machinedeployment.Name, err) + return err + } - retryDeadline := time.Now().Add(maxRetryDeadline) - for { - md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(machinedeployment.Name) - if err != nil && time.Now().Before(retryDeadline) { - klog.Warningf("Unable to fetch MachineDeployment object %s, Error: %+v", machinedeployment.Name, err) - time.Sleep(conflictRetryInterval) - continue - } else if err != nil { - // Timeout occurred - klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", machinedeployment.Name, err) - return err - } + // don't scale down during rolling update, as that could remove ready node with workload + if md.Spec.Replicas >= int32(size) && !isRollingUpdateFinished(md) { + return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", md.Name) + } + retryDeadline := time.Now().Add(maxRetryTimeout) + for { clone := md.DeepCopy() clone.Spec.Replicas = int32(size) @@ -432,7 +434,6 @@ func (m *McmManager) SetMachineDeploymentSize(machinedeployment *MachineDeployme // DeleteMachines deletes the Machines and also reduces the desired replicas of the Machinedeplyoment in parallel. func (m *McmManager) DeleteMachines(machines []*Ref) error { - var ( mdclone *v1alpha1.MachineDeployment terminatingMachines []*v1alpha1.Machine @@ -456,9 +457,18 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error { } } + md, err := m.getMachineDeploymentUntilTimeout(commonMachineDeployment.Name, conflictRetryInterval, maxRetryTimeout) + if err != nil { + klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err) + return err + } + if !isRollingUpdateFinished(md) { + return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", commonMachineDeployment.Name) + } + for _, machine := range machines { - retryDeadline := time.Now().Add(maxRetryDeadline) + retryDeadline := time.Now().Add(maxRetryTimeout) for { machine, err := m.machineLister.Machines(m.namespace).Get(machine.Name) if err != nil && time.Now().Before(retryDeadline) { @@ -501,21 +511,17 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error { // Break out of loop when update succeeds break } + + klog.Infof("Machine %s of machineDeployment %s marked with priority 1 successfully", machine.Name, md.Name) } - retryDeadline := time.Now().Add(maxRetryDeadline) + // getting fresh copy of machineDeployment before updating + md, err = m.getMachineDeploymentUntilTimeout(commonMachineDeployment.Name, conflictRetryInterval, maxRetryTimeout) + if err != nil { + klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err) + return err + } for { - md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(commonMachineDeployment.Name) - if err != nil && time.Now().Before(retryDeadline) { - klog.Warningf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err) - time.Sleep(conflictRetryInterval) - continue - } else if err != nil { - // Timeout occurred - klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err) - return err - } - mdclone = md.DeepCopy() if (int(mdclone.Spec.Replicas) - len(machines)) < 0 { return fmt.Errorf("Unable to delete machine in MachineDeployment object %s , machine replicas are < 0 ", commonMachineDeployment.Name) @@ -528,6 +534,7 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error { mdclone.Spec.Replicas = expectedReplicas + retryDeadline := time.Now().Add(maxRetryTimeout) _, err = m.machineClient.MachineDeployments(mdclone.Namespace).Update(context.TODO(), mdclone, metav1.UpdateOptions{}) if err != nil && time.Now().Before(retryDeadline) { klog.Warningf("Unable to update MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err) @@ -757,6 +764,34 @@ func (m *McmManager) GetMachineDeploymentNodeTemplate(machinedeployment *Machine return nodeTmpl, nil } +func isRollingUpdateFinished(md *v1alpha1.MachineDeployment) bool { + for _, cond := range md.Status.Conditions { + switch { + case cond.Type == machineDeploymentProgressing && cond.Status == conditionTrue && cond.Reason == newISAvailableReason: + return true + case cond.Type == machineDeploymentProgressing: + return false + } + } + // no "Progressing" condition means the deployment has not undergone any rolling update yet + return true +} + +func (m *McmManager) getMachineDeploymentUntilTimeout(mdName string, retryInterval, timeout time.Duration) (*v1alpha1.MachineDeployment, error) { + var md *v1alpha1.MachineDeployment + err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) { + var err error + md, err = m.machineDeploymentLister.MachineDeployments(m.namespace).Get(mdName) + if err != nil { + klog.Warningf("Unable to fetch MachineDeployment object %s, Error: %s, will retry", mdName, err) + return false, nil + } + + return true, nil + }) + return md, err +} + func filterOutNodes(nodes []*v1.Node, instanceType string) []*v1.Node { var filteredNodes []*v1.Node for _, node := range nodes { diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 2b7a97738eb8..64ca0b958deb 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -347,15 +347,16 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError // Check if there has been a constant difference between the number of nodes in k8s and // the number of nodes on the cloud provider side. // TODO: andrewskim - add protection for ready AWS nodes. - fixedSomething, err := fixNodeGroupSize(autoscalingContext, a.clusterStateRegistry, currentTime) - if err != nil { - klog.Errorf("Failed to fix node group sizes: %v", err) - return errors.ToAutoscalerError(errors.CloudProviderError, err) - } - if fixedSomething { - klog.V(0).Infof("Some node group target size was fixed, skipping the iteration") - return nil - } + // NOTE: Commented this code as it removes `Registered but long not Ready` nodes which causes issues like scaling below minimum size and removing ready nodes during meltdown scenario + //fixedSomething, err := fixNodeGroupSize(autoscalingContext, a.clusterStateRegistry, currentTime) + //if err != nil { + // klog.Errorf("Failed to fix node group sizes: %v", err) + // return errors.ToAutoscalerError(errors.CloudProviderError, err) + //} + //if fixedSomething { + // klog.V(0).Infof("Some node group target size was fixed, skipping the iteration") + // return nil + //} metrics.UpdateLastTime(metrics.Autoscaling, time.Now())