Skip to content

Commit

Permalink
ScaleUp for check-capacity ProvisioningRequestClass
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk committed Jan 16, 2024
1 parent 13c5875 commit cba20c1
Show file tree
Hide file tree
Showing 13 changed files with 992 additions and 38 deletions.
32 changes: 13 additions & 19 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
if !o.initialized {
return scaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized"))
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized"))
}

loggingQuota := klogx.PodsLoggingQuota()
Expand All @@ -103,7 +103,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(

upcomingNodes, aErr := o.UpcomingNodes(nodeInfos)
if aErr != nil {
return scaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not get upcoming nodes: "))
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not get upcoming nodes: "))
}
klog.V(4).Infof("Upcoming %d nodes", len(upcomingNodes))

Expand All @@ -112,7 +112,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
var err error
nodeGroups, nodeInfos, err = o.processors.NodeGroupListProcessor.Process(o.autoscalingContext, nodeGroups, nodeInfos, unschedulablePods)
if err != nil {
return scaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, err))
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, err))
}
}

Expand All @@ -121,7 +121,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(

resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingContext, nodeInfos, nodes)
if aErr != nil {
return scaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: "))
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: "))
}

now := time.Now()
Expand Down Expand Up @@ -186,15 +186,15 @@ func (o *ScaleUpOrchestrator) ScaleUp(

newNodes, aErr := o.GetCappedNewNodeCount(bestOption.NodeCount, len(nodes)+len(upcomingNodes))
if aErr != nil {
return scaleUpError(&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods}, aErr)
return status.UpdateScaleUpError(&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods}, aErr)
}

createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0)
if !bestOption.NodeGroup.Exist() {
oldId := bestOption.NodeGroup.Id()
createNodeGroupResult, aErr := o.processors.NodeGroupManager.CreateNodeGroup(o.autoscalingContext, bestOption.NodeGroup)
if aErr != nil {
return scaleUpError(
return status.UpdateScaleUpError(
&status.ScaleUpStatus{FailedCreationNodeGroups: []cloudprovider.NodeGroup{bestOption.NodeGroup}, PodsTriggeredScaleUp: bestOption.Pods},
aErr)
}
Expand Down Expand Up @@ -253,7 +253,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
if !found {
// This should never happen, as we already should have retrieved nodeInfo for any considered nodegroup.
klog.Errorf("No node info for: %s", bestOption.NodeGroup.Id())
return scaleUpError(
return status.UpdateScaleUpError(
&status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods},
errors.NewAutoscalerError(
errors.CloudProviderError,
Expand All @@ -263,7 +263,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
// Apply upper limits for CPU and memory.
newNodes, aErr = o.resourceManager.ApplyLimits(o.autoscalingContext, newNodes, resourcesLeft, nodeInfo, bestOption.NodeGroup)
if aErr != nil {
return scaleUpError(
return status.UpdateScaleUpError(
&status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods},
aErr)
}
Expand All @@ -283,15 +283,15 @@ func (o *ScaleUpOrchestrator) ScaleUp(

scaleUpInfos, aErr := o.processors.NodeGroupSetProcessor.BalanceScaleUpBetweenGroups(o.autoscalingContext, targetNodeGroups, newNodes)
if aErr != nil {
return scaleUpError(
return status.UpdateScaleUpError(
&status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods},
aErr)
}

klog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos)
aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now)
if aErr != nil {
return scaleUpError(
return status.UpdateScaleUpError(
&status.ScaleUpStatus{
CreateNodeGroupResults: createNodeGroupResults,
FailedResizeNodeGroups: failedNodeGroups,
Expand Down Expand Up @@ -322,7 +322,7 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
if !o.initialized {
return scaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized"))
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized"))
}

now := time.Now()
Expand All @@ -331,7 +331,7 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(

resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingContext, nodeInfos, nodes)
if aErr != nil {
return scaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: "))
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: "))
}

for _, ng := range nodeGroups {
Expand Down Expand Up @@ -397,7 +397,7 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
klog.V(1).Infof("ScaleUpToNodeGroupMinSize: final scale-up plan: %v", scaleUpInfos)
aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now)
if aErr != nil {
return scaleUpError(
return status.UpdateScaleUpError(
&status.ScaleUpStatus{
FailedResizeNodeGroups: failedNodeGroups,
},
Expand Down Expand Up @@ -717,9 +717,3 @@ func GetPodsAwaitingEvaluation(egs []*equivalence.PodGroup, bestOption string) [
}
return awaitsEvaluation
}

func scaleUpError(s *status.ScaleUpStatus, err errors.AutoscalerError) (*status.ScaleUpStatus, errors.AutoscalerError) {
s.ScaleUpError = &err
s.Result = status.ScaleUpError
return s, err
}
112 changes: 112 additions & 0 deletions cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package orchestrator

import (
"fmt"

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/client-go/rest"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

const (
consumeProvReq = "cluster-autoscaler.kubernetes.io/consume-provisioning-request"
)

// WrapperOrchestrator is an orchestrator which wrap scale up for ProvisioningRequests and regular pods.
type WrapperOrchestrator struct {
scaleUpRegularPods bool
scaleUpOrchestrator scaleup.Orchestrator
provReqOrchestrator scaleup.Orchestrator
}

// NewWrapperOrchestrator return WrapperOrchestrator
func NewWrapperOrchestrator(kubeConfig *rest.Config) (scaleup.Orchestrator, error) {
provReqOrchestrator, err := checkcapacity.New(kubeConfig)
if err != nil {
return nil, fmt.Errorf("failed create ScaleUp orchestrator for ProvisioningRequests, error: %v", err)
}
return &WrapperOrchestrator{
scaleUpOrchestrator: New(),
provReqOrchestrator: provReqOrchestrator,
}, nil
}

// Initialize initializes the orchestrator object with required fields.
func (o *WrapperOrchestrator) Initialize(
autoscalingContext *context.AutoscalingContext,
processors *ca_processors.AutoscalingProcessors,
clusterStateRegistry *clusterstate.ClusterStateRegistry,
taintConfig taints.TaintConfig,
) {
o.scaleUpOrchestrator.Initialize(autoscalingContext, processors, clusterStateRegistry, taintConfig)
o.provReqOrchestrator.Initialize(autoscalingContext, processors, clusterStateRegistry, taintConfig)
}

// ScaleUp run scaleUp function for regular pods of pods from ProvisioningRequest.
func (o *WrapperOrchestrator) ScaleUp(
unschedulablePods []*apiv1.Pod,
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
provReqPods, regularPods := sortOut(unschedulablePods)
if len(provReqPods) == 0 {
return o.scaleUpOrchestrator.ScaleUp(regularPods, nodes, daemonSets, nodeInfos)
}
if len(regularPods) == 0 {
return o.provReqOrchestrator.ScaleUp(provReqPods, nodes, daemonSets, nodeInfos)
}
if o.scaleUpRegularPods {
o.scaleUpRegularPods = false
return o.scaleUpOrchestrator.ScaleUp(regularPods, nodes, daemonSets, nodeInfos)
}
o.scaleUpRegularPods = true
return o.provReqOrchestrator.ScaleUp(provReqPods, nodes, daemonSets, nodeInfos)
}

func sortOut(unschedulablePods []*apiv1.Pod) (provReqPods, regularPods []*apiv1.Pod) {
for _, pod := range unschedulablePods {
if _, ok := pod.Annotations[consumeProvReq]; ok {
provReqPods = append(provReqPods, pod)
} else {
regularPods = append(regularPods, pod)
}
}
return
}

// ScaleUpToNodeGroupMinSize tries to scale up node groups that have less nodes
// than the configured min size. The source of truth for the current node group
// size is the TargetSize queried directly from cloud providers. Returns
// appropriate status or error if an unexpected error occurred.
func (o *WrapperOrchestrator) ScaleUpToNodeGroupMinSize(
nodes []*apiv1.Node,
nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
return o.scaleUpOrchestrator.ScaleUpToNodeGroupMinSize(nodes, nodeInfos)
}
11 changes: 11 additions & 0 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config"
Expand Down Expand Up @@ -468,6 +469,15 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions)
drainabilityRules := rules.Default(deleteOptions)

scaleUpOrchestrator := orchestrator.New()
if *provisioningRequestsEnabled {
kubeClient := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)
scaleUpOrchestrator, err = orchestrator.NewWrapperOrchestrator(kubeClient)
if err != nil{
return nil, err
}
}

opts := core.AutoscalerOptions{
AutoscalingOptions: autoscalingOptions,
ClusterSnapshot: clustersnapshot.NewDeltaClusterSnapshot(),
Expand All @@ -477,6 +487,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
PredicateChecker: predicateChecker,
DeleteOptions: deleteOptions,
DrainabilityRules: drainabilityRules,
ScaleUpOrchestrator: scaleUpOrchestrator,
}

opts.Processors = ca_processors.DefaultProcessors(autoscalingOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,10 @@ func (p *NoOpScaleUpStatusProcessor) Process(context *context.AutoscalingContext
// CleanUp cleans up the processor's internal structures.
func (p *NoOpScaleUpStatusProcessor) CleanUp() {
}

// UpdateScaleUpError updates ScaleUpStatus.
func UpdateScaleUpError(s *ScaleUpStatus, err errors.AutoscalerError) (*ScaleUpStatus, errors.AutoscalerError) {
s.ScaleUpError = &err
s.Result = ScaleUpError
return s, err
}
77 changes: 77 additions & 0 deletions cluster-autoscaler/provisioningrequest/checkcapacity/condition.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package checkcapacity

import (
"time"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
)

// ProvisioningRequestCondition is a type of Condition that ClusterAutoscaler appends to ProvisioningRequest.
type ProvisioningRequestCondition string

const (
// BookCapacityCondition is appended if capacity for ProvisioningRequest was found in the cluster.
BookCapacityCondition = ProvisioningRequestCondition("BookCapacity")

// ExpiredCondition is append if the ProvisioningRequest has BookCapacity condition before
// and the reservation time is expired or the ProvisioningRequest has Pending condition before
// and expiration time is expired.
ExpiredCondition = ProvisioningRequestCondition("Expired")

// PendingCondition is append if no capacity for ProvisioningRequest was found in the cluster
// and ClusterAutoscaler will try to find capacity later.
PendingCondition = ProvisioningRequestCondition("Pending")

// RejectedCondition is append if ProvisioningRequest is invalid.
RejectedCondition = ProvisioningRequestCondition("Rejected")

// CheckCapacityClass is a name of ProvisioningRequestClass
CheckCapacityClass = "check-capacity.kubernetes.io"
defaultReservationTime = 10 * time.Minute
defaultExpirationTime = 7 * 24 * time.Hour // 7 days
)

// HasBookCapacityCondition return if PR has BookCapacity condition
func HasBookCapacityCondition(pr *provreqwrapper.ProvisioningRequest) bool {
if pr.V1Beta1().Spec.ProvisioningClassName != CheckCapacityClass {
return false
}
if pr.Conditions() == nil || len(pr.Conditions()) == 0 {
return false
}
condition := pr.Conditions()[len(pr.Conditions())-1]
if condition.Type == string(BookCapacityCondition) && condition.Status == v1.ConditionTrue {
return true
}
return false
}

func setCondition(pr *provreqwrapper.ProvisioningRequest, conditionType ProvisioningRequestCondition, reason, message string) {
conditions := pr.Conditions()
conditions = append(conditions, v1.Condition{
Type: string(conditionType),
Status: v1.ConditionTrue,
ObservedGeneration: pr.V1Beta1().GetObjectMeta().GetGeneration(),
LastTransitionTime: v1.Now(),
Reason: reason,
Message: message,
})
pr.SetConditions(conditions)
}
Loading

0 comments on commit cba20c1

Please sign in to comment.