Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #160 on rel-v1.25: No scale down during rolling update #205

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,10 @@ func (machinedeployment *MachineDeployment) DecreaseTargetSize(delta int) error
if err != nil {
return err
}
if int(size)+delta < machinedeployment.minSize {
klog.Warningf("Cannot go below min size= %d for machineDeployment %s, requested target size= %d . Setting target size to min size", machinedeployment.minSize, machinedeployment.Name, size+int64(delta))
return machinedeployment.mcmManager.SetMachineDeploymentSize(machinedeployment, int64(machinedeployment.minSize))
}
return machinedeployment.mcmManager.SetMachineDeploymentSize(machinedeployment, size+int64(delta))
}

Expand Down
106 changes: 70 additions & 36 deletions cluster-autoscaler/cloudprovider/mcm/mcm_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,14 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
kubeletapis "k8s.io/kubelet/pkg/apis"
)

const (
operationWaitTimeout = 5 * time.Second
operationPollInterval = 100 * time.Millisecond
maxRecordsReturnedByAPI = 100
maxRetryDeadline = 1 * time.Minute
conflictRetryInterval = 5 * time.Second
minResyncPeriodDefault = 1 * time.Hour
maxRetryTimeout = 1 * time.Minute
conflictRetryInterval = 5 * time.Second
minResyncPeriodDefault = 1 * time.Hour
// machinePriorityAnnotation is the annotation to set machine priority while deletion
machinePriorityAnnotation = "machinepriority.machine.sapcloud.io"
// kindMachineClass is the kind for generic machine class used by the OOT providers
Expand All @@ -82,6 +79,13 @@ const (
machineGroup = "machine.sapcloud.io"
// machineGroup is the API version used to identify machine API group objects
machineVersion = "v1alpha1"
// machineDeploymentProgressing tells that deployment is progressing. Progress for a MachineDeployment is considered when a new machine set is created or adopted, and when new machines scale up or old machines scale down.
// Progress is not estimated for paused MachineDeployments. It is also updated if progressDeadlineSeconds is not specified(treated as infinite deadline), in which case it would never be updated to "false".
machineDeploymentProgressing v1alpha1.MachineDeploymentConditionType = "Progressing"
// newISAvailableReason is the reason in "Progressing" condition when machineDeployment rollout is complete
newISAvailableReason = "NewMachineSetAvailable"
// conditionTrue means the given condition status is true
conditionTrue v1alpha1.ConditionStatus = "True"
)

var (
Expand Down Expand Up @@ -248,9 +252,7 @@ func createMCMManagerInternal(discoveryOpts cloudprovider.NodeGroupDiscoveryOpti
}

// TODO: In general, any controller checking this needs to be dynamic so
//
// users don't have to restart their controller manager if they change the apiserver.
//
// users don't have to restart their controller manager if they change the apiserver.
// Until we get there, the structure here needs to be exposed for the construction of a proper ControllerContext.
func getAvailableResources(clientBuilder CoreClientBuilder) (map[schema.GroupVersionResource]bool, error) {
var discoveryClient discovery.DiscoveryInterface
Expand Down Expand Up @@ -397,20 +399,19 @@ func (m *McmManager) GetMachineDeploymentSize(machinedeployment *MachineDeployme

// SetMachineDeploymentSize sets the desired size for the Machinedeployment.
func (m *McmManager) SetMachineDeploymentSize(machinedeployment *MachineDeployment, size int64) error {
md, err := m.getMachineDeploymentUntilTimeout(machinedeployment.Name, conflictRetryInterval, maxRetryTimeout)
if err != nil {
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", machinedeployment.Name, err)
return err
}

retryDeadline := time.Now().Add(maxRetryDeadline)
for {
md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(machinedeployment.Name)
if err != nil && time.Now().Before(retryDeadline) {
klog.Warningf("Unable to fetch MachineDeployment object %s, Error: %+v", machinedeployment.Name, err)
time.Sleep(conflictRetryInterval)
continue
} else if err != nil {
// Timeout occurred
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", machinedeployment.Name, err)
return err
}
// don't scale down during rolling update, as that could remove ready node with workload
if md.Spec.Replicas >= int32(size) && !isRollingUpdateFinished(md) {
return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", md.Name)
}

retryDeadline := time.Now().Add(maxRetryTimeout)
for {
clone := md.DeepCopy()
clone.Spec.Replicas = int32(size)

Expand All @@ -433,7 +434,6 @@ func (m *McmManager) SetMachineDeploymentSize(machinedeployment *MachineDeployme

// DeleteMachines deletes the Machines and also reduces the desired replicas of the Machinedeplyoment in parallel.
func (m *McmManager) DeleteMachines(machines []*Ref) error {

var (
mdclone *v1alpha1.MachineDeployment
terminatingMachines []*v1alpha1.Machine
Expand All @@ -457,9 +457,18 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error {
}
}

md, err := m.getMachineDeploymentUntilTimeout(commonMachineDeployment.Name, conflictRetryInterval, maxRetryTimeout)
if err != nil {
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err)
return err
}
if !isRollingUpdateFinished(md) {
return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", commonMachineDeployment.Name)
}

for _, machine := range machines {

retryDeadline := time.Now().Add(maxRetryDeadline)
retryDeadline := time.Now().Add(maxRetryTimeout)
for {
machine, err := m.machineLister.Machines(m.namespace).Get(machine.Name)
if err != nil && time.Now().Before(retryDeadline) {
Expand Down Expand Up @@ -502,21 +511,17 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error {
// Break out of loop when update succeeds
break
}

klog.Infof("Machine %s of machineDeployment %s marked with priority 1 successfully", machine.Name, md.Name)
}

retryDeadline := time.Now().Add(maxRetryDeadline)
// getting fresh copy of machineDeployment before updating
md, err = m.getMachineDeploymentUntilTimeout(commonMachineDeployment.Name, conflictRetryInterval, maxRetryTimeout)
if err != nil {
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err)
return err
}
for {
md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(commonMachineDeployment.Name)
if err != nil && time.Now().Before(retryDeadline) {
klog.Warningf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err)
time.Sleep(conflictRetryInterval)
continue
} else if err != nil {
// Timeout occurred
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err)
return err
}

mdclone = md.DeepCopy()
if (int(mdclone.Spec.Replicas) - len(machines)) < 0 {
return fmt.Errorf("Unable to delete machine in MachineDeployment object %s , machine replicas are < 0 ", commonMachineDeployment.Name)
Expand All @@ -529,6 +534,7 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error {

mdclone.Spec.Replicas = expectedReplicas

retryDeadline := time.Now().Add(maxRetryTimeout)
_, err = m.machineClient.MachineDeployments(mdclone.Namespace).Update(context.TODO(), mdclone, metav1.UpdateOptions{})
if err != nil && time.Now().Before(retryDeadline) {
klog.Warningf("Unable to update MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err)
Expand Down Expand Up @@ -758,6 +764,34 @@ func (m *McmManager) GetMachineDeploymentNodeTemplate(machinedeployment *Machine
return nodeTmpl, nil
}

func isRollingUpdateFinished(md *v1alpha1.MachineDeployment) bool {
for _, cond := range md.Status.Conditions {
switch {
case cond.Type == machineDeploymentProgressing && cond.Status == conditionTrue && cond.Reason == newISAvailableReason:
return true
case cond.Type == machineDeploymentProgressing:
return false
}
}
// no "Progressing" condition means the deployment has not undergone any rolling update yet
return true
}

func (m *McmManager) getMachineDeploymentUntilTimeout(mdName string, retryInterval, timeout time.Duration) (*v1alpha1.MachineDeployment, error) {
var md *v1alpha1.MachineDeployment
err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) {
var err error
md, err = m.machineDeploymentLister.MachineDeployments(m.namespace).Get(mdName)
if err != nil {
klog.Warningf("Unable to fetch MachineDeployment object %s, Error: %s, will retry", mdName, err)
return false, nil
}

return true, nil
})
return md, err
}

func filterOutNodes(nodes []*v1.Node, instanceType string) []*v1.Node {
var filteredNodes []*v1.Node
for _, node := range nodes {
Expand Down
19 changes: 10 additions & 9 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,15 +378,16 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
// Check if there has been a constant difference between the number of nodes in k8s and
// the number of nodes on the cloud provider side.
// TODO: andrewskim - add protection for ready AWS nodes.
fixedSomething, err := fixNodeGroupSize(autoscalingContext, a.clusterStateRegistry, currentTime)
if err != nil {
klog.Errorf("Failed to fix node group sizes: %v", err)
return errors.ToAutoscalerError(errors.CloudProviderError, err)
}
if fixedSomething {
klog.V(0).Infof("Some node group target size was fixed, skipping the iteration")
return nil
}
// NOTE: Commented this code as it removes `Registered but long not Ready` nodes which causes issues like scaling below minimum size and removing ready nodes during meltdown scenario
//fixedSomething, err := fixNodeGroupSize(autoscalingContext, a.clusterStateRegistry, currentTime)
//if err != nil {
// klog.Errorf("Failed to fix node group sizes: %v", err)
// return errors.ToAutoscalerError(errors.CloudProviderError, err)
//}
//if fixedSomething {
// klog.V(0).Infof("Some node group target size was fixed, skipping the iteration")
// return nil
//}

metrics.UpdateLastTime(metrics.Autoscaling, time.Now())

Expand Down