Skip to content
This repository has been archived by the owner on May 25, 2023. It is now read-only.

Commit

Permalink
Add metrics for kube-batch
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffwan committed Mar 10, 2019
1 parent 6ec0670 commit 45b6141
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 2 deletions.
2 changes: 2 additions & 0 deletions cmd/kube-batch/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ServerOption struct {
LockObjectNamespace string
DefaultQueue string
PrintVersion bool
ListenAddress string
}

// NewServerOption creates a new CMServer with a default config.
Expand All @@ -56,6 +57,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
"executing the main loop. Enable this when running replicated kube-batch for high availability")
fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")
fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", s.LockObjectNamespace, "Define the namespace of the lock object")
fs.StringVar(&s.ListenAddress, "listen-address", ":8080", "The address to listen on for HTTP requests.")
}

func (s *ServerOption) CheckOptionOrDie() error {
Expand Down
7 changes: 7 additions & 0 deletions cmd/kube-batch/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package app
import (
"context"
"fmt"
"net/http"
"os"
"time"

"github.com/golang/glog"
"github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/options"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler"
"github.com/kubernetes-sigs/kube-batch/pkg/version"
"github.com/prometheus/client_golang/prometheus/promhttp"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/uuid"
Expand Down Expand Up @@ -75,6 +77,11 @@ func Run(opt *options.ServerOption) error {
panic(err)
}

go func() {
http.Handle("/metrics", promhttp.Handler())
glog.Fatalf("Prometheus Http Server failed %s", http.ListenAndServe(opt.ListenAddress, nil))
}()

run := func(ctx context.Context) {
sched.Run(ctx.Done())
<-ctx.Done()
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
for !tasks.Empty() {
predicateNodes := []*api.NodeInfo{}
nodeScores := map[int][]*api.NodeInfo{}

task := tasks.Pop().(*api.TaskInfo)
assigned := false

Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util"
)

Expand Down Expand Up @@ -212,6 +213,7 @@ func preempt(
}
}
victims := ssn.Preemptable(preemptor, preemptees)
metrics.UpdatePreemptionVictimsCount(len(victims))

if err := validateVictims(victims, resreq); err != nil {
glog.V(3).Infof("No validated victims on Node <%s>: %v", node.Name, err)
Expand All @@ -235,6 +237,7 @@ func preempt(
resreq.Sub(preemptee.Resreq)
}

metrics.RegisterPreemptionAttempts()
glog.V(3).Infof("Preempted <%v> for task <%s/%s> requested <%v>.",
preempted, preemptor.Namespace, preemptor.Name, preemptor.Resreq)

Expand Down
7 changes: 7 additions & 0 deletions pkg/scheduler/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ limitations under the License.
package framework

import (
"time"

"github.com/golang/glog"

"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/conf"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics"
)

func OpenSession(cache cache.Cache, tiers []conf.Tier) *Session {
Expand All @@ -39,15 +42,19 @@ func OpenSession(cache cache.Cache, tiers []conf.Tier) *Session {
}

for _, plugin := range ssn.plugins {
onSessionOpenStart := time.Now()
plugin.OnSessionOpen(ssn)
metrics.UpdatePluginDuration(plugin.Name(), metrics.OnSessionOpen, metrics.Duration(onSessionOpenStart))
}

return ssn
}

func CloseSession(ssn *Session) {
for _, plugin := range ssn.plugins {
onSessionCloseStart := time.Now()
plugin.OnSessionClose(ssn)
metrics.UpdatePluginDuration(plugin.Name(), metrics.OnSessionClose, metrics.Duration(onSessionCloseStart))
}

closeSession(ssn)
Expand Down
4 changes: 3 additions & 1 deletion pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

"github.com/golang/glog"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
Expand All @@ -30,6 +30,7 @@ import (
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/conf"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics"
)

type Session struct {
Expand Down Expand Up @@ -295,6 +296,7 @@ func (ssn *Session) dispatch(task *api.TaskInfo) error {
task.Job, ssn.UID)
}

metrics.UpdateTaskScheduleDuration(metrics.Duration(task.Pod.CreationTimestamp.Time))
return nil
}

Expand Down
191 changes: 191 additions & 0 deletions pkg/scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto" // auto-registry collectors in default registry
)

const (
// KubeBatchNamespace - namespace in prometheus used by kube-batch
KubeBatchNamespace = "kube_batch"

// OnSessionOpen label
OnSessionOpen = "OnSessionOpen"

// OnSessionClose label
OnSessionClose = "OnSessionClose"
)

var (
e2eSchedulingLatency = promauto.NewHistogram(
prometheus.HistogramOpts{
Subsystem: KubeBatchNamespace,
Name: "e2e_scheduling_latency_milliseconds",
Help: "E2e scheduling latency in milliseconds (scheduling algorithm + binding)",
Buckets: prometheus.ExponentialBuckets(5, 2, 10),
},
)

pluginSchedulingLatency = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: KubeBatchNamespace,
Name: "plugin_scheduling_latency_microseconds",
Help: "Plugin scheduling latency in microseconds",
Buckets: prometheus.ExponentialBuckets(5, 2, 10),
}, []string{"plugin", "OnSession"},
)

actionSchedulingLatency = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: KubeBatchNamespace,
Name: "action_scheduling_latency_microseconds",
Help: "Action scheduling latency in microseconds",
Buckets: prometheus.ExponentialBuckets(5, 2, 10),
}, []string{"action"},
)

