Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add usage based scheduling plugin #2129

Merged
merged 5 commits into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9q
github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
Expand Down Expand Up @@ -453,6 +454,7 @@ github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8m
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mvdan/xurls v1.1.0/go.mod h1:tQlNn3BED8bE/15hnSL2HLkDeLWpNPAwtw7wkEq44oU=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
Expand Down
32 changes: 28 additions & 4 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ type NodeInfo struct {
// pods
Used *Resource

Allocatable *Resource
Capability *Resource
Allocatable *Resource
Capability *Resource
ResourceUsage *NodeUsage

Tasks map[TaskID]*TaskInfo
NumaInfo *NumatopoInfo
Expand Down Expand Up @@ -92,6 +93,25 @@ type NodeState struct {
Reason string
}

type NodeUsage struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest adding comment about NodeUsage in order that other developers can have a better understanding of the keys and values of these map.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will add it, thanks

CPUUsageAvg map[string]float64
MEMUsageAvg map[string]float64
}

func (nu *NodeUsage) DeepCopy() *NodeUsage {
newUsage := &NodeUsage{
CPUUsageAvg: make(map[string]float64),
MEMUsageAvg: make(map[string]float64),
}
for k, v := range nu.CPUUsageAvg {
newUsage.CPUUsageAvg[k] = v
}
for k, v := range nu.MEMUsageAvg {
newUsage.MEMUsageAvg[k] = v
}
return newUsage
}

