From ef9a89f831780bcd65ced7fbea4ab326133f1668 Mon Sep 17 00:00:00 2001 From: lminzhw Date: Thu, 23 May 2019 09:49:54 +0800 Subject: [PATCH 1/4] performance improvement --- cmd/kube-batch/app/options/options.go | 7 + cmd/kube-batch/app/server.go | 22 ++- pkg/scheduler/actions/allocate/allocate.go | 4 + pkg/scheduler/actions/backfill/backfill.go | 4 + pkg/scheduler/actions/preempt/preempt.go | 4 + pkg/scheduler/actions/reclaim/reclaim.go | 4 + pkg/scheduler/api/job_info.go | 2 +- pkg/scheduler/cache/cache.go | 7 +- pkg/scheduler/cache/interface.go | 2 +- pkg/scheduler/framework/job_updater.go | 122 +++++++++++++ pkg/scheduler/framework/session.go | 25 ++- pkg/scheduler/plugins/nodeorder/nodeorder.go | 77 ++++---- .../plugins/predicates/predicates.go | 52 +++++- pkg/scheduler/plugins/util/util.go | 165 +++++++++++++++--- 14 files changed, 407 insertions(+), 90 deletions(-) create mode 100644 pkg/scheduler/framework/job_updater.go diff --git a/cmd/kube-batch/app/options/options.go b/cmd/kube-batch/app/options/options.go index b0c97258e9..85f9e92ffc 100644 --- a/cmd/kube-batch/app/options/options.go +++ b/cmd/kube-batch/app/options/options.go @@ -28,6 +28,9 @@ const ( defaultSchedulerPeriod = time.Second defaultQueue = "default" defaultListenAddress = ":8080" + + defaultQPS = 50.0 + defaultBurst = 100 ) // ServerOption is the main context object for the controller manager. @@ -43,6 +46,8 @@ type ServerOption struct { PrintVersion bool ListenAddress string EnablePriorityClass bool + KubeAPIBurst int + KubeAPIQPS float32 } // ServerOpts server options @@ -71,6 +76,8 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.ListenAddress, "listen-address", defaultListenAddress, "The address to listen on for HTTP requests.") fs.BoolVar(&s.EnablePriorityClass, "priority-class", true, "Enable PriorityClass to provide the capacity of preemption at pod group level; to disable it, set it false") + fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver") + fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver") } // CheckOptionOrDie check lock-object-namespace when LeaderElection is enabled diff --git a/cmd/kube-batch/app/server.go b/cmd/kube-batch/app/server.go index 1fbe6f415e..600d7fd0a1 100644 --- a/cmd/kube-batch/app/server.go +++ b/cmd/kube-batch/app/server.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net/http" + _ "net/http/pprof" "os" "time" @@ -52,11 +53,24 @@ const ( apiVersion = "v1alpha1" ) -func buildConfig(master, kubeconfig string) (*rest.Config, error) { +func buildConfig(opt *options.ServerOption) (*rest.Config, error) { + var cfg *rest.Config + var err error + + master := opt.Master + kubeconfig := opt.Kubeconfig if master != "" || kubeconfig != "" { - return clientcmd.BuildConfigFromFlags(master, kubeconfig) + cfg, err = clientcmd.BuildConfigFromFlags(master, kubeconfig) + } else { + cfg, err = rest.InClusterConfig() + } + if err != nil { + return nil, err } - return rest.InClusterConfig() + cfg.QPS = opt.KubeAPIQPS + cfg.Burst = opt.KubeAPIBurst + + return cfg, nil } // Run the kubeBatch scheduler @@ -65,7 +79,7 @@ func Run(opt *options.ServerOption) error { version.PrintVersionAndExit(apiVersion) } - config, err := buildConfig(opt.Master, opt.Kubeconfig) + config, err := buildConfig(opt) if err != nil { return err } diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 18458d7cad..89a5c51083 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -52,6 +52,10 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending { continue } + if vr := ssn.JobValid(job); vr != nil && !vr.Pass { + glog.V(4).Infof("Job <%s/%s> Queue <%s> skip allocate, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message) + continue + } if queue, found := ssn.Queues[job.Queue]; found { queues.Push(queue) diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index 618c8cedaf..4b1a38dd19 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -47,6 +47,10 @@ func (alloc *backfillAction) Execute(ssn *framework.Session) { if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending { continue } + if vr := ssn.JobValid(job); vr != nil && !vr.Pass { + glog.V(4).Infof("Job <%s/%s> Queue <%s> skip backfill, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message) + continue + } for _, task := range job.TaskStatusIndex[api.Pending] { if task.InitResreq.IsEmpty() { diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index 4ca1b5f508..3af0dd674a 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -56,6 +56,10 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) { if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending { continue } + if vr := ssn.JobValid(job); vr != nil && !vr.Pass { + glog.V(4).Infof("Job <%s/%s> Queue <%s> skip preemption, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message) + continue + } if queue, found := ssn.Queues[job.Queue]; !found { continue diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index 520e8264a8..5545df516f 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -57,6 +57,10 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) { if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending { continue } + if vr := ssn.JobValid(job); vr != nil && !vr.Pass { + glog.V(4).Infof("Job <%s/%s> Queue <%s> skip reclaim, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message) + continue + } if queue, found := ssn.Queues[job.Queue]; !found { glog.Errorf("Failed to find Queue <%s> for Job <%s/%s>", diff --git a/pkg/scheduler/api/job_info.go b/pkg/scheduler/api/job_info.go index 2a9a6bfcf5..ce27db2545 100644 --- a/pkg/scheduler/api/job_info.go +++ b/pkg/scheduler/api/job_info.go @@ -302,7 +302,7 @@ func (ji *JobInfo) Clone() *JobInfo { NodesFitDelta: make(NodeResourceMap), PDB: ji.PDB, - PodGroup: ji.PodGroup, + PodGroup: ji.PodGroup.DeepCopy(), TaskStatusIndex: map[TaskStatus]tasksMap{}, Tasks: tasksMap{}, diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 322d3b4d02..7de275dc35 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -460,9 +460,6 @@ func (sc *SchedulerCache) BindVolumes(task *api.TaskInfo) error { // taskUnschedulable updates pod status of pending task func (sc *SchedulerCache) taskUnschedulable(task *api.TaskInfo, message string) error { - sc.Mutex.Lock() - defer sc.Mutex.Unlock() - pod := task.Pod.DeepCopy() // The reason field in 'Events' should be "FailedScheduling", there is not constants defined for this in @@ -658,8 +655,8 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) { } // UpdateJobStatus update the status of job and its tasks. -func (sc *SchedulerCache) UpdateJobStatus(job *kbapi.JobInfo) (*kbapi.JobInfo, error) { - if !shadowPodGroup(job.PodGroup) { +func (sc *SchedulerCache) UpdateJobStatus(job *kbapi.JobInfo, updatePG bool) (*kbapi.JobInfo, error) { + if updatePG && !shadowPodGroup(job.PodGroup) { pg, err := sc.StatusUpdater.UpdatePodGroup(job.PodGroup) if err != nil { return nil, err diff --git a/pkg/scheduler/cache/interface.go b/pkg/scheduler/cache/interface.go index b664ea20a6..9a2807d4e2 100644 --- a/pkg/scheduler/cache/interface.go +++ b/pkg/scheduler/cache/interface.go @@ -46,7 +46,7 @@ type Cache interface { RecordJobStatusEvent(job *api.JobInfo) // UpdateJobStatus puts job in backlog for a while. - UpdateJobStatus(job *api.JobInfo) (*api.JobInfo, error) + UpdateJobStatus(job *api.JobInfo, updatePG bool) (*api.JobInfo, error) // AllocateVolumes allocates volume on the host to the task AllocateVolumes(task *api.TaskInfo, hostname string) error diff --git a/pkg/scheduler/framework/job_updater.go b/pkg/scheduler/framework/job_updater.go new file mode 100644 index 0000000000..ab03cc992e --- /dev/null +++ b/pkg/scheduler/framework/job_updater.go @@ -0,0 +1,122 @@ +package framework + +import ( + "context" + "math/rand" + "reflect" + "time" + + "github.com/golang/glog" + + "k8s.io/client-go/util/workqueue" + + "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" +) + +const ( + jobUpdaterWorker = 16 + + jobConditionUpdateTime = time.Minute + jobConditionUpdateTimeJitter = 30 * time.Second +) + +// TimeJitterAfter means: new after old + duration + jitter +func TimeJitterAfter(new, old time.Time, duration, maxJitter time.Duration) bool { + var jitter int64 + if maxJitter > 0 { + jitter = rand.Int63n(int64(maxJitter)) + } + return new.After(old.Add(duration + time.Duration(jitter))) +} + +type jobUpdater struct { + ssn *Session + jobQueue []*api.JobInfo +} + +func newJobUpdater(ssn *Session) *jobUpdater { + queue := make([]*api.JobInfo, 0, len(ssn.Jobs)) + for _, job := range ssn.Jobs { + queue = append(queue, job) + } + + ju := &jobUpdater{ + ssn: ssn, + jobQueue: queue, + } + return ju +} + +func (ju *jobUpdater) UpdateAll() { + workqueue.ParallelizeUntil(context.TODO(), jobUpdaterWorker, len(ju.jobQueue), ju.updateJob) +} + +func isPodGroupConditionsUpdated(newCondition, oldCondition []v1alpha1.PodGroupCondition) bool { + if len(newCondition) != len(oldCondition) { + return true + } + + for index, newCond := range newCondition { + oldCond := oldCondition[index] + + newTime := newCond.LastTransitionTime + oldTime := oldCond.LastTransitionTime + if TimeJitterAfter(newTime.Time, oldTime.Time, jobConditionUpdateTime, jobConditionUpdateTimeJitter) { + return true + } + + // if newCond is not new enough, we treat it the same as the old one + newCond.LastTransitionTime = oldTime + + // comparing should ignore the TransitionID + newTransitionID := newCond.TransitionID + newCond.TransitionID = oldCond.TransitionID + + shouldUpdate := !reflect.DeepEqual(&newCond, &oldCond) + + newCond.LastTransitionTime = newTime + newCond.TransitionID = newTransitionID + if shouldUpdate { + return true + } + } + + return false +} + +func isPodGroupStatusUpdated(newStatus, oldStatus *v1alpha1.PodGroupStatus) bool { + newCondition := newStatus.Conditions + newStatus.Conditions = nil + oldCondition := oldStatus.Conditions + oldStatus.Conditions = nil + + shouldUpdate := !reflect.DeepEqual(newStatus, oldStatus) || isPodGroupConditionsUpdated(newCondition, oldCondition) + + newStatus.Conditions = newCondition + oldStatus.Conditions = oldCondition + + return shouldUpdate +} + +// updateJob update specified job +func (ju *jobUpdater) updateJob(index int) { + job := ju.jobQueue[index] + ssn := ju.ssn + + // If job is using PDB, ignore it. + // TODO(k82cn): remove it when removing PDB support + if job.PodGroup == nil { + ssn.cache.RecordJobStatusEvent(job) + return + } + + job.PodGroup.Status = jobStatus(ssn, job) + oldStatus, found := ssn.podGroupStatus[job.UID] + updatePG := !found || isPodGroupStatusUpdated(&job.PodGroup.Status, oldStatus) + + if _, err := ssn.cache.UpdateJobStatus(job, updatePG); err != nil { + glog.Errorf("Failed to update job <%s/%s>: %v", + job.Namespace, job.Name, err) + } +} diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index 27d4ca2a61..313e691827 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -39,6 +39,8 @@ type Session struct { cache cache.Cache + podGroupStatus map[api.JobID]*v1alpha1.PodGroupStatus + Jobs map[api.JobID]*api.JobInfo Nodes map[string]*api.NodeInfo Queues map[api.QueueID]*api.QueueInfo @@ -68,6 +70,8 @@ func openSession(cache cache.Cache) *Session { UID: uuid.NewUUID(), cache: cache, + podGroupStatus: map[api.JobID]*v1alpha1.PodGroupStatus{}, + Jobs: map[api.JobID]*api.JobInfo{}, Nodes: map[string]*api.NodeInfo{}, Queues: map[api.QueueID]*api.QueueInfo{}, @@ -93,6 +97,11 @@ func openSession(cache cache.Cache) *Session { ssn.Jobs = snapshot.Jobs for _, job := range ssn.Jobs { + // only conditions will be updated periodically + if job.PodGroup != nil && job.PodGroup.Status.Conditions != nil { + ssn.podGroupStatus[job.UID] = job.PodGroup.Status.DeepCopy() + } + if vjr := ssn.JobValid(job); vjr != nil { if !vjr.Pass { jc := &v1alpha1.PodGroupCondition{ @@ -123,20 +132,8 @@ func openSession(cache cache.Cache) *Session { } func closeSession(ssn *Session) { - for _, job := range ssn.Jobs { - // If job is using PDB, ignore it. - // TODO(k82cn): remove it when removing PDB support - if job.PodGroup == nil { - ssn.cache.RecordJobStatusEvent(job) - continue - } - - job.PodGroup.Status = jobStatus(ssn, job) - if _, err := ssn.cache.UpdateJobStatus(job); err != nil { - glog.Errorf("Failed to update job <%s/%s>: %v", - job.Namespace, job.Name, err) - } - } + ju := newJobUpdater(ssn) + ju.UpdateAll() ssn.Jobs = nil ssn.Nodes = nil diff --git a/pkg/scheduler/plugins/nodeorder/nodeorder.go b/pkg/scheduler/plugins/nodeorder/nodeorder.go index 4441d33119..7bff35ce4f 100644 --- a/pkg/scheduler/plugins/nodeorder/nodeorder.go +++ b/pkg/scheduler/plugins/nodeorder/nodeorder.go @@ -56,19 +56,6 @@ func getInterPodAffinityScore(name string, interPodAffinityScore schedulerapi.Ho return 0 } -func generateNodeMapAndSlice(nodes map[string]*api.NodeInfo) (map[string]*cache.NodeInfo, []*v1.Node) { - var nodeMap map[string]*cache.NodeInfo - var nodeSlice []*v1.Node - nodeMap = make(map[string]*cache.NodeInfo) - for _, node := range nodes { - nodeInfo := cache.NewNodeInfo(node.Pods()...) - nodeInfo.SetNode(node.Node) - nodeMap[node.Name] = nodeInfo - nodeSlice = append(nodeSlice, node.Node) - } - return nodeMap, nodeSlice -} - type cachedNodeInfo struct { session *framework.Session } @@ -153,30 +140,60 @@ func calculateWeight(args framework.Arguments) priorityWeight { } func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { - nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) { + var nodeMap map[string]*cache.NodeInfo + var nodeSlice []*v1.Node - weight := calculateWeight(pp.pluginArguments) + weight := calculateWeight(pp.pluginArguments) - pl := &util.PodLister{ - Session: ssn, - } + pl := util.NewPodLister(ssn) - nl := &util.NodeLister{ - Session: ssn, - } + nl := &util.NodeLister{ + Session: ssn, + } - cn := &cachedNodeInfo{ - session: ssn, - } + cn := &cachedNodeInfo{ + session: ssn, + } - var nodeMap map[string]*cache.NodeInfo - var nodeSlice []*v1.Node - var interPodAffinityScore schedulerapi.HostPriorityList + nodeMap, nodeSlice = util.GenerateNodeMapAndSlice(ssn.Nodes) + + // Register event handlers to update task info in PodLister & nodeMap + ssn.AddEventHandler(&framework.EventHandler{ + AllocateFunc: func(event *framework.Event) { + pod := pl.UpdateTask(event.Task, event.Task.NodeName) + + nodeName := event.Task.NodeName + node, found := nodeMap[nodeName] + if !found { + glog.Warningf("node order, update pod %s/%s allocate to NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName) + } else { + node.AddPod(pod) + glog.V(4).Infof("node order, update pod %s/%s allocate to node [%s]", pod.Namespace, pod.Name, nodeName) + } + }, + DeallocateFunc: func(event *framework.Event) { + pod := pl.UpdateTask(event.Task, "") + + nodeName := event.Task.NodeName + node, found := nodeMap[nodeName] + if !found { + glog.Warningf("node order, update pod %s/%s allocate from NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName) + } else { + node.RemovePod(pod) + glog.V(4).Infof("node order, update pod %s/%s deallocate from node [%s]", pod.Namespace, pod.Name, nodeName) + } + }, + }) - nodeMap, nodeSlice = generateNodeMapAndSlice(ssn.Nodes) + nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) { + var interPodAffinityScore schedulerapi.HostPriorityList - nodeInfo := cache.NewNodeInfo(node.Pods()...) - nodeInfo.SetNode(node.Node) + nodeInfo, found := nodeMap[node.Name] + if !found { + nodeInfo = cache.NewNodeInfo(node.Pods()...) + nodeInfo.SetNode(node.Node) + glog.Warningf("node order, generate node info for %s at NodeOrderFn is unexpected", node.Name) + } var score = 0.0 //TODO: Add ImageLocalityPriority Function once priorityMetadata is published diff --git a/pkg/scheduler/plugins/predicates/predicates.go b/pkg/scheduler/plugins/predicates/predicates.go index fa307bc3a1..cc720f3661 100644 --- a/pkg/scheduler/plugins/predicates/predicates.go +++ b/pkg/scheduler/plugins/predicates/predicates.go @@ -111,9 +111,39 @@ func enablePredicate(args framework.Arguments) predicateEnable { } func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { - pl := &util.PodLister{ - Session: ssn, - } + var nodeMap map[string]*cache.NodeInfo + + pl := util.NewPodLister(ssn) + + nodeMap, _ = util.GenerateNodeMapAndSlice(ssn.Nodes) + + // Register event handlers to update task info in PodLister & nodeMap + ssn.AddEventHandler(&framework.EventHandler{ + AllocateFunc: func(event *framework.Event) { + pod := pl.UpdateTask(event.Task, event.Task.NodeName) + + nodeName := event.Task.NodeName + node, found := nodeMap[nodeName] + if !found { + glog.Warningf("predicates, update pod %s/%s allocate to NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName) + } else { + node.AddPod(pod) + glog.V(4).Infof("predicates, update pod %s/%s allocate to node [%s]", pod.Namespace, pod.Name, nodeName) + } + }, + DeallocateFunc: func(event *framework.Event) { + pod := pl.UpdateTask(event.Task, "") + + nodeName := event.Task.NodeName + node, found := nodeMap[nodeName] + if !found { + glog.Warningf("predicates, update pod %s/%s allocate from NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName) + } else { + node.RemovePod(pod) + glog.V(4).Infof("predicates, update pod %s/%s deallocate from node [%s]", pod.Namespace, pod.Name, nodeName) + } + }, + }) ni := &util.CachedNodeInfo{ Session: ssn, @@ -122,8 +152,12 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { predicate := enablePredicate(pp.pluginArguments) ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error { - nodeInfo := cache.NewNodeInfo(node.Pods()...) - nodeInfo.SetNode(node.Node) + nodeInfo, found := nodeMap[node.Name] + if !found { + nodeInfo = cache.NewNodeInfo(node.Pods()...) + nodeInfo.SetNode(node.Node) + glog.Warningf("predicates, generate node info for %s at PredicateFn is unexpected", node.Name) + } if node.Allocatable.MaxTaskNum <= len(nodeInfo.Pods()) { return fmt.Errorf("node <%s> can not allow more task running on it", node.Name) @@ -247,8 +281,14 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { } } + var lister algorithm.PodLister + lister = pl + if !util.HaveAffinity(task.Pod) { + // pod without affinity will be only affected by pod with affinity + lister = pl.AffinityLister() + } // Pod Affinity/Anti-Affinity Predicate - podAffinityPredicate := predicates.NewPodAffinityPredicate(ni, pl) + podAffinityPredicate := predicates.NewPodAffinityPredicate(ni, lister) fit, _, err = podAffinityPredicate(task.Pod, nil, nodeInfo) if err != nil { return err diff --git a/pkg/scheduler/plugins/util/util.go b/pkg/scheduler/plugins/util/util.go index a8935afa23..76705848aa 100644 --- a/pkg/scheduler/plugins/util/util.go +++ b/pkg/scheduler/plugins/util/util.go @@ -19,9 +19,12 @@ package util import ( "fmt" + "github.com/golang/glog" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/kubernetes/pkg/scheduler/algorithm" + "k8s.io/kubernetes/pkg/scheduler/cache" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework" @@ -30,11 +33,33 @@ import ( // PodLister is used in predicate and nodeorder plugin type PodLister struct { Session *framework.Session + + CachedPods map[api.TaskID]*v1.Pod + Tasks map[api.TaskID]*api.TaskInfo + TaskWithAffinity map[api.TaskID]*api.TaskInfo } -// List method is used to list all the pods -func (pl *PodLister) List(selector labels.Selector) ([]*v1.Pod, error) { - var pods []*v1.Pod +type PodAffinityLister struct { + pl *PodLister +} + +func HaveAffinity(pod *v1.Pod) bool { + affinity := pod.Spec.Affinity + return affinity != nil && + (affinity.NodeAffinity != nil || + affinity.PodAffinity != nil || + affinity.PodAntiAffinity != nil) +} + +func NewPodLister(ssn *framework.Session) *PodLister { + pl := &PodLister{ + Session: ssn, + + CachedPods: make(map[api.TaskID]*v1.Pod), + Tasks: make(map[api.TaskID]*api.TaskInfo), + TaskWithAffinity: make(map[api.TaskID]*api.TaskInfo), + } + for _, job := range pl.Session.Jobs { for status, tasks := range job.TaskStatusIndex { if !api.AllocatedStatus(status) { @@ -42,48 +67,130 @@ func (pl *PodLister) List(selector labels.Selector) ([]*v1.Pod, error) { } for _, task := range tasks { - if selector.Matches(labels.Set(task.Pod.Labels)) { - if task.NodeName != task.Pod.Spec.NodeName { - pod := task.Pod.DeepCopy() - pod.Spec.NodeName = task.NodeName - pods = append(pods, pod) - } else { - pods = append(pods, task.Pod) - } + pl.Tasks[task.UID] = task + if HaveAffinity(task.Pod) { + pl.TaskWithAffinity[task.UID] = task } } } } + return pl +} + +func (pl *PodLister) copyTaskPod(task *api.TaskInfo) *v1.Pod { + pod := task.Pod.DeepCopy() + pod.Spec.NodeName = task.NodeName + return pod +} + +// GetPod will get pod with proper nodeName, from cache or DeepCopy +// keeping this function read only to avoid concurrent panic of map +func (pl *PodLister) GetPod(task *api.TaskInfo) *v1.Pod { + if task.NodeName == task.Pod.Spec.NodeName { + return task.Pod + } + + pod, found := pl.CachedPods[task.UID] + if !found { + // we could not write the copied pod back into cache for read only + pod = pl.copyTaskPod(task) + glog.Warningf("DeepCopy for pod %s/%s at PodLister.GetPod is unexpected", pod.Namespace, pod.Name) + } + return pod +} + +// UpdateTask will update the pod nodeName in cache using nodeName +// NOT thread safe, please ensure UpdateTask is the only called function of PodLister at the same time. +func (pl *PodLister) UpdateTask(task *api.TaskInfo, nodeName string) *v1.Pod { + pod, found := pl.CachedPods[task.UID] + if !found { + pod = pl.copyTaskPod(task) + pl.CachedPods[task.UID] = pod + } + pod.Spec.NodeName = nodeName + + if !api.AllocatedStatus(task.Status) { + delete(pl.Tasks, task.UID) + if HaveAffinity(task.Pod) { + delete(pl.TaskWithAffinity, task.UID) + } + } else { + pl.Tasks[task.UID] = task + if HaveAffinity(task.Pod) { + pl.TaskWithAffinity[task.UID] = task + } + } + + return pod +} + +// List method is used to list all the pods +func (pl *PodLister) List(selector labels.Selector) ([]*v1.Pod, error) { + var pods []*v1.Pod + for _, task := range pl.Tasks { + pod := pl.GetPod(task) + if selector.Matches(labels.Set(pod.Labels)) { + pods = append(pods, pod) + } + } + return pods, nil } // FilteredList is used to list all the pods under filter condition -func (pl *PodLister) FilteredList(podFilter algorithm.PodFilter, selector labels.Selector) ([]*v1.Pod, error) { +func (pl *PodLister) filteredListWithTaskSet(taskSet map[api.TaskID]*api.TaskInfo, podFilter algorithm.PodFilter, selector labels.Selector) ([]*v1.Pod, error) { var pods []*v1.Pod - for _, job := range pl.Session.Jobs { - for status, tasks := range job.TaskStatusIndex { - if !api.AllocatedStatus(status) { - continue - } - - for _, task := range tasks { - if podFilter(task.Pod) && selector.Matches(labels.Set(task.Pod.Labels)) { - if task.NodeName != task.Pod.Spec.NodeName { - pod := task.Pod.DeepCopy() - pod.Spec.NodeName = task.NodeName - pods = append(pods, pod) - } else { - pods = append(pods, task.Pod) - } - } - } + for _, task := range taskSet { + pod := pl.GetPod(task) + if podFilter(pod) && selector.Matches(labels.Set(pod.Labels)) { + pods = append(pods, pod) } } return pods, nil } +// FilteredList is used to list all the pods under filter condition +func (pl *PodLister) FilteredList(podFilter algorithm.PodFilter, selector labels.Selector) ([]*v1.Pod, error) { + return pl.filteredListWithTaskSet(pl.Tasks, podFilter, selector) +} + +// AffinityFilteredList is used to list all the pods with affinity under filter condition +func (pl *PodLister) AffinityFilteredList(podFilter algorithm.PodFilter, selector labels.Selector) ([]*v1.Pod, error) { + return pl.filteredListWithTaskSet(pl.TaskWithAffinity, podFilter, selector) +} + +func (pl *PodLister) AffinityLister() *PodAffinityLister { + pal := &PodAffinityLister{ + pl: pl, + } + return pal +} + +// List method is used to list all the pods +func (pal *PodAffinityLister) List(selector labels.Selector) ([]*v1.Pod, error) { + return pal.pl.List(selector) +} + +// FilteredList is used to list all the pods with affinity under filter condition +func (pal *PodAffinityLister) FilteredList(podFilter algorithm.PodFilter, selector labels.Selector) ([]*v1.Pod, error) { + return pal.pl.AffinityFilteredList(podFilter, selector) +} + +func GenerateNodeMapAndSlice(nodes map[string]*api.NodeInfo) (map[string]*cache.NodeInfo, []*v1.Node) { + var nodeMap map[string]*cache.NodeInfo + var nodeSlice []*v1.Node + nodeMap = make(map[string]*cache.NodeInfo) + for _, node := range nodes { + nodeInfo := cache.NewNodeInfo(node.Pods()...) + nodeInfo.SetNode(node.Node) + nodeMap[node.Name] = nodeInfo + nodeSlice = append(nodeSlice, node.Node) + } + return nodeMap, nodeSlice +} + // CachedNodeInfo is used in nodeorder and predicate plugin type CachedNodeInfo struct { Session *framework.Session From 70de67e41ccae7a490659df6551a0c8e4a2fe0b5 Mon Sep 17 00:00:00 2001 From: lminzhw Date: Tue, 28 May 2019 21:46:22 +0800 Subject: [PATCH 2/4] improve performance --- cmd/kube-batch/main.go | 3 + pkg/scheduler/actions/allocate/allocate.go | 2 +- pkg/scheduler/actions/preempt/preempt.go | 2 +- pkg/scheduler/api/types.go | 3 + pkg/scheduler/cache/cache.go | 89 +++++++++++++++----- pkg/scheduler/framework/session.go | 2 + pkg/scheduler/framework/session_plugins.go | 29 +++++++ pkg/scheduler/plugins/nodeorder/nodeorder.go | 26 ++++-- pkg/scheduler/util/scheduler_helper.go | 17 +++- 9 files changed, 139 insertions(+), 34 deletions(-) diff --git a/cmd/kube-batch/main.go b/cmd/kube-batch/main.go index 9b78e4a54d..b0fda7024e 100644 --- a/cmd/kube-batch/main.go +++ b/cmd/kube-batch/main.go @@ -18,6 +18,7 @@ package main import ( "fmt" "os" + "runtime" "time" "github.com/golang/glog" @@ -37,6 +38,8 @@ import ( var logFlushFreq = pflag.Duration("log-flush-frequency", 5*time.Second, "Maximum number of seconds between log flushes") func main() { + runtime.GOMAXPROCS(runtime.NumCPU()) + s := options.NewServerOption() s.AddFlags(pflag.CommandLine) s.RegisterOptions() diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 89a5c51083..c557dd9a07 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -154,7 +154,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { break } - nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) + nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) node := util.SelectBestNode(nodeScores) // Allocate idle resource to the task. diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index 3af0dd674a..f409d0ada2 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -190,7 +190,7 @@ func preempt( predicateNodes := util.PredicateNodes(preemptor, allNodes, ssn.PredicateFn) - nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) + nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) selectedNodes := util.SortNodes(nodeScores) for _, node := range selectedNodes { diff --git a/pkg/scheduler/api/types.go b/pkg/scheduler/api/types.go index a88439e451..71842960fe 100644 --- a/pkg/scheduler/api/types.go +++ b/pkg/scheduler/api/types.go @@ -132,6 +132,9 @@ type EvictableFn func(*TaskInfo, []*TaskInfo) []*TaskInfo // NodeOrderFn is the func declaration used to get priority score for a node for a particular task. type NodeOrderFn func(*TaskInfo, *NodeInfo) (float64, error) +// BatchNodeOrderFn is the func declaration used to get priority score for ALL nodes for a particular task. +type BatchNodeOrderFn func(*TaskInfo, []*NodeInfo) (map[string]float64, error) + // NodeMapFn is the func declaration used to get priority score for a node for a particular task. type NodeMapFn func(*TaskInfo, *NodeInfo) (float64, error) diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 7de275dc35..ea83a1530d 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -147,6 +147,31 @@ type defaultStatusUpdater struct { kbclient *kbver.Clientset } +// following the same logic as podutil.UpdatePodCondition +func podConditionHaveUpdate(status *v1.PodStatus, condition *v1.PodCondition) bool { + lastTransitionTime := metav1.Now() + // Try to find this pod condition. + _, oldCondition := podutil.GetPodCondition(status, condition.Type) + + if oldCondition == nil { + // We are adding new pod condition. + return true + } + // We are updating an existing condition, so we need to check if it has changed. + if condition.Status == oldCondition.Status { + lastTransitionTime = oldCondition.LastTransitionTime + } + + isEqual := condition.Status == oldCondition.Status && + condition.Reason == oldCondition.Reason && + condition.Message == oldCondition.Message && + condition.LastProbeTime.Equal(&oldCondition.LastProbeTime) && + lastTransitionTime.Equal(&oldCondition.LastTransitionTime) + + // Return true if one of the fields have changed. + return !isEqual +} + // UpdatePodCondition will Update pod with podCondition func (su *defaultStatusUpdater) UpdatePodCondition(pod *v1.Pod, condition *v1.PodCondition) (*v1.Pod, error) { glog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s)", pod.Namespace, pod.Name, condition.Type, condition.Status) @@ -460,19 +485,27 @@ func (sc *SchedulerCache) BindVolumes(task *api.TaskInfo) error { // taskUnschedulable updates pod status of pending task func (sc *SchedulerCache) taskUnschedulable(task *api.TaskInfo, message string) error { - pod := task.Pod.DeepCopy() + pod := task.Pod - // The reason field in 'Events' should be "FailedScheduling", there is not constants defined for this in - // k8s core, so using the same string here. - // The reason field in PodCondition should be "Unschedulable" - sc.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", message) - if _, err := sc.StatusUpdater.UpdatePodCondition(pod, &v1.PodCondition{ + condition := &v1.PodCondition{ Type: v1.PodScheduled, Status: v1.ConditionFalse, Reason: v1.PodReasonUnschedulable, Message: message, - }); err != nil { - return err + } + + if podConditionHaveUpdate(&pod.Status, condition) { + pod = pod.DeepCopy() + + // The reason field in 'Events' should be "FailedScheduling", there is not constants defined for this in + // k8s core, so using the same string here. + // The reason field in PodCondition should be "Unschedulable" + sc.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", message) + if _, err := sc.StatusUpdater.UpdatePodCondition(pod, condition); err != nil { + return err + } + } else { + glog.V(4).Infof("task unscheduleable %s/%s, message: %s, skip by no condition update", pod.Namespace, pod.Name, message) } return nil @@ -557,6 +590,30 @@ func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo { snapshot.Queues[value.UID] = value.Clone() } + var cloneJobLock sync.Mutex + var wg sync.WaitGroup + + cloneJob := func(value *api.JobInfo) { + if value.PodGroup != nil { + value.Priority = sc.defaultPriority + + priName := value.PodGroup.Spec.PriorityClassName + if priorityClass, found := sc.PriorityClasses[priName]; found { + value.Priority = priorityClass.Value + } + + glog.V(4).Infof("The priority of job <%s/%s> is <%s/%d>", + value.Namespace, value.Name, priName, value.Priority) + } + + clonedJob := value.Clone() + + cloneJobLock.Lock() + snapshot.Jobs[value.UID] = clonedJob + cloneJobLock.Unlock() + wg.Done() + } + for _, value := range sc.Jobs { // If no scheduling spec, does not handle it. if value.PodGroup == nil && value.PDB == nil { @@ -572,20 +629,10 @@ func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo { continue } - if value.PodGroup != nil { - value.Priority = sc.defaultPriority - - priName := value.PodGroup.Spec.PriorityClassName - if priorityClass, found := sc.PriorityClasses[priName]; found { - value.Priority = priorityClass.Value - } - - glog.V(4).Infof("The priority of job <%s/%s> is <%s/%d>", - value.Namespace, value.Name, priName, value.Priority) - } - - snapshot.Jobs[value.UID] = value.Clone() + wg.Add(1) + go cloneJob(value) } + wg.Wait() glog.V(3).Infof("There are <%d> Jobs, <%d> Queues and <%d> Nodes in total for scheduling.", len(snapshot.Jobs), len(snapshot.Queues), len(snapshot.Nodes)) diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index 313e691827..311a096d28 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -54,6 +54,7 @@ type Session struct { taskOrderFns map[string]api.CompareFn predicateFns map[string]api.PredicateFn nodeOrderFns map[string]api.NodeOrderFn + batchNodeOrderFns map[string]api.BatchNodeOrderFn nodeMapFns map[string]api.NodeMapFn nodeReduceFns map[string]api.NodeReduceFn preemptableFns map[string]api.EvictableFn @@ -82,6 +83,7 @@ func openSession(cache cache.Cache) *Session { taskOrderFns: map[string]api.CompareFn{}, predicateFns: map[string]api.PredicateFn{}, nodeOrderFns: map[string]api.NodeOrderFn{}, + batchNodeOrderFns: map[string]api.BatchNodeOrderFn{}, nodeMapFns: map[string]api.NodeMapFn{}, nodeReduceFns: map[string]api.NodeReduceFn{}, preemptableFns: map[string]api.EvictableFn{}, diff --git a/pkg/scheduler/framework/session_plugins.go b/pkg/scheduler/framework/session_plugins.go index b7421b3d0c..8754c71a51 100644 --- a/pkg/scheduler/framework/session_plugins.go +++ b/pkg/scheduler/framework/session_plugins.go @@ -66,6 +66,11 @@ func (ssn *Session) AddNodeOrderFn(name string, pf api.NodeOrderFn) { ssn.nodeOrderFns[name] = pf } +// AddBatchNodeOrderFn add Batch Node order function +func (ssn *Session) AddBatchNodeOrderFn(name string, pf api.BatchNodeOrderFn) { + ssn.batchNodeOrderFns[name] = pf +} + // AddNodeMapFn add Node map function func (ssn *Session) AddNodeMapFn(name string, pf api.NodeMapFn) { ssn.nodeMapFns[name] = pf @@ -406,6 +411,30 @@ func (ssn *Session) NodeOrderFn(task *api.TaskInfo, node *api.NodeInfo) (float64 return priorityScore, nil } +// BatchNodeOrderFn invoke node order function of the plugins +func (ssn *Session) BatchNodeOrderFn(task *api.TaskInfo, nodes []*api.NodeInfo) (map[string]float64, error) { + priorityScore := make(map[string]float64, len(nodes)) + for _, tier := range ssn.Tiers { + for _, plugin := range tier.Plugins { + if !isEnabled(plugin.EnabledNodeOrder) { + continue + } + pfn, found := ssn.batchNodeOrderFns[plugin.Name] + if !found { + continue + } + score, err := pfn(task, nodes) + if err != nil { + return nil, err + } + for nodeName, score := range score { + priorityScore[nodeName] += score + } + } + } + return priorityScore, nil +} + func isEnabled(enabled *bool) bool { return enabled != nil && *enabled } diff --git a/pkg/scheduler/plugins/nodeorder/nodeorder.go b/pkg/scheduler/plugins/nodeorder/nodeorder.go index 7bff35ce4f..30a9833529 100644 --- a/pkg/scheduler/plugins/nodeorder/nodeorder.go +++ b/pkg/scheduler/plugins/nodeorder/nodeorder.go @@ -186,8 +186,6 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { }) nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) { - var interPodAffinityScore schedulerapi.HostPriorityList - nodeInfo, found := nodeMap[node.Name] if !found { nodeInfo = cache.NewNodeInfo(node.Pods()...) @@ -223,20 +221,30 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { // If nodeAffinityWeight in provided, host.Score is multiplied with weight, if not, host.Score is added to total score. score = score + float64(host.Score*weight.nodeAffinityWeight) + glog.V(4).Infof("Total Score for task %s/%s on node %s is: %f", task.Namespace, task.Name, node.Name, score) + return score, nil + } + ssn.AddNodeOrderFn(pp.Name(), nodeOrderFn) + + batchNodeOrderFn := func(task *api.TaskInfo, nodes []*api.NodeInfo) (map[string]float64, error) { + var interPodAffinityScore schedulerapi.HostPriorityList + mapFn := priorities.NewInterPodAffinityPriority(cn, nl, pl, v1.DefaultHardPodAffinitySymmetricWeight) - interPodAffinityScore, err = mapFn(task.Pod, nodeMap, nodeSlice) + interPodAffinityScore, err := mapFn(task.Pod, nodeMap, nodeSlice) if err != nil { glog.Warningf("Calculate Inter Pod Affinity Priority Failed because of Error: %v", err) - return 0, err + return nil, err } - hostScore := getInterPodAffinityScore(node.Name, interPodAffinityScore) - // If podAffinityWeight in provided, host.Score is multiplied with weight, if not, host.Score is added to total score. - score = score + float64(hostScore*weight.podAffinityWeight) - glog.V(4).Infof("Total Score for that node is: %d", score) + score := make(map[string]float64, len(interPodAffinityScore)) + for _, host := range interPodAffinityScore { + score[host.Host] = float64(host.Score) * float64(weight.podAffinityWeight) + } + + glog.V(4).Infof("Batch Total Score for task %s/%s is: %v", task.Namespace, task.Name, score) return score, nil } - ssn.AddNodeOrderFn(pp.Name(), nodeOrderFn) + ssn.AddBatchNodeOrderFn(pp.Name(), batchNodeOrderFn) } func (pp *nodeOrderPlugin) OnSessionClose(ssn *framework.Session) { diff --git a/pkg/scheduler/util/scheduler_helper.go b/pkg/scheduler/util/scheduler_helper.go index 7429dd8a51..95eef11785 100644 --- a/pkg/scheduler/util/scheduler_helper.go +++ b/pkg/scheduler/util/scheduler_helper.go @@ -42,7 +42,7 @@ func PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateF // TODO (k82cn): Enable eCache for performance improvement. if err := fn(task, node); err != nil { - glog.Errorf("Predicates failed for task <%s/%s> on node <%s>: %v", + glog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v", task.Namespace, task.Name, node.Name, err) return } @@ -57,7 +57,7 @@ func PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateF } // PrioritizeNodes returns a map whose key is node's score and value are corresponding nodes -func PrioritizeNodes(task *api.TaskInfo, nodes []*api.NodeInfo, mapFn api.NodeOrderMapFn, reduceFn api.NodeOrderReduceFn) map[float64][]*api.NodeInfo { +func PrioritizeNodes(task *api.TaskInfo, nodes []*api.NodeInfo, batchFn api.BatchNodeOrderFn, mapFn api.NodeOrderMapFn, reduceFn api.NodeOrderReduceFn) map[float64][]*api.NodeInfo { pluginNodeScoreMap := map[string]schedulerapi.HostPriorityList{} nodeOrderScoreMap := map[string]float64{} nodeScores := map[float64][]*api.NodeInfo{} @@ -90,11 +90,21 @@ func PrioritizeNodes(task *api.TaskInfo, nodes []*api.NodeInfo, mapFn api.NodeOr glog.Errorf("Error in Calculating Priority for the node:%v", err) return nodeScores } + + batchNodeScore, err := batchFn(task, nodes) + if err != nil { + glog.Errorf("Error in Calculating batch Priority for the node, err %v", err) + return nodeScores + } + for _, node := range nodes { if score, found := reduceScores[node.Name]; found { if orderScore, ok := nodeOrderScoreMap[node.Name]; ok { score = score + orderScore } + if batchScore, ok := batchNodeScore[node.Name]; ok { + score = score + batchScore + } nodeScores[score] = append(nodeScores[score], node) } else { // If no plugin is applied to this node, the default is 0.0 @@ -102,6 +112,9 @@ func PrioritizeNodes(task *api.TaskInfo, nodes []*api.NodeInfo, mapFn api.NodeOr if orderScore, ok := nodeOrderScoreMap[node.Name]; ok { score = score + orderScore } + if batchScore, ok := batchNodeScore[node.Name]; ok { + score = score + batchScore + } nodeScores[score] = append(nodeScores[score], node) } } From ede35a3c7137e049bef855474650b4b1811ca270 Mon Sep 17 00:00:00 2001 From: lminzhw Date: Mon, 3 Jun 2019 13:57:23 +0800 Subject: [PATCH 3/4] split eventClient from kubeClient --- pkg/scheduler/cache/cache.go | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index ea83a1530d..474b00f5a8 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -209,6 +209,19 @@ func (dvb *defaultVolumeBinder) BindVolumes(task *api.TaskInfo) error { } func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue string) *SchedulerCache { + kubeClient, err := kubernetes.NewForConfig(config) + if err != nil { + panic(fmt.Sprintf("failed init kubeClient, with err: %v", err)) + } + kbClient, err := kbver.NewForConfig(config) + if err != nil { + panic(fmt.Sprintf("failed init kbClient, with err: %v", err)) + } + eventClient, err := kubernetes.NewForConfig(config) + if err != nil { + panic(fmt.Sprintf("failed init eventClient, with err: %v", err)) + } + sc := &SchedulerCache{ Jobs: make(map[kbapi.JobID]*kbapi.JobInfo), Nodes: make(map[string]*kbapi.NodeInfo), @@ -216,15 +229,15 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s PriorityClasses: make(map[string]*v1beta1.PriorityClass), errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), deletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - kubeclient: kubernetes.NewForConfigOrDie(config), - kbclient: kbver.NewForConfigOrDie(config), + kubeclient: kubeClient, + kbclient: kbClient, defaultQueue: defaultQueue, schedulerName: schedulerName, } // Prepare event clients. broadcaster := record.NewBroadcaster() - broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: sc.kubeclient.CoreV1().Events("")}) + broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: eventClient.CoreV1().Events("")}) sc.Recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: schedulerName}) sc.Binder = &defaultBinder{ From 65e615e84989979e170f5b689d9f9bfecfae07a0 Mon Sep 17 00:00:00 2001 From: lminzhw Date: Fri, 28 Jun 2019 09:42:49 +0800 Subject: [PATCH 4/4] fix golint error --- cmd/kube-batch/app/options/options_test.go | 2 ++ cmd/kube-batch/app/server.go | 1 - cmd/kube-batch/main.go | 3 +++ pkg/scheduler/plugins/util/util.go | 5 +++++ 4 files changed, 10 insertions(+), 1 deletion(-) diff --git a/cmd/kube-batch/app/options/options_test.go b/cmd/kube-batch/app/options/options_test.go index a1d51d154a..92c570a204 100644 --- a/cmd/kube-batch/app/options/options_test.go +++ b/cmd/kube-batch/app/options/options_test.go @@ -41,6 +41,8 @@ func TestAddFlags(t *testing.T) { SchedulePeriod: 5 * time.Minute, DefaultQueue: defaultQueue, ListenAddress: defaultListenAddress, + KubeAPIBurst: defaultBurst, + KubeAPIQPS: defaultQPS, } if !reflect.DeepEqual(expected, s) { diff --git a/cmd/kube-batch/app/server.go b/cmd/kube-batch/app/server.go index 600d7fd0a1..a2d7d493f3 100644 --- a/cmd/kube-batch/app/server.go +++ b/cmd/kube-batch/app/server.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "net/http" - _ "net/http/pprof" "os" "time" diff --git a/cmd/kube-batch/main.go b/cmd/kube-batch/main.go index b0fda7024e..cf419ebfb6 100644 --- a/cmd/kube-batch/main.go +++ b/cmd/kube-batch/main.go @@ -21,6 +21,9 @@ import ( "runtime" "time" + // init pprof server + _ "net/http/pprof" + "github.com/golang/glog" "github.com/spf13/pflag" diff --git a/pkg/scheduler/plugins/util/util.go b/pkg/scheduler/plugins/util/util.go index 76705848aa..6d5700a609 100644 --- a/pkg/scheduler/plugins/util/util.go +++ b/pkg/scheduler/plugins/util/util.go @@ -39,10 +39,12 @@ type PodLister struct { TaskWithAffinity map[api.TaskID]*api.TaskInfo } +// PodAffinityLister is used to list pod with affinity type PodAffinityLister struct { pl *PodLister } +// HaveAffinity checks pod have affinity or not func HaveAffinity(pod *v1.Pod) bool { affinity := pod.Spec.Affinity return affinity != nil && @@ -51,6 +53,7 @@ func HaveAffinity(pod *v1.Pod) bool { affinity.PodAntiAffinity != nil) } +// NewPodLister returns a PodLister generate from ssn func NewPodLister(ssn *framework.Session) *PodLister { pl := &PodLister{ Session: ssn, @@ -161,6 +164,7 @@ func (pl *PodLister) AffinityFilteredList(podFilter algorithm.PodFilter, selecto return pl.filteredListWithTaskSet(pl.TaskWithAffinity, podFilter, selector) } +// AffinityLister generate a PodAffinityLister following current PodLister func (pl *PodLister) AffinityLister() *PodAffinityLister { pal := &PodAffinityLister{ pl: pl, @@ -178,6 +182,7 @@ func (pal *PodAffinityLister) FilteredList(podFilter algorithm.PodFilter, select return pal.pl.AffinityFilteredList(podFilter, selector) } +// GenerateNodeMapAndSlice returns the nodeMap and nodeSlice generated from ssn func GenerateNodeMapAndSlice(nodes map[string]*api.NodeInfo) (map[string]*cache.NodeInfo, []*v1.Node) { var nodeMap map[string]*cache.NodeInfo var nodeSlice []*v1.Node