Skip to content

Commit

Permalink
Merge pull request volcano-sh#35 from lminzhw/performance_improvement
Browse files Browse the repository at this point in the history
Performance improvement
  • Loading branch information
volcano-sh-bot authored Jun 28, 2019
2 parents ab057d3 + 65e615e commit a92572f
Show file tree
Hide file tree
Showing 19 changed files with 569 additions and 125 deletions.
7 changes: 7 additions & 0 deletions cmd/kube-batch/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const (
defaultSchedulerPeriod = time.Second
defaultQueue = "default"
defaultListenAddress = ":8080"

defaultQPS = 50.0
defaultBurst = 100
)

// ServerOption is the main context object for the controller manager.
Expand All @@ -43,6 +46,8 @@ type ServerOption struct {
PrintVersion bool
ListenAddress string
EnablePriorityClass bool
KubeAPIBurst int
KubeAPIQPS float32
}

// ServerOpts server options
Expand Down Expand Up @@ -71,6 +76,8 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.ListenAddress, "listen-address", defaultListenAddress, "The address to listen on for HTTP requests.")
fs.BoolVar(&s.EnablePriorityClass, "priority-class", true,
"Enable PriorityClass to provide the capacity of preemption at pod group level; to disable it, set it false")
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver")
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver")
}

// CheckOptionOrDie check lock-object-namespace when LeaderElection is enabled
Expand Down
2 changes: 2 additions & 0 deletions cmd/kube-batch/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func TestAddFlags(t *testing.T) {
SchedulePeriod: 5 * time.Minute,
DefaultQueue: defaultQueue,
ListenAddress: defaultListenAddress,
KubeAPIBurst: defaultBurst,
KubeAPIQPS: defaultQPS,
}

