Skip to content

Commit

Permalink
Automated cherry pick of #160 on rel-v1.24: No scale down during roll…
Browse files Browse the repository at this point in the history
…ing update (#206)

* fixNodeGroupSize logic silenced

* no scale-down allowed during rolling update

* updated logs

* removed useless found variable

* addressed review comments and some refactoring
  • Loading branch information
himanshu-kun authored Apr 10, 2023
1 parent 554a12a commit fd54771
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 45 deletions.
4 changes: 4 additions & 0 deletions cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,10 @@ func (machinedeployment *MachineDeployment) DecreaseTargetSize(delta int) error
if err != nil {
return err
}
if int(size)+delta < machinedeployment.minSize {
klog.Warningf("Cannot go below min size= %d for machineDeployment %s, requested target size= %d . Setting target size to min size", machinedeployment.minSize, machinedeployment.Name, size+int64(delta))
return machinedeployment.mcmManager.SetMachineDeploymentSize(machinedeployment, int64(machinedeployment.minSize))
}
return machinedeployment.mcmManager.SetMachineDeploymentSize(machinedeployment, size+int64(delta))
}

Expand Down
107 changes: 71 additions & 36 deletions cluster-autoscaler/cloudprovider/mcm/mcm_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,14 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
kubeletapis "k8s.io/kubelet/pkg/apis"
)

