Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

koordlet: fix prodReclaimablePredictor result to avoid influence of o… #2325

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/koordlet/metrics/resource_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ var (
Help: "the node reclaimable of different priorities resources updated by koordinator",
}, []string{NodeKey, PriorityKey, ResourceKey, UnitKey})

NodeResourcePriorityReclaimableStatus = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: KoordletSubsystem,
Name: "node_resource_priority_reclaimable_status",
Help: "status of node reclaimable of different priorities resources updated by koordinator",
}, []string{NodeKey, PriorityKey})

ContainerResourceRequests = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: KoordletSubsystem,
Name: "container_resource_requests",
Expand All @@ -50,6 +56,7 @@ var (
ResourceSummaryCollectors = []prometheus.Collector{
NodeResourceAllocatable,
NodeResourcePriorityReclaimable,
NodeResourcePriorityReclaimableStatus,
ContainerResourceRequests,
ContainerResourceLimits,
}
Expand All @@ -76,6 +83,15 @@ func RecordNodeResourcePriorityReclaimable(resourceName string, unit string, pri
NodeResourcePriorityReclaimable.With(labels).Set(value)
}

func RecordNodeResourcePriorityReclaimableStatus(priority string, value float64) {
lijunxin559 marked this conversation as resolved.
Show resolved Hide resolved
labels := genNodeLabels()
if labels == nil {
return
}
labels[PriorityKey] = priority
NodeResourcePriorityReclaimableStatus.With(labels).Set(value)
}

func RecordContainerResourceRequests(resourceName string, unit string, status *corev1.ContainerStatus, pod *corev1.Pod, value float64) {
labels := genNodeLabels()
if labels == nil {
Expand Down
116 changes: 88 additions & 28 deletions pkg/koordlet/prediction/peak_predictor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,18 @@ import (
// PredictorType defines constants for different types of predictors.
type PredictorType int

type PredictorContext struct {
Node *v1.Node
}

const (
// ProdReclaimablePredictor represents the type of a reclaimable production predictor.
ProdReclaimablePredictor PredictorType = iota
)

// PredictorFactory is an interface for creating predictors of different types.
type PredictorFactory interface {
New(PredictorType) Predictor
New(predictorType PredictorType, context PredictorContext) Predictor
}

type Predictor interface {
Expand All @@ -65,19 +69,22 @@ func NewPredictorFactory(predictServer PredictServer, coldStartDuration time.Dur
}

// New creates a new instance of a predictor based on the given type.
func (f *predictorFactory) New(t PredictorType) Predictor {
func (f *predictorFactory) New(t PredictorType, context PredictorContext) Predictor {
switch t {
case ProdReclaimablePredictor:
podPredictor := &podReclaimablePredictor{
predictServer: f.predictServer,
node: context.Node,
coldStartDuration: f.coldStartDuration,
safetyMarginPercent: f.safetyMarginPercent,
podFilterFn: isPodReclaimableForProd,
reclaimable: util.NewZeroResourceList(),
unReclaimable: util.NewZeroResourceList(),
pods: make(map[string]bool),
}
priorityPredictor := &priorityReclaimablePredictor{
predictServer: f.predictServer,
node: context.Node,
safetyMarginPercent: f.safetyMarginPercent,
priorityClassFilterFn: isPriorityClassReclaimableForProd,
reclaimRequest: util.NewZeroResourceList(),
Expand Down Expand Up @@ -119,7 +126,7 @@ func NewEmptyPredictorFactory() PredictorFactory {
type emptyPredictorFactory struct {
}

func (f *emptyPredictorFactory) New(t PredictorType) Predictor {
func (f *emptyPredictorFactory) New(t PredictorType, context PredictorContext) Predictor {
return &emptyPredictor{}
}

Expand All @@ -129,12 +136,13 @@ var _ Predictor = (*podReclaimablePredictor)(nil)
// e.g. A podReclaimablePredictor for Prod pods calculates the result based on the sum of the percentile of Prod pods.
type podReclaimablePredictor struct {
predictServer PredictServer
node *v1.Node
coldStartDuration time.Duration
safetyMarginPercent int
podFilterFn func(pod *v1.Pod) bool // return true if the pod is reclaimable

reclaimable v1.ResourceList
pods map[string]bool
reclaimable v1.ResourceList
unReclaimable v1.ResourceList
pods map[string]bool
}

// GetPredictorName is used to obtain the predictor name.
Expand Down Expand Up @@ -180,19 +188,36 @@ func (p *podReclaimablePredictor) AddPod(pod *v1.Pod) error {
podCPURequest := podRequests[v1.ResourceCPU]
podMemoryRequest := podRequests[v1.ResourceMemory]

// calculate the reclaimable resources: reclaimable = podRequest - peak
// calculate the unReclaimable resources: unReclaimable = peak
reclaimableCPUMilli := int64(0)
reclaimableMemoryBytes := int64(0)

unReclaimableCPUMilli := int64(0)
unReclaimableMemoryBytes := int64(0)
ratioAfterSafetyMargin := float64(100+p.safetyMarginPercent) / 100
if p95CPU, ok := p95Resources[v1.ResourceCPU]; ok {
peakCPU := util.MultiplyMilliQuant(p95CPU, ratioAfterSafetyMargin)
unReclaimableCPUMilli = peakCPU.MilliValue()
reclaimableCPUMilli = podCPURequest.MilliValue() - peakCPU.MilliValue()
}
if p98Memory, ok := p98Resources[v1.ResourceMemory]; ok {
peakMemory := util.MultiplyQuant(p98Memory, ratioAfterSafetyMargin)
unReclaimableMemoryBytes = peakMemory.Value()
reclaimableMemoryBytes = podMemoryRequest.Value() - peakMemory.Value()
}

// update the unReclaimable resources
cpu := p.unReclaimable[v1.ResourceCPU]
unReclaimableCPU := resource.NewMilliQuantity(unReclaimableCPUMilli, resource.DecimalSI)
cpu.Add(*unReclaimableCPU)
p.unReclaimable[v1.ResourceCPU] = cpu

memory := p.unReclaimable[v1.ResourceMemory]
unReclaimableMemory := resource.NewQuantity(unReclaimableMemoryBytes, resource.BinarySI)
memory.Add(*unReclaimableMemory)
p.unReclaimable[v1.ResourceMemory] = memory

// update the reclaimableCPUMilli resources
if reclaimableCPUMilli > 0 {
cpu := p.reclaimable[v1.ResourceCPU]
reclaimableCPU := resource.NewMilliQuantity(reclaimableCPUMilli, resource.DecimalSI)
Expand All @@ -210,10 +235,21 @@ func (p *podReclaimablePredictor) AddPod(pod *v1.Pod) error {
}

// GetResult returns the predicted resource list for the added pods.
// The result is the sum of the reclaimable resources of the added pods.
func (p *podReclaimablePredictor) GetResult() (v1.ResourceList, error) {
saintube marked this conversation as resolved.
Show resolved Hide resolved
metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceCPU), metrics.UnitCore, p.GetPredictorName(), float64(p.reclaimable.Cpu().MilliValue())/1000)
metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceMemory), metrics.UnitByte, p.GetPredictorName(), float64(p.reclaimable.Memory().Value()))
return p.reclaimable, nil
// if failed to get node info, stop the reclaimPredictor
if p.node == nil {
return nil, fmt.Errorf("failed to get podReclaimablePredictor result for node is nil")
}
nodeAllocatable, err := getNodeAllocatable(p.node)
if err != nil {
return nil, fmt.Errorf("failed to get allocatable of node, err=%v", err)
}
fixReclaimable := quotav1.SubtractWithNonNegativeResult(nodeAllocatable, p.unReclaimable)
fixReclaimable = util.MinResourceList(fixReclaimable, p.reclaimable)
metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceCPU), metrics.UnitCore, p.GetPredictorName(), float64(fixReclaimable.Cpu().MilliValue())/1000)
metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceMemory), metrics.UnitByte, p.GetPredictorName(), float64(fixReclaimable.Memory().Value()))
return fixReclaimable, nil
}

var _ Predictor = (*priorityReclaimablePredictor)(nil)
Expand All @@ -223,20 +259,21 @@ var _ Predictor = (*priorityReclaimablePredictor)(nil)
// Prod-tier and the system components parts.
type priorityReclaimablePredictor struct {
predictServer PredictServer
node *v1.Node
safetyMarginPercent int
priorityClassFilterFn func(p extension.PriorityClass) bool // return true if the priority class is reclaimable

reclaimRequest v1.ResourceList
}

// GetPredictorName is used to obtain the predictor name.
func (n *priorityReclaimablePredictor) GetPredictorName() string {
func (p *priorityReclaimablePredictor) GetPredictorName() string {
return "priorityReclaimablePredictor"
}

func (n *priorityReclaimablePredictor) AddPod(pod *v1.Pod) error {
func (p *priorityReclaimablePredictor) AddPod(pod *v1.Pod) error {
priorityClass := extension.GetPodPriorityClassWithDefault(pod)
if !n.priorityClassFilterFn(priorityClass) {
if !p.priorityClassFilterFn(priorityClass) {
klog.V(6).Infof("priorityReclaimablePredictor skip pod %s whose priority %s is not reclaimable",
pod.UID, priorityClass)
return nil
Expand All @@ -250,31 +287,39 @@ func (n *priorityReclaimablePredictor) AddPod(pod *v1.Pod) error {
}

podRequests := util.GetPodRequest(pod, v1.ResourceCPU, v1.ResourceMemory)
n.reclaimRequest = quotav1.Add(n.reclaimRequest, podRequests)
p.reclaimRequest = quotav1.Add(p.reclaimRequest, podRequests)

return nil
}

func (n *priorityReclaimablePredictor) GetResult() (v1.ResourceList, error) {
func (p *priorityReclaimablePredictor) GetResult() (v1.ResourceList, error) {
// if failed to get node info, stop the reclaimPredictor
if p.node == nil {
return nil, fmt.Errorf("failed to get priorityReclaimablePredictor result for node is nil")
}
nodeAllocatable, err := getNodeAllocatable(p.node)
if err != nil {
return nil, fmt.Errorf("failed to get allocatable of node, err=%v", err)
}
// get sys prediction
sysResult, err := n.predictServer.GetPrediction(MetricDesc{UID: getNodeItemUID(SystemItemID)})
sysResult, err := p.predictServer.GetPrediction(MetricDesc{UID: getNodeItemUID(SystemItemID)})
if err != nil {
return nil, fmt.Errorf("failed to get prediction of sys, err: %w", err)
}
sysResultForCPU := sysResult.Data["p95"]
sysResultForMemory := sysResult.Data["p98"]
reclaimPredict := v1.ResourceList{
unReclaimable := v1.ResourceList{
v1.ResourceCPU: *sysResultForCPU.Cpu(),
v1.ResourceMemory: *sysResultForMemory.Memory(),
}

// get reclaimable priority class prediction
for _, priorityClass := range extension.KnownPriorityClasses {
if !n.priorityClassFilterFn(priorityClass) {
if !p.priorityClassFilterFn(priorityClass) {
continue
}

result, err := n.predictServer.GetPrediction(MetricDesc{UID: getNodeItemUID(string(priorityClass))})
result, err := p.predictServer.GetPrediction(MetricDesc{UID: getNodeItemUID(string(priorityClass))})
if err != nil {
return nil, fmt.Errorf("failed to get prediction of priority %s, err: %s", priorityClass, err)
}
Expand All @@ -285,21 +330,24 @@ func (n *priorityReclaimablePredictor) GetResult() (v1.ResourceList, error) {
v1.ResourceCPU: *resultForCPU.Cpu(),
v1.ResourceMemory: *resultForMemory.Memory(),
}
reclaimPredict = quotav1.Add(reclaimPredict, predictResource)
unReclaimable = quotav1.Add(unReclaimable, predictResource)
}

// scale with the safety margin
ratioAfterSafetyMargin := float64(100+n.safetyMarginPercent) / 100
reclaimPredict = v1.ResourceList{
v1.ResourceCPU: util.MultiplyMilliQuant(*reclaimPredict.Cpu(), ratioAfterSafetyMargin),
v1.ResourceMemory: util.MultiplyQuant(*reclaimPredict.Memory(), ratioAfterSafetyMargin),
ratioAfterSafetyMargin := float64(100+p.safetyMarginPercent) / 100
unReclaimable = v1.ResourceList{
v1.ResourceCPU: util.MultiplyMilliQuant(*unReclaimable.Cpu(), ratioAfterSafetyMargin),
v1.ResourceMemory: util.MultiplyQuant(*unReclaimable.Memory(), ratioAfterSafetyMargin),
}

// reclaimable[P] := max(request[P] - peak[P], 0)
reclaimable := quotav1.Max(quotav1.Subtract(n.reclaimRequest, reclaimPredict), util.NewZeroResourceList())
metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceCPU), metrics.UnitCore, n.GetPredictorName(), float64(reclaimable.Cpu().MilliValue())/1000)
metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceMemory), metrics.UnitByte, n.GetPredictorName(), float64(reclaimable.Memory().Value()))
return reclaimable, nil
reclaimable := quotav1.Max(quotav1.Subtract(p.reclaimRequest, unReclaimable), util.NewZeroResourceList())
// fixReclaimable[P] := min(nodeAllocatable[P]-unReclaimable[P],reclaimable[P])
fixReclaimable := quotav1.SubtractWithNonNegativeResult(nodeAllocatable, unReclaimable)
fixReclaimable = util.MinResourceList(fixReclaimable, reclaimable)
metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceCPU), metrics.UnitCore, p.GetPredictorName(), float64(fixReclaimable.Cpu().MilliValue())/1000)
metrics.RecordNodePredictedResourceReclaimable(string(v1.ResourceMemory), metrics.UnitByte, p.GetPredictorName(), float64(fixReclaimable.Memory().Value()))
return fixReclaimable, nil
}

var _ Predictor = (*minPredictor)(nil)
Expand Down Expand Up @@ -356,3 +404,15 @@ func isPodReclaimableForProd(pod *v1.Pod) bool {
func isPriorityClassReclaimableForProd(priorityClass extension.PriorityClass) bool {
return priorityClass == extension.PriorityProd || priorityClass == extension.PriorityNone
}

func getNodeAllocatable(node *v1.Node) (v1.ResourceList, error) {
res, err := extension.GetNodeRawAllocatable(node.Annotations)
if err == nil && res != nil {
return res, nil
}
if node.Status.Allocatable != nil {
return node.Status.Allocatable, nil
} else {
return nil, fmt.Errorf("invalid node, for node.status.allocation is nil")
}
}
Loading
Loading