Skip to content

Commit

Permalink
Automated cherry pick of #216: retry deadline respected (#217)
Browse files Browse the repository at this point in the history
* retry deadline respected

* reverted retryTimeout changed for testing

* updated getMachineDeploymentUntilDeadline fn implementation and docstring

* fixed unwanted rebase changes

* Modifying taint removal logic on startup to consider all nodes instead of ready nodes.

---------

Co-authored-by: Clint Fooken <clfooken@ea.com>
  • Loading branch information
himanshu-kun and fookenc authored May 9, 2023
1 parent 5d1feb7 commit 1292c60
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 35 deletions.
85 changes: 53 additions & 32 deletions cluster-autoscaler/cloudprovider/mcm/mcm_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ type nodeTemplate struct {
Taints []apiv1.Taint
}

type machineNameNodeNamePair struct {
machineName string
nodeName string
}

func init() {
controlBurst = flag.Int("control-apiserver-burst", rest.DefaultBurst, "Throttling burst configuration for the client to control cluster's apiserver.")
controlQPS = flag.Float64("control-apiserver-qps", float64(rest.DefaultQPS), "Throttling QPS configuration for the client to control cluster's apiserver.")
Expand Down Expand Up @@ -313,7 +318,7 @@ func CreateMcmManager(discoveryOpts cloudprovider.NodeGroupDiscoveryOptions) (*M
// GetMachineDeploymentForMachine returns the MachineDeployment for the Machine object.
func (m *McmManager) GetMachineDeploymentForMachine(machine *Ref) (*MachineDeployment, error) {
if machine.Name == "" {
//Considering the possibility when Machine has been deleted but due to cached Node object it appears here.
// Considering the possibility when Machine has been deleted but due to cached Node object it appears here.
return nil, fmt.Errorf("Node does not Exists")
}

Expand Down Expand Up @@ -342,7 +347,7 @@ func (m *McmManager) GetMachineDeploymentForMachine(machine *Ref) (*MachineDeplo
if len(machineSetObject.OwnerReferences) > 0 {
machineDeploymentName = machineSetObject.OwnerReferences[0].Name
} else {
return nil, fmt.Errorf("Unable to find parent MachineDeployment of given MachineSet object %s %+v", machineSetName, err)
return nil, fmt.Errorf("unable to find parent MachineDeployment of given MachineSet object %s %+v", machineSetName, err)
}

mcmRef := Ref{
Expand Down Expand Up @@ -399,7 +404,7 @@ func (m *McmManager) GetMachineDeploymentSize(machinedeployment *MachineDeployme

// SetMachineDeploymentSize sets the desired size for the Machinedeployment.
func (m *McmManager) SetMachineDeploymentSize(machinedeployment *MachineDeployment, size int64) error {
md, err := m.getMachineDeploymentUntilTimeout(machinedeployment.Name, conflictRetryInterval, maxRetryTimeout)
md, err := m.getMachineDeploymentUntilDeadline(machinedeployment.Name, conflictRetryInterval, time.Now().Add(maxRetryTimeout))
if err != nil {
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", machinedeployment.Name, err)
return err
Expand All @@ -412,12 +417,19 @@ func (m *McmManager) SetMachineDeploymentSize(machinedeployment *MachineDeployme

retryDeadline := time.Now().Add(maxRetryTimeout)
for {
// fetching fresh copy of machineDeployment
md, err := m.getMachineDeploymentUntilDeadline(machinedeployment.Name, conflictRetryInterval, retryDeadline)
if err != nil {
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", machinedeployment.Name, err)
return err
}

clone := md.DeepCopy()
clone.Spec.Replicas = int32(size)

_, err = m.machineClient.MachineDeployments(machinedeployment.Namespace).Update(context.TODO(), clone, metav1.UpdateOptions{})
if err != nil && time.Now().Before(retryDeadline) {
klog.Warningf("Unable to update MachineDeployment object %s, Error: %+v", machinedeployment.Name, err)
klog.Warningf("Unable to update MachineDeployment object %s, Error: %+v , will retry in %s", machinedeployment.Name, err, conflictRetryInterval)
time.Sleep(conflictRetryInterval)
continue
} else if err != nil {
Expand All @@ -435,8 +447,9 @@ func (m *McmManager) SetMachineDeploymentSize(machinedeployment *MachineDeployme
// DeleteMachines deletes the Machines and also reduces the desired replicas of the Machinedeplyoment in parallel.
func (m *McmManager) DeleteMachines(machines []*Ref) error {
var (
mdclone *v1alpha1.MachineDeployment
terminatingMachines []*v1alpha1.Machine
mdclone *v1alpha1.MachineDeployment
terminatingMachines []*v1alpha1.Machine
expectedToTerminateMachineNodePairs []machineNameNodeNamePair
)

if len(machines) == 0 {
Expand All @@ -453,11 +466,11 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error {
return err
}
if machinedeployment.Name != commonMachineDeployment.Name {
return fmt.Errorf("Cannot delete machines which don't belong to the same MachineDeployment")
return fmt.Errorf("cannot delete machines which don't belong to the same MachineDeployment")
}
}

md, err := m.getMachineDeploymentUntilTimeout(commonMachineDeployment.Name, conflictRetryInterval, maxRetryTimeout)
md, err := m.getMachineDeploymentUntilDeadline(commonMachineDeployment.Name, conflictRetryInterval, time.Now().Add(maxRetryTimeout))
if err != nil {
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err)
return err
Expand All @@ -468,11 +481,12 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error {

for _, machine := range machines {

// Trying to update the priority of machine till retryDeadline
retryDeadline := time.Now().Add(maxRetryTimeout)
for {
machine, err := m.machineLister.Machines(m.namespace).Get(machine.Name)
if err != nil && time.Now().Before(retryDeadline) {
klog.Warningf("Unable to fetch Machine object %s, Error: %s", machine.Name, err)
klog.Warningf("Unable to fetch Machine object %s, Error: %s , will retry in %s", machine.Name, err, conflictRetryInterval)
time.Sleep(conflictRetryInterval)
continue
} else if err != nil {
Expand All @@ -485,7 +499,10 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error {

if isMachineTerminating(mclone) {
terminatingMachines = append(terminatingMachines, mclone)
} else {
expectedToTerminateMachineNodePairs = append(expectedToTerminateMachineNodePairs, machineNameNodeNamePair{mclone.Name, mclone.Labels["node"]})
}

if mclone.Annotations != nil {
if mclone.Annotations[machinePriorityAnnotation] == "1" {
klog.Infof("Machine %q priority is already set to 1, hence skipping the update", machine.Name)
Expand All @@ -499,7 +516,7 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error {

_, err = m.machineClient.Machines(machine.Namespace).Update(context.TODO(), mclone, metav1.UpdateOptions{})
if err != nil && time.Now().Before(retryDeadline) {
klog.Warningf("Unable to update Machine object %s, Error: %s", machine.Name, err)
klog.Warningf("Unable to update Machine object %s, Error: %s , will retry in %s", machine.Name, err, conflictRetryInterval)
time.Sleep(conflictRetryInterval)
continue
} else if err != nil {
Expand All @@ -515,13 +532,16 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error {
klog.Infof("Machine %s of machineDeployment %s marked with priority 1 successfully", machine.Name, md.Name)
}

// getting fresh copy of machineDeployment before updating
md, err = m.getMachineDeploymentUntilTimeout(commonMachineDeployment.Name, conflictRetryInterval, maxRetryTimeout)
if err != nil {
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err)
return err
}
// Trying to update the machineDeployment till the retryDeadline
retryDeadline := time.Now().Add(maxRetryTimeout)
for {
// fetch fresh copy of machineDeployment
md, err = m.getMachineDeploymentUntilDeadline(commonMachineDeployment.Name, conflictRetryInterval, retryDeadline)
if err != nil {
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err)
return err
}

mdclone = md.DeepCopy()
if (int(mdclone.Spec.Replicas) - len(machines)) < 0 {
return fmt.Errorf("Unable to delete machine in MachineDeployment object %s , machine replicas are < 0 ", commonMachineDeployment.Name)
Expand All @@ -534,23 +554,22 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error {

mdclone.Spec.Replicas = expectedReplicas

retryDeadline := time.Now().Add(maxRetryTimeout)
_, err = m.machineClient.MachineDeployments(mdclone.Namespace).Update(context.TODO(), mdclone, metav1.UpdateOptions{})
if err != nil && time.Now().Before(retryDeadline) {
klog.Warningf("Unable to update MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err)
klog.Warningf("Unable to update MachineDeployment object %s, Error: %s , will retry in %s", commonMachineDeployment.Name, err, conflictRetryInterval)
time.Sleep(conflictRetryInterval)
continue
} else if err != nil {
// Timeout occurred
klog.Errorf("Unable to update MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err)
klog.Errorf("Unable to update MachineDeployment object %s, Error: %s , timeout occurred", commonMachineDeployment.Name, err)
return err
}

// Break out of loop when update succeeds
break
}

klog.V(2).Infof("MachineDeployment %s size decreased to %d", commonMachineDeployment.Name, mdclone.Spec.Replicas)
klog.V(2).Infof("MachineDeployment %s size decreased to %d , should remove following {machine, corresponding node} pairs %s ", commonMachineDeployment.Name, mdclone.Spec.Replicas, expectedToTerminateMachineNodePairs)

return nil
}
Expand Down Expand Up @@ -777,19 +796,21 @@ func isRollingUpdateFinished(md *v1alpha1.MachineDeployment) bool {
return true
}

func (m *McmManager) getMachineDeploymentUntilTimeout(mdName string, retryInterval, timeout time.Duration) (*v1alpha1.MachineDeployment, error) {
var md *v1alpha1.MachineDeployment
err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) {
var err error
md, err = m.machineDeploymentLister.MachineDeployments(m.namespace).Get(mdName)
if err != nil {
klog.Warningf("Unable to fetch MachineDeployment object %s, Error: %s, will retry", mdName, err)
return false, nil
// getMachineDeploymentUntilDeadline returns error only when fetching the machineDeployment has been failing consequently and deadline is crossed
func (m *McmManager) getMachineDeploymentUntilDeadline(mdName string, retryInterval time.Duration, deadline time.Time) (*v1alpha1.MachineDeployment, error) {
for {
md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(mdName)
if err != nil && time.Now().Before(deadline) {
klog.Warningf("Unable to fetch MachineDeployment object %s, Error: %s, will retry in %s", mdName, err, retryInterval)
time.Sleep(conflictRetryInterval)
continue
} else if err != nil {
// Timeout occurred
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s, timeout occurred", mdName, err)
return nil, err
}

return true, nil
})
return md, err
return md, nil
}
}

func filterOutNodes(nodes []*v1.Node, instanceType string) []*v1.Node {
Expand Down
6 changes: 3 additions & 3 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,14 @@ func (a *StaticAutoscaler) cleanUpIfRequired() {
}

// CA can die at any time. Removing taints that might have been left from the previous run.
if readyNodes, err := a.ReadyNodeLister().List(); err != nil {
if allNodes, err := a.AllNodeLister().List(); err != nil {
klog.Errorf("Failed to list ready nodes, not cleaning up taints: %v", err)
} else {
deletetaint.CleanAllToBeDeleted(readyNodes,
deletetaint.CleanAllToBeDeleted(allNodes,
a.AutoscalingContext.ClientSet, a.Recorder, a.CordonNodeBeforeTerminate)
if a.AutoscalingContext.AutoscalingOptions.MaxBulkSoftTaintCount == 0 {
// Clean old taints if soft taints handling is disabled
deletetaint.CleanAllDeletionCandidates(readyNodes,
deletetaint.CleanAllDeletionCandidates(allNodes,
a.AutoscalingContext.ClientSet, a.Recorder)
}
}
Expand Down
8 changes: 8 additions & 0 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,14 @@ func main() {

klog.V(1).Infof("Cluster Autoscaler %s", version.ClusterAutoscalerVersion)

// FORK-CHANGE: logging version of g/autoscaler as well
gardenerversion, err := os.ReadFile("../VERSION")
if err != nil {
klog.Warningf("Error reading gardener autoscaler version, err: %s", err)
} else {
klog.V(1).Infof("Gardener Cluster Autoscaler %s", gardenerversion)
}

go func() {
pathRecorderMux := mux.NewPathRecorderMux("cluster-autoscaler")
defaultMetricsHandler := legacyregistry.Handler().ServeHTTP
Expand Down

0 comments on commit 1292c60

Please sign in to comment.