const (
operationWaitTimeout = 5 * time.Second
operationPollInterval = 100 * time.Millisecond
maxRecordsReturnedByAPI = 100
maxRetryDeadline = 1 * time.Minute
conflictRetryInterval = 5 * time.Second
minResyncPeriodDefault = 1 * time.Hour
maxRetryTimeout = 1 * time.Minute
conflictRetryInterval = 5 * time.Second
minResyncPeriodDefault = 1 * time.Hour
// machinePriorityAnnotation is the annotation to set machine priority while deletion
machinePriorityAnnotation = "machinepriority.machine.sapcloud.io"
// kindMachineClass is the kind for generic machine class used by the OOT providers
Expand All @@ -82,6 +79,13 @@ const (
machineGroup = "machine.sapcloud.io"
// machineGroup is the API version used to identify machine API group objects
machineVersion = "v1alpha1"
// machineDeploymentProgressing tells that deployment is progressing. Progress for a MachineDeployment is considered when a new machine set is created or adopted, and when new machines scale up or old machines scale down.
// Progress is not estimated for paused MachineDeployments. It is also updated if progressDeadlineSeconds is not specified(treated as infinite deadline), in which case it would never be updated to "false".
machineDeploymentProgressing v1alpha1.MachineDeploymentConditionType = "Progressing"
// newISAvailableReason is the reason in "Progressing" condition when machineDeployment rollout is complete
newISAvailableReason = "NewMachineSetAvailable"
// conditionTrue means the given condition status is true
conditionTrue v1alpha1.ConditionStatus = "True"
)

var (
Expand All @@ -97,7 +101,7 @@ var (
machineDeploymentGVR = schema.GroupVersionResource{Group: machineGroup, Version: machineVersion, Resource: "machinedeployments"}
)

//McmManager manages the client communication for MachineDeployments.
// McmManager manages the client communication for MachineDeployments.
type McmManager struct {
namespace string
interrupt chan struct{}
Expand Down Expand Up @@ -248,7 +252,7 @@ func createMCMManagerInternal(discoveryOpts cloudprovider.NodeGroupDiscoveryOpti
}

// TODO: In general, any controller checking this needs to be dynamic so
// users don't have to restart their controller manager if they change the apiserver.
// users don't have to restart their controller manager if they change the apiserver.
// Until we get there, the structure here needs to be exposed for the construction of a proper ControllerContext.
func getAvailableResources(clientBuilder CoreClientBuilder) (map[schema.GroupVersionResource]bool, error) {
var discoveryClient discovery.DiscoveryInterface
Expand Down Expand Up @@ -374,7 +378,6 @@ func (m *McmManager) GetMachineDeploymentForMachine(machine *Ref) (*MachineDeplo
}

// Refresh does nothing at the moment.
//
func (m *McmManager) Refresh() error {
return nil
}
Expand All @@ -396,20 +399,19 @@ func (m *McmManager) GetMachineDeploymentSize(machinedeployment *MachineDeployme

// SetMachineDeploymentSize sets the desired size for the Machinedeployment.
func (m *McmManager) SetMachineDeploymentSize(machinedeployment *MachineDeployment, size int64) error {
md, err := m.getMachineDeploymentUntilTimeout(machinedeployment.Name, conflictRetryInterval, maxRetryTimeout)
if err != nil {
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", machinedeployment.Name, err)
return err
}

retryDeadline := time.Now().Add(maxRetryDeadline)
for {
md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(machinedeployment.Name)
if err != nil && time.Now().Before(retryDeadline) {
klog.Warningf("Unable to fetch MachineDeployment object %s, Error: %+v", machinedeployment.Name, err)
time.Sleep(conflictRetryInterval)
continue
} else if err != nil {
// Timeout occurred
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", machinedeployment.Name, err)
return err
}
// don't scale down during rolling update, as that could remove ready node with workload
if md.Spec.Replicas >= int32(size) && !isRollingUpdateFinished(md) {
return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", md.Name)
}

retryDeadline := time.Now().Add(maxRetryTimeout)
for {
clone := md.DeepCopy()
clone.Spec.Replicas = int32(size)

Expand All @@ -432,7 +434,6 @@ func (m *McmManager) SetMachineDeploymentSize(machinedeployment *MachineDeployme

// DeleteMachines deletes the Machines and also reduces the desired replicas of the Machinedeplyoment in parallel.
func (m *McmManager) DeleteMachines(machines []*Ref) error {

var (
mdclone *v1alpha1.MachineDeployment
terminatingMachines []*v1alpha1.Machine
Expand All @@ -456,9 +457,18 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error {
}
}

md, err := m.getMachineDeploymentUntilTimeout(commonMachineDeployment.Name, conflictRetryInterval, maxRetryTimeout)
if err != nil {
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err)
return err
}
if !isRollingUpdateFinished(md) {
return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", commonMachineDeployment.Name)
}

for _, machine := range machines {

retryDeadline := time.Now().Add(maxRetryDeadline)
retryDeadline := time.Now().Add(maxRetryTimeout)
for {
machine, err := m.machineLister.Machines(m.namespace).Get(machine.Name)
if err != nil && time.Now().Before(retryDeadline) {
Expand Down Expand Up @@ -501,21 +511,17 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error {
// Break out of loop when update succeeds
break
}

klog.Infof("Machine %s of machineDeployment %s marked with priority 1 successfully", machine.Name, md.Name)
}

retryDeadline := time.Now().Add(maxRetryDeadline)
// getting fresh copy of machineDeployment before updating
md, err = m.getMachineDeploymentUntilTimeout(commonMachineDeployment.Name, conflictRetryInterval, maxRetryTimeout)
if err != nil {
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err)
return err
}
for {
md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(commonMachineDeployment.Name)
if err != nil && time.Now().Before(retryDeadline) {
klog.Warningf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err)
time.Sleep(conflictRetryInterval)
continue
} else if err != nil {
// Timeout occurred
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err)
return err
}

mdclone = md.DeepCopy()
if (int(mdclone.Spec.Replicas) - len(machines)) < 0 {
return fmt.Errorf("Unable to delete machine in MachineDeployment object %s , machine replicas are < 0 ", commonMachineDeployment.Name)
Expand All @@ -528,6 +534,7 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error {

mdclone.Spec.Replicas = expectedReplicas

retryDeadline := time.Now().Add(maxRetryTimeout)
_, err = m.machineClient.MachineDeployments(mdclone.Namespace).Update(context.TODO(), mdclone, metav1.UpdateOptions{})
if err != nil && time.Now().Before(retryDeadline) {
klog.Warningf("Unable to update MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err)
Expand Down Expand Up @@ -757,6 +764,34 @@ func (m *McmManager) GetMachineDeploymentNodeTemplate(machinedeployment *Machine
return nodeTmpl, nil
}

func isRollingUpdateFinished(md *v1alpha1.MachineDeployment) bool {
for _, cond := range md.Status.Conditions {
switch {
case cond.Type == machineDeploymentProgressing && cond.Status == conditionTrue && cond.Reason == newISAvailableReason:
return true
case cond.Type == machineDeploymentProgressing:
return false
}
}
// no "Progressing" condition means the deployment has not undergone any rolling update yet
return true
}

func (m *McmManager) getMachineDeploymentUntilTimeout(mdName string, retryInterval, timeout time.Duration) (*v1alpha1.MachineDeployment, error) {
var md *v1alpha1.MachineDeployment
err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) {
var err error
md, err = m.machineDeploymentLister.MachineDeployments(m.namespace).Get(mdName)
if err != nil {
klog.Warningf("Unable to fetch MachineDeployment object %s, Error: %s, will retry", mdName, err)
return false, nil
}

return true, nil
})
return md, err
}

func filterOutNodes(nodes []*v1.Node, instanceType string) []*v1.Node {
var filteredNodes []*v1.Node
for _, node := range nodes {
Expand Down
19 changes: 10 additions & 9 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,15 +365,16 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
// Check if there has been a constant difference between the number of nodes in k8s and
// the number of nodes on the cloud provider side.
// TODO: andrewskim - add protection for ready AWS nodes.
fixedSomething, err := fixNodeGroupSize(autoscalingContext, a.clusterStateRegistry, currentTime)
if err != nil {
klog.Errorf("Failed to fix node group sizes: %v", err)
return errors.ToAutoscalerError(errors.CloudProviderError, err)
}
if fixedSomething {
klog.V(0).Infof("Some node group target size was fixed, skipping the iteration")
return nil
}
// NOTE: Commented this code as it removes `Registered but long not Ready` nodes which causes issues like scaling below minimum size and removing ready nodes during meltdown scenario
//fixedSomething, err := fixNodeGroupSize(autoscalingContext, a.clusterStateRegistry, currentTime)
//if err != nil {
// klog.Errorf("Failed to fix node group sizes: %v", err)
// return errors.ToAutoscalerError(errors.CloudProviderError, err)
//}
//if fixedSomething {
// klog.V(0).Infof("Some node group target size was fixed, skipping the iteration")
// return nil
//}

metrics.UpdateLastTime(metrics.Autoscaling, time.Now())

Expand Down

0 comments on commit fd54771

Please sign in to comment.