diff --git a/pkg/apis/scheduling/v1alpha1/types.go b/pkg/apis/scheduling/v1alpha1/types.go index e4d0004c08..6670498812 100644 --- a/pkg/apis/scheduling/v1alpha1/types.go +++ b/pkg/apis/scheduling/v1alpha1/types.go @@ -36,6 +36,10 @@ const ( // PodGroupUnknown means part of `spec.minMember` pods are running but the other part can not // be scheduled, e.g. not enough resource; scheduler will wait for related controller to recover it. PodGroupUnknown PodGroupPhase = "Unknown" + + // PodGroupInqueue means controllers can start to create pods, + // is a new state between PodGroupPending and PodGroupRunning + PodGroupInqueue PodGroupPhase = "Inqueue" ) type PodGroupConditionType string @@ -123,6 +127,11 @@ type PodGroupSpec struct { // default. // +optional PriorityClassName string `json:"priorityClassName,omitempty" protobuf:"bytes,3,opt,name=priorityClassName"` + + // MinResources defines the minimal resource of members/tasks to run the pod group; + // if there's not enough resources to start all tasks, the scheduler + // will not start anyone. + MinResources *v1.ResourceList `json:"minResources,omitempty" protobuf:"bytes,4,opt,name=minResources"` } // PodGroupStatus represents the current state of a pod group. diff --git a/pkg/apis/scheduling/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/scheduling/v1alpha1/zz_generated.deepcopy.go index 3bc3811730..d0b60967f9 100644 --- a/pkg/apis/scheduling/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/scheduling/v1alpha1/zz_generated.deepcopy.go @@ -21,6 +21,8 @@ limitations under the License. package v1alpha1 import ( + v1 "k8s.io/api/core/v1" + resource "k8s.io/apimachinery/pkg/api/resource" runtime "k8s.io/apimachinery/pkg/runtime" ) @@ -29,7 +31,7 @@ func (in *PodGroup) DeepCopyInto(out *PodGroup) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) in.Status.DeepCopyInto(&out.Status) return } @@ -105,6 +107,17 @@ func (in *PodGroupList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PodGroupSpec) DeepCopyInto(out *PodGroupSpec) { *out = *in + if in.MinResources != nil { + in, out := &in.MinResources, &out.MinResources + *out = new(v1.ResourceList) + if **in != nil { + in, out := *in, *out + *out = make(map[v1.ResourceName]resource.Quantity, len(*in)) + for key, val := range *in { + (*out)[key] = val.DeepCopy() + } + } + } return } diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index df28d6aaca..973db99cc5 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -21,6 +21,7 @@ import ( "github.com/golang/glog" + "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util" @@ -48,6 +49,10 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { jobsMap := map[api.QueueID]*util.PriorityQueue{} for _, job := range ssn.Jobs { + if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending { + continue + } + if queue, found := ssn.Queues[job.Queue]; found { queues.Push(queue) } else { diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index 920270351d..618c8cedaf 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -19,6 +19,7 @@ package backfill import ( "github.com/golang/glog" + "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework" ) @@ -43,6 +44,10 @@ func (alloc *backfillAction) Execute(ssn *framework.Session) { // TODO (k82cn): When backfill, it's also need to balance between Queues. for _, job := range ssn.Jobs { + if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending { + continue + } + for _, task := range job.TaskStatusIndex[api.Pending] { if task.InitResreq.IsEmpty() { // As task did not request resources, so it only need to meet predicates. diff --git a/pkg/scheduler/actions/enqueue/enqueue.go b/pkg/scheduler/actions/enqueue/enqueue.go new file mode 100644 index 0000000000..70ac372914 --- /dev/null +++ b/pkg/scheduler/actions/enqueue/enqueue.go @@ -0,0 +1,128 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package enqueue + +import ( + "github.com/golang/glog" + + "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework" + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util" +) + +type enqueueAction struct { + ssn *framework.Session +} + +func New() *enqueueAction { + return &enqueueAction{} +} + +func (enqueue *enqueueAction) Name() string { + return "enqueue" +} + +func (enqueue *enqueueAction) Initialize() {} + +func (enqueue *enqueueAction) Execute(ssn *framework.Session) { + glog.V(3).Infof("Enter Enqueue ...") + defer glog.V(3).Infof("Leaving Enqueue ...") + + queues := util.NewPriorityQueue(ssn.QueueOrderFn) + queueMap := map[api.QueueID]*api.QueueInfo{} + + jobsMap := map[api.QueueID]*util.PriorityQueue{} + + for _, job := range ssn.Jobs { + if queue, found := ssn.Queues[job.Queue]; !found { + glog.Errorf("Failed to find Queue <%s> for Job <%s/%s>", + job.Queue, job.Namespace, job.Name) + continue + } else { + if _, existed := queueMap[queue.UID]; !existed { + glog.V(3).Infof("Added Queue <%s> for Job <%s/%s>", + queue.Name, job.Namespace, job.Name) + + queueMap[queue.UID] = queue + queues.Push(queue) + } + } + + if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending { + if _, found := jobsMap[job.Queue]; !found { + jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn) + } + glog.V(3).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue) + jobsMap[job.Queue].Push(job) + } + } + + glog.V(3).Infof("Try to enqueue PodGroup to %d Queues", len(jobsMap)) + + emptyRes := api.EmptyResource() + nodesIdleRes := api.EmptyResource() + for _, node := range ssn.Nodes { + nodesIdleRes.Add(node.Allocatable.Clone().Multi(1.2).Sub(node.Used)) + } + + for { + if queues.Empty() { + break + } + + if nodesIdleRes.Less(emptyRes) { + glog.V(3).Infof("Node idle resource <%s> is overused, ignore it.") + break + } + + queue := queues.Pop().(*api.QueueInfo) + + // Found "high" priority job + jobs, found := jobsMap[queue.UID] + if !found || jobs.Empty() { + continue + } + job := jobs.Pop().(*api.JobInfo) + + inqueue := false + if len(job.TaskStatusIndex[api.Pending]) != 0 { + inqueue = true + } else { + if job.PodGroup.Spec.MinResources == nil { + inqueue = true + } else { + pgResource := api.NewResource(*job.PodGroup.Spec.MinResources) + + if pgResource.LessEqual(nodesIdleRes) { + nodesIdleRes.Sub(pgResource) + inqueue = true + } + } + } + + if inqueue { + job.PodGroup.Status.Phase = v1alpha1.PodGroupInqueue + ssn.Jobs[job.UID] = job + } + + // Added Queue back until no job in Queue. + queues.Push(queue) + } +} + +func (enqueue *enqueueAction) UnInitialize() {} diff --git a/pkg/scheduler/actions/factory.go b/pkg/scheduler/actions/factory.go index 827533b226..9f5cd6ad79 100644 --- a/pkg/scheduler/actions/factory.go +++ b/pkg/scheduler/actions/factory.go @@ -21,6 +21,7 @@ import ( "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/allocate" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/backfill" + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/enqueue" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/preempt" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/reclaim" ) @@ -30,4 +31,5 @@ func init() { framework.RegisterAction(allocate.New()) framework.RegisterAction(backfill.New()) framework.RegisterAction(preempt.New()) + framework.RegisterAction(enqueue.New()) } diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index f76571dcd4..435696f228 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -21,6 +21,7 @@ import ( "github.com/golang/glog" + "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics" @@ -52,6 +53,10 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) { queues := map[api.QueueID]*api.QueueInfo{} for _, job := range ssn.Jobs { + if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending { + continue + } + if queue, found := ssn.Queues[job.Queue]; !found { continue } else if _, existed := queues[queue.UID]; !existed { diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index 83ae3a1af8..520e8264a8 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -19,6 +19,7 @@ package reclaim import ( "github.com/golang/glog" + "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util" @@ -53,6 +54,10 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) { var underRequest []*api.JobInfo for _, job := range ssn.Jobs { + if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending { + continue + } + if queue, found := ssn.Queues[job.Queue]; !found { glog.Errorf("Failed to find Queue <%s> for Job <%s/%s>", job.Queue, job.Namespace, job.Name) diff --git a/pkg/scheduler/api/resource_info.go b/pkg/scheduler/api/resource_info.go index 6c3caab742..582d955576 100644 --- a/pkg/scheduler/api/resource_info.go +++ b/pkg/scheduler/api/resource_info.go @@ -20,6 +20,8 @@ import ( "fmt" "math" + "k8s.io/apimachinery/pkg/api/resource" + v1 "k8s.io/api/core/v1" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" ) @@ -324,3 +326,14 @@ func (r *Resource) SetScalar(name v1.ResourceName, quantity float64) { } r.ScalarResources[name] = quantity } + +func (r *Resource) Convert2K8sResource() *v1.ResourceList { + list := v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(int64(r.MilliCPU), resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(int64(r.Memory), resource.BinarySI), + } + for name, value := range r.ScalarResources { + list[name] = *resource.NewQuantity(int64(value), resource.BinarySI) + } + return &list +} diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index c1b4d049b3..e6ac75ddea 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -171,7 +171,7 @@ func jobStatus(ssn *Session, jobInfo *api.JobInfo) v1alpha1.PodGroupStatus { // If there're enough allocated resource, it's running if int32(allocated) > jobInfo.PodGroup.Spec.MinMember { status.Phase = v1alpha1.PodGroupRunning - } else { + } else if jobInfo.PodGroup.Status.Phase != v1alpha1.PodGroupInqueue { status.Phase = v1alpha1.PodGroupPending } }