Skip to content

Commit

Permalink
Merge pull request kubernetes-retired#592 from Jeffwan/metrics
Browse files Browse the repository at this point in the history
Add performance metrics for scheduling
  • Loading branch information
k8s-ci-robot authored Mar 10, 2019
2 parents fb96516 + 927c3b0 commit 2c07cd3
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 5 deletions.
15 changes: 12 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,7 @@
[prune]
go-tests = true
unused-packages = true

[[constraint]]
name = "github.com/prometheus/client_golang"
version = "0.9.2"
1 change: 1 addition & 0 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
for !tasks.Empty() {
predicateNodes := []*api.NodeInfo{}
nodeScores := map[int][]*api.NodeInfo{}

task := tasks.Pop().(*api.TaskInfo)
assigned := false

Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util"
)

Expand Down Expand Up @@ -212,6 +213,7 @@ func preempt(
}
}
victims := ssn.Preemptable(preemptor, preemptees)
metrics.UpdatePreemptionVictimsCount(len(victims))

if err := validateVictims(victims, resreq); err != nil {
glog.V(3).Infof("No validated victims on Node <%s>: %v", node.Name, err)
Expand All @@ -235,6 +237,7 @@ func preempt(
resreq.Sub(preemptee.Resreq)
}

metrics.RegisterPreemptionAttempts()
glog.V(3).Infof("Preempted <%v> for task <%s/%s> requested <%v>.",
preempted, preemptor.Namespace, preemptor.Name, preemptor.Resreq)

Expand Down
7 changes: 7 additions & 0 deletions pkg/scheduler/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ limitations under the License.
package framework

import (
"time"

"github.com/golang/glog"

"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/conf"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics"
)

func OpenSession(cache cache.Cache, tiers []conf.Tier) *Session {
Expand All @@ -39,15 +42,19 @@ func OpenSession(cache cache.Cache, tiers []conf.Tier) *Session {
}

for _, plugin := range ssn.plugins {
onSessionOpenStart := time.Now()
plugin.OnSessionOpen(ssn)
metrics.UpdatePluginDuration(plugin.Name(), metrics.OnSessionOpen, metrics.Duration(onSessionOpenStart))
}

return ssn
}

func CloseSession(ssn *Session) {
for _, plugin := range ssn.plugins {
onSessionCloseStart := time.Now()
plugin.OnSessionClose(ssn)
metrics.UpdatePluginDuration(plugin.Name(), metrics.OnSessionClose, metrics.Duration(onSessionCloseStart))
}

closeSession(ssn)
Expand Down
4 changes: 3 additions & 1 deletion pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

"github.com/golang/glog"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
Expand All @@ -30,6 +30,7 @@ import (
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/conf"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics"
)

type Session struct {
Expand Down Expand Up @@ -295,6 +296,7 @@ func (ssn *Session) dispatch(task *api.TaskInfo) error {
task.Job, ssn.UID)
}

metrics.UpdateTaskScheduleDuration(metrics.Duration(task.Pod.CreationTimestamp.Time))
return nil
}

Expand Down
191 changes: 191 additions & 0 deletions pkg/scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto" // auto-registry collectors in default registry
)

const (
// KubeBatchNamespace - namespace in prometheus used by kube-batch
KubeBatchNamespace = "kube_batch"

// OnSessionOpen label
OnSessionOpen = "OnSessionOpen"

// OnSessionClose label
OnSessionClose = "OnSessionClose"
)

var (
e2eSchedulingLatency = promauto.NewHistogram(
prometheus.HistogramOpts{
Subsystem: KubeBatchNamespace,
Name: "e2e_scheduling_latency_milliseconds",
Help: "E2e scheduling latency in milliseconds (scheduling algorithm + binding)",
Buckets: prometheus.ExponentialBuckets(5, 2, 10),
},
)

pluginSchedulingLatency = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: KubeBatchNamespace,
Name: "plugin_scheduling_latency_microseconds",
Help: "Plugin scheduling latency in microseconds",
Buckets: prometheus.ExponentialBuckets(5, 2, 10),
}, []string{"plugin", "OnSession"},
)

actionSchedulingLatency = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: KubeBatchNamespace,
Name: "action_scheduling_latency_microseconds",
Help: "Action scheduling latency in microseconds",
Buckets: prometheus.ExponentialBuckets(5, 2, 10),
}, []string{"action"},
)

taskSchedulingLatency = promauto.NewHistogram(
prometheus.HistogramOpts{
Subsystem: KubeBatchNamespace,
Name: "task_scheduling_latency_microseconds",
Help: "Task scheduling latency in microseconds",
Buckets: prometheus.ExponentialBuckets(5, 2, 10),
},
)

scheduleAttempts = promauto.NewCounterVec(
prometheus.CounterOpts{
Subsystem: KubeBatchNamespace,
Name: "schedule_attempts_total",
Help: "Number of attempts to schedule pods, by the result. 'unschedulable' means a pod could not be scheduled, while 'error' means an internal scheduler problem.",
}, []string{"result"},
)

preemptionVictims = promauto.NewGauge(
prometheus.GaugeOpts{
Subsystem: KubeBatchNamespace,
Name: "pod_preemption_victims",
Help: "Number of selected preemption victims",
},
)

preemptionAttempts = promauto.NewCounter(
prometheus.CounterOpts{
Subsystem: KubeBatchNamespace,
Name: "total_preemption_attempts",
Help: "Total preemption attempts in the cluster till now",
},
)

