-
Notifications
You must be signed in to change notification settings - Fork 953
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add usage based scheduling plugin #2129
Changes from 4 commits
9be34d4
7ac670e
c6cff6f
95f329f
a68c165
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,6 +47,8 @@ import ( | |
podutil "k8s.io/kubernetes/pkg/api/v1/pod" | ||
volumescheduling "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" | ||
|
||
"github.com/prometheus/client_golang/api" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please sort the import packages. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1" | ||
batch "volcano.sh/apis/pkg/apis/batch/v1alpha1" | ||
"volcano.sh/apis/pkg/apis/scheduling" | ||
schedulingscheme "volcano.sh/apis/pkg/apis/scheduling/scheme" | ||
|
@@ -61,6 +63,15 @@ import ( | |
"volcano.sh/volcano/pkg/scheduler/metrics" | ||
) | ||
|
||
const ( | ||
// record name of cpu average usage defined in prometheus rules | ||
cpuUsageAvg = "cpu_usage_avg" | ||
// record name of mem average usage defined in prometheus rules | ||
memUsageAvg = "mem_usage_avg" | ||
// default interval for sync data from metrics server | ||
defaultMetricsInternal = 5 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 5s, just added comments to clarify this. |
||
) | ||
|
||
func init() { | ||
schemeBuilder := runtime.SchemeBuilder{ | ||
v1.AddToScheme, | ||
|
@@ -84,6 +95,7 @@ type SchedulerCache struct { | |
// schedulerName is the name for volcano scheduler | ||
schedulerName string | ||
nodeSelectorLabels map[string]string | ||
metricsConf map[string]string | ||
|
||
podInformer infov1.PodInformer | ||
nodeInformer infov1.NodeInformer | ||
|
@@ -614,6 +626,13 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) { | |
go wait.Until(sc.processCleanupJob, 0, stopCh) | ||
|
||
go wait.Until(sc.processBindTask, time.Millisecond*20, stopCh) | ||
|
||
// Get metrics data | ||
interval, err := time.ParseDuration(sc.metricsConf["interval"]) | ||
if err != nil || interval <= 0 { | ||
interval = time.Duration(defaultMetricsInternal) | ||
} | ||
go wait.Until(sc.GetMetricsData, interval, stopCh) | ||
} | ||
|
||
// WaitForCacheSync sync the cache with the api server | ||
|
@@ -1155,3 +1174,79 @@ func (sc *SchedulerCache) recordPodGroupEvent(podGroup *schedulingapi.PodGroup, | |
} | ||
sc.Recorder.Eventf(pg, eventType, reason, msg) | ||
} | ||
|
||
func (sc *SchedulerCache) SetMetricsConf(conf map[string]string) { | ||
sc.metricsConf = conf | ||
} | ||
|
||
func (sc *SchedulerCache) GetMetricsData() { | ||
address := sc.metricsConf["address"] | ||
if len(address) == 0 { | ||
return | ||
} | ||
klog.V(4).Infof("Get metrics from Prometheus: %s", address) | ||
client, err := api.NewClient(api.Config{ | ||
Address: address, | ||
}) | ||
if err != nil { | ||
klog.Errorf("Error creating client: %v\n", err) | ||
return | ||
} | ||
v1api := prometheusv1.NewAPI(client) | ||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60) | ||
defer cancel() | ||
nodeUsageMap := make(map[string]*schedulingapi.NodeUsage) | ||
sc.Mutex.Lock() | ||
for k := range sc.Nodes { | ||
nodeUsageMap[k] = &schedulingapi.NodeUsage{ | ||
CPUUsageAvg: make(map[string]float64), | ||
MEMUsageAvg: make(map[string]float64), | ||
} | ||
} | ||
sc.Mutex.Unlock() | ||
|
||
supportedPeriods := []string{"5m"} | ||
supportedMetrics := []string{cpuUsageAvg, memUsageAvg} | ||
for node := range nodeUsageMap { | ||
for _, period := range supportedPeriods { | ||
for _, metric := range supportedMetrics { | ||
queryStr := fmt.Sprintf("%s_%s{instance=\"%s\"}", metric, period, node) | ||
klog.V(4).Infof("Query prometheus by %s", queryStr) | ||
res, warnings, err := v1api.Query(ctx, queryStr, time.Now()) | ||
if err != nil { | ||
klog.Errorf("Error querying Prometheus: %v", err) | ||
} | ||
if len(warnings) > 0 { | ||
klog.V(3).Infof("Warning querying Prometheus: %v", warnings) | ||
} | ||
|
||
if err == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this judgement necessary? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||
rowValues := strings.Split(strings.TrimSpace(res.String()), "=>") | ||
value := strings.Split(strings.TrimSpace(rowValues[1]), " ") | ||
switch metric { | ||
case "cpu_usage_avg": | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggest make values such as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated. |
||
cpuUsage, _ := strconv.ParseFloat(value[0], 64) | ||
nodeUsageMap[node].CPUUsageAvg[period] = cpuUsage | ||
klog.V(4).Infof("node: %v, CpuUsageAvg: %v, period:%v", node, cpuUsage, period) | ||
case "mem_usage_avg": | ||
memUsage, _ := strconv.ParseFloat(value[0], 64) | ||
nodeUsageMap[node].MEMUsageAvg[period] = memUsage | ||
klog.V(4).Infof("node: %v, MemUsageAvg: %v, period:%v", node, memUsage, period) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
sc.setMetricsData(nodeUsageMap) | ||
} | ||
|
||
func (sc *SchedulerCache) setMetricsData(usageInfo map[string]*schedulingapi.NodeUsage) { | ||
sc.Mutex.Lock() | ||
defer sc.Mutex.Unlock() | ||
for k := range usageInfo { | ||
nodeInfo, ok := sc.Nodes[k] | ||
if ok { | ||
nodeInfo.ResourceUsage = usageInfo[k] | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest adding comment about
NodeUsage
in order that other developers can have a better understanding of the keys and values of these map.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I will add it, thanks