Skip to content

Commit

Permalink
addressed review comments and some refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
himanshu-kun committed Jan 23, 2023
1 parent 92972a3 commit efa357c
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 69 deletions.
2 changes: 1 addition & 1 deletion cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (machinedeployment *MachineDeployment) DecreaseTargetSize(delta int) error
if err != nil {
return err
}
if int(size+int64(delta)) < machinedeployment.minSize {
if int(size)+delta < machinedeployment.minSize {
klog.Warningf("Cannot go below min size= %d for machineDeployment %s, requested target size= %d . Setting target size to min size", machinedeployment.minSize, machinedeployment.Name, size+int64(delta))
return machinedeployment.mcmManager.SetMachineDeploymentSize(machinedeployment, int64(machinedeployment.minSize))
}
Expand Down
126 changes: 58 additions & 68 deletions cluster-autoscaler/cloudprovider/mcm/mcm_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,14 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
kubeletapis "k8s.io/kubelet/pkg/apis"
)

const (
operationWaitTimeout = 5 * time.Second
operationPollInterval = 100 * time.Millisecond
maxRecordsReturnedByAPI = 100
maxRetryDeadline = 1 * time.Minute
conflictRetryInterval = 5 * time.Second
minResyncPeriodDefault = 1 * time.Hour
maxRetryTimeout = 1 * time.Minute
conflictRetryInterval = 5 * time.Second
minResyncPeriodDefault = 1 * time.Hour
// machinePriorityAnnotation is the annotation to set machine priority while deletion
machinePriorityAnnotation = "machinepriority.machine.sapcloud.io"
// kindMachineClass is the kind for generic machine class used by the OOT providers
Expand Down Expand Up @@ -400,40 +397,21 @@ func (m *McmManager) GetMachineDeploymentSize(machinedeployment *MachineDeployme
return int64(md.Spec.Replicas), nil
}

func isRollingUpdateFinished(md *v1alpha1.MachineDeployment) bool {
for _, cond := range md.Status.Conditions {
switch {
case cond.Type == machineDeploymentProgressing && cond.Status == conditionTrue && cond.Reason == newISAvailableReason:
return true
case cond.Type == machineDeploymentProgressing:
return false
}
}
// no "Progressing" condition means the deployment has not undergone any rolling update yet
return true
}

// SetMachineDeploymentSize sets the desired size for the Machinedeployment.
func (m *McmManager) SetMachineDeploymentSize(machinedeployment *MachineDeployment, size int64) error {
md, err := m.getMachineDeploymentUntilTimeout(machinedeployment.Name, conflictRetryInterval, maxRetryTimeout)
if err != nil {
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", machinedeployment.Name, err)
return err
}

retryDeadline := time.Now().Add(maxRetryDeadline)
for {
md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(machinedeployment.Name)
if err != nil && time.Now().Before(retryDeadline) {
klog.Warningf("Unable to fetch MachineDeployment object %s, Error: %+v", machinedeployment.Name, err)
time.Sleep(conflictRetryInterval)
continue
} else if err != nil {
// Timeout occurred
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", machinedeployment.Name, err)
return err
}

// don't scale down during rolling update, as that could remove ready node with workload
if md.Spec.Replicas >= int32(size) && !isRollingUpdateFinished(md) {
return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", md.Name)
}
// don't scale down during rolling update, as that could remove ready node with workload
if md.Spec.Replicas >= int32(size) && !isRollingUpdateFinished(md) {
return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", md.Name)
}

retryDeadline := time.Now().Add(maxRetryTimeout)
for {
clone := md.DeepCopy()
clone.Spec.Replicas = int32(size)

Expand All @@ -454,21 +432,6 @@ func (m *McmManager) SetMachineDeploymentSize(machinedeployment *MachineDeployme
return nil
}

func (m *McmManager) getMachineDeploymentObjUntilDeadline(mdName string, retryDeadline time.Time) (*v1alpha1.MachineDeployment, error) {
md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(mdName)
if err != nil && time.Now().Before(retryDeadline) {
klog.Warningf("Unable to fetch MachineDeployment object %s, Error: %s", mdName, err)
time.Sleep(conflictRetryInterval)
return nil, nil
} else if err != nil {
// Timeout occurred
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", mdName, err)
return nil, err
}

return md, nil
}

// DeleteMachines deletes the Machines and also reduces the desired replicas of the Machinedeplyoment in parallel.
func (m *McmManager) DeleteMachines(machines []*Ref) error {
var (
Expand All @@ -494,26 +457,18 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error {
}
}

var md *v1alpha1.MachineDeployment
retryDeadline := time.Now().Add(maxRetryDeadline)
for {
md, err = m.getMachineDeploymentObjUntilDeadline(commonMachineDeployment.Name, retryDeadline)
if err != nil {
return err
} else if md == nil {
continue
}

if !isRollingUpdateFinished(md) {
return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", md.Name)
} else {
break
}
md, err := m.getMachineDeploymentUntilTimeout(commonMachineDeployment.Name, conflictRetryInterval, maxRetryTimeout)
if err != nil {
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err)
return err
}
if !isRollingUpdateFinished(md) {
return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", commonMachineDeployment.Name)
}

for _, machine := range machines {

retryDeadline := time.Now().Add(maxRetryDeadline)
retryDeadline := time.Now().Add(maxRetryTimeout)
for {
machine, err := m.machineLister.Machines(m.namespace).Get(machine.Name)
if err != nil && time.Now().Before(retryDeadline) {
Expand Down Expand Up @@ -560,6 +515,12 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error {
klog.Infof("Machine %s of machineDeployment %s marked with priority 1 successfully", machine.Name, md.Name)
}

// getting fresh copy of machineDeployment before updating
md, err = m.getMachineDeploymentUntilTimeout(commonMachineDeployment.Name, conflictRetryInterval, maxRetryTimeout)
if err != nil {
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err)
return err
}
for {
mdclone = md.DeepCopy()
if (int(mdclone.Spec.Replicas) - len(machines)) < 0 {
Expand All @@ -573,6 +534,7 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error {

mdclone.Spec.Replicas = expectedReplicas

retryDeadline := time.Now().Add(maxRetryTimeout)
_, err = m.machineClient.MachineDeployments(mdclone.Namespace).Update(context.TODO(), mdclone, metav1.UpdateOptions{})
if err != nil && time.Now().Before(retryDeadline) {
klog.Warningf("Unable to update MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err)
Expand Down Expand Up @@ -802,6 +764,34 @@ func (m *McmManager) GetMachineDeploymentNodeTemplate(machinedeployment *Machine
return nodeTmpl, nil
}

func isRollingUpdateFinished(md *v1alpha1.MachineDeployment) bool {
for _, cond := range md.Status.Conditions {
switch {
case cond.Type == machineDeploymentProgressing && cond.Status == conditionTrue && cond.Reason == newISAvailableReason:
return true
case cond.Type == machineDeploymentProgressing:
return false
}
}
// no "Progressing" condition means the deployment has not undergone any rolling update yet
return true
}

func (m *McmManager) getMachineDeploymentUntilTimeout(mdName string, retryInterval, timeout time.Duration) (*v1alpha1.MachineDeployment, error) {
var md *v1alpha1.MachineDeployment
err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) {
var err error
md, err = m.machineDeploymentLister.MachineDeployments(m.namespace).Get(mdName)
if err != nil {
klog.Warningf("Unable to fetch MachineDeployment object %s, Error: %s, will retry", mdName, err)
return false, nil
}

return true, nil
})
return md, err
}

func filterOutNodes(nodes []*v1.Node, instanceType string) []*v1.Node {
var filteredNodes []*v1.Node
for _, node := range nodes {
Expand Down

0 comments on commit efa357c

Please sign in to comment.