unscheduleTaskCount = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: KubeBatchNamespace,
Name: "unschedule_task_count",
Help: "Number of tasks could not be scheduled",
}, []string{"job_id"},
)

unscheduleJobCount = promauto.NewGauge(
prometheus.GaugeOpts{
Subsystem: KubeBatchNamespace,
Name: "unschedule_job_count",
Help: "Number of jobs could not be scheduled",
},
)

jobRetryCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Subsystem: KubeBatchNamespace,
Name: "job_retry_counts",
Help: "Number of retry counts for one job",
}, []string{"job_id"},
)
)

// UpdatePluginDuration updates latency for every plugin
func UpdatePluginDuration(pluginName, OnSessionStatus string, duration time.Duration) {
pluginSchedulingLatency.WithLabelValues(pluginName, OnSessionStatus).Observe(DurationInMicroseconds(duration))
}

// UpdateActionDuration updates latency for every action
func UpdateActionDuration(actionName string, duration time.Duration) {
actionSchedulingLatency.WithLabelValues(actionName).Observe(DurationInMicroseconds(duration))
}

// UpdateE2eDuration updates entire end to end scheduling latency
func UpdateE2eDuration(duration time.Duration) {
e2eSchedulingLatency.Observe(DurationInMilliseconds(duration))
}

// UpdateTaskScheduleDuration updates single task scheduling latency
func UpdateTaskScheduleDuration(duration time.Duration) {
taskSchedulingLatency.Observe(DurationInMicroseconds(duration))
}

// UpdatePodScheduleStatus update pod schedule decision, could be Success, Failure, Error
func UpdatePodScheduleStatus(label string, count int) {
scheduleAttempts.WithLabelValues(label).Add(float64(count))
}

// UpdatePreemptionVictimsCount updates count of preemption victims
func UpdatePreemptionVictimsCount(victimsCount int) {
preemptionVictims.Set(float64(victimsCount))
}

// RegisterPreemptionAttempts records number of attempts for preemtion
func RegisterPreemptionAttempts() {
preemptionAttempts.Inc()
}

// UpdateUnscheduleTaskCount records total number of unscheduleable tasks
func UpdateUnscheduleTaskCount(jobID string, taskCount int) {
unscheduleTaskCount.WithLabelValues(jobID).Set(float64(taskCount))
}

// UpdateUnscheduleJobCount records total number of unscheduleable jobs
func UpdateUnscheduleJobCount(jobCount int) {
unscheduleJobCount.Set(float64(jobCount))
}

// RegisterJobRetries total number of job retries.
func RegisterJobRetries(jobID string) {
jobRetryCount.WithLabelValues(jobID).Inc()
}

// DurationInMicroseconds gets the time in microseconds.
func DurationInMicroseconds(duration time.Duration) float64 {
return float64(duration.Nanoseconds()) / float64(time.Microsecond.Nanoseconds())
}

// DurationInMilliseconds gets the time in milliseconds.
func DurationInMilliseconds(duration time.Duration) float64 {
return float64(duration.Nanoseconds()) / float64(time.Millisecond.Nanoseconds())
}

// DurationInSeconds gets the time in seconds.
func DurationInSeconds(duration time.Duration) float64 {
return duration.Seconds()
}

// Duration get the time since specified start
func Duration(start time.Time) time.Duration {
return time.Since(start)
}
12 changes: 11 additions & 1 deletion pkg/scheduler/plugins/gang/gang.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (

"github.com/golang/glog"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics"
)

type gangPlugin struct {
Expand Down Expand Up @@ -156,11 +157,18 @@ func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) {
}

func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) {
var unreadyTaskCount int32
var unScheduleJobCount int
for _, job := range ssn.Jobs {
if !jobReady(job) {
unreadyTaskCount = job.MinAvailable - readyTaskNum(job)
msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v",
job.MinAvailable-readyTaskNum(job), len(job.Tasks), job.FitError())

unScheduleJobCount += 1
metrics.UpdateUnscheduleTaskCount(job.Name, int(unreadyTaskCount))
metrics.RegisterJobRetries(job.Name)

jc := &v1alpha1.PodGroupCondition{
Type: v1alpha1.PodGroupUnschedulableType,
Status: v1.ConditionTrue,
Expand All @@ -176,4 +184,6 @@ func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) {
}
}
}

metrics.UpdateUnscheduleJobCount(unScheduleJobCount)
}
5 changes: 5 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
schedcache "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/conf"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics"
)

type Scheduler struct {
Expand Down Expand Up @@ -82,12 +83,16 @@ func (pc *Scheduler) Run(stopCh <-chan struct{}) {

func (pc *Scheduler) runOnce() {
glog.V(4).Infof("Start scheduling ...")
scheduleStartTime := time.Now()
defer glog.V(4).Infof("End scheduling ...")
defer metrics.UpdateE2eDuration(metrics.Duration(scheduleStartTime))

ssn := framework.OpenSession(pc.cache, pc.plugins)
defer framework.CloseSession(ssn)

for _, action := range pc.actions {
actionStartTime := time.Now()
action.Execute(ssn)
metrics.UpdateActionDuration(action.Name(), metrics.Duration(actionStartTime))
}
}

0 comments on commit 2c07cd3

Please sign in to comment.