// NewNodeInfo is used to create new nodeInfo object
func NewNodeInfo(node *v1.Node) *NodeInfo {
nodeInfo := &NodeInfo{
Expand All @@ -100,8 +120,9 @@ func NewNodeInfo(node *v1.Node) *NodeInfo {
Idle: EmptyResource(),
Used: EmptyResource(),

Allocatable: EmptyResource(),
Capability: EmptyResource(),
Allocatable: EmptyResource(),
Capability: EmptyResource(),
ResourceUsage: &NodeUsage{},

OversubscriptionResource: EmptyResource(),
Tasks: make(map[TaskID]*TaskInfo),
Expand Down Expand Up @@ -160,6 +181,9 @@ func (ni *NodeInfo) Clone() *NodeInfo {
if ni.NumaInfo != nil {
res.NumaInfo = ni.NumaInfo.DeepCopy()
}
if ni.ResourceUsage != nil {
res.ResourceUsage = ni.ResourceUsage.DeepCopy()
}

if ni.NumaSchedulerInfo != nil {
res.NumaSchedulerInfo = ni.NumaSchedulerInfo.DeepCopy()
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/api/node_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestNodeInfo_AddPod(t *testing.T) {
OversubscriptionResource: EmptyResource(),
Allocatable: buildResource("8000m", "10G"),
Capability: buildResource("8000m", "10G"),
ResourceUsage: &NodeUsage{},
State: NodeState{Phase: Ready},
Tasks: map[TaskID]*TaskInfo{
"c1/p1": NewTaskInfo(case01Pod1),
Expand All @@ -80,6 +81,7 @@ func TestNodeInfo_AddPod(t *testing.T) {
OversubscriptionResource: EmptyResource(),
Allocatable: buildResource("2000m", "1G"),
Capability: buildResource("2000m", "1G"),
ResourceUsage: &NodeUsage{},
State: NodeState{Phase: Ready},
Tasks: map[TaskID]*TaskInfo{},
GPUDevices: make(map[int]*GPUDevice),
Expand Down Expand Up @@ -138,6 +140,7 @@ func TestNodeInfo_RemovePod(t *testing.T) {
Pipelined: EmptyResource(),
Allocatable: buildResource("8000m", "10G"),
Capability: buildResource("8000m", "10G"),
ResourceUsage: &NodeUsage{},
State: NodeState{Phase: Ready},
Tasks: map[TaskID]*TaskInfo{
"c1/p1": NewTaskInfo(case01Pod1),
Expand Down Expand Up @@ -198,6 +201,7 @@ func TestNodeInfo_SetNode(t *testing.T) {
Pipelined: EmptyResource(),
Allocatable: buildResource("10", "10G"),
Capability: buildResource("10", "10G"),
ResourceUsage: &NodeUsage{},
State: NodeState{Phase: NotReady, Reason: "OutOfSync"},
Tasks: map[TaskID]*TaskInfo{
"c1/p1": NewTaskInfo(case01Pod1),
Expand Down
95 changes: 95 additions & 0 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
volumescheduling "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"

"github.com/prometheus/client_golang/api"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sort the import packages.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1"
batch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
"volcano.sh/apis/pkg/apis/scheduling"
schedulingscheme "volcano.sh/apis/pkg/apis/scheduling/scheme"
Expand All @@ -61,6 +63,15 @@ import (
"volcano.sh/volcano/pkg/scheduler/metrics"
)

const (
// record name of cpu average usage defined in prometheus rules
cpuUsageAvg = "cpu_usage_avg"
// record name of mem average usage defined in prometheus rules
memUsageAvg = "mem_usage_avg"
// default interval for sync data from metrics server
defaultMetricsInternal = 5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5s or 5m?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5s, just added comments to clarify this.

)

func init() {
schemeBuilder := runtime.SchemeBuilder{
v1.AddToScheme,
Expand All @@ -84,6 +95,7 @@ type SchedulerCache struct {
// schedulerName is the name for volcano scheduler
schedulerName string
nodeSelectorLabels map[string]string
metricsConf map[string]string

podInformer infov1.PodInformer
nodeInformer infov1.NodeInformer
Expand Down Expand Up @@ -614,6 +626,13 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
go wait.Until(sc.processCleanupJob, 0, stopCh)

go wait.Until(sc.processBindTask, time.Millisecond*20, stopCh)

// Get metrics data
interval, err := time.ParseDuration(sc.metricsConf["interval"])
if err != nil || interval <= 0 {
interval = time.Duration(defaultMetricsInternal)
}
go wait.Until(sc.GetMetricsData, interval, stopCh)
}

// WaitForCacheSync sync the cache with the api server
Expand Down Expand Up @@ -1155,3 +1174,79 @@ func (sc *SchedulerCache) recordPodGroupEvent(podGroup *schedulingapi.PodGroup,
}
sc.Recorder.Eventf(pg, eventType, reason, msg)
}

func (sc *SchedulerCache) SetMetricsConf(conf map[string]string) {
sc.metricsConf = conf
}

func (sc *SchedulerCache) GetMetricsData() {
address := sc.metricsConf["address"]
if len(address) == 0 {
return
}
klog.V(4).Infof("Get metrics from Prometheus: %s", address)
client, err := api.NewClient(api.Config{
Address: address,
})
if err != nil {
klog.Errorf("Error creating client: %v\n", err)
return
}
v1api := prometheusv1.NewAPI(client)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()
nodeUsageMap := make(map[string]*schedulingapi.NodeUsage)
sc.Mutex.Lock()
for k := range sc.Nodes {
nodeUsageMap[k] = &schedulingapi.NodeUsage{
CPUUsageAvg: make(map[string]float64),
MEMUsageAvg: make(map[string]float64),
}
}
sc.Mutex.Unlock()

supportedPeriods := []string{"5m"}
supportedMetrics := []string{cpuUsageAvg, memUsageAvg}
for node := range nodeUsageMap {
for _, period := range supportedPeriods {
for _, metric := range supportedMetrics {
queryStr := fmt.Sprintf("%s_%s{instance=\"%s\"}", metric, period, node)
klog.V(4).Infof("Query prometheus by %s", queryStr)
res, warnings, err := v1api.Query(ctx, queryStr, time.Now())
if err != nil {
klog.Errorf("Error querying Prometheus: %v", err)
}
if len(warnings) > 0 {
klog.V(3).Infof("Warning querying Prometheus: %v", warnings)
}

if err == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this judgement necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

rowValues := strings.Split(strings.TrimSpace(res.String()), "=>")
value := strings.Split(strings.TrimSpace(rowValues[1]), " ")
switch metric {
case "cpu_usage_avg":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest make values such as cpu_usage_avg and mem_usage_avg as const variables for they are used in serval places.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

cpuUsage, _ := strconv.ParseFloat(value[0], 64)
nodeUsageMap[node].CPUUsageAvg[period] = cpuUsage
klog.V(4).Infof("node: %v, CpuUsageAvg: %v, period:%v", node, cpuUsage, period)
case "mem_usage_avg":
memUsage, _ := strconv.ParseFloat(value[0], 64)
nodeUsageMap[node].MEMUsageAvg[period] = memUsage
klog.V(4).Infof("node: %v, MemUsageAvg: %v, period:%v", node, memUsage, period)
}
}
}
}
}
sc.setMetricsData(nodeUsageMap)
}

func (sc *SchedulerCache) setMetricsData(usageInfo map[string]*schedulingapi.NodeUsage) {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
for k := range usageInfo {
nodeInfo, ok := sc.Nodes[k]
if ok {
nodeInfo.ResourceUsage = usageInfo[k]
}
}
}
2 changes: 1 addition & 1 deletion pkg/scheduler/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func TestSchedulerCache_Bind_NodeWithInsufficientResources(t *testing.T) {
t.Errorf("expected task to remain the same after failed bind: \n %#v\n %#v", taskBeforeBind, taskAfterBind)
}

nodeAfterBind := cache.Nodes["n1"]
nodeAfterBind := cache.Nodes["n1"].Clone()
if !reflect.DeepEqual(nodeBeforeBind, nodeAfterBind) {
t.Errorf("expected node to remain the same after failed bind")
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type Cache interface {

// SharedInformerFactory return scheduler SharedInformerFactory
SharedInformerFactory() informers.SharedInformerFactory

// SetMetricsConf set the metrics server related configuration
SetMetricsConf(conf map[string]string)
}

// VolumeBinder interface for allocate and bind volumes
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/conf/scheduler_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ type SchedulerConfiguration struct {
// Tiers defines plugins in different tiers
Tiers []Tier `yaml:"tiers"`
// Configurations is configuration for actions
Configurations []Configuration `yaml:"configurations"`
Configurations []Configuration `yaml:"configurations"`
MetricsConfiguration map[string]string `yaml:"metrics"`
}

// Tier defines plugin tier
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/plugins/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"volcano.sh/volcano/pkg/scheduler/plugins/sla"
tasktopology "volcano.sh/volcano/pkg/scheduler/plugins/task-topology"
"volcano.sh/volcano/pkg/scheduler/plugins/tdm"
"volcano.sh/volcano/pkg/scheduler/plugins/usage"
)

func init() {
Expand All @@ -50,6 +51,7 @@ func init() {
framework.RegisterPluginBuilder(sla.PluginName, sla.New)
framework.RegisterPluginBuilder(tasktopology.PluginName, tasktopology.New)
framework.RegisterPluginBuilder(numaaware.PluginName, numaaware.New)
framework.RegisterPluginBuilder(usage.PluginName, usage.New)

// Plugins for Queues
framework.RegisterPluginBuilder(proportion.PluginName, proportion.New)
Expand Down
Loading