From d3bb89dcc8d9706cf1a273613c22c5d9914c75a6 Mon Sep 17 00:00:00 2001 From: Devansh Das Date: Mon, 9 Sep 2024 18:34:37 +0000 Subject: [PATCH] Add support for frequent loops when provisioningrequest is encountered in last iteration --- cluster-autoscaler/loop/trigger.go | 23 ++++++++++----- cluster-autoscaler/main.go | 29 ++++++++++--------- .../processors/provreq/injector.go | 16 +++++++--- .../processors/provreq/injector_test.go | 2 +- 4 files changed, 44 insertions(+), 26 deletions(-) diff --git a/cluster-autoscaler/loop/trigger.go b/cluster-autoscaler/loop/trigger.go index 52ef962ba1b0..85d6e14b2237 100644 --- a/cluster-autoscaler/loop/trigger.go +++ b/cluster-autoscaler/loop/trigger.go @@ -43,19 +43,25 @@ type scalingTimesGetter interface { LastScaleDownDeleteTime() time.Time } +type provisioningRequestProcessTimeGetter interface { + LastProvisioningRequestProcessedTime() time.Time +} + // LoopTrigger object implements criteria used to start new autoscaling iteration type LoopTrigger struct { - podObserver *UnschedulablePodObserver - scanInterval time.Duration - scalingTimesGetter scalingTimesGetter + podObserver *UnschedulablePodObserver + scanInterval time.Duration + scalingTimesGetter scalingTimesGetter + provisioningRequestProcessTimeGetter provisioningRequestProcessTimeGetter } // NewLoopTrigger creates a LoopTrigger object -func NewLoopTrigger(podObserver *UnschedulablePodObserver, scalingTimesGetter scalingTimesGetter, scanInterval time.Duration) *LoopTrigger { +func NewLoopTrigger(podObserver *UnschedulablePodObserver, scalingTimesGetter scalingTimesGetter, scanInterval time.Duration, provisioningRequestProcessTimeGetter provisioningRequestProcessTimeGetter) *LoopTrigger { return &LoopTrigger{ - podObserver: podObserver, - scanInterval: scanInterval, - scalingTimesGetter: scalingTimesGetter, + podObserver: podObserver, + scanInterval: scanInterval, + scalingTimesGetter: scalingTimesGetter, + provisioningRequestProcessTimeGetter: provisioningRequestProcessTimeGetter, } } @@ -67,7 +73,8 @@ func (t *LoopTrigger) Wait(lastRun time.Time) { // To improve scale-up throughput, Cluster Autoscaler starts new iteration // immediately if the previous one was productive. if !t.scalingTimesGetter.LastScaleUpTime().Before(lastRun) || - !t.scalingTimesGetter.LastScaleDownDeleteTime().Before(lastRun) { + !t.scalingTimesGetter.LastScaleDownDeleteTime().Before(lastRun) || + !t.provisioningRequestProcessTimeGetter.LastProvisioningRequestProcessedTime().Before(lastRun) { select { case <-t.podObserver.unschedulablePodChan: klog.Info("Autoscaler loop triggered by unschedulable pod appearing") diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index d1659ec45751..3d8b57b890bf 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -465,12 +465,7 @@ func registerSignalHandlers(autoscaler core.Autoscaler) { }() } -func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, error) { - // Create basic config from flags. - autoscalingOptions := createAutoscalingOptions() - - autoscalingOptions.KubeClientOpts.KubeClientBurst = int(*kubeClientBurst) - autoscalingOptions.KubeClientOpts.KubeClientQPS = float32(*kubeClientQPS) +func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter, autoscalingOptions config.AutoscalingOptions, restConfig *rest.Config, injector pods.PodListProcessor) (core.Autoscaler, error) { kubeClient := kube_util.CreateKubeClient(autoscalingOptions.KubeClientOpts) // Informer transform to trim ManagedFields for memory efficiency. @@ -508,7 +503,6 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter if autoscalingOptions.ProvisioningRequestEnabled { podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager())) - restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts) client, err := provreqclient.NewProvisioningRequestClient(restConfig) if err != nil { return nil, err @@ -525,10 +519,6 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter return nil, err } opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor}) - injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig) - if err != nil { - return nil, err - } podListProcessor.AddProcessor(injector) podListProcessor.AddProcessor(provreqProcesor) } @@ -609,7 +599,20 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) { metrics.RegisterAll(*emitPerNodeGroupMetrics) - autoscaler, err := buildAutoscaler(debuggingSnapshotter) + // Create basic config from flags. + autoscalingOptions := createAutoscalingOptions() + + autoscalingOptions.KubeClientOpts.KubeClientBurst = int(*kubeClientBurst) + autoscalingOptions.KubeClientOpts.KubeClientQPS = float32(*kubeClientQPS) + + restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts) + + injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig) + if err != nil { + klog.Fatalf("Failed to create provisioning request pods injector: %v", err) + } + + autoscaler, err := buildAutoscaler(debuggingSnapshotter, autoscalingOptions, restConfig, injector) if err != nil { klog.Fatalf("Failed to create autoscaler: %v", err) } @@ -630,7 +633,7 @@ func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapsho defer cancel() if *frequentLoopsEnabled { podObserver := loop.StartPodObserver(context, kube_util.CreateKubeClient(createAutoscalingOptions().KubeClientOpts)) - trigger := loop.NewLoopTrigger(podObserver, autoscaler, *scanInterval) + trigger := loop.NewLoopTrigger(podObserver, autoscaler, *scanInterval, injector) lastRun := time.Now() for { trigger.Wait(lastRun) diff --git a/cluster-autoscaler/processors/provreq/injector.go b/cluster-autoscaler/processors/provreq/injector.go index 538563d24cbe..b4718c1ec5d7 100644 --- a/cluster-autoscaler/processors/provreq/injector.go +++ b/cluster-autoscaler/processors/provreq/injector.go @@ -24,7 +24,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1" "k8s.io/autoscaler/cluster-autoscaler/context" - "k8s.io/autoscaler/cluster-autoscaler/processors/pods" + // "k8s.io/autoscaler/cluster-autoscaler/processors/pods" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest" provreqconditions "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" provreqpods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" @@ -41,8 +41,9 @@ const ( // ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list. type ProvisioningRequestPodsInjector struct { - client *provreqclient.ProvisioningRequestClient - clock clock.PassiveClock + client *provreqclient.ProvisioningRequestClient + clock clock.PassiveClock + lastProvisioningRequestProcessedTime time.Time } // IsAvailableForProvisioning checks if the provisioning request is the correct state for processing and provisioning has not been attempted recently. @@ -68,6 +69,7 @@ func (p *ProvisioningRequestPodsInjector) MarkAsAccepted(pr *provreqwrapper.Prov klog.Errorf("failed add Accepted condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err) return err } + p.lastProvisioningRequestProcessedTime = p.clock.Now() return nil } @@ -109,6 +111,7 @@ func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest( if err := p.MarkAsAccepted(pr); err != nil { continue } + return podsFromProvReq, nil } return nil, nil @@ -136,10 +139,15 @@ func (p *ProvisioningRequestPodsInjector) Process( func (p *ProvisioningRequestPodsInjector) CleanUp() {} // NewProvisioningRequestPodsInjector creates a ProvisioningRequest filter processor. -func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config) (pods.PodListProcessor, error) { +func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config) (*ProvisioningRequestPodsInjector, error) { client, err := provreqclient.NewProvisioningRequestClient(kubeConfig) if err != nil { return nil, err } return &ProvisioningRequestPodsInjector{client: client, clock: clock.RealClock{}}, nil } + +// LastProvisioningRequestProcessedTime returns the time when the last provisioning request was processed. +func (p *ProvisioningRequestPodsInjector) LastProvisioningRequestProcessedTime() time.Time { + return p.lastProvisioningRequestProcessedTime +} diff --git a/cluster-autoscaler/processors/provreq/injector_test.go b/cluster-autoscaler/processors/provreq/injector_test.go index 533c2b979bdf..4a9a0d0e0967 100644 --- a/cluster-autoscaler/processors/provreq/injector_test.go +++ b/cluster-autoscaler/processors/provreq/injector_test.go @@ -124,7 +124,7 @@ func TestProvisioningRequestPodsInjector(t *testing.T) { } for _, tc := range testCases { client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...) - injector := ProvisioningRequestPodsInjector{client, clock.NewFakePassiveClock(now)} + injector := ProvisioningRequestPodsInjector{client, clock.NewFakePassiveClock(now), now} getUnscheduledPods, err := injector.Process(nil, provreqwrapper.BuildTestPods("ns", "pod", tc.existingUnsUnschedulablePodCount)) if err != nil { t.Errorf("%s failed: injector.Process return error %v", tc.name, err)