Skip to content
This repository has been archived by the owner on May 25, 2023. It is now read-only.

Commit

Permalink
Merge pull request #592 from Jeffwan/metrics
Browse files Browse the repository at this point in the history
Add performance metrics for scheduling
  • Loading branch information
k8s-ci-robot authored Mar 10, 2019
2 parents 65a4311 + 45b6141 commit 82fdced
Show file tree
Hide file tree
Showing 40 changed files with 3,998 additions and 1,039 deletions.
15 changes: 12 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,7 @@
[prune]
go-tests = true
unused-packages = true

[[constraint]]
name = "github.com/prometheus/client_golang"
version = "0.9.2"
2 changes: 2 additions & 0 deletions cmd/kube-batch/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ServerOption struct {
LockObjectNamespace string
DefaultQueue string
PrintVersion bool
ListenAddress string
}

// NewServerOption creates a new CMServer with a default config.
Expand All @@ -56,6 +57,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
"executing the main loop. Enable this when running replicated kube-batch for high availability")
fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")
fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", s.LockObjectNamespace, "Define the namespace of the lock object")
fs.StringVar(&s.ListenAddress, "listen-address", ":8080", "The address to listen on for HTTP requests.")
}

func (s *ServerOption) CheckOptionOrDie() error {
Expand Down
7 changes: 7 additions & 0 deletions cmd/kube-batch/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package app
import (
"context"
"fmt"
"net/http"
"os"
"time"

"github.com/golang/glog"
"github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/options"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler"
"github.com/kubernetes-sigs/kube-batch/pkg/version"
"github.com/prometheus/client_golang/prometheus/promhttp"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/uuid"
Expand Down Expand Up @@ -75,6 +77,11 @@ func Run(opt *options.ServerOption) error {
panic(err)
}

go func() {
http.Handle("/metrics", promhttp.Handler())
glog.Fatalf("Prometheus Http Server failed %s", http.ListenAndServe(opt.ListenAddress, nil))
}()

run := func(ctx context.Context) {
sched.Run(ctx.Done())
<-ctx.Done()
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
for !tasks.Empty() {
predicateNodes := []*api.NodeInfo{}
nodeScores := map[int][]*api.NodeInfo{}

task := tasks.Pop().(*api.TaskInfo)
assigned := false

Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util"
)

Expand Down Expand Up @@ -212,6 +213,7 @@ func preempt(
}
}
victims := ssn.Preemptable(preemptor, preemptees)
metrics.UpdatePreemptionVictimsCount(len(victims))

if err := validateVictims(victims, resreq); err != nil {
glog.V(3).Infof("No validated victims on Node <%s>: %v", node.Name, err)
Expand All @@ -235,6 +237,7 @@ func preempt(
resreq.Sub(preemptee.Resreq)
}

metrics.RegisterPreemptionAttempts()
glog.V(3).Infof("Preempted <%v> for task <%s/%s> requested <%v>.",
preempted, preemptor.Namespace, preemptor.Name, preemptor.Resreq)

Expand Down
7 changes: 7 additions & 0 deletions pkg/scheduler/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ limitations under the License.
package framework

import (
"time"

"github.com/golang/glog"

"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/conf"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics"
)

func OpenSession(cache cache.Cache, tiers []conf.Tier) *Session {
Expand All @@ -39,15 +42,19 @@ func OpenSession(cache cache.Cache, tiers []conf.Tier) *Session {
}

for _, plugin := range ssn.plugins {
onSessionOpenStart := time.Now()
plugin.OnSessionOpen(ssn)
metrics.UpdatePluginDuration(plugin.Name(), metrics.OnSessionOpen, metrics.Duration(onSessionOpenStart))
}

return ssn
}

func CloseSession(ssn *Session) {
for _, plugin := range ssn.plugins {
onSessionCloseStart := time.Now()
plugin.OnSessionClose(ssn)
metrics.UpdatePluginDuration(plugin.Name(), metrics.OnSessionClose, metrics.Duration(onSessionCloseStart))
}

closeSession(ssn)
Expand Down
4 changes: 3 additions & 1 deletion pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

"github.com/golang/glog"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
Expand All @@ -30,6 +30,7 @@ import (
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/conf"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics"
)

type Session struct {
Expand Down Expand Up @@ -295,6 +296,7 @@ func (ssn *Session) dispatch(task *api.TaskInfo) error {
task.Job, ssn.UID)
}

metrics.UpdateTaskScheduleDuration(metrics.Duration(task.Pod.CreationTimestamp.Time))
return nil
}

Expand Down
191 changes: 191 additions & 0 deletions pkg/scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto" // auto-registry collectors in default registry
)

const (
// KubeBatchNamespace - namespace in prometheus used by kube-batch
KubeBatchNamespace = "kube_batch"

// OnSessionOpen label
OnSessionOpen = "OnSessionOpen"

// OnSessionClose label
OnSessionClose = "OnSessionClose"
)

var (
e2eSchedulingLatency = promauto.NewHistogram(
prometheus.HistogramOpts{
Subsystem: KubeBatchNamespace,
Name: "e2e_scheduling_latency_milliseconds",
Help: "E2e scheduling latency in milliseconds (scheduling algorithm + binding)",
Buckets: prometheus.ExponentialBuckets(5, 2, 10),
},
)

pluginSchedulingLatency = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: KubeBatchNamespace,
Name: "plugin_scheduling_latency_microseconds",
Help: "Plugin scheduling latency in microseconds",
Buckets: prometheus.ExponentialBuckets(5, 2, 10),
}, []string{"plugin", "OnSession"},
)

actionSchedulingLatency = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: KubeBatchNamespace,
Name: "action_scheduling_latency_microseconds",
Help: "Action scheduling latency in microseconds",
Buckets: prometheus.ExponentialBuckets(5, 2, 10),
}, []string{"action"},
)

