Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract criteria for removing unneded nodes to a separate package #5147

Merged
merged 2 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 39 additions & 142 deletions cluster-autoscaler/core/scaledown/legacy/legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unneeded"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
"k8s.io/autoscaler/cluster-autoscaler/utils"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"

apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
Expand All @@ -48,9 +47,8 @@ type ScaleDown struct {
context *context.AutoscalingContext
processors *processors.AutoscalingProcessors
clusterStateRegistry *clusterstate.ClusterStateRegistry
unneededNodes map[string]time.Time
unneededNodesList []*apiv1.Node
unremovableNodes *unremovable.Nodes
unneededNodes *unneeded.Nodes
podLocationHints map[string]string
nodeUtilizationMap map[string]utilization.Info
usageTracker *simulator.UsageTracker
Expand All @@ -65,20 +63,20 @@ func NewScaleDown(context *context.AutoscalingContext, processors *processors.Au
usageTracker := simulator.NewUsageTracker()
removalSimulator := simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, usageTracker, false)
unremovableNodes := unremovable.NewNodes()
resourceLimitsFinder := resource.NewLimitsFinder(processors.CustomResourcesProcessor)
return &ScaleDown{
context: context,
processors: processors,
clusterStateRegistry: clusterStateRegistry,
unneededNodes: make(map[string]time.Time),
unremovableNodes: unremovableNodes,
unneededNodes: unneeded.NewNodes(processors.NodeGroupConfigProcessor, resourceLimitsFinder),
podLocationHints: make(map[string]string),
nodeUtilizationMap: make(map[string]utilization.Info),
usageTracker: usageTracker,
unneededNodesList: make([]*apiv1.Node, 0),
nodeDeletionTracker: ndt,
removalSimulator: removalSimulator,
eligibilityChecker: eligibility.NewChecker(processors.NodeGroupConfigProcessor),
resourceLimitsFinder: resource.NewLimitsFinder(processors.CustomResourcesProcessor),
resourceLimitsFinder: resourceLimitsFinder,
}
}

Expand All @@ -91,17 +89,16 @@ func (sd *ScaleDown) CleanUp(timestamp time.Time) {

// CleanUpUnneededNodes clears the list of unneeded nodes.
func (sd *ScaleDown) CleanUpUnneededNodes() {
sd.unneededNodesList = make([]*apiv1.Node, 0)
sd.unneededNodes = make(map[string]time.Time)
sd.unneededNodes.Clear()
}

// UnneededNodes returns a list of nodes that can potentially be scaled down.
func (sd *ScaleDown) UnneededNodes() []*apiv1.Node {
return sd.unneededNodesList
return sd.unneededNodes.AsList()
}

// UpdateUnneededNodes calculates which nodes are not needed, i.e. all pods can be scheduled somewhere else,
// and updates unneededNodes map accordingly. It also computes information where pods can be rescheduled and
// and updates unneededNodes accordingly. It also computes information where pods can be rescheduled and
// node utilization level. The computations are made only for the nodes managed by CA.
// * destinationNodes are the nodes that can potentially take in any pods that are evicted because of a scale down.
// * scaleDownCandidates are the nodes that are being considered for scale down.
Expand Down Expand Up @@ -199,17 +196,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
}

// Update the timestamp map.
result := make(map[string]time.Time)
unneededNodesList := make([]*apiv1.Node, 0, len(nodesToRemove))
for _, node := range nodesToRemove {
name := node.Node.Name
unneededNodesList = append(unneededNodesList, node.Node)
if val, found := sd.unneededNodes[name]; !found {
result[name] = timestamp
} else {
result[name] = val
}
}
sd.unneededNodes.Update(nodesToRemove, timestamp)

