From c3758271fde327f08735554cd64ff95b57cddde8 Mon Sep 17 00:00:00 2001 From: Knative Prow Robot Date: Thu, 9 Feb 2023 10:47:48 +0000 Subject: [PATCH] [release-1.8] Extract scheduler config in a dedicate struct instead of many parameters (#6739) This is an automated cherry-pick of #6736 Signed-off-by: Pierangelo Di Pilato Co-authored-by: Pierangelo Di Pilato --- pkg/scheduler/statefulset/autoscaler.go | 26 +++---- pkg/scheduler/statefulset/autoscaler_test.go | 33 +++++++-- pkg/scheduler/statefulset/scheduler.go | 75 +++++++++++++++----- pkg/scheduler/statefulset/scheduler_test.go | 14 +++- 4 files changed, 108 insertions(+), 40 deletions(-) diff --git a/pkg/scheduler/statefulset/autoscaler.go b/pkg/scheduler/statefulset/autoscaler.go index 6b66a0584a0..a829668d35d 100644 --- a/pkg/scheduler/statefulset/autoscaler.go +++ b/pkg/scheduler/statefulset/autoscaler.go @@ -28,10 +28,11 @@ import ( "k8s.io/apimachinery/pkg/util/wait" clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1" - "knative.dev/eventing/pkg/scheduler" - st "knative.dev/eventing/pkg/scheduler/state" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/logging" + + "knative.dev/eventing/pkg/scheduler" + st "knative.dev/eventing/pkg/scheduler/state" ) type Autoscaler interface { @@ -60,24 +61,17 @@ type autoscaler struct { lock sync.Locker } -func NewAutoscaler(ctx context.Context, - namespace, name string, - lister scheduler.VPodLister, - stateAccessor st.StateAccessor, - evictor scheduler.Evictor, - refreshPeriod time.Duration, - capacity int32) Autoscaler { - +func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor) Autoscaler { return &autoscaler{ logger: logging.FromContext(ctx), - statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(namespace), - statefulSetName: name, - vpodLister: lister, + statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace), + statefulSetName: cfg.StatefulSetName, + vpodLister: cfg.VPodLister, stateAccessor: stateAccessor, - evictor: evictor, + evictor: cfg.Evictor, trigger: make(chan int32, 1), - capacity: capacity, - refreshPeriod: refreshPeriod, + capacity: cfg.PodCapacity, + refreshPeriod: cfg.RefreshPeriod, lock: new(sync.Mutex), } } diff --git a/pkg/scheduler/statefulset/autoscaler_test.go b/pkg/scheduler/statefulset/autoscaler_test.go index 830186d878f..6f0de384329 100644 --- a/pkg/scheduler/statefulset/autoscaler_test.go +++ b/pkg/scheduler/statefulset/autoscaler_test.go @@ -30,10 +30,11 @@ import ( v1 "k8s.io/client-go/listers/core/v1" gtesting "k8s.io/client-go/testing" - listers "knative.dev/eventing/pkg/reconciler/testing/v1" kubeclient "knative.dev/pkg/client/injection/kube/client/fake" _ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake" + listers "knative.dev/eventing/pkg/reconciler/testing/v1" + duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" "knative.dev/eventing/pkg/scheduler" "knative.dev/eventing/pkg/scheduler/state" @@ -377,7 +378,15 @@ func TestAutoscaler(t *testing.T) { return nil } - autoscaler := NewAutoscaler(ctx, testNs, sfsName, vpodClient.List, stateAccessor, noopEvictor, 10*time.Second, int32(10)).(*autoscaler) + cfg := &Config{ + StatefulSetNamespace: testNs, + StatefulSetName: sfsName, + VPodLister: vpodClient.List, + Evictor: noopEvictor, + RefreshPeriod: 10 * time.Second, + PodCapacity: 10, + } + autoscaler := newAutoscaler(ctx, cfg, stateAccessor).(*autoscaler) for _, vpod := range tc.vpods { vpodClient.Append(vpod) @@ -425,7 +434,15 @@ func TestAutoscalerScaleDownToZero(t *testing.T) { return nil } - autoscaler := NewAutoscaler(ctx, testNs, sfsName, vpodClient.List, stateAccessor, noopEvictor, 2*time.Second, int32(10)).(*autoscaler) + cfg := &Config{ + StatefulSetNamespace: testNs, + StatefulSetName: sfsName, + VPodLister: vpodClient.List, + Evictor: noopEvictor, + RefreshPeriod: 2 * time.Second, + PodCapacity: 10, + } + autoscaler := newAutoscaler(ctx, cfg, stateAccessor).(*autoscaler) done := make(chan bool) go func() { @@ -846,7 +863,15 @@ func TestCompactor(t *testing.T) { return nil } - autoscaler := NewAutoscaler(ctx, testNs, sfsName, vpodClient.List, stateAccessor, recordEviction, 10*time.Second, int32(10)).(*autoscaler) + cfg := &Config{ + StatefulSetNamespace: testNs, + StatefulSetName: sfsName, + VPodLister: vpodClient.List, + Evictor: recordEviction, + RefreshPeriod: 10 * time.Second, + PodCapacity: 10, + } + autoscaler := newAutoscaler(ctx, cfg, stateAccessor).(*autoscaler) for _, vpod := range tc.vpods { vpodClient.Append(vpod) diff --git a/pkg/scheduler/statefulset/scheduler.go b/pkg/scheduler/statefulset/scheduler.go index 59f56f29aca..52a3465ef20 100644 --- a/pkg/scheduler/statefulset/scheduler.go +++ b/pkg/scheduler/statefulset/scheduler.go @@ -57,7 +57,40 @@ import ( _ "knative.dev/eventing/pkg/scheduler/plugins/kafka/nomaxresourcecount" ) +type Config struct { + StatefulSetNamespace string `json:"statefulSetNamespace"` + StatefulSetName string `json:"statefulSetName"` + + // PodCapacity max capacity for each StatefulSet's pod. + PodCapacity int32 `json:"podCapacity"` + // Autoscaler refresh period + RefreshPeriod time.Duration `json:"refreshPeriod"` + + SchedulerPolicy scheduler.SchedulerPolicyType `json:"schedulerPolicy"` + SchedPolicy *scheduler.SchedulerPolicy `json:"schedPolicy"` + DeschedPolicy *scheduler.SchedulerPolicy `json:"deschedPolicy"` + + Evictor scheduler.Evictor `json:"-"` + + VPodLister scheduler.VPodLister `json:"-"` + NodeLister corev1listers.NodeLister `json:"-"` +} + +func New(ctx context.Context, cfg *Config) (scheduler.Scheduler, error) { + + podInformer := podinformer.Get(ctx) + podLister := podInformer.Lister().Pods(cfg.StatefulSetNamespace) + + stateAccessor := st.NewStateBuilder(ctx, cfg.StatefulSetNamespace, cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, podLister, cfg.NodeLister) + autoscaler := newAutoscaler(ctx, cfg, stateAccessor) + + go autoscaler.Start(ctx) + + return newStatefulSetScheduler(ctx, cfg, stateAccessor, autoscaler, podLister), nil +} + // NewScheduler creates a new scheduler with pod autoscaling enabled. +// Deprecated: Use New func NewScheduler(ctx context.Context, namespace, name string, lister scheduler.VPodLister, @@ -69,15 +102,21 @@ func NewScheduler(ctx context.Context, schedPolicy *scheduler.SchedulerPolicy, deschedPolicy *scheduler.SchedulerPolicy) scheduler.Scheduler { - podInformer := podinformer.Get(ctx) - podLister := podInformer.Lister().Pods(namespace) - - stateAccessor := st.NewStateBuilder(ctx, namespace, name, lister, capacity, schedulerPolicy, schedPolicy, deschedPolicy, podLister, nodeLister) - autoscaler := NewAutoscaler(ctx, namespace, name, lister, stateAccessor, evictor, refreshPeriod, capacity) - - go autoscaler.Start(ctx) - - return NewStatefulSetScheduler(ctx, namespace, name, lister, stateAccessor, autoscaler, podLister) + cfg := &Config{ + StatefulSetNamespace: namespace, + StatefulSetName: name, + PodCapacity: capacity, + RefreshPeriod: refreshPeriod, + SchedulerPolicy: schedulerPolicy, + SchedPolicy: schedPolicy, + DeschedPolicy: deschedPolicy, + Evictor: evictor, + VPodLister: lister, + NodeLister: nodeLister, + } + + s, _ := New(ctx, cfg) + return s } // StatefulSetScheduler is a scheduler placing VPod into statefulset-managed set of pods @@ -106,20 +145,20 @@ type StatefulSetScheduler struct { reserved map[types.NamespacedName]map[string]int32 } -func NewStatefulSetScheduler(ctx context.Context, - namespace, name string, - lister scheduler.VPodLister, +func newStatefulSetScheduler(ctx context.Context, + cfg *Config, stateAccessor st.StateAccessor, - autoscaler Autoscaler, podlister corev1listers.PodNamespaceLister) scheduler.Scheduler { + autoscaler Autoscaler, + podlister corev1listers.PodNamespaceLister) scheduler.Scheduler { scheduler := &StatefulSetScheduler{ ctx: ctx, logger: logging.FromContext(ctx), - statefulSetNamespace: namespace, - statefulSetName: name, - statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(namespace), + statefulSetNamespace: cfg.StatefulSetNamespace, + statefulSetName: cfg.StatefulSetName, + statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace), podLister: podlister, - vpodLister: lister, + vpodLister: cfg.VPodLister, pending: make(map[types.NamespacedName]int32), lock: new(sync.Mutex), stateAccessor: stateAccessor, @@ -130,7 +169,7 @@ func NewStatefulSetScheduler(ctx context.Context, // Monitor our statefulset statefulsetInformer := statefulsetinformer.Get(ctx) statefulsetInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: controller.FilterWithNameAndNamespace(namespace, name), + FilterFunc: controller.FilterWithNameAndNamespace(cfg.StatefulSetNamespace, cfg.StatefulSetName), Handler: controller.HandleAll(scheduler.updateStatefulset), }) diff --git a/pkg/scheduler/statefulset/scheduler_test.go b/pkg/scheduler/statefulset/scheduler_test.go index 4905c9d3cd1..9c534b32e31 100644 --- a/pkg/scheduler/statefulset/scheduler_test.go +++ b/pkg/scheduler/statefulset/scheduler_test.go @@ -774,7 +774,12 @@ func TestStatefulsetScheduler(t *testing.T) { lsp := listers.NewListers(podlist) lsn := listers.NewListers(nodelist) sa := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister()) - s := NewStatefulSetScheduler(ctx, testNs, sfsName, vpodClient.List, sa, nil, lsp.GetPodLister().Pods(testNs)).(*StatefulSetScheduler) + cfg := &Config{ + StatefulSetNamespace: testNs, + StatefulSetName: sfsName, + VPodLister: vpodClient.List, + } + s := newStatefulSetScheduler(ctx, cfg, sa, nil, lsp.GetPodLister().Pods(testNs)).(*StatefulSetScheduler) if tc.pending != nil { s.pending = tc.pending } @@ -867,7 +872,12 @@ func TestReservePlacements(t *testing.T) { vpodClient := tscheduler.NewVPodClient() vpodClient.Append(tc.vpod) - s := NewStatefulSetScheduler(ctx, testNs, sfsName, vpodClient.List, nil, nil, nil).(*StatefulSetScheduler) + cfg := &Config{ + StatefulSetNamespace: testNs, + StatefulSetName: sfsName, + VPodLister: vpodClient.List, + } + s := newStatefulSetScheduler(ctx, cfg, nil, nil, nil).(*StatefulSetScheduler) s.reservePlacements(tc.vpod, tc.vpod.GetPlacements()) //initial reserve s.reservePlacements(tc.vpod, tc.placements) //new reserve