From 7c9ac0bb1b8823a57aec573930bb19cf03cc02a9 Mon Sep 17 00:00:00 2001 From: lijunxin Date: Fri, 17 Jan 2025 18:48:05 +0800 Subject: [PATCH] koordlet: fix prodReclaimablePredictor result to avoid influence of oversold Signed-off-by: lijunxin --- pkg/koordlet/metrics/resource_summary.go | 16 + pkg/koordlet/prediction/peak_predictor.go | 116 +++++-- .../prediction/peak_predictor_test.go | 312 +++++++++++++----- .../statesinformer/impl/states_nodemetric.go | 18 +- .../impl/states_nodemetric_test.go | 24 +- 5 files changed, 368 insertions(+), 118 deletions(-) diff --git a/pkg/koordlet/metrics/resource_summary.go b/pkg/koordlet/metrics/resource_summary.go index 852fc9bb3..6da7b890d 100644 --- a/pkg/koordlet/metrics/resource_summary.go +++ b/pkg/koordlet/metrics/resource_summary.go @@ -35,6 +35,12 @@ var ( Help: "the node reclaimable of different priorities resources updated by koordinator", }, []string{NodeKey, PriorityKey, ResourceKey, UnitKey}) + NodeResourcePriorityReclaimableStatus = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: KoordletSubsystem, + Name: "node_resource_priority_reclaimable_status", + Help: "status of node reclaimable of different priorities resources updated by koordinator", + }, []string{NodeKey, PriorityKey}) + ContainerResourceRequests = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Subsystem: KoordletSubsystem, Name: "container_resource_requests", @@ -50,6 +56,7 @@ var ( ResourceSummaryCollectors = []prometheus.Collector{ NodeResourceAllocatable, NodeResourcePriorityReclaimable, + NodeResourcePriorityReclaimableStatus, ContainerResourceRequests, ContainerResourceLimits, } @@ -76,6 +83,15 @@ func RecordNodeResourcePriorityReclaimable(resourceName string, unit string, pri NodeResourcePriorityReclaimable.With(labels).Set(value) } +func RecordNodeResourcePriorityReclaimableStatus(priority string, value float64) { + labels := genNodeLabels() + if labels == nil { + return + } + labels[PriorityKey] = priority + NodeResourcePriorityReclaimableStatus.With(labels).Set(value) +} + func RecordContainerResourceRequests(resourceName string, unit string, status *corev1.ContainerStatus, pod *corev1.Pod, value float64) { labels := genNodeLabels() if labels == nil { diff --git a/pkg/koordlet/prediction/peak_predictor.go b/pkg/koordlet/prediction/peak_predictor.go index a93b7da17..0a33c9955 100644 --- a/pkg/koordlet/prediction/peak_predictor.go +++ b/pkg/koordlet/prediction/peak_predictor.go @@ -33,6 +33,10 @@ import ( // PredictorType defines constants for different types of predictors. type PredictorType int +type PredictorContext struct { + Node *v1.Node +} + const ( // ProdReclaimablePredictor represents the type of a reclaimable production predictor. ProdReclaimablePredictor PredictorType = iota @@ -40,7 +44,7 @@ const ( // PredictorFactory is an interface for creating predictors of different types. type PredictorFactory interface { - New(PredictorType) Predictor + New(predictorType PredictorType, context PredictorContext) Predictor } type Predictor interface { @@ -65,19 +69,22 @@ func NewPredictorFactory(predictServer PredictServer, coldStartDuration time.Dur } // New creates a new instance of a predictor based on the given type. -func (f *predictorFactory) New(t PredictorType) Predictor { +func (f *predictorFactory) New(t PredictorType, context PredictorContext) Predictor { switch t { case ProdReclaimablePredictor: podPredictor := &podReclaimablePredictor{ predictServer: f.predictServer, + node: context.Node, coldStartDuration: f.coldStartDuration, safetyMarginPercent: f.safetyMarginPercent, podFilterFn: isPodReclaimableForProd, reclaimable: util.NewZeroResourceList(), + unReclaimable: util.NewZeroResourceList(), pods: make(map[string]bool), } priorityPredictor := &priorityReclaimablePredictor{ predictServer: f.predictServer, + node: context.Node, safetyMarginPercent: f.safetyMarginPercent, priorityClassFilterFn: isPriorityClassReclaimableForProd, reclaimRequest: util.NewZeroResourceList(), @@ -119,7 +126,7 @@ func NewEmptyPredictorFactory() PredictorFactory { type emptyPredictorFactory struct { } -func (f *emptyPredictorFactory) New(t PredictorType) Predictor { +func (f *emptyPredictorFactory) New(t PredictorType, context PredictorContext) Predictor { return &emptyPredictor{} } @@ -129,12 +136,13 @@ var _ Predictor = (*podReclaimablePredictor)(nil) // e.g. A podReclaimablePredictor for Prod pods calculates the result based on the sum of the percentile of Prod pods. type podReclaimablePredictor struct { predictServer PredictServer + node *v1.Node coldStartDuration time.Duration safetyMarginPercent int podFilterFn func(pod *v1.Pod) bool // return true if the pod is reclaimable - - reclaimable v1.ResourceList - pods map[string]bool + reclaimable v1.ResourceList + unReclaimable v1.ResourceList + pods map[string]bool } // GetPredictorName is used to obtain the predictor name. @@ -180,19 +188,36 @@ func (p *podReclaimablePredictor) AddPod(pod *v1.Pod) error { podCPURequest := podRequests[v1.ResourceCPU] podMemoryRequest := podRequests[v1.ResourceMemory] + // calculate the reclaimable resources: reclaimable = podRequest - peak + // calculate the unReclaimable resources: unReclaimable = peak reclaimableCPUMilli := int64(0) reclaimableMemoryBytes := int64(0) - + unReclaimableCPUMilli := int64(0) + unReclaimableMemoryBytes := int64(0) ratioAfterSafetyMargin := float64(100+p.safetyMarginPercent) / 100 if p95CPU, ok := p95Resources[v1.ResourceCPU]; ok { peakCPU := util.MultiplyMilliQuant(p95CPU, ratioAfterSafetyMargin) + unReclaimableCPUMilli = peakCPU.MilliValue() reclaimableCPUMilli = podCPURequest.MilliValue() - peakCPU.MilliValue() } if p98Memory, ok := p98Resources[v1.ResourceMemory]; ok { peakMemory := util.MultiplyQuant(p98Memory, ratioAfterSafetyMargin) + unReclaimableMemoryBytes = peakMemory.Value() reclaimableMemoryBytes = podMemoryRequest.Value() - peakMemory.Value() } + // update the unReclaimable resources + cpu := p.unReclaimable[v1.ResourceCPU] + unReclaimableCPU := resource.NewMilliQuantity(unReclaimableCPUMilli, resource.DecimalSI) + cpu.Add(*unReclaimableCPU) + p.unReclaimable[v1.ResourceCPU] = cpu + + memory := p.unReclaimable[v1.ResourceMemory] + unReclaimableMemory := resource.NewQuantity(unReclaimableMemoryBytes, resource.BinarySI) + memory.Add(*unReclaimableMemory) + p.unReclaimable[v1.ResourceMemory] = memory + + // update the reclaimableCPUMilli resources if reclaimableCPUMilli > 0 { cpu := p.reclaimable[v1.ResourceCPU] reclaimableCPU := resource.NewMilliQuantity(reclaimableCPUMilli, resource.DecimalSI) @@ -210,10 +235,21 @@ func (p *podReclaimablePredictor) AddPod(pod *v1.Pod) error { } // GetResult returns the predicted resource list for the added pods. +// The result is the sum of the reclaimable resources of the added pods. func (p *podReclaimablePredictor) GetResult() (v1.ResourceList, error) { - metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceCPU), metrics.UnitCore, p.GetPredictorName(), float64(p.reclaimable.Cpu().MilliValue())/1000) - metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceMemory), metrics.UnitByte, p.GetPredictorName(), float64(p.reclaimable.Memory().Value())) - return p.reclaimable, nil + // if failed to get node info, stop the reclaimPredictor + if p.node == nil { + return nil, fmt.Errorf("failed to get podReclaimablePredictor result for node is nil") + } + nodeAllocatable, err := getNodeAllocatable(p.node) + if err != nil { + return nil, fmt.Errorf("failed to get allocatable of node, err=%v", err) + } + fixReclaimable := quotav1.SubtractWithNonNegativeResult(nodeAllocatable, p.unReclaimable) + fixReclaimable = util.MinResourceList(fixReclaimable, p.reclaimable) + metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceCPU), metrics.UnitCore, p.GetPredictorName(), float64(fixReclaimable.Cpu().MilliValue())/1000) + metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceMemory), metrics.UnitByte, p.GetPredictorName(), float64(fixReclaimable.Memory().Value())) + return fixReclaimable, nil } var _ Predictor = (*priorityReclaimablePredictor)(nil) @@ -223,6 +259,7 @@ var _ Predictor = (*priorityReclaimablePredictor)(nil) // Prod-tier and the system components parts. type priorityReclaimablePredictor struct { predictServer PredictServer + node *v1.Node safetyMarginPercent int priorityClassFilterFn func(p extension.PriorityClass) bool // return true if the priority class is reclaimable @@ -230,13 +267,13 @@ type priorityReclaimablePredictor struct { } // GetPredictorName is used to obtain the predictor name. -func (n *priorityReclaimablePredictor) GetPredictorName() string { +func (p *priorityReclaimablePredictor) GetPredictorName() string { return "priorityReclaimablePredictor" } -func (n *priorityReclaimablePredictor) AddPod(pod *v1.Pod) error { +func (p *priorityReclaimablePredictor) AddPod(pod *v1.Pod) error { priorityClass := extension.GetPodPriorityClassWithDefault(pod) - if !n.priorityClassFilterFn(priorityClass) { + if !p.priorityClassFilterFn(priorityClass) { klog.V(6).Infof("priorityReclaimablePredictor skip pod %s whose priority %s is not reclaimable", pod.UID, priorityClass) return nil @@ -250,31 +287,39 @@ func (n *priorityReclaimablePredictor) AddPod(pod *v1.Pod) error { } podRequests := util.GetPodRequest(pod, v1.ResourceCPU, v1.ResourceMemory) - n.reclaimRequest = quotav1.Add(n.reclaimRequest, podRequests) + p.reclaimRequest = quotav1.Add(p.reclaimRequest, podRequests) return nil } -func (n *priorityReclaimablePredictor) GetResult() (v1.ResourceList, error) { +func (p *priorityReclaimablePredictor) GetResult() (v1.ResourceList, error) { + // if failed to get node info, stop the reclaimPredictor + if p.node == nil { + return nil, fmt.Errorf("failed to get priorityReclaimablePredictor result for node is nil") + } + nodeAllocatable, err := getNodeAllocatable(p.node) + if err != nil { + return nil, fmt.Errorf("failed to get allocatable of node, err=%v", err) + } // get sys prediction - sysResult, err := n.predictServer.GetPrediction(MetricDesc{UID: getNodeItemUID(SystemItemID)}) + sysResult, err := p.predictServer.GetPrediction(MetricDesc{UID: getNodeItemUID(SystemItemID)}) if err != nil { return nil, fmt.Errorf("failed to get prediction of sys, err: %w", err) } sysResultForCPU := sysResult.Data["p95"] sysResultForMemory := sysResult.Data["p98"] - reclaimPredict := v1.ResourceList{ + unReclaimable := v1.ResourceList{ v1.ResourceCPU: *sysResultForCPU.Cpu(), v1.ResourceMemory: *sysResultForMemory.Memory(), } // get reclaimable priority class prediction for _, priorityClass := range extension.KnownPriorityClasses { - if !n.priorityClassFilterFn(priorityClass) { + if !p.priorityClassFilterFn(priorityClass) { continue } - result, err := n.predictServer.GetPrediction(MetricDesc{UID: getNodeItemUID(string(priorityClass))}) + result, err := p.predictServer.GetPrediction(MetricDesc{UID: getNodeItemUID(string(priorityClass))}) if err != nil { return nil, fmt.Errorf("failed to get prediction of priority %s, err: %s", priorityClass, err) } @@ -285,21 +330,24 @@ func (n *priorityReclaimablePredictor) GetResult() (v1.ResourceList, error) { v1.ResourceCPU: *resultForCPU.Cpu(), v1.ResourceMemory: *resultForMemory.Memory(), } - reclaimPredict = quotav1.Add(reclaimPredict, predictResource) + unReclaimable = quotav1.Add(unReclaimable, predictResource) } // scale with the safety margin - ratioAfterSafetyMargin := float64(100+n.safetyMarginPercent) / 100 - reclaimPredict = v1.ResourceList{ - v1.ResourceCPU: util.MultiplyMilliQuant(*reclaimPredict.Cpu(), ratioAfterSafetyMargin), - v1.ResourceMemory: util.MultiplyQuant(*reclaimPredict.Memory(), ratioAfterSafetyMargin), + ratioAfterSafetyMargin := float64(100+p.safetyMarginPercent) / 100 + unReclaimable = v1.ResourceList{ + v1.ResourceCPU: util.MultiplyMilliQuant(*unReclaimable.Cpu(), ratioAfterSafetyMargin), + v1.ResourceMemory: util.MultiplyQuant(*unReclaimable.Memory(), ratioAfterSafetyMargin), } // reclaimable[P] := max(request[P] - peak[P], 0) - reclaimable := quotav1.Max(quotav1.Subtract(n.reclaimRequest, reclaimPredict), util.NewZeroResourceList()) - metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceCPU), metrics.UnitCore, n.GetPredictorName(), float64(reclaimable.Cpu().MilliValue())/1000) - metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceMemory), metrics.UnitByte, n.GetPredictorName(), float64(reclaimable.Memory().Value())) - return reclaimable, nil + reclaimable := quotav1.Max(quotav1.Subtract(p.reclaimRequest, unReclaimable), util.NewZeroResourceList()) + // fixReclaimable[P] := min(nodeAllocatable[P]-unReclaimable[P],reclaimable[P]) + fixReclaimable := quotav1.SubtractWithNonNegativeResult(nodeAllocatable, unReclaimable) + fixReclaimable = util.MinResourceList(fixReclaimable, reclaimable) + metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceCPU), metrics.UnitCore, p.GetPredictorName(), float64(fixReclaimable.Cpu().MilliValue())/1000) + metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceMemory), metrics.UnitByte, p.GetPredictorName(), float64(fixReclaimable.Memory().Value())) + return fixReclaimable, nil } var _ Predictor = (*minPredictor)(nil) @@ -356,3 +404,15 @@ func isPodReclaimableForProd(pod *v1.Pod) bool { func isPriorityClassReclaimableForProd(priorityClass extension.PriorityClass) bool { return priorityClass == extension.PriorityProd || priorityClass == extension.PriorityNone } + +func getNodeAllocatable(node *v1.Node) (v1.ResourceList, error) { + res, err := extension.GetNodeRawAllocatable(node.Annotations) + if err == nil && res != nil { + return res, nil + } + if node.Status.Allocatable != nil { + return node.Status.Allocatable, nil + } else { + return nil, fmt.Errorf("invalid node, for node.status.allocation is nil") + } +} diff --git a/pkg/koordlet/prediction/peak_predictor_test.go b/pkg/koordlet/prediction/peak_predictor_test.go index 8f4f16f5e..aeb622998 100644 --- a/pkg/koordlet/prediction/peak_predictor_test.go +++ b/pkg/koordlet/prediction/peak_predictor_test.go @@ -24,6 +24,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + quotav1 "k8s.io/apiserver/pkg/quota/v1" "k8s.io/utils/pointer" "github.com/koordinator-sh/koordinator/apis/extension" @@ -81,6 +82,27 @@ func (m *mockPredictServer) GetPrediction(desc MetricDesc) (Result, error) { } func TestProdReclaimablePredictor_AddPod(t *testing.T) { + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Status: v1.NodeStatus{ + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(3*1024*1024*1024, resource.BinarySI), + }, + }, + } + node_huge := node.DeepCopy() + node_huge.Status.Allocatable = v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(8000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(8*1024*1024*1024, resource.BinarySI), + } + node_small := node.DeepCopy() + node_small.Status.Allocatable = v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(1000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1*1024*1024*1024, resource.BinarySI), + } priority := extension.PriorityProdValueMin sysPrediction := Result{ Data: map[string]v1.ResourceList{ @@ -189,44 +211,80 @@ func TestProdReclaimablePredictor_AddPod(t *testing.T) { coldStartDuration := time.Hour factory := NewPredictorFactory(predictServer, coldStartDuration, 10) - predictor := factory.New(ProdReclaimablePredictor) - assert.Equal(t, 2, len(predictor.(*minPredictor).predictors)) - - err := predictor.AddPod(pod1) - assert.NoError(t, err) - err = predictor.AddPod(pod2) - assert.NoError(t, err) - - err = predictor.AddPod(pod3) - assert.NoError(t, err) - - result, err := predictor.GetResult() - assert.NoError(t, err) - - podMemPeak := 1.1 * 1024 * 1024 * 1024 - podPredictResult := v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(2000-500*1.1, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(2*1024*1024*1024-int64(podMemPeak), resource.BinarySI), + number1 := 1.1 * 1024 * 1024 * 1024 + number2 := 1.1 * 1536 * 1024 * 1024 + testCases := []struct { + name string + predictor Predictor + podsList []*v1.Pod + expectedPodPredictResult v1.ResourceList + expectedPriorityPredictResult v1.ResourceList + }{ + { + name: "node allocatable == pods' requests", + predictor: factory.New(ProdReclaimablePredictor, PredictorContext{node}), + podsList: []*v1.Pod{pod1, pod2, pod3}, + expectedPodPredictResult: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000-500*1.1, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(2*1024*1024*1024-int64(number1), resource.BinarySI), + }, + expectedPriorityPredictResult: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(3000-1300*1.1, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(3*1024*1024*1024-int64(number2), resource.BinarySI), + }, + }, + { + name: "node allocatable > pods' requests", + predictor: factory.New(ProdReclaimablePredictor, PredictorContext{node_huge}), + podsList: []*v1.Pod{pod1, pod2, pod3}, + expectedPodPredictResult: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000-500*1.1, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(2*1024*1024*1024-int64(number1), resource.BinarySI), + }, + expectedPriorityPredictResult: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(3000-1300*1.1, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(3*1024*1024*1024-int64(number2), resource.BinarySI), + }, + }, + { + name: "node allocatable < pods' requests", + predictor: factory.New(ProdReclaimablePredictor, PredictorContext{node_small}), + podsList: []*v1.Pod{pod1, pod2, pod3}, + expectedPodPredictResult: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(1000-500*1.1, resource.DecimalSI), + v1.ResourceMemory: resource.MustParse("0"), + }, + expectedPriorityPredictResult: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("0"), + v1.ResourceMemory: resource.MustParse("0"), + }, + }, } - gotPodResult, err := predictor.(*minPredictor).predictors[0].GetResult() - assert.NoError(t, err) - assert.Equal(t, podPredictResult, gotPodResult) - prodMemPeak := 1.1 * 1536 * 1024 * 1024 - priorityPredictResult := v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(3000-1300*1.1, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(3*1024*1024*1024-int64(prodMemPeak), resource.BinarySI), + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + predictor := tc.predictor + assert.Equal(t, 2, len(predictor.(*minPredictor).predictors)) + for _, pod := range tc.podsList { + err := predictor.AddPod(pod) + assert.NoError(t, err) + } + result, err := predictor.GetResult() + assert.NoError(t, err) + gotPodResult, err := predictor.(*minPredictor).predictors[0].GetResult() + assert.NoError(t, err) + assert.Equal(t, true, quotav1.Equals(tc.expectedPodPredictResult, gotPodResult)) + gotPriorityResult, err := predictor.(*minPredictor).predictors[1].GetResult() + assert.NoError(t, err) + assert.Equal(t, true, quotav1.Equals(tc.expectedPriorityPredictResult, gotPriorityResult)) + expected := util.MinResourceList(tc.expectedPodPredictResult, tc.expectedPriorityPredictResult) + assert.Equal(t, expected, result) + assert.Equal(t, true, quotav1.Equals(expected, result)) + }) } - gotPriorityResult, err := predictor.(*minPredictor).predictors[1].GetResult() - assert.NoError(t, err) - assert.Equal(t, priorityPredictResult, gotPriorityResult) - - // min() - expected := util.MinResourceList(podPredictResult, priorityPredictResult) - assert.Equal(t, expected, result) } -func Test_podReclaimablePredictor(t *testing.T) { +func TestPodReclaimablePredictor(t *testing.T) { predictServer := &mockPredictServer{ DefaultResult: testPredictionResult, } @@ -234,16 +292,7 @@ func Test_podReclaimablePredictor(t *testing.T) { priority := extension.PriorityProdValueMin priorityBatch := extension.PriorityBatchValueMin - predictor := &podReclaimablePredictor{ - predictServer: predictServer, - coldStartDuration: coldStartDuration, - safetyMarginPercent: 10, - podFilterFn: isPodReclaimableForProd, - reclaimable: util.NewZeroResourceList(), - pods: make(map[string]bool), - } - - pod1 := &v1.Pod{ + pod1 := &v1.Pod{ // cool start ObjectMeta: metav1.ObjectMeta{ UID: "pod-1-uid", CreationTimestamp: metav1.Time{Time: time.Now().Add(-time.Minute)}, @@ -263,7 +312,7 @@ func Test_podReclaimablePredictor(t *testing.T) { }, } - pod2 := &v1.Pod{ + pod2 := &v1.Pod{ // normal ObjectMeta: metav1.ObjectMeta{ UID: "pod-2-uid", CreationTimestamp: metav1.Time{Time: time.Now().Add(-2 * time.Hour)}, @@ -283,7 +332,7 @@ func Test_podReclaimablePredictor(t *testing.T) { }, } - pod3 := &v1.Pod{ + pod3 := &v1.Pod{ // deleted ObjectMeta: metav1.ObjectMeta{ UID: "pod-3-uid", CreationTimestamp: metav1.Time{Time: time.Now().Add(-2 * time.Hour)}, @@ -304,7 +353,7 @@ func Test_podReclaimablePredictor(t *testing.T) { }, } - pod4 := &v1.Pod{ + pod4 := &v1.Pod{ // batch ObjectMeta: metav1.ObjectMeta{ UID: "pod-4-uid", CreationTimestamp: metav1.Time{Time: time.Now().Add(-2 * time.Hour)}, @@ -324,28 +373,76 @@ func Test_podReclaimablePredictor(t *testing.T) { }, } - err := predictor.AddPod(pod1) - assert.NoError(t, err) - - err = predictor.AddPod(pod2) - assert.NoError(t, err) - - err = predictor.AddPod(pod3) - assert.NoError(t, err) - - err = predictor.AddPod(pod4) - assert.NoError(t, err) - - result, err := predictor.GetResult() - assert.NoError(t, err) - + newPodReclaimablePredictor := func(node *v1.Node) *podReclaimablePredictor { + return &podReclaimablePredictor{ + predictServer: predictServer, + coldStartDuration: coldStartDuration, + safetyMarginPercent: 10, + podFilterFn: isPodReclaimableForProd, + reclaimable: util.NewZeroResourceList(), + unReclaimable: util.NewZeroResourceList(), + node: node, + pods: make(map[string]bool), + } + } peak := 1.1 * 768 * 1024 * 1024 - expected := v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(2000-550, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(2*1024*1024*1024-int64(peak), resource.BinarySI), + testCase := []struct { + name string + predictor Predictor + podList []*v1.Pod + expectedReclaimable v1.ResourceList + }{ + { + name: "node allocatable > pods' requests ", + predictor: newPodReclaimablePredictor(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + Status: v1.NodeStatus{ + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(3*1024*1024*1024, resource.BinarySI), + }, + }, + }), + podList: []*v1.Pod{pod1, pod2, pod3, pod4}, + expectedReclaimable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000-550, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(2*1024*1024*1024-int64(peak), resource.BinarySI), + }, + }, + { + name: "node allocatable < pods' requests ", + predictor: newPodReclaimablePredictor(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-2", + }, + Status: v1.NodeStatus{ + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(1000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1024*1024*1024, resource.BinarySI), + }, + }, + }), + podList: []*v1.Pod{pod1, pod2, pod3, pod4}, + expectedReclaimable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(1000-550, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1024*1024*1024-int64(peak), resource.BinarySI), + }, + }, + } + for _, tc := range testCase { + t.Run(tc.name, func(t *testing.T) { + predictor := tc.predictor + for _, pod := range tc.podList { + err := predictor.AddPod(pod) + assert.NoError(t, err) + } + result, err := predictor.GetResult() + assert.NoError(t, err) + assert.Equal(t, true, quotav1.Equals(tc.expectedReclaimable, result)) + }) } - - assert.Equal(t, expected, result) } func Test_priorityReclaimablePredictor(t *testing.T) { @@ -435,25 +532,70 @@ func Test_priorityReclaimablePredictor(t *testing.T) { UIDType(podBatch.UID): batchPrediction, }, } - - predictor := &priorityReclaimablePredictor{ - predictServer: predictServer, - safetyMarginPercent: 0, - priorityClassFilterFn: isPriorityClassReclaimableForProd, - reclaimRequest: util.NewZeroResourceList(), + newPodReclaimablePredictor := func(node *v1.Node) *priorityReclaimablePredictor { + return &priorityReclaimablePredictor{ + predictServer: predictServer, + node: node, + safetyMarginPercent: 0, + priorityClassFilterFn: isPriorityClassReclaimableForProd, + reclaimRequest: util.NewZeroResourceList(), + } } - - err := predictor.AddPod(podProd) - assert.NoError(t, err) - - err = predictor.AddPod(podBatch) - assert.NoError(t, err) - - got, err := predictor.GetResult() - assert.NoError(t, err) - expected := v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(1000-(300+500)*1.0, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity((2048-(512+1024)*1.0)*1024*1024, resource.BinarySI), + testCase := []struct { + name string + predictor Predictor + podList []*v1.Pod + expectedReclaimable v1.ResourceList + }{ + { + name: "node allocatable > pods' requests ", + predictor: newPodReclaimablePredictor(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + Status: v1.NodeStatus{ + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(3*1024*1024*1024, resource.BinarySI), + }, + }, + }), + podList: []*v1.Pod{podProd, podBatch}, + expectedReclaimable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(1000-(300+500)*1.0, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity((2048-(512+1024)*1.0)*1024*1024, resource.BinarySI), + }, + }, + { + name: "node allocatable < pods' requests ", + predictor: newPodReclaimablePredictor(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + Status: v1.NodeStatus{ + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(900, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1024*1024*1024, resource.BinarySI), + }, + }, + }), + podList: []*v1.Pod{podProd, podBatch}, + expectedReclaimable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(900-(300+500)*1.0, resource.DecimalSI), + v1.ResourceMemory: resource.MustParse("0"), + }, + }, + } + for _, tc := range testCase { + t.Run(tc.name, func(t *testing.T) { + predictor := tc.predictor + for _, pod := range tc.podList { + err := predictor.AddPod(pod) + assert.NoError(t, err) + } + got, err := predictor.GetResult() + assert.NoError(t, err) + assert.Equal(t, true, quotav1.Equals(tc.expectedReclaimable, got)) + }) } - assert.Equal(t, expected, got) } diff --git a/pkg/koordlet/statesinformer/impl/states_nodemetric.go b/pkg/koordlet/statesinformer/impl/states_nodemetric.go index 14dd3dfd0..756b7ece0 100644 --- a/pkg/koordlet/statesinformer/impl/states_nodemetric.go +++ b/pkg/koordlet/statesinformer/impl/states_nodemetric.go @@ -97,6 +97,7 @@ type nodeMetricInformer struct { statusUpdater *statusUpdater podsInformer *podsInformer + nodeInformer *nodeInformer nodeSLOInformer *nodeSLOInformer metricCache metriccache.MetricCache predictorFactory prediction.PredictorFactory @@ -140,6 +141,14 @@ func (r *nodeMetricInformer) Setup(ctx *PluginOption, state *PluginState) { } else { r.podsInformer = podsInformer } + + nodeInformerIf := state.informerPlugins[nodeInformerName] + if nodeInformer, ok := nodeInformerIf.(*nodeInformer); !ok { + klog.Fatalf("node informer format error") + } else { + r.nodeInformer = nodeInformer + } + nodeSLOInformerIf := state.informerPlugins[nodeSLOInformerName] if nodeSLOInformer, ok := nodeSLOInformerIf.(*nodeSLOInformer); !ok { klog.Fatalf("node slo informer format error") @@ -189,7 +198,7 @@ func (r *nodeMetricInformer) Start(stopCh <-chan struct{}) { } go r.nodeMetricInformer.Run(stopCh) - if !cache.WaitForCacheSync(stopCh, r.nodeMetricInformer.HasSynced, r.podsInformer.HasSynced, r.nodeSLOInformer.HasSynced) { + if !cache.WaitForCacheSync(stopCh, r.nodeMetricInformer.HasSynced, r.podsInformer.HasSynced, r.nodeInformer.HasSynced, r.nodeSLOInformer.HasSynced) { klog.Errorf("timed out waiting for node metric caches to sync") } go r.syncNodeMetricWorker(stopCh) @@ -360,8 +369,8 @@ func (r *nodeMetricInformer) collectMetric() (*slov1alpha1.NodeMetricInfo, []*sl Start: &startTime, End: &endTime, } - prodPredictor := r.predictorFactory.New(prediction.ProdReclaimablePredictor) - + node := r.nodeInformer.GetNode() + prodPredictor := r.predictorFactory.New(prediction.ProdReclaimablePredictor, prediction.PredictorContext{Node: node}) for _, podMeta := range podsMeta { podMetric, err := r.collectPodMetric(podMeta, queryParam) if err != nil { @@ -394,10 +403,13 @@ func (r *nodeMetricInformer) collectMetric() (*slov1alpha1.NodeMetricInfo, []*sl klog.Errorf("failed to get prediction, err %v", err) metrics.RecordNodeResourcePriorityReclaimable(string(corev1.ResourceCPU), metrics.UnitCore, string(apiext.PriorityProd), 0) metrics.RecordNodeResourcePriorityReclaimable(string(corev1.ResourceMemory), metrics.UnitByte, string(apiext.PriorityProd), 0) + // Todo: expose status in nodeMetrics + metrics.RecordNodeResourcePriorityReclaimableStatus(string(apiext.PriorityProd), 1) } else { prodReclaimable.Resource = slov1alpha1.ResourceMap{ResourceList: p} metrics.RecordNodeResourcePriorityReclaimable(string(corev1.ResourceCPU), metrics.UnitCore, string(apiext.PriorityProd), float64(p.Cpu().MilliValue())/1000) metrics.RecordNodeResourcePriorityReclaimable(string(corev1.ResourceMemory), metrics.UnitByte, string(apiext.PriorityProd), float64(p.Memory().Value())) + metrics.RecordNodeResourcePriorityReclaimableStatus(string(apiext.PriorityProd), 0) } return nodeMetricInfo, podsMetricInfo, hostAppMetricInfo, prodReclaimable diff --git a/pkg/koordlet/statesinformer/impl/states_nodemetric_test.go b/pkg/koordlet/statesinformer/impl/states_nodemetric_test.go index aee8b9e69..fd12345fa 100644 --- a/pkg/koordlet/statesinformer/impl/states_nodemetric_test.go +++ b/pkg/koordlet/statesinformer/impl/states_nodemetric_test.go @@ -163,12 +163,25 @@ func (c *fakeNodeMetricClient) UpdateStatus(ctx context.Context, nodeMetric *slo func Test_reporter_sync_with_single_node_metric(t *testing.T) { endTime := time.Now() startTime := endTime.Add(-30 * time.Second) - + ctrl := gomock.NewController(t) + defer ctrl.Finish() + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode", + }, + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("16Gi"), + }, + }, + } type fields struct { nodeName string nodeMetric *slov1alpha1.NodeMetric metricCache func(ctrl *gomock.Controller) metriccache.MetricCache podsInformer *podsInformer + nodeInformer *nodeInformer nodeSLOInformer *nodeSLOInformer nodeMetricLister listerv1alpha1.NodeMetricLister nodeMetricClient clientsetv1alpha1.NodeMetricInterface @@ -191,6 +204,7 @@ func Test_reporter_sync_with_single_node_metric(t *testing.T) { return nil }, podsInformer: NewPodsInformer(), + nodeInformer: NewNodeInformer(), nodeSLOInformer: NewNodeSLOInformer(), nodeMetricLister: nil, nodeMetricClient: &fakeNodeMetricClient{}, @@ -203,7 +217,7 @@ func Test_reporter_sync_with_single_node_metric(t *testing.T) { wantErr: true, }, { - name: "successfully report nodeMetric", + name: "successfully report nodeMetric - sum of pods request < node.allocatable", fields: fields{ nodeName: "test", nodeMetric: &slov1alpha1.NodeMetric{ @@ -315,6 +329,9 @@ func Test_reporter_sync_with_single_node_metric(t *testing.T) { }, }, }, + nodeInformer: &nodeInformer{ + node: node, + }, nodeSLOInformer: &nodeSLOInformer{ nodeSLO: &slov1alpha1.NodeSLO{ Spec: slov1alpha1.NodeSLOSpec{}, @@ -428,6 +445,7 @@ func Test_reporter_sync_with_single_node_metric(t *testing.T) { return c }, podsInformer: NewPodsInformer(), + nodeInformer: NewNodeInformer(), nodeSLOInformer: &nodeSLOInformer{ nodeSLO: &slov1alpha1.NodeSLO{ Spec: slov1alpha1.NodeSLOSpec{}, @@ -466,6 +484,7 @@ func Test_reporter_sync_with_single_node_metric(t *testing.T) { nodeMetric: tt.fields.nodeMetric, metricCache: tt.fields.metricCache(ctrl), podsInformer: tt.fields.podsInformer, + nodeInformer: tt.fields.nodeInformer, nodeSLOInformer: tt.fields.nodeSLOInformer, nodeMetricLister: tt.fields.nodeMetricLister, statusUpdater: newStatusUpdater(tt.fields.nodeMetricClient), @@ -704,6 +723,7 @@ func Test_nodeMetricInformer_NewAndSetup(t *testing.T) { metricCache: mockmetriccache.NewMockMetricCache(ctrl), informerPlugins: map[PluginName]informerPlugin{ podsInformerName: NewPodsInformer(), + nodeInformerName: NewNodeInformer(), nodeSLOInformerName: NewNodeSLOInformer(), }, },