From 683f4726de01912a139799195844731380df8dbd Mon Sep 17 00:00:00 2001 From: ZhangJian He Date: Mon, 20 Feb 2023 11:30:12 +0800 Subject: [PATCH] Extract MetricsClient and NodeMetrics to support other metrics platform Signed-off-by: ZhangJian He --- pkg/scheduler/cache/cache.go | 59 ++----------- .../cache/refermetrics/metrics_client.go | 34 +++++++ .../refermetrics/metrics_client_prometheus.go | 88 +++++++++++++++++++ pkg/scheduler/cache/refermetrics/module.go | 22 +++++ 4 files changed, 153 insertions(+), 50 deletions(-) create mode 100644 pkg/scheduler/cache/refermetrics/metrics_client.go create mode 100644 pkg/scheduler/cache/refermetrics/metrics_client_prometheus.go create mode 100644 pkg/scheduler/cache/refermetrics/module.go diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 2a4199a6608..10680dd2a3d 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -24,10 +24,7 @@ import ( "strings" "sync" "time" - - "github.com/prometheus/client_golang/api" - prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1" - pmodel "github.com/prometheus/common/model" + "volcano.sh/volcano/pkg/scheduler/cache/refermetrics" v1 "k8s.io/api/core/v1" schedulingv1 "k8s.io/api/scheduling/v1" @@ -70,10 +67,6 @@ import ( ) const ( - // record name of cpu average usage defined in prometheus rules - cpuUsageAvg = "cpu_usage_avg" - // record name of mem average usage defined in prometheus rules - memUsageAvg = "mem_usage_avg" // default interval for sync data from metrics server, the value is 5s defaultMetricsInternal = 5000000000 ) @@ -1266,19 +1259,11 @@ func (sc *SchedulerCache) SetMetricsConf(conf map[string]string) { } func (sc *SchedulerCache) GetMetricsData() { - address := sc.metricsConf["address"] - if len(address) == 0 { - return - } - klog.V(4).Infof("Get metrics from Prometheus: %s", address) - client, err := api.NewClient(api.Config{ - Address: address, - }) + client, err := refermetrics.NewMetricsClient(sc.metricsConf) if err != nil { klog.Errorf("Error creating client: %v\n", err) return } - v1api := prometheusv1.NewAPI(client) ctx, cancel := context.WithTimeout(context.Background(), time.Second*60) defer cancel() nodeUsageMap := make(map[string]*schedulingapi.NodeUsage) @@ -1292,42 +1277,16 @@ func (sc *SchedulerCache) GetMetricsData() { sc.Mutex.Unlock() supportedPeriods := []string{"5m"} - supportedMetrics := []string{cpuUsageAvg, memUsageAvg} for node := range nodeUsageMap { for _, period := range supportedPeriods { - for _, metric := range supportedMetrics { - queryStr := fmt.Sprintf("%s_%s{instance=\"%s\"}", metric, period, node) - klog.V(4).Infof("Query prometheus by %s", queryStr) - res, warnings, err := v1api.Query(ctx, queryStr, time.Now()) - if err != nil { - klog.Errorf("Error querying Prometheus: %v", err) - } - if len(warnings) > 0 { - klog.V(3).Infof("Warning querying Prometheus: %v", warnings) - } - if res == nil || res.String() == "" { - klog.Warningf("Warning querying Prometheus: no data found for %s", queryStr) - continue - } - // plugin.usage only need type pmodel.ValVector in Prometheus.rulues - if res.Type() != pmodel.ValVector { - continue - } - // only method res.String() can get data, dataType []pmodel.ValVector, eg: "{k1:v1, ...} => #[value] @#[timespace]\n {k2:v2, ...} => ..." - firstRowValVector := strings.Split(res.String(), "\n")[0] - rowValues := strings.Split(strings.TrimSpace(firstRowValVector), "=>") - value := strings.Split(strings.TrimSpace(rowValues[1]), " ") - switch metric { - case cpuUsageAvg: - cpuUsage, _ := strconv.ParseFloat(value[0], 64) - nodeUsageMap[node].CPUUsageAvg[period] = cpuUsage - klog.V(4).Infof("node: %v, CpuUsageAvg: %v, period:%v", node, cpuUsage, period) - case memUsageAvg: - memUsage, _ := strconv.ParseFloat(value[0], 64) - nodeUsageMap[node].MEMUsageAvg[period] = memUsage - klog.V(4).Infof("node: %v, MemUsageAvg: %v, period:%v", node, memUsage, period) - } + nodeMetrics, err := client.NodeMetricsAvg(ctx, node, period) + if err != nil { + klog.Errorf("Error getting node metrics: %v\n", err) + continue } + klog.V(4).Infof("node: %v, CpuUsageAvg: %v, MemUsageAvg: %v, period:%v", node, nodeMetrics.Cpu, nodeMetrics.Memory, period) + nodeUsageMap[node].CPUUsageAvg[period] = nodeMetrics.Cpu + nodeUsageMap[node].MEMUsageAvg[period] = nodeMetrics.Memory } } sc.setMetricsData(nodeUsageMap) diff --git a/pkg/scheduler/cache/refermetrics/metrics_client.go b/pkg/scheduler/cache/refermetrics/metrics_client.go new file mode 100644 index 00000000000..661437e3687 --- /dev/null +++ b/pkg/scheduler/cache/refermetrics/metrics_client.go @@ -0,0 +1,34 @@ +/* + Copyright 2023 The Volcano Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package refermetrics + +import ( + "context" + "errors" +) + +type MetricsClient interface { + NodeMetricsAvg(ctx context.Context, nodeName string, period string) (*NodeMetrics, error) +} + +func NewMetricsClient(metricsConf map[string]string) (MetricsClient, error) { + address := metricsConf["address"] + if len(address) == 0 { + return nil, errors.New("metrics address is empty") + } + return &PrometheusMetricsClient{address: address}, nil +} diff --git a/pkg/scheduler/cache/refermetrics/metrics_client_prometheus.go b/pkg/scheduler/cache/refermetrics/metrics_client_prometheus.go new file mode 100644 index 00000000000..1958e95a2a8 --- /dev/null +++ b/pkg/scheduler/cache/refermetrics/metrics_client_prometheus.go @@ -0,0 +1,88 @@ +/* + Copyright 2023 The Volcano Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package refermetrics + +import ( + "context" + "fmt" + "github.com/prometheus/client_golang/api" + prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1" + pmodel "github.com/prometheus/common/model" + "k8s.io/klog" + "strconv" + "strings" + "time" +) + +const ( + // promCpuUsageAvg record name of cpu average usage defined in prometheus rules + promCpuUsageAvg = "cpu_usage_avg" + // promMemUsageAvg record name of mem average usage defined in prometheus rules + promMemUsageAvg = "mem_usage_avg" +) + +type PrometheusMetricsClient struct { + address string +} + +func NewPrometheusMetricsClient(address string) (*PrometheusMetricsClient, error) { + return &PrometheusMetricsClient{address: address}, nil +} + +func (p *PrometheusMetricsClient) NodeMetricsAvg(ctx context.Context, nodeName string, period string) (*NodeMetrics, error) { + klog.V(4).Infof("Get node metrics from Prometheus: %s", p.address) + client, err := api.NewClient(api.Config{ + Address: p.address, + }) + if err != nil { + return nil, err + } + v1api := prometheusv1.NewAPI(client) + nodeMetrics := &NodeMetrics{} + for _, metric := range []string{promCpuUsageAvg, promMemUsageAvg} { + queryStr := fmt.Sprintf("%s_%s{instance=\"%s\"}", metric, period, nodeName) + klog.V(4).Infof("Query prometheus by %s", queryStr) + res, warnings, err := v1api.Query(ctx, queryStr, time.Now()) + if err != nil { + klog.Errorf("Error querying Prometheus: %v", err) + } + if len(warnings) > 0 { + klog.V(3).Infof("Warning querying Prometheus: %v", warnings) + } + if res == nil || res.String() == "" { + klog.Warningf("Warning querying Prometheus: no data found for %s", queryStr) + continue + } + // plugin.usage only need type pmodel.ValVector in Prometheus.rulues + if res.Type() != pmodel.ValVector { + continue + } + // only method res.String() can get data, dataType []pmodel.ValVector, eg: "{k1:v1, ...} => #[value] @#[timespace]\n {k2:v2, ...} => ..." + firstRowValVector := strings.Split(res.String(), "\n")[0] + rowValues := strings.Split(strings.TrimSpace(firstRowValVector), "=>") + value := strings.Split(strings.TrimSpace(rowValues[1]), " ") + switch metric { + case promCpuUsageAvg: + cpuUsage, _ := strconv.ParseFloat(value[0], 64) + nodeMetrics.Cpu = cpuUsage + case promMemUsageAvg: + memUsage, _ := strconv.ParseFloat(value[0], 64) + nodeMetrics.Memory = memUsage + } + } + return nodeMetrics, nil +} diff --git a/pkg/scheduler/cache/refermetrics/module.go b/pkg/scheduler/cache/refermetrics/module.go new file mode 100644 index 00000000000..d4fbf3cbdf7 --- /dev/null +++ b/pkg/scheduler/cache/refermetrics/module.go @@ -0,0 +1,22 @@ +/* + Copyright 2023 The Volcano Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package refermetrics + +type NodeMetrics struct { + Cpu float64 + Memory float64 +}