taskSchedulingLatency = promauto.NewHistogram(
prometheus.HistogramOpts{
Subsystem: KubeBatchNamespace,
Name: "task_scheduling_latency_microseconds",
Help: "Task scheduling latency in microseconds",
Buckets: prometheus.ExponentialBuckets(5, 2, 10),
},
)

scheduleAttempts = promauto.NewCounterVec(
prometheus.CounterOpts{
Subsystem: KubeBatchNamespace,
Name: "schedule_attempts_total",
Help: "Number of attempts to schedule pods, by the result. 'unschedulable' means a pod could not be scheduled, while 'error' means an internal scheduler problem.",
}, []string{"result"},
)

preemptionVictims = promauto.NewGauge(
prometheus.GaugeOpts{
Subsystem: KubeBatchNamespace,
Name: "pod_preemption_victims",
Help: "Number of selected preemption victims",
},
)

preemptionAttempts = promauto.NewCounter(
prometheus.CounterOpts{
Subsystem: KubeBatchNamespace,
Name: "total_preemption_attempts",
Help: "Total preemption attempts in the cluster till now",
},
)

unscheduleTaskCount = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: KubeBatchNamespace,
Name: "unschedule_task_count",
Help: "Number of tasks could not be scheduled",
}, []string{"job_id"},
)

unscheduleJobCount = promauto.NewGauge(
prometheus.GaugeOpts{
Subsystem: KubeBatchNamespace,
Name: "unschedule_job_count",
Help: "Number of jobs could not be scheduled",
},
)

jobRetryCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Subsystem: KubeBatchNamespace,
Name: "job_retry_counts",
Help: "Number of retry counts for one job",
}, []string{"job_id"},
)
)

// UpdatePluginDuration updates latency for every plugin
func UpdatePluginDuration(pluginName, OnSessionStatus string, duration time.Duration) {
pluginSchedulingLatency.WithLabelValues(pluginName, OnSessionStatus).Observe(DurationInMicroseconds(duration))
}

// UpdateActionDuration updates latency for every action
func UpdateActionDuration(actionName string, duration time.Duration) {
actionSchedulingLatency.WithLabelValues(actionName).Observe(DurationInMicroseconds(duration))
}

// UpdateE2eDuration updates entire end to end scheduling latency
func UpdateE2eDuration(duration time.Duration) {
e2eSchedulingLatency.Observe(DurationInMilliseconds(duration))
}