if !reflect.DeepEqual(expected, s) {
Expand Down
21 changes: 17 additions & 4 deletions cmd/kube-batch/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,24 @@ const (
apiVersion = "v1alpha1"
)

func buildConfig(master, kubeconfig string) (*rest.Config, error) {
func buildConfig(opt *options.ServerOption) (*rest.Config, error) {
var cfg *rest.Config
var err error

master := opt.Master
kubeconfig := opt.Kubeconfig
if master != "" || kubeconfig != "" {
return clientcmd.BuildConfigFromFlags(master, kubeconfig)
cfg, err = clientcmd.BuildConfigFromFlags(master, kubeconfig)
} else {
cfg, err = rest.InClusterConfig()
}
if err != nil {
return nil, err
}
return rest.InClusterConfig()
cfg.QPS = opt.KubeAPIQPS
cfg.Burst = opt.KubeAPIBurst

return cfg, nil
}

// Run the kubeBatch scheduler
Expand All @@ -65,7 +78,7 @@ func Run(opt *options.ServerOption) error {
version.PrintVersionAndExit(apiVersion)
}

config, err := buildConfig(opt.Master, opt.Kubeconfig)
config, err := buildConfig(opt)
if err != nil {
return err
}
Expand Down
6 changes: 6 additions & 0 deletions cmd/kube-batch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ package main
import (
"fmt"
"os"
"runtime"
"time"

// init pprof server
_ "net/http/pprof"

"github.com/golang/glog"
"github.com/spf13/pflag"

Expand All @@ -37,6 +41,8 @@ import (
var logFlushFreq = pflag.Duration("log-flush-frequency", 5*time.Second, "Maximum number of seconds between log flushes")

func main() {
runtime.GOMAXPROCS(runtime.NumCPU())

s := options.NewServerOption()
s.AddFlags(pflag.CommandLine)
s.RegisterOptions()
Expand Down
6 changes: 5 additions & 1 deletion pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending {
continue
}
if vr := ssn.JobValid(job); vr != nil && !vr.Pass {
glog.V(4).Infof("Job <%s/%s> Queue <%s> skip allocate, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message)
continue
}

if queue, found := ssn.Queues[job.Queue]; found {
queues.Push(queue)
Expand Down Expand Up @@ -150,7 +154,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
break
}

nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)

node := util.SelectBestNode(nodeScores)
// Allocate idle resource to the task.
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (alloc *backfillAction) Execute(ssn *framework.Session) {
if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending {
continue
}
if vr := ssn.JobValid(job); vr != nil && !vr.Pass {
glog.V(4).Infof("Job <%s/%s> Queue <%s> skip backfill, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message)
continue
}

for _, task := range job.TaskStatusIndex[api.Pending] {
if task.InitResreq.IsEmpty() {
Expand Down
6 changes: 5 additions & 1 deletion pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {
if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending {
continue
}
if vr := ssn.JobValid(job); vr != nil && !vr.Pass {
glog.V(4).Infof("Job <%s/%s> Queue <%s> skip preemption, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message)
continue
}

if queue, found := ssn.Queues[job.Queue]; !found {
continue
Expand Down Expand Up @@ -186,7 +190,7 @@ func preempt(

predicateNodes := util.PredicateNodes(preemptor, allNodes, ssn.PredicateFn)

nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)

selectedNodes := util.SortNodes(nodeScores)
for _, node := range selectedNodes {
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) {
if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending {
continue
}
if vr := ssn.JobValid(job); vr != nil && !vr.Pass {
glog.V(4).Infof("Job <%s/%s> Queue <%s> skip reclaim, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message)
continue
}

if queue, found := ssn.Queues[job.Queue]; !found {
glog.Errorf("Failed to find Queue <%s> for Job <%s/%s>",
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (ji *JobInfo) Clone() *JobInfo {
NodesFitDelta: make(NodeResourceMap),

PDB: ji.PDB,
PodGroup: ji.PodGroup,
PodGroup: ji.PodGroup.DeepCopy(),

TaskStatusIndex: map[TaskStatus]tasksMap{},
Tasks: tasksMap{},
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ type EvictableFn func(*TaskInfo, []*TaskInfo) []*TaskInfo
// NodeOrderFn is the func declaration used to get priority score for a node for a particular task.
type NodeOrderFn func(*TaskInfo, *NodeInfo) (float64, error)

// BatchNodeOrderFn is the func declaration used to get priority score for ALL nodes for a particular task.
type BatchNodeOrderFn func(*TaskInfo, []*NodeInfo) (map[string]float64, error)

// NodeMapFn is the func declaration used to get priority score for a node for a particular task.
type NodeMapFn func(*TaskInfo, *NodeInfo) (float64, error)

Expand Down
115 changes: 86 additions & 29 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,31 @@ type defaultStatusUpdater struct {
kbclient *kbver.Clientset
}

// following the same logic as podutil.UpdatePodCondition
func podConditionHaveUpdate(status *v1.PodStatus, condition *v1.PodCondition) bool {
lastTransitionTime := metav1.Now()
// Try to find this pod condition.
_, oldCondition := podutil.GetPodCondition(status, condition.Type)

if oldCondition == nil {
// We are adding new pod condition.
return true
}
// We are updating an existing condition, so we need to check if it has changed.
if condition.Status == oldCondition.Status {
lastTransitionTime = oldCondition.LastTransitionTime
}

isEqual := condition.Status == oldCondition.Status &&
condition.Reason == oldCondition.Reason &&
condition.Message == oldCondition.Message &&
condition.LastProbeTime.Equal(&oldCondition.LastProbeTime) &&
lastTransitionTime.Equal(&oldCondition.LastTransitionTime)

// Return true if one of the fields have changed.
return !isEqual
}

// UpdatePodCondition will Update pod with podCondition
func (su *defaultStatusUpdater) UpdatePodCondition(pod *v1.Pod, condition *v1.PodCondition) (*v1.Pod, error) {
glog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s)", pod.Namespace, pod.Name, condition.Type, condition.Status)
Expand Down Expand Up @@ -184,22 +209,35 @@ func (dvb *defaultVolumeBinder) BindVolumes(task *api.TaskInfo) error {
}

func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue string) *SchedulerCache {
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
panic(fmt.Sprintf("failed init kubeClient, with err: %v", err))
}
kbClient, err := kbver.NewForConfig(config)
if err != nil {
panic(fmt.Sprintf("failed init kbClient, with err: %v", err))
}
eventClient, err := kubernetes.NewForConfig(config)
if err != nil {
panic(fmt.Sprintf("failed init eventClient, with err: %v", err))
}

sc := &SchedulerCache{
Jobs: make(map[kbapi.JobID]*kbapi.JobInfo),
Nodes: make(map[string]*kbapi.NodeInfo),
Queues: make(map[kbapi.QueueID]*kbapi.QueueInfo),
PriorityClasses: make(map[string]*v1beta1.PriorityClass),
errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
deletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
kubeclient: kubernetes.NewForConfigOrDie(config),
kbclient: kbver.NewForConfigOrDie(config),
kubeclient: kubeClient,
kbclient: kbClient,
defaultQueue: defaultQueue,
schedulerName: schedulerName,
}

// Prepare event clients.
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: sc.kubeclient.CoreV1().Events("")})
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: eventClient.CoreV1().Events("")})
sc.Recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: schedulerName})

sc.Binder = &defaultBinder{
Expand Down Expand Up @@ -460,22 +498,27 @@ func (sc *SchedulerCache) BindVolumes(task *api.TaskInfo) error {

// taskUnschedulable updates pod status of pending task
func (sc *SchedulerCache) taskUnschedulable(task *api.TaskInfo, message string) error {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
pod := task.Pod

pod := task.Pod.DeepCopy()

// The reason field in 'Events' should be "FailedScheduling", there is not constants defined for this in
// k8s core, so using the same string here.
// The reason field in PodCondition should be "Unschedulable"
sc.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", message)
if _, err := sc.StatusUpdater.UpdatePodCondition(pod, &v1.PodCondition{
condition := &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: v1.PodReasonUnschedulable,
Message: message,
}); err != nil {
return err
}

if podConditionHaveUpdate(&pod.Status, condition) {
pod = pod.DeepCopy()

// The reason field in 'Events' should be "FailedScheduling", there is not constants defined for this in
// k8s core, so using the same string here.
// The reason field in PodCondition should be "Unschedulable"
sc.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", message)
if _, err := sc.StatusUpdater.UpdatePodCondition(pod, condition); err != nil {
return err
}
} else {
glog.V(4).Infof("task unscheduleable %s/%s, message: %s, skip by no condition update", pod.Namespace, pod.Name, message)
}

return nil
Expand Down Expand Up @@ -560,6 +603,30 @@ func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo {
snapshot.Queues[value.UID] = value.Clone()
}

var cloneJobLock sync.Mutex
var wg sync.WaitGroup

cloneJob := func(value *api.JobInfo) {
if value.PodGroup != nil {
value.Priority = sc.defaultPriority

priName := value.PodGroup.Spec.PriorityClassName
if priorityClass, found := sc.PriorityClasses[priName]; found {
value.Priority = priorityClass.Value
}

glog.V(4).Infof("The priority of job <%s/%s> is <%s/%d>",
value.Namespace, value.Name, priName, value.Priority)
}

clonedJob := value.Clone()

cloneJobLock.Lock()
snapshot.Jobs[value.UID] = clonedJob
cloneJobLock.Unlock()
wg.Done()
}

for _, value := range sc.Jobs {
// If no scheduling spec, does not handle it.
if value.PodGroup == nil && value.PDB == nil {
Expand All @@ -575,20 +642,10 @@ func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo {
continue
}

if value.PodGroup != nil {
value.Priority = sc.defaultPriority

priName := value.PodGroup.Spec.PriorityClassName
if priorityClass, found := sc.PriorityClasses[priName]; found {
value.Priority = priorityClass.Value
}

glog.V(4).Infof("The priority of job <%s/%s> is <%s/%d>",
value.Namespace, value.Name, priName, value.Priority)
}

snapshot.Jobs[value.UID] = value.Clone()
wg.Add(1)
go cloneJob(value)
}
wg.Wait()

glog.V(3).Infof("There are <%d> Jobs, <%d> Queues and <%d> Nodes in total for scheduling.",
len(snapshot.Jobs), len(snapshot.Queues), len(snapshot.Nodes))
Expand Down Expand Up @@ -658,8 +715,8 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) {
}

// UpdateJobStatus update the status of job and its tasks.
func (sc *SchedulerCache) UpdateJobStatus(job *kbapi.JobInfo) (*kbapi.JobInfo, error) {
if !shadowPodGroup(job.PodGroup) {
func (sc *SchedulerCache) UpdateJobStatus(job *kbapi.JobInfo, updatePG bool) (*kbapi.JobInfo, error) {
if updatePG && !shadowPodGroup(job.PodGroup) {
pg, err := sc.StatusUpdater.UpdatePodGroup(job.PodGroup)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Cache interface {
RecordJobStatusEvent(job *api.JobInfo)

// UpdateJobStatus puts job in backlog for a while.
UpdateJobStatus(job *api.JobInfo) (*api.JobInfo, error)
UpdateJobStatus(job *api.JobInfo, updatePG bool) (*api.JobInfo, error)

// AllocateVolumes allocates volume on the host to the task
AllocateVolumes(task *api.TaskInfo, hostname string) error
Expand Down
Loading

0 comments on commit a92572f

Please sign in to comment.