Skip to content

Commit

Permalink
Merge pull request volcano-sh#2 from volcano-sh/volcano-master
Browse files Browse the repository at this point in the history
Added enqueue action
  • Loading branch information
k82cn authored Apr 29, 2019
2 parents 5e9977d + b880cec commit 2e229b9
Show file tree
Hide file tree
Showing 12 changed files with 190 additions and 3 deletions.
2 changes: 1 addition & 1 deletion config/kube-batch-conf.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
actions: "reclaim, allocate, backfill, preempt"
actions: "enqueue, reclaim, allocate, backfill, preempt"
tiers:
- plugins:
- name: priority
Expand Down
2 changes: 2 additions & 0 deletions hack/.golint_failures
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ pkg/apis/scheduling/v1alpha1
pkg/apis/utils
pkg/scheduler/actions/allocate
pkg/scheduler/actions/backfill
pkg/scheduler/actions/enqueue
pkg/scheduler/actions/preempt
pkg/scheduler/actions/reclaim
pkg/scheduler/api
test/e2e
9 changes: 9 additions & 0 deletions pkg/apis/scheduling/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ const (
// PodGroupUnknown means part of `spec.minMember` pods are running but the other part can not
// be scheduled, e.g. not enough resource; scheduler will wait for related controller to recover it.
PodGroupUnknown PodGroupPhase = "Unknown"

// PodGroupInqueue means controllers can start to create pods,
// is a new state between PodGroupPending and PodGroupRunning
PodGroupInqueue PodGroupPhase = "Inqueue"
)

type PodGroupConditionType string
Expand Down Expand Up @@ -123,6 +127,11 @@ type PodGroupSpec struct {
// default.
// +optional
PriorityClassName string `json:"priorityClassName,omitempty" protobuf:"bytes,3,opt,name=priorityClassName"`

// MinResources defines the minimal resource of members/tasks to run the pod group;
// if there's not enough resources to start all tasks, the scheduler
// will not start anyone.
MinResources *v1.ResourceList `json:"minResources,omitempty" protobuf:"bytes,4,opt,name=minResources"`
}

// PodGroupStatus represents the current state of a pod group.
Expand Down
15 changes: 14 additions & 1 deletion pkg/apis/scheduling/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/golang/glog"

"github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util"
Expand Down Expand Up @@ -48,6 +49,10 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
jobsMap := map[api.QueueID]*util.PriorityQueue{}

