Skip to content

Commit

Permalink
handle edge case as described in issue #328
Browse files Browse the repository at this point in the history
  • Loading branch information
Rishabh Patel committed Nov 14, 2024
1 parent 0838a61 commit b10e0d9
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 52 deletions.
11 changes: 1 addition & 10 deletions cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,20 +225,11 @@ func (mcm *mcmCloudProvider) checkMCMAvailableReplicas() error {
// Refresh is called before every main loop and can be used to dynamically update cloud provider state.
// In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh().
func (mcm *mcmCloudProvider) Refresh() error {

err := mcm.checkMCMAvailableReplicas()
if err != nil {
return err
}

for _, machineDeployment := range mcm.machinedeployments {
err := mcm.mcmManager.resetPriorityForNotToBeDeletedMachines(machineDeployment.Name)
if err != nil {
klog.Errorf("failed to reset priority for machines in MachineDeployment %s, err: %v", machineDeployment.Name, err.Error())
return err
}
}
return nil
return mcm.mcmManager.Refresh()
}

// GPULabel returns the label added to nodes with GPU resource.
Expand Down
53 changes: 41 additions & 12 deletions cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestDeleteNodes(t *testing.T) {
},
},
{
"should not scale down when machine deployment update call times out",
"should not scale down when machine deployment update call times out and should reset priority of the corresponding machine",
setup{
nodes: newNodes(2, "fakeID", []bool{true, false}),
machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}, []bool{false, false}),
Expand All @@ -168,10 +168,10 @@ func TestDeleteNodes(t *testing.T) {
},
action{node: newNodes(1, "fakeID", []bool{true})[0]},
expect{
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{false}),
mdName: "machinedeployment-1",
mdReplicas: 2,
err: fmt.Errorf("unable to scale in machine deployment machinedeployment-1, Error: %v", mdUpdateErrorMsg),
err: errors.Join(nil, fmt.Errorf("unable to scale in machine deployment machinedeployment-1, Error: %w", errors.New(mdUpdateErrorMsg))),
},
},
{
Expand Down Expand Up @@ -332,13 +332,13 @@ func TestDeleteNodes(t *testing.T) {
flag := false
for _, entryMachineItem := range entry.expect.machines {
if entryMachineItem.Name == machine.Name {
g.Expect(machine.Annotations[priorityAnnotationKey]).To(Equal(entryMachineItem.Annotations[priorityAnnotationKey]))
g.Expect(machine.Annotations[machinePriorityAnnotation]).To(Equal(entryMachineItem.Annotations[machinePriorityAnnotation]))
flag = true
break
}
}
if !flag {
g.Expect(machine.Annotations[priorityAnnotationKey]).To(Equal("3"))
g.Expect(machine.Annotations[machinePriorityAnnotation]).To(Equal("3"))
}
}
})
Expand All @@ -357,7 +357,6 @@ func TestRefresh(t *testing.T) {
}
table := []data{
{

"should return an error if MCM has zero available replicas",
setup{
nodes: newNodes(1, "fakeID", []bool{false}),
Expand All @@ -371,7 +370,6 @@ func TestRefresh(t *testing.T) {
},
},
{

"should return an error if MCM deployment is not found",
setup{
nodes: newNodes(1, "fakeID", []bool{false}),
Expand All @@ -384,8 +382,7 @@ func TestRefresh(t *testing.T) {
},
},
{

"should reset priority of a machine with node without ToBeDeletedTaint to 3",
"should reset priority of a machine to 3 if machine deployment is not scaled in",
setup{
nodes: newNodes(1, "fakeID", []bool{false}),
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
Expand All @@ -399,14 +396,46 @@ func TestRefresh(t *testing.T) {
},
},
{
"should not reset priority of a machine to 3 if the node has ToBeDeleted taint",
"should reset priority of a machine to 3 if machine deployment is not scaled in even if ToBeDeletedTaint is present on the corresponding node",
setup{
nodes: newNodes(1, "fakeID", []bool{true}),
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
machineDeployments: newMachineDeployments(1, 1, nil, nil, nil),
nodeGroups: []string{nodeGroup2},
mcmDeployment: newMCMDeployment(1),
},
expect{
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{false}),
err: nil,
},
},
{
"should ignore terminating/failed machines in checking if number of annotated machines is more than desired",
setup{
nodes: newNodes(1, "fakeID", []bool{true}),
machines: newMachines(1, "fakeID", &v1alpha1.MachineStatus{
CurrentStatus: v1alpha1.CurrentStatus{Phase: v1alpha1.MachineFailed},
}, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
machineDeployments: newMachineDeployments(1, 1, nil, nil, nil),
nodeGroups: []string{nodeGroup2},
mcmDeployment: newMCMDeployment(1),
},
expect{
machines: newMachines(1, "fakeID", &v1alpha1.MachineStatus{
CurrentStatus: v1alpha1.CurrentStatus{Phase: v1alpha1.MachineFailed},
}, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
err: nil,
},
},
{
"should not reset priority of a machine to 3 if machine deployment is scaled in",
setup{
nodes: newNodes(1, "fakeID", []bool{true}),
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
machineDeployments: newMachineDeployments(1, 0, nil, nil, nil),
nodeGroups: []string{nodeGroup2},
mcmDeployment: newMCMDeployment(1),
},
expect{
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
err: nil,
Expand All @@ -428,7 +457,7 @@ func TestRefresh(t *testing.T) {
},
expect{
machines: []*v1alpha1.Machine{newMachine("machine-1", "fakeID-1", nil, "machinedeployment-1", "machineset-1", "1", false, true)},
err: errors.Join(fmt.Errorf("could not reset priority annotation on machine machine-1, Error: %v", mcUpdateErrorMsg)),
err: errors.Join(nil, errors.Join(fmt.Errorf("could not reset priority annotation on machine machine-1, Error: %v", mcUpdateErrorMsg))),
},
},
}
Expand Down Expand Up @@ -461,7 +490,7 @@ func TestRefresh(t *testing.T) {
for _, mc := range entry.expect.machines {
machine, err := m.machineClient.Machines(m.namespace).Get(context.TODO(), mc.Name, metav1.GetOptions{})
g.Expect(err).To(BeNil())
g.Expect(mc.Annotations[priorityAnnotationKey]).To(Equal(machine.Annotations[priorityAnnotationKey]))
g.Expect(mc.Annotations[machinePriorityAnnotation]).To(Equal(machine.Annotations[machinePriorityAnnotation]))
}
})
}
Expand Down
88 changes: 62 additions & 26 deletions cluster-autoscaler/cloudprovider/mcm/mcm_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"math/rand"
"net/http"
"os"
"slices"
"strconv"
"strings"
"time"
Expand All @@ -57,7 +58,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/client-go/discovery"
appsinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers"
Expand Down Expand Up @@ -402,9 +402,52 @@ func (m *McmManager) GetMachineDeploymentForMachine(machine *Ref) (*MachineDeplo
}, nil
}

// Refresh does nothing at the moment.
// Refresh method, for each machine deployment, will reset the priority of the machines if the number of annotated machines is more than desired.
// It will select the machines to reset the priority based on the descending order of creation timestamp.
func (m *McmManager) Refresh() error {
return nil
machineDeployments, err := m.machineDeploymentLister.MachineDeployments(m.namespace).List(labels.Everything())
if err != nil {
klog.Errorf("[Refresh] unable to list machine deployments")
return err
}
var collectiveError error
for _, machineDeployment := range machineDeployments {
// ignore the machine deployment if it is in rolling update
if !isRollingUpdateFinished(machineDeployment) {
klog.Infof("[Refresh] machine deployment %s is under rolling update, skipping", machineDeployment.Name)
continue
}
replicas := machineDeployment.Spec.Replicas
// check if number of annotated machine objects is more than desired and correspondingly reset the priority annotation value if needed.
machines, err := m.getMachinesForMachineDeployment(machineDeployment.Name)
if err != nil {
klog.Errorf("[Refresh] failed to get machines for machine deployment %s, hence skipping it. Err: %v", machineDeployment.Name, err.Error())
collectiveError = errors.Join(collectiveError, err)
continue
}
var annotatedMachines []*v1alpha1.Machine
for _, machine := range machines {
// no need to reset priority for machines already in termination or failed phase
if machine.Status.CurrentStatus.Phase == v1alpha1.MachineTerminating || machine.Status.CurrentStatus.Phase == v1alpha1.MachineFailed {
continue
}
if machine.Annotations != nil && machine.Annotations[machinePriorityAnnotation] != defaultPriorityValue {
annotatedMachines = append(annotatedMachines, machine)
}
}
if int(replicas) > len(machines)-len(annotatedMachines) {
slices.SortStableFunc(annotatedMachines, func(m1, m2 *v1alpha1.Machine) int {
return -m1.CreationTimestamp.Compare(m2.CreationTimestamp.Time)
})
diff := int(replicas) - len(machines) + len(annotatedMachines)
targetRefs := make([]*Ref, 0, diff)
for i := 0; i < min(diff, len(annotatedMachines)); i++ {
targetRefs = append(targetRefs, &Ref{Name: annotatedMachines[i].Name, Namespace: annotatedMachines[i].Namespace})
}
collectiveError = errors.Join(collectiveError, m.resetPriorityForMachines(targetRefs))
}
}
return collectiveError
}

// Cleanup does nothing at the moment.
Expand Down Expand Up @@ -440,7 +483,7 @@ func (m *McmManager) SetMachineDeploymentSize(ctx context.Context, machinedeploy
return true, err
}

// DeleteMachines deletes the Machines and also reduces the desired replicas of the MachineDeployment in parallel.
// DeleteMachines annotates the target machines and also reduces the desired replicas of the MachineDeployment.
func (m *McmManager) DeleteMachines(targetMachineRefs []*Ref) error {
if len(targetMachineRefs) == 0 {
return nil
Expand All @@ -459,7 +502,7 @@ func (m *McmManager) DeleteMachines(targetMachineRefs []*Ref) error {
return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", commonMachineDeployment.Name)
}
// update priorities of machines to be deleted except the ones already in termination to 1
scaleDownAmount, err := m.prioritizeMachinesForDeletion(targetMachineRefs, commonMachineDeployment.Name)
scaleDownAmount, err := m.prioritizeMachinesForDeletion(targetMachineRefs)
if err != nil {
return err
}
Expand All @@ -468,33 +511,26 @@ func (m *McmManager) DeleteMachines(targetMachineRefs []*Ref) error {
return m.scaleDownMachineDeployment(ctx, commonMachineDeployment.Name, scaleDownAmount)
}, "MachineDeployment", "update", commonMachineDeployment.Name)
if err != nil {
klog.Errorf("unable to scale in machine deployment %s, Error: %v", commonMachineDeployment.Name, err)
return fmt.Errorf("unable to scale in machine deployment %s, Error: %v", commonMachineDeployment.Name, err)
klog.Errorf("unable to scale in machine deployment %s, will reset priority of target machines, Error: %v", commonMachineDeployment.Name, err)
return errors.Join(err, m.resetPriorityForMachines(targetMachineRefs))
}
return nil
}

// resetPriorityForNotToBeDeletedMachines resets the priority of machines with nodes without ToBeDeleted taint to 3
func (m *McmManager) resetPriorityForNotToBeDeletedMachines(mdName string) error {
allMachinesForMachineDeployment, err := m.getMachinesForMachineDeployment(mdName)
if err != nil {
return fmt.Errorf("unable to list all machines for node group %s, Error: %v", mdName, err)
}
// resetPriorityForMachines resets the priority of machines passed in the argument to defaultPriorityValue
func (m *McmManager) resetPriorityForMachines(mcRefs []*Ref) error {
var collectiveError error
for _, machine := range allMachinesForMachineDeployment {
for _, mcRef := range mcRefs {
machine, err := m.machineLister.Machines(m.namespace).Get(mcRef.Name)
if err != nil {
collectiveError = errors.Join(collectiveError, fmt.Errorf("unable to get Machine object %s, Error: %v", mcRef, err))
continue
}
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(defaultResetAnnotationTimeout))
err := func() error {
err = func() error {
defer cancelFn()
val, ok := machine.Annotations[machinePriorityAnnotation]
if ok && val != defaultPriorityValue {
nodeName := machine.Labels[v1alpha1.NodeLabelKey]
node, err := m.nodeLister.Get(nodeName)
if err != nil && !kube_errors.IsNotFound(err) {
return fmt.Errorf("unable to get Node object %s for machine %s, Error: %v", nodeName, machine.Name, err)
} else if err == nil && taints.HasToBeDeletedTaint(node) {
// Don't update priority annotation if the taint is present on the node
return nil
}
_, err = m.updateAnnotationOnMachine(ctx, machine.Name, machinePriorityAnnotation, defaultPriorityValue)
return err
}
Expand All @@ -509,7 +545,7 @@ func (m *McmManager) resetPriorityForNotToBeDeletedMachines(mdName string) error
}

// prioritizeMachinesForDeletion prioritizes the targeted machines by updating their priority annotation to 1
func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*Ref, mdName string) (int, error) {
func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*Ref) (int, error) {
var expectedToTerminateMachineNodePairs = make(map[string]string)
for _, machineRef := range targetMachineRefs {
// Trying to update the priority of machineRef till m.maxRetryTimeout
Expand Down Expand Up @@ -561,7 +597,7 @@ func (m *McmManager) updateAnnotationOnMachine(ctx context.Context, mcName strin
}
_, err = m.machineClient.Machines(machine.Namespace).Update(ctx, clone, metav1.UpdateOptions{})
if err == nil {
klog.Infof("Machine %s marked with priority 1 successfully", mcName)
klog.Infof("Machine %s marked with priority %s successfully", mcName, val)
}
return true, err
}
Expand All @@ -585,7 +621,7 @@ func (m *McmManager) scaleDownMachineDeployment(ctx context.Context, mdName stri
mdclone.Spec.Replicas = expectedReplicas
_, err = m.machineClient.MachineDeployments(mdclone.Namespace).Update(ctx, mdclone, metav1.UpdateOptions{})
if err != nil {
return true, err
return true, fmt.Errorf("unable to scale in machine deployment %s, Error: %w", mdName, err)
}
klog.V(2).Infof("MachineDeployment %s size decreased to %d ", mdclone.Name, mdclone.Spec.Replicas)
return false, nil
Expand Down
7 changes: 3 additions & 4 deletions cluster-autoscaler/cloudprovider/mcm/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ import (
)

var (
testNamespace = "test-namespace"
priorityAnnotationKey = "machinepriority.machine.sapcloud.io"
testTaintValue = fmt.Sprint(time.Now().Unix())
testNamespace = "test-namespace"
testTaintValue = fmt.Sprint(time.Now().Unix())
)

func newMachineDeployments(
Expand Down Expand Up @@ -135,7 +134,7 @@ func newMachines(
{Name: msName},
},
Labels: map[string]string{machineDeploymentNameLabel: mdName},
Annotations: map[string]string{priorityAnnotationKey: priorityAnnotationValues[i]},
Annotations: map[string]string{machinePriorityAnnotation: priorityAnnotationValues[i]},
CreationTimestamp: metav1.Now(),
},
}
Expand Down

0 comments on commit b10e0d9

Please sign in to comment.