Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #216: retry deadline respected #219

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 53 additions & 32 deletions cluster-autoscaler/cloudprovider/mcm/mcm_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ type nodeTemplate struct {
Taints []apiv1.Taint
}

type machineNameNodeNamePair struct {
machineName string
nodeName string
}

func init() {
controlBurst = flag.Int("control-apiserver-burst", rest.DefaultBurst, "Throttling burst configuration for the client to control cluster's apiserver.")
controlQPS = flag.Float64("control-apiserver-qps", float64(rest.DefaultQPS), "Throttling QPS configuration for the client to control cluster's apiserver.")
Expand Down Expand Up @@ -313,7 +318,7 @@ func CreateMcmManager(discoveryOpts cloudprovider.NodeGroupDiscoveryOptions) (*M
// GetMachineDeploymentForMachine returns the MachineDeployment for the Machine object.
func (m *McmManager) GetMachineDeploymentForMachine(machine *Ref) (*MachineDeployment, error) {
if machine.Name == "" {
//Considering the possibility when Machine has been deleted but due to cached Node object it appears here.
// Considering the possibility when Machine has been deleted but due to cached Node object it appears here.
return nil, fmt.Errorf("Node does not Exists")
}

Expand Down Expand Up @@ -342,7 +347,7 @@ func (m *McmManager) GetMachineDeploymentForMachine(machine *Ref) (*MachineDeplo
if len(machineSetObject.OwnerReferences) > 0 {
machineDeploymentName = machineSetObject.OwnerReferences[0].Name
} else {
return nil, fmt.Errorf("Unable to find parent MachineDeployment of given MachineSet object %s %+v", machineSetName, err)
return nil, fmt.Errorf("unable to find parent MachineDeployment of given MachineSet object %s %+v", machineSetName, err)
}

mcmRef := Ref{
Expand Down Expand Up @@ -399,7 +404,7 @@ func (m *McmManager) GetMachineDeploymentSize(machinedeployment *MachineDeployme

// SetMachineDeploymentSize sets the desired size for the Machinedeployment.
func (m *McmManager) SetMachineDeploymentSize(machinedeployment *MachineDeployment, size int64) error {
md, err := m.getMachineDeploymentUntilTimeout(machinedeployment.Name, conflictRetryInterval, maxRetryTimeout)
md, err := m.getMachineDeploymentUntilDeadline(machinedeployment.Name, conflictRetryInterval, time.Now().Add(maxRetryTimeout))
if err != nil {
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", machinedeployment.Name, err)
return err
Expand All @@ -412,12 +417,19 @@ func (m *McmManager) SetMachineDeploymentSize(machinedeployment *MachineDeployme

retryDeadline := time.Now().Add(maxRetryTimeout)
for {
// fetching fresh copy of machineDeployment
md, err := m.getMachineDeploymentUntilDeadline(machinedeployment.Name, conflictRetryInterval, retryDeadline)
if err != nil {
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", machinedeployment.Name, err)
return err
}

clone := md.DeepCopy()
clone.Spec.Replicas = int32(size)

_, err = m.machineClient.MachineDeployments(machinedeployment.Namespace).Update(context.TODO(), clone, metav1.UpdateOptions{})
if err != nil && time.Now().Before(retryDeadline) {
klog.Warningf("Unable to update MachineDeployment object %s, Error: %+v", machinedeployment.Name, err)
klog.Warningf("Unable to update MachineDeployment object %s, Error: %+v , will retry in %s", machinedeployment.Name, err, conflictRetryInterval)
time.Sleep(conflictRetryInterval)
continue
} else if err != nil {
Expand All @@ -435,8 +447,9 @@ func (m *McmManager) SetMachineDeploymentSize(machinedeployment *MachineDeployme
// DeleteMachines deletes the Machines and also reduces the desired replicas of the Machinedeplyoment in parallel.
func (m *McmManager) DeleteMachines(machines []*Ref) error {
var (
mdclone *v1alpha1.MachineDeployment
terminatingMachines []*v1alpha1.Machine
mdclone *v1alpha1.MachineDeployment
terminatingMachines []*v1alpha1.Machine
expectedToTerminateMachineNodePairs []machineNameNodeNamePair
)

if len(machines) == 0 {
Expand All @@ -453,11 +466,11 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error {
return err
}
if machinedeployment.Name != commonMachineDeployment.Name {
return fmt.Errorf("Cannot delete machines which don't belong to the same MachineDeployment")
return fmt.Errorf("cannot delete machines which don't belong to the same MachineDeployment")
}
}

md, err := m.getMachineDeploymentUntilTimeout(commonMachineDeployment.Name, conflictRetryInterval, maxRetryTimeout)
md, err := m.getMachineDeploymentUntilDeadline(commonMachineDeployment.Name, conflictRetryInterval, time.Now().Add(maxRetryTimeout))
if err != nil {
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err)
return err
Expand All @@ -468,11 +481,12 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error {

for _, machine := range machines {

// Trying to update the priority of machine till retryDeadline
retryDeadline := time.Now().Add(maxRetryTimeout)
for {
machine, err := m.machineLister.Machines(m.namespace).Get(machine.Name)
if err != nil && time.Now().Before(retryDeadline) {
klog.Warningf("Unable to fetch Machine object %s, Error: %s", machine.Name, err)
klog.Warningf("Unable to fetch Machine object %s, Error: %s , will retry in %s", machine.Name, err, conflictRetryInterval)
time.Sleep(conflictRetryInterval)
continue
} else if err != nil {
Expand All @@ -485,7 +499,10 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error {

if isMachineTerminating(mclone) {
terminatingMachines = append(terminatingMachines, mclone)
} else {
expectedToTerminateMachineNodePairs = append(expectedToTerminateMachineNodePairs, machineNameNodeNamePair{mclone.Name, mclone.Labels["node"]})
}

if mclone.Annotations != nil {
if mclone.Annotations[machinePriorityAnnotation] == "1" {
klog.Infof("Machine %q priority is already set to 1, hence skipping the update", machine.Name)
Expand All @@ -499,7 +516,7 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error {

_, err = m.machineClient.Machines(machine.Namespace).Update(context.TODO(), mclone, metav1.UpdateOptions{})
if err != nil && time.Now().Before(retryDeadline) {
klog.Warningf("Unable to update Machine object %s, Error: %s", machine.Name, err)
klog.Warningf("Unable to update Machine object %s, Error: %s , will retry in %s", machine.Name, err, conflictRetryInterval)
time.Sleep(conflictRetryInterval)
continue
} else if err != nil {
Expand All @@ -515,13 +532,16 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error {
klog.Infof("Machine %s of machineDeployment %s marked with priority 1 successfully", machine.Name, md.Name)
}

// getting fresh copy of machineDeployment before updating
md, err = m.getMachineDeploymentUntilTimeout(commonMachineDeployment.Name, conflictRetryInterval, maxRetryTimeout)
if err != nil {
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err)
return err
}
// Trying to update the machineDeployment till the retryDeadline
retryDeadline := time.Now().Add(maxRetryTimeout)
for {
// fetch fresh copy of machineDeployment
md, err = m.getMachineDeploymentUntilDeadline(commonMachineDeployment.Name, conflictRetryInterval, retryDeadline)
if err != nil {
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err)
return err
}

mdclone = md.DeepCopy()
if (int(mdclone.Spec.Replicas) - len(machines)) < 0 {
return fmt.Errorf("Unable to delete machine in MachineDeployment object %s , machine replicas are < 0 ", commonMachineDeployment.Name)
Expand All @@ -534,23 +554,22 @@ func (m *McmManager) DeleteMachines(machines []*Ref) error {

mdclone.Spec.Replicas = expectedReplicas

retryDeadline := time.Now().Add(maxRetryTimeout)
_, err = m.machineClient.MachineDeployments(mdclone.Namespace).Update(context.TODO(), mdclone, metav1.UpdateOptions{})
if err != nil && time.Now().Before(retryDeadline) {
klog.Warningf("Unable to update MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err)
klog.Warningf("Unable to update MachineDeployment object %s, Error: %s , will retry in %s", commonMachineDeployment.Name, err, conflictRetryInterval)
time.Sleep(conflictRetryInterval)
continue
} else if err != nil {
// Timeout occurred
klog.Errorf("Unable to update MachineDeployment object %s, Error: %s", commonMachineDeployment.Name, err)
klog.Errorf("Unable to update MachineDeployment object %s, Error: %s , timeout occurred", commonMachineDeployment.Name, err)
return err
}

// Break out of loop when update succeeds
break
}

klog.V(2).Infof("MachineDeployment %s size decreased to %d", commonMachineDeployment.Name, mdclone.Spec.Replicas)
klog.V(2).Infof("MachineDeployment %s size decreased to %d , should remove following {machine, corresponding node} pairs %s ", commonMachineDeployment.Name, mdclone.Spec.Replicas, expectedToTerminateMachineNodePairs)

return nil
}
Expand Down Expand Up @@ -777,19 +796,21 @@ func isRollingUpdateFinished(md *v1alpha1.MachineDeployment) bool {
return true
}

func (m *McmManager) getMachineDeploymentUntilTimeout(mdName string, retryInterval, timeout time.Duration) (*v1alpha1.MachineDeployment, error) {
var md *v1alpha1.MachineDeployment
err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) {
var err error
md, err = m.machineDeploymentLister.MachineDeployments(m.namespace).Get(mdName)
if err != nil {
klog.Warningf("Unable to fetch MachineDeployment object %s, Error: %s, will retry", mdName, err)
return false, nil
// getMachineDeploymentUntilDeadline returns error only when fetching the machineDeployment has been failing consequently and deadline is crossed
func (m *McmManager) getMachineDeploymentUntilDeadline(mdName string, retryInterval time.Duration, deadline time.Time) (*v1alpha1.MachineDeployment, error) {
for {
md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(mdName)
if err != nil && time.Now().Before(deadline) {
klog.Warningf("Unable to fetch MachineDeployment object %s, Error: %s, will retry in %s", mdName, err, retryInterval)
time.Sleep(conflictRetryInterval)
continue
} else if err != nil {
// Timeout occurred
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %s, timeout occurred", mdName, err)
return nil, err
}

return true, nil
})
return md, err
return md, nil
}
}

func filterOutNodes(nodes []*v1.Node, instanceType string) []*v1.Node {
Expand Down
6 changes: 3 additions & 3 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,14 @@ func (a *StaticAutoscaler) cleanUpIfRequired() {
}

// CA can die at any time. Removing taints that might have been left from the previous run.
if readyNodes, err := a.ReadyNodeLister().List(); err != nil {
if allNodes, err := a.AllNodeLister().List(); err != nil {
klog.Errorf("Failed to list ready nodes, not cleaning up taints: %v", err)
} else {
deletetaint.CleanAllToBeDeleted(readyNodes,
deletetaint.CleanAllToBeDeleted(allNodes,
a.AutoscalingContext.ClientSet, a.Recorder, a.CordonNodeBeforeTerminate)
if a.AutoscalingContext.AutoscalingOptions.MaxBulkSoftTaintCount == 0 {
// Clean old taints if soft taints handling is disabled
deletetaint.CleanAllDeletionCandidates(readyNodes,
deletetaint.CleanAllDeletionCandidates(allNodes,
a.AutoscalingContext.ClientSet, a.Recorder)
}
}
Expand Down
8 changes: 8 additions & 0 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,14 @@ func main() {

klog.V(1).Infof("Cluster Autoscaler %s", version.ClusterAutoscalerVersion)

// FORK-CHANGE: logging version of g/autoscaler as well
gardenerversion, err := os.ReadFile("../VERSION")
if err != nil {
klog.Warningf("Error reading gardener autoscaler version, err: %s", err)
} else {
klog.V(1).Infof("Gardener Cluster Autoscaler %s", gardenerversion)
}

go func() {
pathRecorderMux := mux.NewPathRecorderMux("cluster-autoscaler")
defaultMetricsHandler := legacyregistry.Handler().ServeHTTP
Expand Down