// Add nodes to unremovable map
if len(unremovable) > 0 {
Expand All @@ -223,24 +210,18 @@ func (sd *ScaleDown) UpdateUnneededNodes(
// This method won't always check all nodes, so let's give a generic reason for all nodes that weren't checked.
for _, node := range scaleDownCandidates {
unremovableReasonProvided := sd.unremovableNodes.HasReason(node.Name)
_, unneeded := result[node.Name]
unneeded := sd.unneededNodes.Contains(node.Name)
if !unneeded && !unremovableReasonProvided {
sd.unremovableNodes.AddReason(node, simulator.NotUnneededOtherReason)
}
}

// Update state and metrics
sd.unneededNodesList = unneededNodesList
sd.unneededNodes = result
sd.podLocationHints = newHints
sd.nodeUtilizationMap = utilizationMap
sd.clusterStateRegistry.UpdateScaleDownCandidates(sd.unneededNodesList, timestamp)
metrics.UpdateUnneededNodesCount(len(sd.unneededNodesList))
if klog.V(4).Enabled() {
for key, val := range sd.unneededNodes {
klog.Infof("%s is unneeded since %s duration %s", key, val.String(), timestamp.Sub(val).String())
}
}
unneededNodesList := sd.unneededNodes.AsList()
sd.clusterStateRegistry.UpdateScaleDownCandidates(unneededNodesList, timestamp)
metrics.UpdateUnneededNodesCount(len(unneededNodesList))
return nil
}

Expand All @@ -260,10 +241,9 @@ func (sd *ScaleDown) UnremovableNodes() []*simulator.UnremovableNode {
func (sd *ScaleDown) markSimulationError(simulatorErr errors.AutoscalerError,
timestamp time.Time) errors.AutoscalerError {
klog.Errorf("Error while simulating node drains: %v", simulatorErr)
sd.unneededNodesList = make([]*apiv1.Node, 0)
sd.unneededNodes = make(map[string]time.Time)
sd.unneededNodes.Clear()
sd.nodeUtilizationMap = make(map[string]utilization.Info)
sd.clusterStateRegistry.UpdateScaleDownCandidates(sd.unneededNodesList, timestamp)
sd.clusterStateRegistry.UpdateScaleDownCandidates(nil, timestamp)
return simulatorErr.AddPrefix("error while simulating node drains: ")
}

Expand All @@ -277,7 +257,7 @@ func (sd *ScaleDown) chooseCandidates(nodes []string) (candidates []string, nonC
return nodes, nil
}
for _, node := range nodes {
if _, found := sd.unneededNodes[node]; found {
if sd.unneededNodes.Contains(node) {
candidates = append(candidates, node)
} else {
nonCandidates = append(nonCandidates, node)
Expand Down Expand Up @@ -322,116 +302,22 @@ func (sd *ScaleDown) NodesToDelete(currentTime time.Time, pdbs []*policyv1.PodDi
allNodeNames = append(allNodeNames, ni.Node().Name)
}

candidateNames := make([]string, 0)
readinessMap := make(map[string]bool)
candidateNodeGroups := make(map[string]cloudprovider.NodeGroup)

resourceLimiter, errCP := sd.context.CloudProvider.GetResourceLimiter()
if errCP != nil {
return nil, nil, status.ScaleDownError, errors.ToAutoscalerError(errors.CloudProviderError, errCP)
}

scaleDownResourcesLeft := sd.resourceLimitsFinder.LimitsLeft(sd.context, allNodes, resourceLimiter, currentTime)

nodeGroupSize := utils.GetNodeGroupSizeMap(sd.context.CloudProvider)
resourcesWithLimits := resourceLimiter.GetResources()
for nodeName, unneededSince := range sd.unneededNodes {
klog.V(2).Infof("%s was unneeded for %s", nodeName, currentTime.Sub(unneededSince).String())

nodeInfo, err := sd.context.ClusterSnapshot.NodeInfos().Get(nodeName)
if err != nil {
klog.Errorf("Can't retrieve unneeded node %s from snapshot, err: %v", nodeName, err)
continue
}

node := nodeInfo.Node()

// Check if node is marked with no scale down annotation.
if eligibility.HasNoScaleDownAnnotation(node) {
klog.V(4).Infof("Skipping %s - scale down disabled annotation found", node.Name)
sd.unremovableNodes.AddReason(node, simulator.ScaleDownDisabledAnnotation)
continue
}

ready, _, _ := kube_util.GetReadinessState(node)
readinessMap[node.Name] = ready

nodeGroup, err := sd.context.CloudProvider.NodeGroupForNode(node)
if err != nil {
klog.Errorf("Error while checking node group for %s: %v", node.Name, err)
sd.unremovableNodes.AddReason(node, simulator.UnexpectedError)
continue
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
klog.V(4).Infof("Skipping %s - no node group config", node.Name)
sd.unremovableNodes.AddReason(node, simulator.NotAutoscaled)
continue
}

if ready {
// Check how long a ready node was underutilized.
unneededTime, err := sd.processors.NodeGroupConfigProcessor.GetScaleDownUnneededTime(sd.context, nodeGroup)
if err != nil {
klog.Errorf("Error trying to get ScaleDownUnneededTime for node %s (in group: %s)", node.Name, nodeGroup.Id())
continue
}
if !unneededSince.Add(unneededTime).Before(currentTime) {
sd.unremovableNodes.AddReason(node, simulator.NotUnneededLongEnough)
continue
}
} else {
// Unready nodes may be deleted after a different time than underutilized nodes.
unreadyTime, err := sd.processors.NodeGroupConfigProcessor.GetScaleDownUnreadyTime(sd.context, nodeGroup)
if err != nil {
klog.Errorf("Error trying to get ScaleDownUnreadyTime for node %s (in group: %s)", node.Name, nodeGroup.Id())
continue
}
if !unneededSince.Add(unreadyTime).Before(currentTime) {
sd.unremovableNodes.AddReason(node, simulator.NotUnreadyLongEnough)
continue
}
}

size, found := nodeGroupSize[nodeGroup.Id()]
if !found {
klog.Errorf("Error while checking node group size %s: group size not found in cache", nodeGroup.Id())
sd.unremovableNodes.AddReason(node, simulator.UnexpectedError)
continue
}

deletionsInProgress := sd.nodeDeletionTracker.DeletionsCount(nodeGroup.Id())
if size-deletionsInProgress <= nodeGroup.MinSize() {
klog.V(1).Infof("Skipping %s - node group min size reached", node.Name)
sd.unremovableNodes.AddReason(node, simulator.NodeGroupMinSizeReached)
continue
}

scaleDownResourcesDelta, err := sd.resourceLimitsFinder.DeltaForNode(sd.context, node, nodeGroup, resourcesWithLimits)
if err != nil {
klog.Errorf("Error getting node resources: %v", err)
sd.unremovableNodes.AddReason(node, simulator.UnexpectedError)
continue
}

checkResult := scaleDownResourcesLeft.CheckDeltaWithinLimits(scaleDownResourcesDelta)
if checkResult.Exceeded() {
klog.V(4).Infof("Skipping %s - minimal limit exceeded for %v", node.Name, checkResult.ExceededResources)
sd.unremovableNodes.AddReason(node, simulator.MinimalResourceLimitExceeded)
for _, resource := range checkResult.ExceededResources {
switch resource {
case cloudprovider.ResourceNameCores:
metrics.RegisterSkippedScaleDownCPU()
case cloudprovider.ResourceNameMemory:
metrics.RegisterSkippedScaleDownMemory()
default:
continue
}
}
continue
}

candidateNames = append(candidateNames, node.Name)
candidateNodeGroups[node.Name] = nodeGroup
empty, nonEmpty, unremovable := sd.unneededNodes.RemovableAt(sd.context, currentTime, scaleDownResourcesLeft, resourceLimiter.GetResources(), sd.nodeDeletionTracker)
for _, u := range unremovable {
sd.unremovableNodes.Add(u)
}
candidateNames := make([]string, 0, len(empty)+len(nonEmpty))
for _, n := range empty {
candidateNames = append(candidateNames, n.Name)
}
for _, n := range nonEmpty {
candidateNames = append(candidateNames, n.Name)
}

if len(candidateNames) == 0 {
Expand All @@ -448,7 +334,7 @@ func (sd *ScaleDown) NodesToDelete(currentTime time.Time, pdbs []*policyv1.PodDi
var nodes []*apiv1.Node
for _, node := range emptyNodesToRemove {
// Nothing super-bad should happen if the node is removed from tracker prematurely.
simulator.RemoveNodeFromTracker(sd.usageTracker, node.Node.Name, sd.unneededNodes)
sd.removeNodeFromTracker(node.Node.Name)
nodes = append(nodes, node.Node)
}
return nodes, nil, status.ScaleDownNodeDeleteStarted, nil
Expand Down Expand Up @@ -478,10 +364,21 @@ func (sd *ScaleDown) NodesToDelete(currentTime time.Time, pdbs []*policyv1.PodDi
}
toRemove := nodesToRemove[0]
// Nothing super-bad should happen if the node is removed from tracker prematurely.
simulator.RemoveNodeFromTracker(sd.usageTracker, toRemove.Node.Name, sd.unneededNodes)
sd.removeNodeFromTracker(toRemove.Node.Name)
return nil, []*apiv1.Node{toRemove.Node}, status.ScaleDownNodeDeleteStarted, nil
}

func (sd *ScaleDown) removeNodeFromTracker(node string) {
unneeded := make([]string, 0, len(sd.unneededNodes.AsList()))
for _, n := range sd.unneededNodes.AsList() {
unneeded = append(unneeded, n.Name)
}
toRemove := simulator.RemoveNodeFromTracker(sd.usageTracker, node, unneeded)
for _, n := range toRemove {
sd.unneededNodes.Drop(n)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'm guessing the strange split of responsibilities (identifying nodes to filter out in one place and filtering them out elsewhere) is to avoid an import cycle between unneeded and simulator? A local interface in tracker.go that exposes AsList() and Drop() and renaming the function to simulator.FilterOutUsedNodes() or similar would IMO be a more elegant solution, but I'm not sure if it's worth the effort.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mainly wanted to get rid of the weird semantic RemoveNodeFromTracker had: silently modifying the map passed as an argument. Now this is explicitly done here, which I think makes reasoning about the code easier. I wouldn't invest too much into getting this part pretty though, since the tracker itself should go away together with legacy scale down implementation.

}
}

// updateScaleDownMetrics registers duration of different parts of scale down.
// Separates time spent on finding nodes to remove, deleting nodes and other operations.
func updateScaleDownMetrics(scaleDownStart time.Time, findNodesToRemoveDuration *time.Duration) {
Expand Down
Loading