diff --git a/pkg/scheduler/plugins/factory.go b/pkg/scheduler/plugins/factory.go index 9a04ae9d449..8caebf69deb 100644 --- a/pkg/scheduler/plugins/factory.go +++ b/pkg/scheduler/plugins/factory.go @@ -29,6 +29,7 @@ import ( "volcano.sh/volcano/pkg/scheduler/plugins/predicates" "volcano.sh/volcano/pkg/scheduler/plugins/priority" "volcano.sh/volcano/pkg/scheduler/plugins/proportion" + "volcano.sh/volcano/pkg/scheduler/plugins/rescheduling" "volcano.sh/volcano/pkg/scheduler/plugins/reservation" "volcano.sh/volcano/pkg/scheduler/plugins/sla" tasktopology "volcano.sh/volcano/pkg/scheduler/plugins/task-topology" @@ -50,6 +51,7 @@ func init() { framework.RegisterPluginBuilder(sla.PluginName, sla.New) framework.RegisterPluginBuilder(tasktopology.PluginName, tasktopology.New) framework.RegisterPluginBuilder(numaaware.PluginName, numaaware.New) + framework.RegisterPluginBuilder(rescheduling.PluginName, rescheduling.New) // Plugins for Queues framework.RegisterPluginBuilder(proportion.PluginName, proportion.New) diff --git a/pkg/scheduler/plugins/rescheduling/low_node_utilization.go b/pkg/scheduler/plugins/rescheduling/low_node_utilization.go new file mode 100644 index 00000000000..a714ffc455e --- /dev/null +++ b/pkg/scheduler/plugins/rescheduling/low_node_utilization.go @@ -0,0 +1,211 @@ +/* +Copyright 2022 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rescheduling + +import ( + "reflect" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/klog" + + "volcano.sh/volcano/pkg/scheduler/api" +) + +// DefaultLowNodeConf defines the default configuration for LNU strategy +var DefaultLowNodeConf = map[string]interface{}{ + "thresholds": map[string]float64{"cpu": 100, "memory": 100, "pods": 100}, + "targetThresholds": map[string]float64{"cpu": 100, "memory": 100, "pods": 100}, + "thresholdPriorityClassName": "system-cluster-critical", + "nodeFit": true, +} + +type LowNodeUtilizationConf struct { + Thresholds map[string]float64 + TargetThresholds map[string]float64 + NumberOfNodes int + ThresholdPriority int + ThresholdPriorityClassName string + NodeFit bool +} + +// NewLowNodeUtilizationConf returns the pointer of LowNodeUtilizationConf object with default value +func NewLowNodeUtilizationConf() *LowNodeUtilizationConf { + return &LowNodeUtilizationConf{ + Thresholds: map[string]float64{"cpu": 100, "memory": 100, "pods": 100}, + TargetThresholds: map[string]float64{"cpu": 100, "memory": 100, "pods": 100}, + ThresholdPriorityClassName: "system-cluster-critical", + NodeFit: true, + } +} + +// parse converts the config map to struct object +func (lnuc *LowNodeUtilizationConf) parse(configs map[string]interface{}) { + if len(configs) == 0 { + return + } + lowThresholdsConfigs, ok := configs["thresholds"] + if ok { + lowConfigs, _ := lowThresholdsConfigs.(map[string]int) + parseThreshold(lowConfigs, lnuc, "Thresholds") + } + targetThresholdsConfigs, ok := configs["targetThresholds"] + if ok { + targetConfigs, _ := targetThresholdsConfigs.(map[string]int) + parseThreshold(targetConfigs, lnuc, "TargetThresholds") + } +} + +func parseThreshold(thresholdsConfig map[string]int, lnuc *LowNodeUtilizationConf, param string) { + if len(thresholdsConfig) > 0 { + configValue := reflect.ValueOf(lnuc).Elem().FieldByName(param) + config := configValue.Interface().(map[string]float64) + + cpuThreshold, ok := thresholdsConfig["cpu"] + if ok { + config["cpu"] = float64(cpuThreshold) + } + memoryThreshold, ok := thresholdsConfig["memory"] + if ok { + config["memory"] = float64(memoryThreshold) + } + podThreshold, ok := thresholdsConfig["pod"] + if ok { + config["pod"] = float64(podThreshold) + } + } +} + +var victimsFnForLnu = func(tasks []*api.TaskInfo) []*api.TaskInfo { + victims := make([]*api.TaskInfo, 0) + + // parse configuration arguments + utilizationConfig := NewLowNodeUtilizationConf() + parametersConfig := RegisteredStrategyConfigs["lowNodeUtilization"] + var config map[string]interface{} + config, ok := parametersConfig.(map[string]interface{}) + if !ok { + klog.Error("parameters parse error for lowNodeUtilization") + return victims + } + utilizationConfig.parse(config) + klog.V(4).Infof("The configuration for lowNodeUtilization: %v", *utilizationConfig) + + // group the nodes into lowNodes and highNodes + nodeUtilizationList := getNodeUtilization() + klog.V(4).Infoln("The nodeUtilizationList:") + for _, nodeUtilization := range nodeUtilizationList { + klog.V(4).Infof("node: %s, utilization: %s \n", nodeUtilization.nodeInfo.Name, nodeUtilization.utilization) + for _, pod := range nodeUtilization.pods { + klog.V(4).Infof("pod: %s \n", pod.Name) + } + } + + lowNodes, highNodes := groupNodesByUtilization(nodeUtilizationList, lowThresholdFilter, highThresholdFilter, *utilizationConfig) + klog.V(4).Infoln("The low nodes:") + for _, node := range lowNodes { + klog.V(4).Infoln(node.nodeInfo.Name) + } + klog.V(4).Infoln("The high nodes:") + for _, node := range highNodes { + klog.V(4).Infoln(node.nodeInfo.Name) + } + if len(lowNodes) == 0 { + klog.V(4).Infof("The resource utilization of all nodes is above the threshold") + return victims + } + if len(lowNodes) == len(Session.Nodes) { + klog.V(4).Infof("The resource utilization of all nodes is below the threshold") + return victims + } + if len(highNodes) == 0 { + klog.V(4).Infof("The resource utilization of all nodes is below the target threshold") + return victims + } + + // select victims from lowNodes + return evictPodsFromSourceNodes(highNodes, lowNodes, tasks, isContinueEvictPods, *utilizationConfig) +} + +// lowThresholdFilter filter nodes which all resource dimensions are under the low utilization threshold +func lowThresholdFilter(node *v1.Node, usage *NodeUtilization, config interface{}) bool { + utilizationConfig := parseArgToConfig(config) + if utilizationConfig == nil { + klog.V(4).Infof("lack of LowNodeUtilizationConf pointer parameter") + return false + } + + if node.Spec.Unschedulable { + return false + } + nodeCapacity := getNodeCapacity(node) + for rName, usage := range usage.utilization { + if thresholdPercent, ok := utilizationConfig.Thresholds[string(rName)]; ok { + threshold := getThresholdForNode(rName, thresholdPercent, nodeCapacity) + if usage.Cmp(*threshold) == 1 { + return false + } + } + } + return true +} + +// highThresholdFilter filter nodes which at least one resource dimension above the target utilization threshold +func highThresholdFilter(node *v1.Node, usage *NodeUtilization, config interface{}) bool { + utilizationConfig := parseArgToConfig(config) + if utilizationConfig == nil { + klog.V(4).Infof("lack of LowNodeUtilizationConf pointer parameter") + return false + } + + nodeCapacity := getNodeCapacity(node) + for rName, usage := range usage.utilization { + if thresholdPercent, ok := utilizationConfig.TargetThresholds[string(rName)]; ok { + threshold := getThresholdForNode(rName, thresholdPercent, nodeCapacity) + if usage.Cmp(*threshold) == 1 { + return true + } + } + } + return false +} + +// isContinueEvictPods judges whether continue to select victim pods +func isContinueEvictPods(node *v1.Node, usage *NodeUtilization, totalAllocatableResource map[v1.ResourceName]*resource.Quantity, config interface{}) bool { + var isNodeOverused bool + utilizationConfig := parseArgToConfig(config) + nodeCapacity := getNodeCapacity(node) + for rName, usage := range usage.utilization { + if thresholdPercent, ok := utilizationConfig.TargetThresholds[string(rName)]; ok { + threshold := getThresholdForNode(rName, thresholdPercent, nodeCapacity) + if usage.Cmp(*threshold) == 1 { + isNodeOverused = true + break + } + } + } + if !isNodeOverused { + return false + } + + for rName := range totalAllocatableResource { + if totalAllocatableResource[rName].CmpInt64(0) == 0 { + return false + } + } + return true +} diff --git a/pkg/scheduler/plugins/rescheduling/node_utilization_util.go b/pkg/scheduler/plugins/rescheduling/node_utilization_util.go new file mode 100644 index 00000000000..e8e0b9d202c --- /dev/null +++ b/pkg/scheduler/plugins/rescheduling/node_utilization_util.go @@ -0,0 +1,205 @@ +/* +Copyright 2022 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rescheduling + +import ( + "sort" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/klog" + v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" + + "volcano.sh/volcano/pkg/scheduler/api" +) + +const FiveMinutes = "5m" + +type NodeUtilization struct { + nodeInfo *v1.Node + utilization map[v1.ResourceName]*resource.Quantity + pods []*v1.Pod +} + +type thresholdFilter func(*v1.Node, *NodeUtilization, interface{}) bool + +type isContinueEviction func(node *v1.Node, usage *NodeUtilization, totalAllocatableResource map[v1.ResourceName]*resource.Quantity, config interface{}) bool + +// groupNodesByUtilization divides the nodes into two groups by resource utilization filters +func groupNodesByUtilization(nodeUtilizationList []*NodeUtilization, lowThresholdFilter, highThresholdFilter thresholdFilter, config interface{}) ([]*NodeUtilization, []*NodeUtilization) { + lowNodes := make([]*NodeUtilization, 0) + highNodes := make([]*NodeUtilization, 0) + + for _, nodeUtilization := range nodeUtilizationList { + if lowThresholdFilter(nodeUtilization.nodeInfo, nodeUtilization, config) { + lowNodes = append(lowNodes, nodeUtilization) + } else if highThresholdFilter(nodeUtilization.nodeInfo, nodeUtilization, config) { + highNodes = append(highNodes, nodeUtilization) + } + } + + return lowNodes, highNodes +} + +// getNodeUtilization returns all node resource utilization list +func getNodeUtilization() []*NodeUtilization { + nodeUtilizationList := make([]*NodeUtilization, 0) + for _, nodeInfo := range Session.Nodes { + nodeUtilization := &NodeUtilization{ + nodeInfo: nodeInfo.Node, + utilization: map[v1.ResourceName]*resource.Quantity{}, + pods: nodeInfo.Pods(), + } + nodeUtilization.utilization[v1.ResourceCPU] = resource.NewMilliQuantity(int64(nodeInfo.ResourceUsage.CPUUsageAvg[FiveMinutes]), resource.DecimalSI) + nodeUtilization.utilization[v1.ResourceMemory] = resource.NewQuantity(int64(nodeInfo.ResourceUsage.MemUsageAvg[FiveMinutes]), resource.BinarySI) + nodeUtilizationList = append(nodeUtilizationList, nodeUtilization) + } + return nodeUtilizationList +} + +// evictPodsFromSourceNodes evict pods from source nodes to target nodes according to priority and QoS +func evictPodsFromSourceNodes(sourceNodes, targetNodes []*NodeUtilization, tasks []*api.TaskInfo, evictionCon isContinueEviction, config interface{}) []*api.TaskInfo { + resourceNames := []v1.ResourceName{ + v1.ResourceCPU, + v1.ResourceMemory, + } + utilizationConfig := parseArgToConfig(config) + totalAllocatableResource := map[v1.ResourceName]*resource.Quantity{ + v1.ResourceCPU: {}, + v1.ResourceMemory: {}, + } + for _, node := range targetNodes { + nodeCapacity := getNodeCapacity(node.nodeInfo) + for _, rName := range resourceNames { + totalAllocatableResource[rName].Add(*getThresholdForNode(rName, utilizationConfig.TargetThresholds[string(rName)], nodeCapacity)) + totalAllocatableResource[rName].Sub(*node.utilization[rName]) + } + } + klog.V(4).Infof("totalAllocatableResource: %s", totalAllocatableResource) + + // sort the source nodes in descending order + sortNodes(sourceNodes, Session.Nodes) + klog.V(4).Infoln("sourceNodes:") + for _, node := range sourceNodes { + klog.V(4).Infoln(node.nodeInfo.Name) + } + + // victims select algorithm: + // 1. Evict pods from nodes with high utilization to low utilization + // 2. As to one node, evict pods from low priority to high priority. If the priority is same, evict pods according to QoS from low to high + victims := make([]*api.TaskInfo, 0) + for _, node := range sourceNodes { + if len(node.pods) == 0 { + klog.V(4).Infof("No pods can be removed on node: %s", node.nodeInfo.Name) + continue + } + sortPods(node.pods) + victims = append(victims, evict(node.pods, node, totalAllocatableResource, evictionCon, tasks, config)...) + } + return victims +} + +// parseArgToConfig returns a nodeUtilizationConfig object from parameters +// TODO: It is just for lowNodeUtilization now, which should be abstracted as a common function. +func parseArgToConfig(config interface{}) *LowNodeUtilizationConf { + var utilizationConfig *LowNodeUtilizationConf + if arg, ok := config.(LowNodeUtilizationConf); ok { + utilizationConfig = &arg + } + + return utilizationConfig +} + +// sortNodes sorts all the nodes according the usage of cpu and memory with weight score +func sortNodes(nodeUtilizationList []*NodeUtilization, nodes map[string]*api.NodeInfo) { + cmpFn := func(i, j int) bool { + return getScoreForNode(i, nodeUtilizationList, nodes) > getScoreForNode(j, nodeUtilizationList, nodes) + } + sort.Slice(nodeUtilizationList, cmpFn) +} + +// getScoreForNode returns the score for node which considers only for CPU and memory +func getScoreForNode(index int, nodeUtilizationList []*NodeUtilization, nodes map[string]*api.NodeInfo) float64 { + nodeName := nodeUtilizationList[index].nodeInfo.Name + cpuScore := float64(nodeUtilizationList[index].utilization[v1.ResourceCPU].MilliValue()) / nodes[nodeName].Capability.MilliCPU + memoryScore := float64(nodeUtilizationList[index].utilization[v1.ResourceMemory].MilliValue()) / nodes[nodeName].Capability.Memory + return cpuScore + memoryScore +} + +// getThresholdForNode returns resource threshold on some dimension +func getThresholdForNode(rName v1.ResourceName, thresholdPercent float64, nodeCapacity v1.ResourceList) *resource.Quantity { + var threshold *resource.Quantity + if rName == v1.ResourceCPU { + threshold = resource.NewMilliQuantity(int64(thresholdPercent*float64(nodeCapacity.Cpu().MilliValue())*0.01), resource.DecimalSI) + } else if rName == v1.ResourceMemory { + threshold = resource.NewQuantity(int64(thresholdPercent*float64(nodeCapacity.Memory().Value())*0.01), resource.BinarySI) + } + return threshold +} + +// getNodeCapacity returns node's capacity +func getNodeCapacity(node *v1.Node) v1.ResourceList { + nodeCapacity := node.Status.Capacity + if len(node.Status.Allocatable) > 0 { + nodeCapacity = node.Status.Allocatable + } + return nodeCapacity +} + +// sortPods return the pods in order according the priority and QoS +func sortPods(pods []*v1.Pod) { + cmp := func(i, j int) bool { + if pods[i].Spec.Priority == nil && pods[j].Spec.Priority != nil { + return true + } + if pods[j].Spec.Priority == nil && pods[i].Spec.Priority != nil { + return false + } + if (pods[j].Spec.Priority == nil && pods[i].Spec.Priority == nil) || (*pods[i].Spec.Priority == *pods[j].Spec.Priority) { + if v1qos.GetPodQOS(pods[i]) == v1.PodQOSBestEffort { + return true + } + if v1qos.GetPodQOS(pods[i]) == v1.PodQOSBurstable && v1qos.GetPodQOS(pods[j]) == v1.PodQOSGuaranteed { + return true + } + return false + } + return *pods[i].Spec.Priority < *pods[j].Spec.Priority + } + sort.Slice(pods, cmp) +} + +// evict select victims and add to the eviction list +func evict(pods []*v1.Pod, utilization *NodeUtilization, totalAllocatableResource map[v1.ResourceName]*resource.Quantity, continueEviction isContinueEviction, tasks []*api.TaskInfo, config interface{}) []*api.TaskInfo { + victims := make([]*api.TaskInfo, 0) + for _, pod := range pods { + if !continueEviction(utilization.nodeInfo, utilization, totalAllocatableResource, config) { + return victims + } + for _, task := range tasks { + if task.Pod.UID == pod.UID { + totalAllocatableResource[v1.ResourceCPU].Sub(*resource.NewMilliQuantity(int64(task.Resreq.MilliCPU), resource.DecimalSI)) + totalAllocatableResource[v1.ResourceMemory].Sub(*resource.NewQuantity(int64(task.Resreq.Memory), resource.BinarySI)) + utilization.utilization[v1.ResourceCPU].Sub(*resource.NewMilliQuantity(int64(task.Resreq.MilliCPU), resource.DecimalSI)) + utilization.utilization[v1.ResourceMemory].Sub(*resource.NewQuantity(int64(task.Resreq.Memory), resource.BinarySI)) + victims = append(victims, task) + break + } + } + } + return victims +} diff --git a/pkg/scheduler/plugins/rescheduling/rescheduling.go b/pkg/scheduler/plugins/rescheduling/rescheduling.go new file mode 100644 index 00000000000..047acde9dfb --- /dev/null +++ b/pkg/scheduler/plugins/rescheduling/rescheduling.go @@ -0,0 +1,170 @@ +/* +Copyright 2022 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rescheduling + +import ( + "time" + + "k8s.io/klog" + + "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/framework" + "volcano.sh/volcano/pkg/scheduler/util" +) + +const ( + // PluginName indicates name of volcano scheduler plugin + PluginName = "rescheduling" + // DefaultInterval indicates the default interval rescheduling plugin works for + DefaultInterval = 5 * time.Minute + // DefaultStrategy indicates the default strategy rescheduling plugin making use of + DefaultStrategy = "lowNodeUtilization" +) + +var ( + // Session contains all the data in session object which will be used for all the rescheduling package + Session *framework.Session + + // RegisteredStrategies collects all the strategies registered. + RegisteredStrategies []string + + // RegisteredStrategyConfigs collects all the strategy configurations registered. + RegisteredStrategyConfigs map[string]interface{} + + // VictimFns contains all the victimFns for registered the strategies + VictimFns map[string]api.VictimsFromCandidatesFn +) + +func init() { + RegisteredStrategies = make([]string, 0) + RegisteredStrategyConfigs = make(map[string]interface{}) + VictimFns = make(map[string]api.VictimsFromCandidatesFn) + + // register victim functions for all strategies here + VictimFns["lowNodeUtilization"] = victimsFnForLnu +} + +type reschedulingPlugin struct { + // Arguments given for rescheduling plugin + pluginArguments framework.Arguments +} + +// New function returns rescheduling plugin object +func New(arguments framework.Arguments) framework.Plugin { + return &reschedulingPlugin{ + pluginArguments: arguments, + } +} + +// Name returns the name of rescheduling plugin +func (rp *reschedulingPlugin) Name() string { + return PluginName +} + +func (rp *reschedulingPlugin) OnSessionOpen(ssn *framework.Session) { + klog.V(4).Infof("Enter rescheduling plugin ...") + defer klog.V(4).Infof("Leaving rescheduling plugin.") + + // Parse all the rescheduling strategies and execution interval + Session = ssn + configs := NewReschedulingConfigs() + for _, tier := range ssn.Tiers { + for _, pluginOption := range tier.Plugins { + if pluginOption.Name == PluginName { + configs.parseArguments(pluginOption.Arguments) + break + } + } + } + klog.V(4).Infof("rescheduling config: %v", configs) + + // Judge whether it is time to execute rescheduling now + if !util.IsToBeExecuted("reschedulingFns", configs.interval) { + klog.V(4).Infof("It is not the time to execute rescheduling strategies.") + return + } + + // Get all strategies and register the VictimTasksFromCandidatesFns + victimFns := make([]api.VictimsFromCandidatesFn, 0) + for _, strategy := range configs.strategies { + victimFns = append(victimFns, VictimFns[strategy.Name]) + } + ssn.AddVictimsFromCandidatesFns(rp.Name(), victimFns) +} + +func (rp *reschedulingPlugin) OnSessionClose(ssn *framework.Session) { + Session = nil + RegisteredStrategies = RegisteredStrategies[0:0] + for k := range RegisteredStrategyConfigs { + delete(RegisteredStrategyConfigs, k) + } + VictimFns = nil +} + +// ReschedulingConfigs is the struct for rescheduling plugin arguments +type ReschedulingConfigs struct { + interval time.Duration + strategies []Strategy +} + +// Strategy is the struct for rescheduling strategy +type Strategy struct { + Name string + Parameters map[string]interface{} +} + +// NewReschedulingConfigs creates an object of rescheduling configurations with default configuration +func NewReschedulingConfigs() *ReschedulingConfigs { + config := &ReschedulingConfigs{ + interval: DefaultInterval, + strategies: []Strategy{ + { + Name: DefaultStrategy, + Parameters: DefaultLowNodeConf, + }, + }, + } + RegisteredStrategies = append(RegisteredStrategies, DefaultStrategy) + RegisteredStrategyConfigs[DefaultStrategy] = DefaultLowNodeConf + return config +} + +// parseArguments parse all the rescheduling arguments +func (rc *ReschedulingConfigs) parseArguments(arguments framework.Arguments) { + var intervalStr string + var err error + if intervalArg, ok := arguments["interval"]; ok { + intervalStr = intervalArg.(string) + } + rc.interval, err = time.ParseDuration(intervalStr) + if err != nil { + klog.V(4).Infof("Parse rescheduling interval failed. Reset the interval to 5m by default.") + rc.interval = DefaultInterval + } + strategies, ok := arguments["strategies"] + if ok { + rc.strategies = strategies.([]Strategy) + RegisteredStrategies = RegisteredStrategies[0:0] + for k := range RegisteredStrategyConfigs { + delete(RegisteredStrategyConfigs, k) + } + for _, strategy := range rc.strategies { + RegisteredStrategies = append(RegisteredStrategies, strategy.Name) + RegisteredStrategyConfigs[strategy.Name] = strategy.Parameters + } + } +}