Skip to content

Commit

Permalink
Merge pull request #2129 from william-wang/usage
Browse files Browse the repository at this point in the history
add usage based scheduling plugin
  • Loading branch information
volcano-sh-bot authored Apr 21, 2022
2 parents eb4cdde + a68c165 commit e3c08cb
Show file tree
Hide file tree
Showing 15 changed files with 1,608 additions and 15 deletions.
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9q
github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
Expand Down Expand Up @@ -453,6 +454,7 @@ github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8m
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mvdan/xurls v1.1.0/go.mod h1:tQlNn3BED8bE/15hnSL2HLkDeLWpNPAwtw7wkEq44oU=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
Expand Down
33 changes: 29 additions & 4 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ type NodeInfo struct {
// pods
Used *Resource

Allocatable *Resource
Capability *Resource
Allocatable *Resource
Capability *Resource
ResourceUsage *NodeUsage

Tasks map[TaskID]*TaskInfo
NumaInfo *NumatopoInfo
Expand Down Expand Up @@ -92,6 +93,26 @@ type NodeState struct {
Reason string
}

// NodeUsage defines the real load usage of node
type NodeUsage struct {
CPUUsageAvg map[string]float64
MEMUsageAvg map[string]float64
}

func (nu *NodeUsage) DeepCopy() *NodeUsage {
newUsage := &NodeUsage{
CPUUsageAvg: make(map[string]float64),
MEMUsageAvg: make(map[string]float64),
}
for k, v := range nu.CPUUsageAvg {
newUsage.CPUUsageAvg[k] = v
}
for k, v := range nu.MEMUsageAvg {
newUsage.MEMUsageAvg[k] = v
}
return newUsage
}

// NewNodeInfo is used to create new nodeInfo object
func NewNodeInfo(node *v1.Node) *NodeInfo {
nodeInfo := &NodeInfo{
Expand All @@ -100,8 +121,9 @@ func NewNodeInfo(node *v1.Node) *NodeInfo {
Idle: EmptyResource(),
Used: EmptyResource(),

Allocatable: EmptyResource(),
Capability: EmptyResource(),
Allocatable: EmptyResource(),
Capability: EmptyResource(),
ResourceUsage: &NodeUsage{},

OversubscriptionResource: EmptyResource(),
Tasks: make(map[TaskID]*TaskInfo),
Expand Down Expand Up @@ -160,6 +182,9 @@ func (ni *NodeInfo) Clone() *NodeInfo {
if ni.NumaInfo != nil {
res.NumaInfo = ni.NumaInfo.DeepCopy()
}
if ni.ResourceUsage != nil {
res.ResourceUsage = ni.ResourceUsage.DeepCopy()
}

if ni.NumaSchedulerInfo != nil {
res.NumaSchedulerInfo = ni.NumaSchedulerInfo.DeepCopy()
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/api/node_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestNodeInfo_AddPod(t *testing.T) {
OversubscriptionResource: EmptyResource(),
Allocatable: buildResource("8000m", "10G"),
Capability: buildResource("8000m", "10G"),
ResourceUsage: &NodeUsage{},
State: NodeState{Phase: Ready},
Tasks: map[TaskID]*TaskInfo{
"c1/p1": NewTaskInfo(case01Pod1),
Expand All @@ -80,6 +81,7 @@ func TestNodeInfo_AddPod(t *testing.T) {
OversubscriptionResource: EmptyResource(),
Allocatable: buildResource("2000m", "1G"),
Capability: buildResource("2000m", "1G"),
ResourceUsage: &NodeUsage{},
State: NodeState{Phase: Ready},
Tasks: map[TaskID]*TaskInfo{},
GPUDevices: make(map[int]*GPUDevice),
Expand Down Expand Up @@ -138,6 +140,7 @@ func TestNodeInfo_RemovePod(t *testing.T) {
Pipelined: EmptyResource(),
Allocatable: buildResource("8000m", "10G"),
Capability: buildResource("8000m", "10G"),
ResourceUsage: &NodeUsage{},
State: NodeState{Phase: Ready},
Tasks: map[TaskID]*TaskInfo{
"c1/p1": NewTaskInfo(case01Pod1),
Expand Down Expand Up @@ -198,6 +201,7 @@ func TestNodeInfo_SetNode(t *testing.T) {
Pipelined: EmptyResource(),
Allocatable: buildResource("10", "10G"),
Capability: buildResource("10", "10G"),
ResourceUsage: &NodeUsage{},
State: NodeState{Phase: NotReady, Reason: "OutOfSync"},
Tasks: map[TaskID]*TaskInfo{
"c1/p1": NewTaskInfo(case01Pod1),
Expand Down
94 changes: 94 additions & 0 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/api"
prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1"

v1 "k8s.io/api/core/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -61,6 +64,15 @@ import (
"volcano.sh/volcano/pkg/scheduler/metrics"
)

const (
// record name of cpu average usage defined in prometheus rules
cpuUsageAvg = "cpu_usage_avg"
// record name of mem average usage defined in prometheus rules
memUsageAvg = "mem_usage_avg"
// default interval for sync data from metrics server, the value is 5s
defaultMetricsInternal = 5
)

func init() {
schemeBuilder := runtime.SchemeBuilder{
v1.AddToScheme,
Expand All @@ -84,6 +96,7 @@ type SchedulerCache struct {
// schedulerName is the name for volcano scheduler
schedulerName string
nodeSelectorLabels map[string]string
metricsConf map[string]string

podInformer infov1.PodInformer
nodeInformer infov1.NodeInformer
Expand Down Expand Up @@ -614,6 +627,13 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
go wait.Until(sc.processCleanupJob, 0, stopCh)

go wait.Until(sc.processBindTask, time.Millisecond*20, stopCh)

// Get metrics data
interval, err := time.ParseDuration(sc.metricsConf["interval"])
if err != nil || interval <= 0 {
interval = time.Duration(defaultMetricsInternal)
}
go wait.Until(sc.GetMetricsData, interval, stopCh)
}

// WaitForCacheSync sync the cache with the api server
Expand Down Expand Up @@ -1155,3 +1175,77 @@ func (sc *SchedulerCache) recordPodGroupEvent(podGroup *schedulingapi.PodGroup,
}
sc.Recorder.Eventf(pg, eventType, reason, msg)
}

func (sc *SchedulerCache) SetMetricsConf(conf map[string]string) {
sc.metricsConf = conf
}

func (sc *SchedulerCache) GetMetricsData() {
address := sc.metricsConf["address"]
if len(address) == 0 {
return
}
klog.V(4).Infof("Get metrics from Prometheus: %s", address)
client, err := api.NewClient(api.Config{
Address: address,
})
if err != nil {
klog.Errorf("Error creating client: %v\n", err)
return
}
v1api := prometheusv1.NewAPI(client)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()
nodeUsageMap := make(map[string]*schedulingapi.NodeUsage)
sc.Mutex.Lock()
for k := range sc.Nodes {
nodeUsageMap[k] = &schedulingapi.NodeUsage{
CPUUsageAvg: make(map[string]float64),
MEMUsageAvg: make(map[string]float64),
}
}
sc.Mutex.Unlock()

supportedPeriods := []string{"5m"}
supportedMetrics := []string{cpuUsageAvg, memUsageAvg}
for node := range nodeUsageMap {
for _, period := range supportedPeriods {
for _, metric := range supportedMetrics {
queryStr := fmt.Sprintf("%s_%s{instance=\"%s\"}", metric, period, node)
klog.V(4).Infof("Query prometheus by %s", queryStr)
res, warnings, err := v1api.Query(ctx, queryStr, time.Now())
if err != nil {
klog.Errorf("Error querying Prometheus: %v", err)
}
if len(warnings) > 0 {
klog.V(3).Infof("Warning querying Prometheus: %v", warnings)
}

rowValues := strings.Split(strings.TrimSpace(res.String()), "=>")
value := strings.Split(strings.TrimSpace(rowValues[1]), " ")
switch metric {
case cpuUsageAvg:
cpuUsage, _ := strconv.ParseFloat(value[0], 64)
nodeUsageMap[node].CPUUsageAvg[period] = cpuUsage
klog.V(4).Infof("node: %v, CpuUsageAvg: %v, period:%v", node, cpuUsage, period)
case memUsageAvg:
memUsage, _ := strconv.ParseFloat(value[0], 64)
nodeUsageMap[node].MEMUsageAvg[period] = memUsage
klog.V(4).Infof("node: %v, MemUsageAvg: %v, period:%v", node, memUsage, period)
}
}
}
}
sc.setMetricsData(nodeUsageMap)
}

func (sc *SchedulerCache) setMetricsData(usageInfo map[string]*schedulingapi.NodeUsage) {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
for k := range usageInfo {
nodeInfo, ok := sc.Nodes[k]
if ok {
nodeInfo.ResourceUsage = usageInfo[k]
}
}
}
2 changes: 1 addition & 1 deletion pkg/scheduler/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func TestSchedulerCache_Bind_NodeWithInsufficientResources(t *testing.T) {
t.Errorf("expected task to remain the same after failed bind: \n %#v\n %#v", taskBeforeBind, taskAfterBind)
}

nodeAfterBind := cache.Nodes["n1"]
nodeAfterBind := cache.Nodes["n1"].Clone()
if !reflect.DeepEqual(nodeBeforeBind, nodeAfterBind) {
t.Errorf("expected node to remain the same after failed bind")
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type Cache interface {

// SharedInformerFactory return scheduler SharedInformerFactory
SharedInformerFactory() informers.SharedInformerFactory

// SetMetricsConf set the metrics server related configuration
SetMetricsConf(conf map[string]string)
}

// VolumeBinder interface for allocate and bind volumes
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/conf/scheduler_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ type SchedulerConfiguration struct {
// Tiers defines plugins in different tiers
Tiers []Tier `yaml:"tiers"`
// Configurations is configuration for actions
Configurations []Configuration `yaml:"configurations"`
Configurations []Configuration `yaml:"configurations"`
MetricsConfiguration map[string]string `yaml:"metrics"`
}

// Tier defines plugin tier
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/plugins/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"volcano.sh/volcano/pkg/scheduler/plugins/sla"
tasktopology "volcano.sh/volcano/pkg/scheduler/plugins/task-topology"
"volcano.sh/volcano/pkg/scheduler/plugins/tdm"
"volcano.sh/volcano/pkg/scheduler/plugins/usage"
)

func init() {
Expand All @@ -50,6 +51,7 @@ func init() {
framework.RegisterPluginBuilder(sla.PluginName, sla.New)
framework.RegisterPluginBuilder(tasktopology.PluginName, tasktopology.New)
framework.RegisterPluginBuilder(numaaware.PluginName, numaaware.New)
framework.RegisterPluginBuilder(usage.PluginName, usage.New)

// Plugins for Queues
framework.RegisterPluginBuilder(proportion.PluginName, proportion.New)
Expand Down
Loading

0 comments on commit e3c08cb

Please sign in to comment.