From 0e74dcc60372a5ad19eea34a00485cd10c329a13 Mon Sep 17 00:00:00 2001 From: Yaroslava Serdiuk Date: Wed, 31 Jan 2024 10:26:50 +0000 Subject: [PATCH] Add ProvisioningRequestProcessor --- cluster-autoscaler/core/static_autoscaler.go | 4 + cluster-autoscaler/main.go | 25 +-- cluster-autoscaler/processors/processors.go | 26 +-- ...go => provisioning_request_pods_filter.go} | 0 ... provisioning_request_pods_filter_test.go} | 0 .../provreq/provsioning_request_processor.go | 41 +++++ .../autoscaling.x-k8s.io/v1beta1/types.go | 7 +- .../checkcapacity/condition.go | 23 +-- .../checkcapacity/condition_test.go | 61 +++---- .../checkcapacity/orchestrator.go | 6 +- .../checkcapacity/orchestrator_test.go | 2 +- .../checkcapacity/processor.go | 79 +++++++++ .../checkcapacity/processor_test.go | 152 ++++++++++++++++++ .../provreqclient/client.go | 14 +- .../provreqclient/client_test.go | 4 +- .../provreqclient/testutils.go | 7 +- 16 files changed, 359 insertions(+), 92 deletions(-) rename cluster-autoscaler/processors/provreq/{provisioning_request_processors.go => provisioning_request_pods_filter.go} (100%) rename cluster-autoscaler/processors/provreq/{provisioning_request_processors_test.go => provisioning_request_pods_filter_test.go} (100%) create mode 100644 cluster-autoscaler/processors/provreq/provsioning_request_processor.go create mode 100644 cluster-autoscaler/provisioningrequest/checkcapacity/processor.go create mode 100644 cluster-autoscaler/provisioningrequest/checkcapacity/processor_test.go diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 04a30fc609b0..ae6d5f085a36 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -507,6 +507,10 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr a.AutoscalingContext.DebuggingSnapshotter.SetClusterNodes(l) } + if err := a.processors.ProvisioningRequestProcessor.Process(); err != nil { + klog.Errorf("Failed to process ProvisioningRequests, err: %v", err) + } + unschedulablePodsToHelp, err := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods) if err != nil { diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index d0480eb1f99a..2d5656329860 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -31,6 +31,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation" "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator" "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config" @@ -469,15 +470,6 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions) drainabilityRules := rules.Default(deleteOptions) - scaleUpOrchestrator := orchestrator.New() - if *provisioningRequestsEnabled { - kubeClient := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts) - scaleUpOrchestrator, err = orchestrator.NewWrapperOrchestrator(kubeClient) - if err != nil { - return nil, err - } - } - opts := core.AutoscalerOptions{ AutoscalingOptions: autoscalingOptions, ClusterSnapshot: clustersnapshot.NewDeltaClusterSnapshot(), @@ -487,14 +479,27 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter PredicateChecker: predicateChecker, DeleteOptions: deleteOptions, DrainabilityRules: drainabilityRules, - ScaleUpOrchestrator: scaleUpOrchestrator, + ScaleUpOrchestrator: orchestrator.New(), } opts.Processors = ca_processors.DefaultProcessors(autoscalingOptions) opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime, *forceDaemonSets) podListProcessor := podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker) + if autoscalingOptions.ProvisioningRequestEnabled { podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager())) + + kubeClient := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts) + scaleUpOrchestrator, err := orchestrator.NewWrapperOrchestrator(kubeClient) + if err != nil { + return nil, err + } + opts.ScaleUpOrchestrator = scaleUpOrchestrator + provReqProcessor, err := checkcapacity.NewCheckCapacityProcessor(kubeClient) + if err != nil { + return nil, err + } + opts.Processors.ProvisioningRequestProcessor = provReqProcessor } opts.Processors.PodListProcessor = podListProcessor scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{} diff --git a/cluster-autoscaler/processors/processors.go b/cluster-autoscaler/processors/processors.go index 43957b14889b..7e1cb3d31a6b 100644 --- a/cluster-autoscaler/processors/processors.go +++ b/cluster-autoscaler/processors/processors.go @@ -29,6 +29,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" "k8s.io/autoscaler/cluster-autoscaler/processors/nodes" "k8s.io/autoscaler/cluster-autoscaler/processors/pods" + "k8s.io/autoscaler/cluster-autoscaler/processors/provreq" "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates" "k8s.io/autoscaler/cluster-autoscaler/processors/status" ) @@ -73,7 +74,8 @@ type AutoscalingProcessors struct { // * scale-downs per nodegroup // * scale-up failures per nodegroup // * scale-down failures per nodegroup - ScaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList + ScaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList + ProvisioningRequestProcessor provreq.ProvisoningRequestProcessor } // DefaultProcessors returns default set of processors. @@ -95,16 +97,17 @@ func DefaultProcessors(options config.AutoscalingOptions) *AutoscalingProcessors nodes.NewAtomicResizeFilteringProcessor(), }, ), - ScaleDownStatusProcessor: status.NewDefaultScaleDownStatusProcessor(), - AutoscalingStatusProcessor: status.NewDefaultAutoscalingStatusProcessor(), - NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(), - NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(), - NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), - CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(), - ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(), - TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false), - ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(), - ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(), + ScaleDownStatusProcessor: status.NewDefaultScaleDownStatusProcessor(), + AutoscalingStatusProcessor: status.NewDefaultAutoscalingStatusProcessor(), + NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(), + NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(), + NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), + CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(), + ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(), + TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false), + ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(), + ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(), + ProvisioningRequestProcessor: provreq.NewDefaultProvisioningRequestProcessor(), } } @@ -124,4 +127,5 @@ func (ap *AutoscalingProcessors) CleanUp() { ap.CustomResourcesProcessor.CleanUp() ap.TemplateNodeInfoProvider.CleanUp() ap.ActionableClusterProcessor.CleanUp() + ap.ProvisioningRequestProcessor.CleanUp() } diff --git a/cluster-autoscaler/processors/provreq/provisioning_request_processors.go b/cluster-autoscaler/processors/provreq/provisioning_request_pods_filter.go similarity index 100% rename from cluster-autoscaler/processors/provreq/provisioning_request_processors.go rename to cluster-autoscaler/processors/provreq/provisioning_request_pods_filter.go diff --git a/cluster-autoscaler/processors/provreq/provisioning_request_processors_test.go b/cluster-autoscaler/processors/provreq/provisioning_request_pods_filter_test.go similarity index 100% rename from cluster-autoscaler/processors/provreq/provisioning_request_processors_test.go rename to cluster-autoscaler/processors/provreq/provisioning_request_pods_filter_test.go diff --git a/cluster-autoscaler/processors/provreq/provsioning_request_processor.go b/cluster-autoscaler/processors/provreq/provsioning_request_processor.go new file mode 100644 index 000000000000..af7bf4db2742 --- /dev/null +++ b/cluster-autoscaler/processors/provreq/provsioning_request_processor.go @@ -0,0 +1,41 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provreq + +// ProvisoningRequestProcessor process ProvisionignRequests in the cluster. +type ProvisoningRequestProcessor interface { + Process() error + CleanUp() +} + +// NoOpProvisoningRequestProcessor do nothing. +type NoOpProvisoningRequestProcessor struct { +} + +// NewDefaultProvisioningRequestProcessor creates an instance of PodListProcessor. +func NewDefaultProvisioningRequestProcessor() ProvisoningRequestProcessor { + return &NoOpProvisoningRequestProcessor{} +} + +// Process processes lists of unschedulable and scheduled pods before scaling of the cluster. +func (p *NoOpProvisoningRequestProcessor) Process() error { + return nil +} + +// CleanUp cleans up the processor's internal structures. +func (p *NoOpProvisoningRequestProcessor) CleanUp() { +} diff --git a/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1/types.go b/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1/types.go index 881b9ca29b4b..ee6b93e7d31e 100644 --- a/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1/types.go +++ b/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1/types.go @@ -175,11 +175,8 @@ type Detail string // The following constants list all currently available Conditions Type values. // See: https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Condition const ( - // CapacityFound indicates that all of the requested resources were - // fount in the cluster. - CapacityFound string = "CapacityFound" - // Expired indicates that the ProvisioningRequest had CapacityFound condition before - // and the reservation time is expired. + // BookingExpired indicates that the ProvisioningRequest had Provisioned condition before + // and capacity reservation time is expired. BookingExpired string = "BookingExpired" // Provisioned indicates that all of the requested resources were created // and are available in the cluster. CA will set this condition when the diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/condition.go b/cluster-autoscaler/provisioningrequest/checkcapacity/condition.go index 896428371095..1638bdfb91e4 100644 --- a/cluster-autoscaler/provisioningrequest/checkcapacity/condition.go +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/condition.go @@ -17,19 +17,12 @@ limitations under the License. package checkcapacity import ( - "time" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" "k8s.io/klog/v2" ) -const ( - defaultReservationTime = 10 * time.Minute - defaultExpirationTime = 7 * 24 * time.Hour // 7 days -) - const ( //CapacityIsNotFoundReason is added when capacity was not found in the cluster. CapacityIsNotFoundReason = "CapacityIsNotFound" @@ -37,6 +30,14 @@ const ( CapacityIsFoundReason = "CapacityIsFound" //FailedToBookCapacityReason is added when Cluster Autoscaler failed to book capacity in the cluster. FailedToBookCapacityReason = "FailedToBookCapacity" + //CapacityReservationTimeExpiredReason is added whed capacity reservation time is expired. + CapacityReservationTimeExpiredReason = "CapacityReservationTimeExpired" + //CapacityReservationTimeExpiredMsg is added if capacity reservation time is expired. + CapacityReservationTimeExpiredMsg = "Capacity reservation time is expired" + //ExpiredReason is added if ProvisioningRequest is expired. + ExpiredReason = "Expired" + //ExpiredMsg is added if ProvisioningRequest is expired. + ExpiredMsg = "ProvisioningRequest is expired" ) func shouldCapacityBeBooked(pr *provreqwrapper.ProvisioningRequest) bool { @@ -50,26 +51,26 @@ func shouldCapacityBeBooked(pr *provreqwrapper.ProvisioningRequest) bool { for _, condition := range pr.Conditions() { if checkConditionType(condition, v1beta1.BookingExpired) || checkConditionType(condition, v1beta1.Failed) { return false - } else if checkConditionType(condition, v1beta1.CapacityFound) { + } else if checkConditionType(condition, v1beta1.Provisioned) { book = true } } return book } -func setCondition(pr *provreqwrapper.ProvisioningRequest, conditionType string, conditionStatus v1.ConditionStatus, reason, message string) { +func setCondition(pr *provreqwrapper.ProvisioningRequest, conditionType string, conditionStatus v1.ConditionStatus, reason, message string, now v1.Time) { var newConditions []v1.Condition newCondition := v1.Condition{ Type: conditionType, Status: conditionStatus, ObservedGeneration: pr.V1Beta1().GetObjectMeta().GetGeneration(), - LastTransitionTime: v1.Now(), + LastTransitionTime: now, Reason: reason, Message: message, } prevConditions := pr.Conditions() switch conditionType { - case v1beta1.CapacityFound, v1beta1.BookingExpired, v1beta1.Failed: + case v1beta1.Provisioned, v1beta1.BookingExpired, v1beta1.Failed: conditionFound := false for _, condition := range prevConditions { if condition.Type == conditionType { diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/condition_test.go b/cluster-autoscaler/provisioningrequest/checkcapacity/condition_test.go index 5a8efe78a770..f95b410637f5 100644 --- a/cluster-autoscaler/provisioningrequest/checkcapacity/condition_test.go +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/condition_test.go @@ -34,7 +34,7 @@ func TestBookCapacity(t *testing.T) { name: "BookingExpired", prConditions: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, { @@ -48,7 +48,7 @@ func TestBookCapacity(t *testing.T) { name: "Failed", prConditions: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, { @@ -66,7 +66,7 @@ func TestBookCapacity(t *testing.T) { name: "Capacity found and provisioned", prConditions: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, { @@ -80,7 +80,7 @@ func TestBookCapacity(t *testing.T) { name: "Capacity is not found", prConditions: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionFalse, }, }, @@ -115,29 +115,29 @@ func TestSetCondition(t *testing.T) { want []v1.Condition }{ { - name: "CapacityFound added, empty conditions before", - newType: v1beta1.CapacityFound, + name: "Provisioned added, empty conditions before", + newType: v1beta1.Provisioned, newStatus: v1.ConditionTrue, want: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, }, }, { - name: "CapacityFound updated", + name: "Provisioned updated", oldConditions: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionFalse, }, }, - newType: v1beta1.CapacityFound, + newType: v1beta1.Provisioned, newStatus: v1.ConditionTrue, want: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, }, @@ -146,7 +146,7 @@ func TestSetCondition(t *testing.T) { name: "Failed added, non-empty conditions before", oldConditions: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, }, @@ -154,7 +154,7 @@ func TestSetCondition(t *testing.T) { newStatus: v1.ConditionTrue, want: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, { @@ -163,28 +163,11 @@ func TestSetCondition(t *testing.T) { }, }, }, - { - name: "Provisioned condition type, conditions are not updated", - oldConditions: []v1.Condition{ - { - Type: v1beta1.CapacityFound, - Status: v1.ConditionTrue, - }, - }, - newType: v1beta1.Provisioned, - newStatus: v1.ConditionFalse, - want: []v1.Condition{ - { - Type: v1beta1.CapacityFound, - Status: v1.ConditionTrue, - }, - }, - }, { name: "Unknown condition status, conditions are updated", oldConditions: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, }, @@ -192,7 +175,7 @@ func TestSetCondition(t *testing.T) { newStatus: v1.ConditionUnknown, want: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, { @@ -205,7 +188,7 @@ func TestSetCondition(t *testing.T) { name: "Unknown condition type, conditions are not updated", oldConditions: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, }, @@ -213,7 +196,7 @@ func TestSetCondition(t *testing.T) { newStatus: v1.ConditionTrue, want: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, }, @@ -233,19 +216,19 @@ func TestSetCondition(t *testing.T) { name: "Capacity found with unknown condition before", oldConditions: []v1.Condition{ { - Type: v1beta1.Provisioned, + Type: "unknown", Status: v1.ConditionTrue, }, }, - newType: v1beta1.CapacityFound, + newType: v1beta1.Provisioned, newStatus: v1.ConditionTrue, want: []v1.Condition{ { - Type: v1beta1.Provisioned, + Type: "unknown", Status: v1.ConditionTrue, }, { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, }, @@ -259,7 +242,7 @@ func TestSetCondition(t *testing.T) { Conditions: test.oldConditions, }, }, nil) - setCondition(pr, test.newType, test.newStatus, "", "") + setCondition(pr, test.newType, test.newStatus, "", "", v1.Now()) got := pr.Conditions() if len(got) > 2 || len(got) != len(test.want) || got[0].Type != test.want[0].Type || got[0].Status != test.want[0].Status { t.Errorf("want %v, got: %v", test.want, got) diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator.go b/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator.go index f8e26eabd7d0..260f0ca1d639 100644 --- a/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator.go +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator.go @@ -125,7 +125,7 @@ func (o *provReqOrchestrator) bookCapacity() error { // ClusterAutoscaler was able to create pods before, so we shouldn't have error here. // If there is an error, mark PR as invalid, because we won't be able to book capacity // for it anyway. - setCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err)) + setCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now()) continue } podsToCreate = append(podsToCreate, pods...) @@ -149,10 +149,10 @@ func (o *provReqOrchestrator) scaleUp(unschedulablePods []*apiv1.Pod) (bool, err } st, _, err := o.injector.TrySchedulePods(o.context.ClusterSnapshot, unschedulablePods, scheduling.ScheduleAnywhere, true) if len(st) < len(unschedulablePods) || err != nil { - setCondition(provReq, v1beta1.CapacityFound, metav1.ConditionFalse, CapacityIsFoundReason, "Capacity is not found, CA will try to find it later.") + setCondition(provReq, v1beta1.Provisioned, metav1.ConditionFalse, CapacityIsFoundReason, "Capacity is not found, CA will try to find it later.", metav1.Now()) return false, err } - setCondition(provReq, v1beta1.CapacityFound, metav1.ConditionTrue, CapacityIsFoundReason, "Capacity is found in the cluster.") + setCondition(provReq, v1beta1.Provisioned, metav1.ConditionTrue, CapacityIsFoundReason, "Capacity is found in the cluster.", metav1.Now()) return true, nil } diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator_test.go b/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator_test.go index 4fe874b96c2e..62ad7a9d017a 100644 --- a/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator_test.go +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator_test.go @@ -56,7 +56,7 @@ func TestScaleUp(t *testing.T) { newCpuProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "newCpuProvReq", "5m", "5", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity) newMemProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "newMemProvReq", "1m", "100", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity) bookedCapacityProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "bookedCapacity", "1m", "200", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity) - bookedCapacityProvReq.SetConditions([]metav1.Condition{{Type: v1beta1.CapacityFound, Status: metav1.ConditionTrue, LastTransitionTime: metav1.Now()}}) + bookedCapacityProvReq.SetConditions([]metav1.Condition{{Type: v1beta1.Provisioned, Status: metav1.ConditionTrue, LastTransitionTime: metav1.Now()}}) expiredProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "bookedCapacity", "1m", "200", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity) expiredProvReq.SetConditions([]metav1.Condition{{Type: v1beta1.BookingExpired, Status: metav1.ConditionTrue, LastTransitionTime: metav1.Now()}}) differentProvReqClass := provreqwrapper.BuildTestProvisioningRequest("ns", "differentProvReqClass", "1", "1", "", int32(5), false, time.Now(), v1beta1.ProvisioningClassAtomicScaleUp) diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/processor.go b/cluster-autoscaler/provisioningrequest/checkcapacity/processor.go new file mode 100644 index 000000000000..0cce589baba4 --- /dev/null +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/processor.go @@ -0,0 +1,79 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checkcapacity + +import ( + "fmt" + "time" + + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" + "k8s.io/client-go/rest" +) + +const ( + defaultReservationTime = 10 * time.Minute + defaultExpirationTime = 7 * 24 * time.Hour // 7 days +) + +type checkCapacityProcessor struct { + client provisioningRequestClient + now func() time.Time +} + +// NewCheckCapacityProcessor return ProvisioningRequestProcessor for Check-capacity ProvisioningClass. +func NewCheckCapacityProcessor(kubeConfig *rest.Config) (*checkCapacityProcessor, error) { + client, err := provreqclient.NewProvisioningRequestClient(kubeConfig) + if err != nil { + return nil, err + } + return &checkCapacityProcessor{client: client, now: time.Now}, nil +} + +// Process iterates over ProvisioningRequests and apply: +// -BookingExpired condition for Provisioned ProvisioningRequest if capacity reservation time is expired. +// -Failed condition for ProvisioningRequest that were not provisioned during defaultExpirationTime. +// TODO(yaroslava): fetch reservation and expiration time from ProvisioningRequest +func (p *checkCapacityProcessor) Process() error { + provReqs, err := p.client.ProvisioningRequests() + if err != nil { + return fmt.Errorf("Couldn't fetch ProvisioningRequests in the cluster: %v", err) + } + for _, provReq := range provReqs { + conditions := provReq.Conditions() + if apimeta.IsStatusConditionTrue(conditions, v1beta1.BookingExpired) || apimeta.IsStatusConditionTrue(conditions, v1beta1.Failed) { + continue + } + provisioned := apimeta.FindStatusCondition(conditions, v1beta1.Provisioned) + if provisioned != nil && provisioned.Status == metav1.ConditionTrue { + if provisioned.LastTransitionTime.Add(defaultReservationTime).Before(p.now()) { + setCondition(provReq, v1beta1.BookingExpired, metav1.ConditionTrue, CapacityReservationTimeExpiredReason, CapacityReservationTimeExpiredMsg, metav1.NewTime(p.now())) + } + } else { + created := provReq.CreationTimestamp() + if created.Add(defaultExpirationTime).Before(p.now()) { + setCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, ExpiredReason, ExpiredMsg, metav1.NewTime(p.now())) + } + } + } + return nil +} + +// Cleanup cleans up internal state. +func (p *checkCapacityProcessor) CleanUp() {} diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/processor_test.go b/cluster-autoscaler/provisioningrequest/checkcapacity/processor_test.go new file mode 100644 index 000000000000..54b14f04d17a --- /dev/null +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/processor_test.go @@ -0,0 +1,152 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checkcapacity + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + // "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" +) + +func TestProcess(t *testing.T) { + now := time.Now() + dayAgo := now.Add(-1 * 24 * time.Hour) + weekAgo := now.Add(-1 * defaultExpirationTime).Add(-1 * 5 * time.Minute) + + testCases := []struct { + name string + creationTime time.Time + conditions []metav1.Condition + wantConditions []metav1.Condition + }{ + { + name: "New ProvisioningRequest, empty conditions", + creationTime: now, + }, + { + name: "ProvisioningRequest with empty conditions, expired", + creationTime: weekAgo, + wantConditions: []metav1.Condition{ + { + Type: v1beta1.Failed, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(now), + Reason: ExpiredReason, + Message: ExpiredMsg, + }, + }, + }, + { + name: "ProvisioningRequest wasn't provisioned, expired", + creationTime: weekAgo, + conditions: []metav1.Condition{ + { + Type: v1beta1.Provisioned, + Status: metav1.ConditionFalse, + LastTransitionTime: metav1.NewTime(dayAgo), + Reason: ExpiredReason, + Message: ExpiredMsg, + }, + }, + wantConditions: []metav1.Condition{ + { + Type: v1beta1.Provisioned, + Status: metav1.ConditionFalse, + LastTransitionTime: metav1.NewTime(dayAgo), + Reason: ExpiredReason, + Message: ExpiredMsg, + }, + { + Type: v1beta1.Failed, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(now), + Reason: ExpiredReason, + Message: ExpiredMsg, + }, + }, + }, + { + name: "BookingCapacity time is expired ", + creationTime: dayAgo, + conditions: []metav1.Condition{ + { + Type: v1beta1.Provisioned, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(dayAgo), + Reason: ExpiredReason, + Message: ExpiredMsg, + }, + }, + wantConditions: []metav1.Condition{ + { + Type: v1beta1.Provisioned, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(dayAgo), + Reason: ExpiredReason, + Message: ExpiredMsg, + }, + { + Type: v1beta1.BookingExpired, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(now), + Reason: CapacityReservationTimeExpiredReason, + Message: CapacityReservationTimeExpiredMsg, + }, + }, + }, + { + name: "Failed ProvisioningRequest", + creationTime: dayAgo, + conditions: []metav1.Condition{ + { + Type: v1beta1.Failed, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(dayAgo), + Reason: "Failed", + Message: "Failed", + }, + }, + wantConditions: []metav1.Condition{ + { + Type: v1beta1.Failed, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(dayAgo), + Reason: "Failed", + Message: "Failed", + }, + }, + }, + } + for _, test := range testCases { + pr := provreqclient.ProvisioningRequestWrapperTests("namespace", "name-1") + pr.V1Beta1().Status.Conditions = test.conditions + pr.V1Beta1().CreationTimestamp = metav1.NewTime(test.creationTime) + client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, pr) + processor := checkCapacityProcessor{client, func() time.Time { return now }} + processor.Process() + got, err := client.ProvisioningRequests() + assert.NoError(t, err) + assert.ElementsMatch(t, test.wantConditions, got[0].Conditions()) + } +} diff --git a/cluster-autoscaler/provisioningrequest/provreqclient/client.go b/cluster-autoscaler/provisioningrequest/provreqclient/client.go index f11c5cd42555..74d9d958b0b8 100644 --- a/cluster-autoscaler/provisioningrequest/provreqclient/client.go +++ b/cluster-autoscaler/provisioningrequest/provreqclient/client.go @@ -41,15 +41,15 @@ const ( provisioningRequestClientCallTimeout = 4 * time.Second ) -// ProvisioningRequestClientV1beta1 represents client for v1beta1 ProvReq CRD. -type ProvisioningRequestClientV1beta1 struct { +// ProvisioningRequestClient represents client for v1beta1 ProvReq CRD. +type ProvisioningRequestClient struct { client versioned.Interface provReqLister listers.ProvisioningRequestLister podTemplLister v1.PodTemplateLister } // NewProvisioningRequestClient configures and returns a provisioningRequestClient. -func NewProvisioningRequestClient(kubeConfig *rest.Config) (*ProvisioningRequestClientV1beta1, error) { +func NewProvisioningRequestClient(kubeConfig *rest.Config) (*ProvisioningRequestClient, error) { prClient, err := newPRClient(kubeConfig) if err != nil { return nil, fmt.Errorf("Failed to create Provisioning Request client: %v", err) @@ -70,7 +70,7 @@ func NewProvisioningRequestClient(kubeConfig *rest.Config) (*ProvisioningRequest return nil, err } - return &ProvisioningRequestClientV1beta1{ + return &ProvisioningRequestClient{ client: prClient, provReqLister: provReqLister, podTemplLister: podTemplLister, @@ -78,7 +78,7 @@ func NewProvisioningRequestClient(kubeConfig *rest.Config) (*ProvisioningRequest } // ProvisioningRequest gets a specific ProvisioningRequest CR. -func (c *ProvisioningRequestClientV1beta1) ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error) { +func (c *ProvisioningRequestClient) ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error) { v1Beta1PR, err := c.provReqLister.ProvisioningRequests(namespace).Get(name) if err != nil { return nil, err @@ -91,7 +91,7 @@ func (c *ProvisioningRequestClientV1beta1) ProvisioningRequest(namespace, name s } // ProvisioningRequests gets all ProvisioningRequest CRs. -func (c *ProvisioningRequestClientV1beta1) ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error) { +func (c *ProvisioningRequestClient) ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error) { v1Beta1PRs, err := c.provReqLister.List(labels.Everything()) if err != nil { return nil, fmt.Errorf("error fetching provisioningRequests: %w", err) @@ -108,7 +108,7 @@ func (c *ProvisioningRequestClientV1beta1) ProvisioningRequests() ([]*provreqwra } // FetchPodTemplates fetches PodTemplates referenced by the Provisioning Request. -func (c *ProvisioningRequestClientV1beta1) FetchPodTemplates(pr *v1beta1.ProvisioningRequest) ([]*apiv1.PodTemplate, error) { +func (c *ProvisioningRequestClient) FetchPodTemplates(pr *v1beta1.ProvisioningRequest) ([]*apiv1.PodTemplate, error) { podTemplates := make([]*apiv1.PodTemplate, 0, len(pr.Spec.PodSets)) for _, podSpec := range pr.Spec.PodSets { podTemplate, err := c.podTemplLister.PodTemplates(pr.Namespace).Get(podSpec.PodTemplateRef.Name) diff --git a/cluster-autoscaler/provisioningrequest/provreqclient/client_test.go b/cluster-autoscaler/provisioningrequest/provreqclient/client_test.go index 333673ec51bb..c230ca0d18d9 100644 --- a/cluster-autoscaler/provisioningrequest/provreqclient/client_test.go +++ b/cluster-autoscaler/provisioningrequest/provreqclient/client_test.go @@ -25,8 +25,8 @@ import ( ) func TestFetchPodTemplates(t *testing.T) { - pr1 := provisioningRequestBetaForTests("namespace", "name-1") - pr2 := provisioningRequestBetaForTests("namespace", "name-2") + pr1 := ProvisioningRequestWrapperTests("namespace", "name-1") + pr2 := ProvisioningRequestWrapperTests("namespace", "name-2") mockProvisioningRequests := []*provreqwrapper.ProvisioningRequest{pr1, pr2} ctx := context.Background() diff --git a/cluster-autoscaler/provisioningrequest/provreqclient/testutils.go b/cluster-autoscaler/provisioningrequest/provreqclient/testutils.go index b2c4a78a8380..43a8796df13e 100644 --- a/cluster-autoscaler/provisioningrequest/provreqclient/testutils.go +++ b/cluster-autoscaler/provisioningrequest/provreqclient/testutils.go @@ -35,7 +35,7 @@ import ( ) // NewFakeProvisioningRequestClient mock ProvisioningRequestClient for tests. -func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ...*provreqwrapper.ProvisioningRequest) *ProvisioningRequestClientV1beta1 { +func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ...*provreqwrapper.ProvisioningRequest) *ProvisioningRequestClient { t.Helper() provReqClient := fake.NewSimpleClientset() podTemplClient := fake_kubernetes.NewSimpleClientset() @@ -60,7 +60,7 @@ func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ... if err != nil { t.Fatalf("Failed to create Provisioning Request lister. Error was: %v", err) } - return &ProvisioningRequestClientV1beta1{ + return &ProvisioningRequestClient{ client: provReqClient, provReqLister: provReqLister, podTemplLister: podTemplLister, @@ -83,7 +83,8 @@ func newFakePodTemplatesLister(t *testing.T, client kubernetes.Interface, channe return podTemplLister, nil } -func provisioningRequestBetaForTests(namespace, name string) *provreqwrapper.ProvisioningRequest { +// ProvisioningRequestWrapperTests mock ProvisioningRequest for tests. +func ProvisioningRequestWrapperTests(namespace, name string) *provreqwrapper.ProvisioningRequest { if namespace == "" { namespace = "default" }