for _, job := range ssn.Jobs {
if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending {
continue
}

if queue, found := ssn.Queues[job.Queue]; found {
queues.Push(queue)
} else {
Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package backfill
import (
"github.com/golang/glog"

"github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
)
Expand All @@ -43,6 +44,10 @@ func (alloc *backfillAction) Execute(ssn *framework.Session) {

// TODO (k82cn): When backfill, it's also need to balance between Queues.
for _, job := range ssn.Jobs {
if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending {
continue
}

for _, task := range job.TaskStatusIndex[api.Pending] {
if task.InitResreq.IsEmpty() {
// As task did not request resources, so it only need to meet predicates.
Expand Down
128 changes: 128 additions & 0 deletions pkg/scheduler/actions/enqueue/enqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package enqueue

import (
"github.com/golang/glog"

"github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util"
)

type enqueueAction struct {
ssn *framework.Session
}

func New() *enqueueAction {
return &enqueueAction{}
}

func (enqueue *enqueueAction) Name() string {
return "enqueue"
}

func (enqueue *enqueueAction) Initialize() {}

func (enqueue *enqueueAction) Execute(ssn *framework.Session) {
glog.V(3).Infof("Enter Enqueue ...")
defer glog.V(3).Infof("Leaving Enqueue ...")

queues := util.NewPriorityQueue(ssn.QueueOrderFn)
queueMap := map[api.QueueID]*api.QueueInfo{}

jobsMap := map[api.QueueID]*util.PriorityQueue{}

for _, job := range ssn.Jobs {
if queue, found := ssn.Queues[job.Queue]; !found {
glog.Errorf("Failed to find Queue <%s> for Job <%s/%s>",
job.Queue, job.Namespace, job.Name)
continue
} else {
if _, existed := queueMap[queue.UID]; !existed {
glog.V(3).Infof("Added Queue <%s> for Job <%s/%s>",
queue.Name, job.Namespace, job.Name)

queueMap[queue.UID] = queue
queues.Push(queue)
}
}

if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending {
if _, found := jobsMap[job.Queue]; !found {
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
}
glog.V(3).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
jobsMap[job.Queue].Push(job)
}
}

glog.V(3).Infof("Try to enqueue PodGroup to %d Queues", len(jobsMap))

emptyRes := api.EmptyResource()
nodesIdleRes := api.EmptyResource()
for _, node := range ssn.Nodes {
nodesIdleRes.Add(node.Allocatable.Clone().Multi(1.2).Sub(node.Used))
}

for {
if queues.Empty() {
break
}

if nodesIdleRes.Less(emptyRes) {
glog.V(3).Infof("Node idle resource <%s> is overused, ignore it.")
break
}

queue := queues.Pop().(*api.QueueInfo)

// Found "high" priority job
jobs, found := jobsMap[queue.UID]
if !found || jobs.Empty() {
continue
}
job := jobs.Pop().(*api.JobInfo)

inqueue := false
if len(job.TaskStatusIndex[api.Pending]) != 0 {
inqueue = true
} else {
if job.PodGroup.Spec.MinResources == nil {
inqueue = true
} else {
pgResource := api.NewResource(*job.PodGroup.Spec.MinResources)

if pgResource.LessEqual(nodesIdleRes) {
nodesIdleRes.Sub(pgResource)
inqueue = true
}
}
}

if inqueue {
job.PodGroup.Status.Phase = v1alpha1.PodGroupInqueue
ssn.Jobs[job.UID] = job
}

// Added Queue back until no job in Queue.
queues.Push(queue)
}
}

func (enqueue *enqueueAction) UnInitialize() {}
2 changes: 2 additions & 0 deletions pkg/scheduler/actions/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/allocate"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/backfill"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/enqueue"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/preempt"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/reclaim"
)
Expand All @@ -30,4 +31,5 @@ func init() {
framework.RegisterAction(allocate.New())
framework.RegisterAction(backfill.New())
framework.RegisterAction(preempt.New())
framework.RegisterAction(enqueue.New())
}
5 changes: 5 additions & 0 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/golang/glog"

"github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics"
Expand Down Expand Up @@ -52,6 +53,10 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {
queues := map[api.QueueID]*api.QueueInfo{}

for _, job := range ssn.Jobs {
if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending {
continue
}

if queue, found := ssn.Queues[job.Queue]; !found {
continue
} else if _, existed := queues[queue.UID]; !existed {
Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package reclaim
import (
"github.com/golang/glog"

"github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util"
Expand Down Expand Up @@ -53,6 +54,10 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) {

var underRequest []*api.JobInfo
for _, job := range ssn.Jobs {
if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending {
continue
}

if queue, found := ssn.Queues[job.Queue]; !found {
glog.Errorf("Failed to find Queue <%s> for Job <%s/%s>",
job.Queue, job.Namespace, job.Name)
Expand Down
13 changes: 13 additions & 0 deletions pkg/scheduler/api/resource_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"fmt"
"math"

"k8s.io/apimachinery/pkg/api/resource"

v1 "k8s.io/api/core/v1"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
)
Expand Down Expand Up @@ -324,3 +326,14 @@ func (r *Resource) SetScalar(name v1.ResourceName, quantity float64) {
}
r.ScalarResources[name] = quantity
}

func (r *Resource) Convert2K8sResource() *v1.ResourceList {
list := v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(int64(r.MilliCPU), resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(int64(r.Memory), resource.BinarySI),
}
for name, value := range r.ScalarResources {
list[name] = *resource.NewQuantity(int64(value), resource.BinarySI)
}
return &list
}
2 changes: 1 addition & 1 deletion pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func jobStatus(ssn *Session, jobInfo *api.JobInfo) v1alpha1.PodGroupStatus {
// If there're enough allocated resource, it's running
if int32(allocated) > jobInfo.PodGroup.Spec.MinMember {
status.Phase = v1alpha1.PodGroupRunning
} else {
} else if jobInfo.PodGroup.Status.Phase != v1alpha1.PodGroupInqueue {
status.Phase = v1alpha1.PodGroupPending
}
}
Expand Down

0 comments on commit 2e229b9

Please sign in to comment.