// UpdateTaskScheduleDuration updates single task scheduling latency
func UpdateTaskScheduleDuration(duration time.Duration) {
taskSchedulingLatency.Observe(DurationInMicroseconds(duration))
}

// UpdatePodScheduleStatus update pod schedule decision, could be Success, Failure, Error
func UpdatePodScheduleStatus(label string, count int) {
scheduleAttempts.WithLabelValues(label).Add(float64(count))
}

// UpdatePreemptionVictimsCount updates count of preemption victims
func UpdatePreemptionVictimsCount(victimsCount int) {
preemptionVictims.Set(float64(victimsCount))
}

// RegisterPreemptionAttempts records number of attempts for preemtion
func RegisterPreemptionAttempts() {
preemptionAttempts.Inc()
}

// UpdateUnscheduleTaskCount records total number of unscheduleable tasks
func UpdateUnscheduleTaskCount(jobID string, taskCount int) {
unscheduleTaskCount.WithLabelValues(jobID).Set(float64(taskCount))
}

// UpdateUnscheduleJobCount records total number of unscheduleable jobs
func UpdateUnscheduleJobCount(jobCount int) {
unscheduleJobCount.Set(float64(jobCount))
}

// RegisterJobRetries total number of job retries.
func RegisterJobRetries(jobID string) {
jobRetryCount.WithLabelValues(jobID).Inc()
}

// DurationInMicroseconds gets the time in microseconds.
func DurationInMicroseconds(duration time.Duration) float64 {
return float64(duration.Nanoseconds()) / float64(time.Microsecond.Nanoseconds())
}

// DurationInMilliseconds gets the time in milliseconds.
func DurationInMilliseconds(duration time.Duration) float64 {
return float64(duration.Nanoseconds()) / float64(time.Millisecond.Nanoseconds())
}

// DurationInSeconds gets the time in seconds.
func DurationInSeconds(duration time.Duration) float64 {
return duration.Seconds()
}

// Duration get the time since specified start
func Duration(start time.Time) time.Duration {
return time.Since(start)
}
12 changes: 11 additions & 1 deletion pkg/scheduler/plugins/gang/gang.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (

"github.com/golang/glog"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics"
)

type gangPlugin struct {
Expand Down Expand Up @@ -156,11 +157,18 @@ func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) {
}

func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) {
var unreadyTaskCount int32
var unScheduleJobCount int
for _, job := range ssn.Jobs {
if !jobReady(job) {
unreadyTaskCount = job.MinAvailable - readyTaskNum(job)
msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v",
job.MinAvailable-readyTaskNum(job), len(job.Tasks), job.FitError())

unScheduleJobCount += 1
metrics.UpdateUnscheduleTaskCount(job.Name, int(unreadyTaskCount))
metrics.RegisterJobRetries(job.Name)

jc := &v1alpha1.PodGroupCondition{
Type: v1alpha1.PodGroupUnschedulableType,
Status: v1.ConditionTrue,
Expand All @@ -176,4 +184,6 @@ func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) {
}
}
}

metrics.UpdateUnscheduleJobCount(unScheduleJobCount)
}
5 changes: 5 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
schedcache "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/conf"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics"
)

type Scheduler struct {
Expand Down Expand Up @@ -82,12 +83,16 @@ func (pc *Scheduler) Run(stopCh <-chan struct{}) {

func (pc *Scheduler) runOnce() {
glog.V(4).Infof("Start scheduling ...")
scheduleStartTime := time.Now()
defer glog.V(4).Infof("End scheduling ...")
defer metrics.UpdateE2eDuration(metrics.Duration(scheduleStartTime))

ssn := framework.OpenSession(pc.cache, pc.plugins)
defer framework.CloseSession(ssn)

for _, action := range pc.actions {
actionStartTime := time.Now()
action.Execute(ssn)
metrics.UpdateActionDuration(action.Name(), metrics.Duration(actionStartTime))
}
}

0 comments on commit 45b6141

Please sign in to comment.