diff --git a/cluster-autoscaler/core/scaleup/equivalence/groups.go b/cluster-autoscaler/core/scaleup/equivalence/groups.go index c805b9f01bcb..5fcefd620162 100644 --- a/cluster-autoscaler/core/scaleup/equivalence/groups.go +++ b/cluster-autoscaler/core/scaleup/equivalence/groups.go @@ -30,9 +30,10 @@ import ( // PodGroup contains a group of pods that are equivalent in terms of schedulability. type PodGroup struct { - Pods []*apiv1.Pod - SchedulingErrors map[string]status.Reasons - Schedulable bool + Pods []*apiv1.Pod + SchedulingErrors map[string]status.Reasons + SchedulableGroups []string + Schedulable bool } // BuildPodGroups prepares pod groups with equivalent scheduling properties. diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go index e94ed4ccae49..236906e511e4 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go @@ -89,6 +89,7 @@ func (o *ScaleUpOrchestrator) ScaleUp( nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulerframework.NodeInfo, + allOrNothing bool, ) (*status.ScaleUpStatus, errors.AutoscalerError) { if !o.initialized { return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized")) @@ -146,11 +147,13 @@ func (o *ScaleUpOrchestrator) ScaleUp( } for _, nodeGroup := range validNodeGroups { - option := o.ComputeExpansionOption(nodeGroup, schedulablePodGroups, nodeInfos, len(nodes)+len(upcomingNodes), now) + option := o.ComputeExpansionOption(nodeGroup, schedulablePodGroups, nodeInfos, len(nodes)+len(upcomingNodes), now, allOrNothing) o.processors.BinpackingLimiter.MarkProcessed(o.autoscalingContext, nodeGroup.Id()) if len(option.Pods) == 0 || option.NodeCount == 0 { klog.V(4).Infof("No pod can fit to %s", nodeGroup.Id()) + } else if allOrNothing && len(option.Pods) < len(unschedulablePods) { + klog.V(4).Infof("Some pods can't fit to %s, giving up due to all-or-nothing scale-up strategy", nodeGroup.Id()) } else { options = append(options, option) } @@ -211,9 +214,20 @@ func (o *ScaleUpOrchestrator) ScaleUp( aErr) } + if newNodes < bestOption.NodeCount { + klog.V(1).Infof("Only %d nodes can be added to %s due to cluster-wide limits", newNodes, bestOption.NodeGroup.Id()) + if allOrNothing { + return stopAllOrNothingScaleUp(podEquivalenceGroups, skippedNodeGroups, nodeGroups) + } + } + // If necessary, create the node group. This is no longer simulation, an empty node group will be created by cloud provider if supported. createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0) if !bestOption.NodeGroup.Exist() { + if allOrNothing && bestOption.NodeGroup.MaxSize() < newNodes { + klog.V(1).Infof("Can only create a new node group with max %d nodes, need %d nodes", bestOption.NodeGroup.MaxSize(), newNodes) + return stopAllOrNothingScaleUp(podEquivalenceGroups, skippedNodeGroups, nodeGroups) + } var scaleUpStatus *status.ScaleUpStatus createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroup(bestOption, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets) if aErr != nil { @@ -256,6 +270,18 @@ func (o *ScaleUpOrchestrator) ScaleUp( aErr) } + // Last check before scale-up. Node group capacity (both due to max size limits & current size) is only checked when balancing. + totalCapacity := 0 + for _, sui := range scaleUpInfos { + totalCapacity += sui.NewSize - sui.CurrentSize + } + if totalCapacity < newNodes { + klog.V(1).Infof("Can only add %d nodes due to node group limits, need %d nodes", totalCapacity, newNodes) + if allOrNothing { + return stopAllOrNothingScaleUp(podEquivalenceGroups, skippedNodeGroups, nodeGroups) + } + } + // Execute scale up. klog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos) aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now) @@ -447,6 +473,7 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption( nodeInfos map[string]*schedulerframework.NodeInfo, currentNodeCount int, now time.Time, + allOrNothing bool, ) expander.Option { option := expander.Option{NodeGroup: nodeGroup} podGroups := schedulablePodGroups[nodeGroup.Id()] @@ -472,6 +499,15 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption( klog.Errorf("Failed to get autoscaling options for node group %s: %v", nodeGroup.Id(), err) } if autoscalingOptions != nil && autoscalingOptions.ZeroOrMaxNodeScaling { + if allOrNothing && option.NodeCount > nodeGroup.MaxSize() { + // The following check can quietly cap the number of nodes and so breaks the + // assumption that as long as we're able to provision option.NodeCount of + // nodes, all pods will be accommodated. + // This fix isn't applicable to non-atomic node groups as we'll operate + // on uncapped number of nodes in that case. + option.Pods = nil + option.NodeCount = 0 + } if option.NodeCount > 0 && option.NodeCount != nodeGroup.MaxSize() { option.NodeCount = nodeGroup.MaxSize() } @@ -564,6 +600,7 @@ func (o *ScaleUpOrchestrator) SchedulablePodGroups( }) // Mark pod group as (theoretically) schedulable. eg.Schedulable = true + eg.SchedulableGroups = append(eg.SchedulableGroups, nodeGroup.Id()) } else { klog.V(2).Infof("Pod %s/%s can't be scheduled on %s, predicate checking error: %v", samplePod.Namespace, samplePod.Name, nodeGroup.Id(), err.VerboseMessage()) if podCount := len(eg.Pods); podCount > 1 { @@ -709,6 +746,26 @@ func matchingSchedulablePodGroups(podGroups []estimator.PodEquivalenceGroup, sim return true } +func stopAllOrNothingScaleUp(egs []*equivalence.PodGroup, skipped map[string]status.Reasons, ngs []cloudprovider.NodeGroup) (*status.ScaleUpStatus, errors.AutoscalerError) { + // Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable. + for _, eg := range egs { + if eg.Schedulable { + errs := map[string]status.Reasons{} + for _, sg := range eg.SchedulableGroups { + errs[sg] = AllOrNothingReason + } + eg.Schedulable = false + } + } + klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated") + return &status.ScaleUpStatus{ + Result: status.ScaleUpNoOptionsAvailable, + PodsRemainUnschedulable: GetRemainingPods(egs, skipped), + ConsideredNodeGroups: ngs, + }, nil + +} + // GetRemainingPods returns information about pods which CA is unable to help // at this moment. func GetRemainingPods(egs []*equivalence.PodGroup, skipped map[string]status.Reasons) []status.NoScaleUpInfo { diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go index 2c446dcf0f44..9b887a108313 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go @@ -1032,7 +1032,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR context.ExpanderStrategy = expander // scale up - scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos) + scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus) // aggregate group size changes @@ -1131,7 +1131,7 @@ func TestScaleUpUnhealthy(t *testing.T) { processors := NewTestProcessors(&context) suOrchestrator := New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) - scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos) + scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) assert.NoError(t, err) // Node group is unhealthy. @@ -1185,7 +1185,7 @@ func TestBinpackingLimiter(t *testing.T) { expander := NewMockRepotingStrategy(t, nil) context.ExpanderStrategy = expander - scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{extraPod}, nodes, []*appsv1.DaemonSet{}, nodeInfos) + scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{extraPod}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus) assert.NoError(t, err) assert.True(t, scaleUpStatus.WasSuccessful()) @@ -1231,7 +1231,7 @@ func TestScaleUpNoHelp(t *testing.T) { processors := NewTestProcessors(&context) suOrchestrator := New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) - scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos) + scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus) assert.NoError(t, err) @@ -1453,7 +1453,7 @@ func TestScaleUpBalanceGroups(t *testing.T) { processors := NewTestProcessors(&context) suOrchestrator := New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) - scaleUpStatus, typedErr := suOrchestrator.ScaleUp(pods, nodes, []*appsv1.DaemonSet{}, nodeInfos) + scaleUpStatus, typedErr := suOrchestrator.ScaleUp(pods, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) assert.NoError(t, typedErr) assert.True(t, scaleUpStatus.WasSuccessful()) @@ -1515,7 +1515,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) { suOrchestrator := New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) - scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos) + scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) assert.NoError(t, err) assert.True(t, scaleUpStatus.WasSuccessful()) assert.Equal(t, "autoprovisioned-T1", utils.GetStringFromChan(createdGroups)) @@ -1570,7 +1570,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) { suOrchestrator := New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) - scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos) + scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) assert.NoError(t, err) assert.True(t, scaleUpStatus.WasSuccessful()) assert.Equal(t, "autoprovisioned-T1", utils.GetStringFromChan(createdGroups)) diff --git a/cluster-autoscaler/core/scaleup/orchestrator/rejectedreasons.go b/cluster-autoscaler/core/scaleup/orchestrator/rejectedreasons.go new file mode 100644 index 000000000000..b5e0ab92392a --- /dev/null +++ b/cluster-autoscaler/core/scaleup/orchestrator/rejectedreasons.go @@ -0,0 +1,37 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package orchestrator + +// RejectedReasons contains information why given node group was rejected as a scale-up option. +type RejectedReasons struct { + messages []string +} + +// NewRejectedReasons creates new RejectedReason object. +func NewRejectedReasons(m string) *RejectedReasons { + return &RejectedReasons{[]string{m}} +} + +// Reasons returns a slice of reasons why the node group was not considered for scale up. +func (sr *RejectedReasons) Reasons() []string { + return sr.messages +} + +var ( + // AllOrNothingReason means the node group was rejected because not all pods would fit it when using all-or-nothing strategy. + AllOrNothingReason = NewRejectedReasons("not all pods would fit and scale-up is using all-or-nothing strategy") +) diff --git a/cluster-autoscaler/core/scaleup/scaleup.go b/cluster-autoscaler/core/scaleup/scaleup.go index 6f9781bff0e2..0da619134ea2 100644 --- a/cluster-autoscaler/core/scaleup/scaleup.go +++ b/cluster-autoscaler/core/scaleup/scaleup.go @@ -48,6 +48,7 @@ type Orchestrator interface { nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulerframework.NodeInfo, + allOrNothing bool, ) (*status.ScaleUpStatus, errors.AutoscalerError) // ScaleUpToNodeGroupMinSize tries to scale up node groups that have less nodes // than the configured min size. The source of truth for the current node group diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index a56ad8e4268f..b45733bdbb20 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -576,7 +576,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr klog.V(1).Info("Unschedulable pods are very new, waiting one iteration for more") } else { scaleUpStart := preScaleUp() - scaleUpStatus, typedErr = a.scaleUpOrchestrator.ScaleUp(unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups) + scaleUpStatus, typedErr = a.scaleUpOrchestrator.ScaleUp(unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups, false) if exit, err := postScaleUp(scaleUpStart); exit { return err } diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go index d4d31ff05528..cd28df390465 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go @@ -91,6 +91,7 @@ func (o *provReqOrchestrator) ScaleUp( nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulerframework.NodeInfo, + _ bool, ) (*status.ScaleUpStatus, ca_errors.AutoscalerError) { if !o.initialized { return &status.ScaleUpStatus{}, ca_errors.ToAutoscalerError(ca_errors.InternalError, fmt.Errorf("provisioningrequest.Orchestrator is not initialized")) diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go index 7a75af43887c..44625effbb8e 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go @@ -174,7 +174,7 @@ func TestScaleUp(t *testing.T) { provisioningClasses: []provisioningClass{checkcapacity.New(client)}, } orchestrator.Initialize(&autoscalingContext, nil, nil, nil, taints.TaintConfig{}) - st, err := orchestrator.ScaleUp(prPods, []*apiv1.Node{}, []*v1.DaemonSet{}, map[string]*framework.NodeInfo{}) + st, err := orchestrator.ScaleUp(prPods, []*apiv1.Node{}, []*v1.DaemonSet{}, map[string]*framework.NodeInfo{}, false) if !tc.err { assert.NoError(t, err) assert.Equal(t, tc.scaleUpResult, st.Result) diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator.go b/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator.go index cda3bc297d70..5c7f792e55b2 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator.go @@ -68,6 +68,7 @@ func (o *WrapperOrchestrator) ScaleUp( nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulerframework.NodeInfo, + allOrNothing bool, ) (*status.ScaleUpStatus, errors.AutoscalerError) { defer func() { o.scaleUpRegularPods = !o.scaleUpRegularPods }() @@ -79,9 +80,9 @@ func (o *WrapperOrchestrator) ScaleUp( } if o.scaleUpRegularPods { - return o.podsOrchestrator.ScaleUp(regularPods, nodes, daemonSets, nodeInfos) + return o.podsOrchestrator.ScaleUp(regularPods, nodes, daemonSets, nodeInfos, allOrNothing) } - return o.provReqOrchestrator.ScaleUp(provReqPods, nodes, daemonSets, nodeInfos) + return o.provReqOrchestrator.ScaleUp(provReqPods, nodes, daemonSets, nodeInfos, allOrNothing) } func splitOut(unschedulablePods []*apiv1.Pod) (provReqPods, regularPods []*apiv1.Pod) { diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator_test.go b/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator_test.go index 1e28b8f10779..bf31f59f8b1b 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator_test.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator_test.go @@ -56,9 +56,9 @@ func TestWrapperScaleUp(t *testing.T) { pod.Annotations[provreq.ProvisioningRequestPodAnnotationKey] = "true" } unschedulablePods := append(regularPods, provReqPods...) - _, err := o.ScaleUp(unschedulablePods, nil, nil, nil) + _, err := o.ScaleUp(unschedulablePods, nil, nil, nil, false) assert.Equal(t, err.Error(), provisioningRequestErrorMsg) - _, err = o.ScaleUp(unschedulablePods, nil, nil, nil) + _, err = o.ScaleUp(unschedulablePods, nil, nil, nil, false) assert.Equal(t, err.Error(), regularPodsErrorMsg) } @@ -71,6 +71,7 @@ func (f *fakeScaleUp) ScaleUp( nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulerframework.NodeInfo, + allOrNothing bool, ) (*status.ScaleUpStatus, errors.AutoscalerError) { return nil, errors.NewAutoscalerError(errors.InternalError, f.errorMsg) }