taskSchedulingLatency = promauto.NewHistogram(
prometheus.HistogramOpts{
Subsystem: KubeBatchNamespace,
Name: "task_scheduling_latency_microseconds",
Help: "Task scheduling latency in microseconds",
Buckets: prometheus.ExponentialBuckets(5, 2, 10),
},
)

scheduleAttempts = promauto.NewCounterVec(
prometheus.CounterOpts{
Subsystem: KubeBatchNamespace,
Name: "schedule_attempts_total",
Help: "Number of attempts to schedule pods, by the result. 'unschedulable' means a pod could not be scheduled, while 'error' means an internal scheduler problem.",
}, []string{"result"},
)

preemptionVictims = promauto.NewGauge(
prometheus.GaugeOpts{
Subsystem: KubeBatchNamespace,
Name: "pod_preemption_victims",
Help: "Number of selected preemption victims",
},
)

preemptionAttempts = promauto.NewCounter(
prometheus.CounterOpts{
Subsystem: KubeBatchNamespace,
Name: "total_preemption_attempts",
Help: "Total preemption attempts in the cluster till now",
},
)

unscheduleTaskCount = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: KubeBatchNamespace,
Name: "unschedule_task_count",
Help: "Number of tasks could not be scheduled",
}, []string{"job_id"},
)

unscheduleJobCount = promauto.NewGauge(
prometheus.GaugeOpts{
Subsystem: KubeBatchNamespace,
Name: "unschedule_job_count",
Help: "Number of jobs could not be scheduled",
},
)

jobRetryCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Subsystem: KubeBatchNamespace,
Name: "job_retry_counts",
Help: "Number of retry counts for one job",
}, []string{"job_id"},
)
)

// UpdatePluginDuration updates latency for every plugin
func UpdatePluginDuration(pluginName, OnSessionStatus string, duration time.Duration) {
pluginSchedulingLatency.WithLabelValues(pluginName, OnSessionStatus).Observe(DurationInMicroseconds(duration))
}

// UpdateActionDuration updates latency for every action
func UpdateActionDuration(actionName string, duration time.Duration) {
actionSchedulingLatency.WithLabelValues(actionName).Observe(DurationInMicroseconds(duration))
}

// UpdateE2eDuration updates entire end to end scheduling latency
func UpdateE2eDuration(duration time.Duration) {
e2eSchedulingLatency.Observe(DurationInMilliseconds(duration))
}

// UpdateTaskScheduleDuration updates single task scheduling latency
func UpdateTaskScheduleDuration(duration time.Duration) {
taskSchedulingLatency.Observe(DurationInMicroseconds(duration))
}

// UpdatePodScheduleStatus update pod schedule decision, could be Success, Failure, Error
func UpdatePodScheduleStatus(label string, count int) {
scheduleAttempts.WithLabelValues(label).Add(float64(count))
}

// UpdatePreemptionVictimsCount updates count of preemption victims
func UpdatePreemptionVictimsCount(victimsCount int) {
preemptionVictims.Set(float64(victimsCount))
}

// RegisterPreemptionAttempts records number of attempts for preemtion
func RegisterPreemptionAttempts() {
preemptionAttempts.Inc()
}

// UpdateUnscheduleTaskCount records total number of unscheduleable tasks
func UpdateUnscheduleTaskCount(jobID string, taskCount int) {
unscheduleTaskCount.WithLabelValues(jobID).Set(float64(taskCount))
}

// UpdateUnscheduleJobCount records total number of unscheduleable jobs
func UpdateUnscheduleJobCount(jobCount int) {
unscheduleJobCount.Set(float64(jobCount))
}

// RegisterJobRetries total number of job retries.
func RegisterJobRetries(jobID string) {
jobRetryCount.WithLabelValues(jobID).Inc()
}

// DurationInMicroseconds gets the time in microseconds.
func DurationInMicroseconds(duration time.Duration) float64 {
return float64(duration.Nanoseconds()) / float64(time.Microsecond.Nanoseconds())
}

// DurationInMilliseconds gets the time in milliseconds.
func DurationInMilliseconds(duration time.Duration) float64 {
return float64(duration.Nanoseconds()) / float64(time.Millisecond.Nanoseconds())
}

// DurationInSeconds gets the time in seconds.
func DurationInSeconds(duration time.Duration) float64 {
return duration.Seconds()
}

// Duration get the time since specified start
func Duration(start time.Time) time.Duration {
return time.Since(start)
}
Loading

0 comments on commit 82fdced

Please sign in to comment.