-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Race conditions in Targeted Deletion of machines by CA #341
base: machine-controller-manager-provider
Are you sure you want to change the base?
Fix Race conditions in Targeted Deletion of machines by CA #341
Conversation
klog.Infof("machine deployment %s is under rolling update, skipping", machineDeployment.Name) | ||
return nil | ||
} | ||
markedMachines := sets.New(strings.Split(mcd.Annotations[machinesMarkedByCAForDeletion], ",")...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move this to a small function mcm.getMachinesMarkedByCAForDeletion(mcd) (machineNames sets.Set[string])
which is unit testable and can be consumed by both the mcm_cloud_provider.go
and mcm_manager.go
klog.Errorf("[Refresh] failed to get machines for machine deployment %s, hence skipping it. Err: %v", machineDeployment.Name, err.Error()) | ||
return err | ||
} | ||
var incorrectlyMarkedMachines []*Ref |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we can omit the Ref
struct and just used types.NamespacedName
which is already being used anyways in this file
@@ -539,12 +567,17 @@ func buildMachineDeploymentFromSpec(value string, mcmManager *McmManager) (*Mach | |||
return machinedeployment, nil | |||
} | |||
|
|||
func getMachinesMarkedByCAForDeletion(mcd *v1alpha1.MachineDeployment) sets.Set[string] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps using the "comma, ok" idiom here for mcd.Annotations[machinesMarkedByCAForDeletion]
would be good for clarity to avoid unnecessary split of empty string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it allowed to read from a nil map? Only writes will cause panic, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getMachinesMarkedByCAForDeletion
->getMachineNamesMarkedByCAForDeletion
// ignore the machine deployment if it is in rolling update | ||
if !isRollingUpdateFinished(mcd) { | ||
klog.Infof("machine deployment %s is under rolling update, skipping", machineDeployment.Name) | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we return an error
here ? We are doing it for other cases of !isRollingUpdateFinished
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should remove this check. Even if a machineDeployment is under rolling update, we should allow the annotation update if needed. wdyt?
// addNodeGroup adds node group defined in string spec. Format: | ||
// minNodes:maxNodes:namespace.machineDeploymentName | ||
func (m *McmManager) addNodeGroup(spec string) error { | ||
machineDeployment, err := buildMachineDeploymentFromSpec(spec, m) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
buildMachineDeploymentFromSpec
should also be moved to mcm_manager.go
.
func (machineDeployment *MachineDeployment) Refresh() error { | ||
machineDeployment.scalingMutex.Lock() | ||
defer machineDeployment.scalingMutex.Unlock() | ||
mcd, err := machineDeployment.mcmManager.machineDeploymentLister.MachineDeployments(machineDeployment.Namespace).Get(machineDeployment.Name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is repeated nearly a dozen times everywhere including common error handling: machineDeployment.mcmManager.machineDeploymentLister.MachineDeployments(machineDeployment.Namespace).Get(machineDeployment.Name)
. Move to a method in mcmManager
called GetMachineDeploymentResource
which returns a formatted error that can simply be returned, so that error message is fully consistent with all uses. we are already having methods like mcmManager.getMachinesForMachineDeployment
so this matches the existing convention.
if machine.Status.CurrentStatus.Phase == v1alpha1.MachineTerminating || machine.Status.CurrentStatus.Phase == v1alpha1.MachineFailed { | ||
continue | ||
} | ||
if annotValue, ok := machine.Annotations[machinePriorityAnnotation]; ok && annotValue == priorityValueForCandidateMachines && !markedMachines.Has(machine.Name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, Why is this called priorityValueForCandidateMachines
? Shouldn't it be defined as const PriorityDeletionValueForCandidateMachine = 1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can rename it to PriorityValueForDeletionCandidateMachine
. wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should also add a comment here to explain what we are doing so next guy making a patch doesn't scratch his head.
} | ||
} | ||
clone := mcd.DeepCopy() | ||
clone.Annotations[machinesMarkedByCAForDeletion] = strings.Join(updatedMarkedMachines, ",") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This strings.Join
to construct the annotation repeated elsewhere. Make a utility method called getMarkedForDeletionAnnotationValue(machineNames []string) string
klog.Errorf("[Refresh] failed to get machines for machine deployment %s, hence skipping it. Err: %v", machineDeployment.Name, err.Error()) | ||
return err | ||
} | ||
var incorrectlyMarkedMachines []*types.NamespacedName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor (need not correct): types.NamespacedName
is a simple struct - a value object. Modern programming practices discourages use of pointers to value objects for keeping data inline for processor cache performance. Pointers should be used only for structs holding active resource (like locks/files/etc) or class/service objects. Using a simple []types.NamespacedName
and using types.NamespacedName
instead of *types.NamespacedName
should be followed for newer code.
@@ -539,12 +567,17 @@ func buildMachineDeploymentFromSpec(value string, mcmManager *McmManager) (*Mach | |||
return machinedeployment, nil | |||
} | |||
|
|||
func getMachinesMarkedByCAForDeletion(mcd *v1alpha1.MachineDeployment) sets.Set[string] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Name function to getMachineNamesMarkedByCAForDeletion
@@ -508,27 +474,32 @@ func (m *McmManager) DeleteMachines(targetMachineRefs []*Ref) error { | |||
if !isRollingUpdateFinished(md) { | |||
return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", commonMachineDeployment.Name) | |||
} | |||
markedMachines := getMachinesMarkedByCAForDeletion(md) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
markedMachines
-> machineNamesMarkedByCA
incorrectlyMarkedMachines = append(incorrectlyMarkedMachines, &types.NamespacedName{Name: machine.Name, Namespace: machine.Namespace}) | ||
} | ||
} | ||
var updatedMarkedMachines []string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updatedMarkedMachines
-> updatedMarkedMachineNames
. Also please add a comment explaining that we do to ensure that annotation do not have non-existent machine names.
} | ||
} | ||
var updatedMarkedMachines []string | ||
for machineName := range markedMachines { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: Technically, you can create allMachineNames
and use Set.Intersection
here - which is cheaper than nested for loop, but its OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK k8s.io/apimachinery/pkg/util/sets
does not have an Intersection method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes but it is not part of the apimachinery version we currently use. Anyways, we do not need sets, I have replaced them with slices.
collectiveError = errors.Join(collectiveError, m.resetPriorityForMachines(targetRefs)) | ||
} | ||
for _, machineDeployment := range m.machineDeployments { | ||
collectiveError = errors.Join(collectiveError, machineDeployment.Refresh()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-optimal use of errors.Join
which will create repeated struct instances. errors.Join
creates a new error object by combining multiple errors. If used in a loop, it allocates memory each time it's called. The right usage is first make a var errs []error
outside the loop, append individual errors into this slice within the loop and then use errors.Join
outside the loop to create the final error.
// update priorities of machines to be deleted except the ones already in termination to 1 | ||
scaleDownAmount, err := m.prioritizeMachinesForDeletion(targetMachineRefs) | ||
machinesWithPrio1, err := m.prioritizeMachinesForDeletion(targetMachineRefs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
machinesWithPrio1
->machineNamesWithPrio1
var incorrectlyMarkedMachines []*types.NamespacedName | ||
for _, machine := range machines { | ||
// no need to reset priority for machines already in termination or failed phase | ||
if machine.Status.CurrentStatus.Phase == v1alpha1.MachineTerminating || machine.Status.CurrentStatus.Phase == v1alpha1.MachineFailed { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we already have a utiltiy method isMachineFailedOrTerminating
. Is that not suitable or make it suitable ?
if err != nil { | ||
return err | ||
} | ||
markedMachines.Insert(machinesWithPrio1...) | ||
// Trying to update the machineDeployment till the deadline | ||
err = m.retry(func(ctx context.Context) (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please consider whether one of the apimachinery wait.Poll*
utility functions can be used or whether retry
can delete to one of the apimachinery wait.Poll*
functions. wait.PollUntilContextTimeout
seems suitable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are using our retry since wait.PollUntilContextTimeout
does not give logs. We were able to debug the issue thanks to these logs so I think we should still keep it
var expectedToTerminateMachineNodePairs = make(map[string]string) | ||
var machinesMarkedWithPrio1 []string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
machinesMarkedWithPrio1
-> prio1MarkedMachineNames
. (optionally, move this variable to a named return value on this method)
MINOR: Since you are already having a map expectedToTerminateMachineNodePairs
which should be called prio1MarkedMachineNodeNamePairs
, you don't need another slice. You can use Go's maps.Keys(prio1MarkedMachineNodeNamePairs).Collect()
as return value instead of separate heap allocation for duplicate slice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maps.Keys was introduced in go 1.23 - https://pkg.go.dev/maps@go1.22.10 does not have Keys function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm...my bad this function was added later to the package which was newly introduced in Go 1.21
if kube_errors.IsNotFound(err) { | ||
klog.Warningf("Machine %s not found, skipping resetting priority annotation", mcRef.Name) | ||
continue | ||
} | ||
if err != nil { | ||
collectiveError = errors.Join(collectiveError, fmt.Errorf("unable to get Machine object %s, Error: %v", mcRef, err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same point regarding errors.Join
- should be invoked just once outside loop to avoid repeated mem alloc inside for loop.
@rishabh-11 thanks for PR. Added review comments. Can you make a list of all test-cases that you can think of - including success, failures, crashes etc ? I will test them out and attach test log. |
This PR is currently under review/testing. will update with further details later. |
} | ||
|
||
// MinSize returns minimum size of the node group. | ||
func (machinedeployment *MachineDeployment) MinSize() int { | ||
return machinedeployment.minSize | ||
func (machineDeployment *MachineDeployment) MinSize() int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is usually a practice to use short names. In this case you can perhaps just use:
func (md *MachineDeployment) MinSize {}
Reference: https://google.github.io/styleguide/go/decisions.html#receiver-names
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I actually mentioned this to rishabh that our machineDeployment
receiver and variable names and CA nodeimpl struct names are all overloaded. But he wanted to minimize the changes. I will make this change anyways.
if delta >= 0 { | ||
return fmt.Errorf("size decrease size must be negative") | ||
} | ||
size, err := machinedeployment.mcmManager.GetMachineDeploymentSize(machinedeployment) | ||
machineDeployment.scalingMutex.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When i looked at the code, this method is called from fixNodeGroupSize
which in our fork is not getting called from anywhere. So having a mutex here is kind of useless, right? So while the issue stated that the issue is with duplicate scale down. I am unable to relate this code change with the actual issue.
|
||
// Refresh resets the priority annotation for the machines that are not present in machines-marked-by-ca-for-deletion annotation on the machineDeployment | ||
func (machineDeployment *MachineDeployment) Refresh() error { | ||
machineDeployment.scalingMutex.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why should a scaling mutex be used in Refresh
?
Scaling mutex by its very name signifies mutex meant to be taken for any scaling event and not when its trying to read/get machine deployments.
Also can this result in the next reconcile of RunOnce
to be stuck because it cannot go beyond CloudProvider.Refresh
call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a mutex when running scaling operations or modifying attributes that directly affect the scaling operation like the newly introduced annotation. There is not any doubt about this as we HAVE to prevent concurrent scaleups and scsledowns due to the fact that our MCM scaling is async and that multiple Go routines can perform scale downs, leading to non-deterministic behaviour.
You have a point that RunOnce
should return an error and not block if the mutex is already acquired. Will change the code to check if mutex is acquired and if so return an informative error. Will also test this change.
// Trying to update the machineDeployment till the deadline | ||
err = m.retry(func(ctx context.Context) (bool, error) { | ||
return m.scaleDownMachineDeployment(ctx, commonMachineDeployment.Name, scaleDownAmount) | ||
return m.scaleDownAndAnnotateMachineDeployment(ctx, commonMachineDeployment.Name, len(machineNamesWithPrio1), createMachinesMarkedForDeletionAnnotationValue(machineNamesMarkedByCA)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now we make one call to KAPI to update the priorities and another call to change the replicas + add scale down annotation. Why can't these two be combined to reduce the number of calls to KAPI and further reduce the complexity of handling failure cases between the two calls?
md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(mdName) | ||
// scaleDownAndAnnotateMachineDeployment scales down the machine deployment by the provided scaleDownAmount and returns the updated spec.Replicas after scale down. | ||
// It also updates the machines-marked-by-ca-for-deletion annotation on the machine deployment with the list of existing machines marked for deletion. | ||
func (m *McmManager) scaleDownAndAnnotateMachineDeployment(ctx context.Context, mdName string, scaleDownAmount int, markedMachines string) (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Should lock the mutex before updating the mcd
- There is a check in this function to skip mcd update if there is no change to the number or replicas. Ig this should also be updated to consider the annotation as well
@@ -98,6 +100,9 @@ const ( | |||
machineDeploymentPausedReason = "DeploymentPaused" | |||
// machineDeploymentNameLabel key for Machine Deployment name in machine labels | |||
machineDeploymentNameLabel = "name" | |||
// machinesMarkedByCAForDeletion is the annotation set by CA on machine deployment. Its value denotes the machines that | |||
// CA marked for deletion by updating the priority annotation to 1 and scaling down the machine deployment. | |||
machinesMarkedByCAForDeletion = "cluster-autoscaler.kubernetes.io/machines-marked-by-ca-for-deletion" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
machinesMarkedByCAForDeletion = "cluster-autoscaler.kubernetes.io/machines-marked-by-ca-for-deletion" | |
machinesMarkedByCAForDeletionAnnotation = "cluster-autoscaler.kubernetes.io/machines-marked-by-ca-for-deletion" |
Need not be considered, but this is usually how annotations are defined
min, | ||
max, | ||
}, nil | ||
return machineDeployment, nil | ||
} | ||
|
||
// Refresh method, for each machine deployment, will reset the priority of the machines if the number of annotated machines is more than desired. | ||
// It will select the machines to reset the priority based on the descending order of creation timestamp. | ||
func (m *McmManager) Refresh() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docstring should be updated to be more accurate
@@ -916,6 +899,16 @@ func (m *McmManager) GetMachineDeploymentNodeTemplate(machinedeployment *Machine | |||
return nodeTmpl, nil | |||
} | |||
|
|||
// GetMachineDeploymentResource returns the MachineDeployment object for the provided machine deployment name | |||
func (m *McmManager) GetMachineDeploymentResource(mdName string) (*v1alpha1.MachineDeployment, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (m *McmManager) GetMachineDeploymentResource(mdName string) (*v1alpha1.MachineDeployment, error) { | |
func (m *McmManager) GetMachineDeploymentObject(mdName string) (*v1alpha1.MachineDeployment, error) { |
Ig Object
is more accurate then Resource
?
What this PR does / why we need it:
This PR fixes the issues noticed in live issues 6120 and 6101. We introduce a mutex in the machineDeployment struct (node group implementation) which must be acquired before performing a scale-down or scale-up operation.
We also introduce an annotation on the machine deployment
cluster-autoscaler.kubernetes.io/machines-marked-by-ca-for-deletion
whose value denotes the machines CA wants to remove. This will help recognize machines for which CA has already reduced the replicas of machine deployment and prevent it from being duplicated.Which issue(s) this PR fixes:
Fixes #342
Special notes for your reviewer:
Release note: