Skip to content

Commit

Permalink
Merge pull request volcano-sh#331 from k82cn/ka_328_3
Browse files Browse the repository at this point in the history
Do preemption within Queue.
  • Loading branch information
k82cn authored Aug 14, 2018
2 parents ccee1d5 + 0045067 commit 5d69b95
Showing 1 changed file with 106 additions and 94 deletions.
200 changes: 106 additions & 94 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,19 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {
return !ssn.TaskOrderFn(l, r)
}

preemptors := util.NewPriorityQueue(ssn.JobOrderFn)
preemptees := util.NewPriorityQueue(jobRevOrderFn)
preemptorsMap := map[api.QueueID]*util.PriorityQueue{}
preempteesMap := map[api.QueueID]*util.PriorityQueue{}

preemptorTasks := map[api.JobID]*util.PriorityQueue{}
preempteeTasks := map[api.JobID]*util.PriorityQueue{}

var underRequest []*api.JobInfo
for _, job := range ssn.Jobs {
if len(job.TaskStatusIndex[api.Pending]) != 0 {
preemptors.Push(job)
if _, found := preemptorsMap[job.Queue]; !found {
preemptorsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
}
preemptorsMap[job.Queue].Push(job)
underRequest = append(underRequest, job)
preemptorTasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn)
for _, task := range job.TaskStatusIndex[api.Pending] {
Expand All @@ -69,7 +72,11 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {

// If no running tasks in job, skip it as preemptee.
if len(job.TaskStatusIndex[api.Running]) != 0 {
preemptees.Push(job)
if _, found := preempteesMap[job.Queue]; !found {
preempteesMap[job.Queue] = util.NewPriorityQueue(jobRevOrderFn)
}
preempteesMap[job.Queue].Push(job)

preempteeTasks[job.UID] = util.NewPriorityQueue(taskRevOrderFn)
// TODO (k82cn): it's better to also includes Binding/Bound tasks.
for _, task := range job.TaskStatusIndex[api.Running] {
Expand All @@ -81,124 +88,129 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {
}
}

// Preemption between Jobs.
for {
// If no preemptors nor preemptees, no preemption.
if preemptors.Empty() || preemptees.Empty() {
glog.V(3).Infof("No preemptors nor preemptees, break.")
break
}

preemptorJob := preemptors.Pop().(*api.JobInfo)

// If not preemptor tasks, next job.
if preemptorTasks[preemptorJob.UID].Empty() {
glog.V(3).Infof("No preemptor task in job <%s/%s>.",
preemptorJob.Namespace, preemptorJob.Name)
continue
}

// Find the preemptee job:
// - More than one running tasks
// - Different job
var preempteeJob *api.JobInfo
for !preemptees.Empty() {
preempteeJob = preemptees.Pop().(*api.JobInfo)
// Preemption between Jobs within Queue.
for _, queue := range ssn.Queues {
for {
preemptors := preemptorsMap[queue.UID]
preemptees := preempteesMap[queue.UID]

// If found itself, then no preemptees anymore.
if preemptorJob.UID == preempteeJob.UID {
glog.V(3).Infof("Can not preempt itself <%s/%s>.",
preemptorJob.Namespace, preemptorJob.Name)
preempteeJob = nil
// If no preemptors nor preemptees, no preemption.
if (preemptors == nil || preemptors.Empty()) ||
(preemptees == nil || preemptees.Empty()) {
glog.V(3).Infof("No preemptors nor preemptees in Queue <%s>, break.", queue.Name)
break
}

if preempteeTasks[preempteeJob.UID].Empty() {
glog.V(3).Infof("No preemptable tasks in job <%s/%s>, next",
preempteeJob.Namespace, preempteeJob.Name)
preemptorJob := preemptors.Pop().(*api.JobInfo)

preempteeJob = nil
// If not preemptor tasks, next job.
if preemptorTasks[preemptorJob.UID].Empty() {
glog.V(3).Infof("No preemptor task in job <%s/%s>.",
preemptorJob.Namespace, preemptorJob.Name)
continue
}

// If found a preemptee job, break
break
}

if preempteeJob == nil {
glog.V(3).Infof("Can not found preemptee job for %v/%v",
preemptorJob.Namespace, preemptorJob.Name)
// Find the preemptee job:
// - More than one running tasks
// - Different job
var preempteeJob *api.JobInfo
for !preemptees.Empty() {
preempteeJob = preemptees.Pop().(*api.JobInfo)

// If found itself, then no preemptees anymore.
if preemptorJob.UID == preempteeJob.UID {
glog.V(3).Infof("Can not preempt itself <%s/%s>.",
preemptorJob.Namespace, preemptorJob.Name)
preempteeJob = nil
break
}

break
}
if preempteeTasks[preempteeJob.UID].Empty() {
glog.V(3).Infof("No preemptable tasks in job <%s/%s>, next",
preempteeJob.Namespace, preempteeJob.Name)

glog.V(3).Infof("The preemptor is %v/%v, the preemptee is %v/%v",
preemptorJob.Namespace, preemptorJob.Name,
preempteeJob.Namespace, preempteeJob.Name)
preempteeJob = nil
continue
}

preemptor := preemptorTasks[preemptorJob.UID].Pop().(*api.TaskInfo)
preemptee := preempteeTasks[preempteeJob.UID].Pop().(*api.TaskInfo)
// If found a preemptee job, break
break
}

preempted := false
if preempteeJob == nil {
glog.V(3).Infof("Can not found preemptee job for %v/%v",
preemptorJob.Namespace, preemptorJob.Name)

if ssn.Preemptable(preemptor, preemptee) {
if err := ssn.Preempt(preemptor, preemptee); err != nil {
glog.Errorf("Failed to evict task %v/%v for task %v/%v: %v",
preemptee.Namespace, preemptee.Name,
preemptor.Namespace, preemptor.Name, err)
} else {
preempted = true
break
}
} else {
glog.V(3).Infof("Can not preempt task <%v/%v> for task <%v/%v>",
preemptee.Namespace, preemptee.Name,
preemptor.Namespace, preemptor.Name)
}

// If preempted resource, put it back to the queue.
if preempted {
preemptors.Push(preemptorJob)
} else {
// If the preemptee is not preempted, push it back for other to preempt.
preempteeTasks[preempteeJob.UID].Push(preemptee)
}
glog.V(3).Infof("The preemptor is %v/%v, the preemptee is %v/%v",
preemptorJob.Namespace, preemptorJob.Name,
preempteeJob.Namespace, preempteeJob.Name)

preemptees.Push(preempteeJob)
}
preemptor := preemptorTasks[preemptorJob.UID].Pop().(*api.TaskInfo)
preemptee := preempteeTasks[preempteeJob.UID].Pop().(*api.TaskInfo)

// Preemption between Task within Job.
for _, job := range underRequest {
for {
if _, found := preempteeTasks[job.UID]; !found {
break
}
preempted := false

if _, found := preemptorTasks[job.UID]; !found {
break
if ssn.Preemptable(preemptor, preemptee) {
if err := ssn.Preempt(preemptor, preemptee); err != nil {
glog.Errorf("Failed to evict task %v/%v for task %v/%v: %v",
preemptee.Namespace, preemptee.Name,
preemptor.Namespace, preemptor.Name, err)
} else {
preempted = true
}
} else {
glog.V(3).Infof("Can not preempt task <%v/%v> for task <%v/%v>",
preemptee.Namespace, preemptee.Name,
preemptor.Namespace, preemptor.Name)
}

if preemptorTasks[job.UID].Empty() || preempteeTasks[job.UID].Empty() {
break
// If preempted resource, put it back to the queue.
if preempted {
preemptors.Push(preemptorJob)
} else {
// If the preemptee is not preempted, push it back for other to preempt.
preempteeTasks[preempteeJob.UID].Push(preemptee)
}

preemptor := preemptorTasks[job.UID].Pop().(*api.TaskInfo)
preemptee := preempteeTasks[job.UID].Pop().(*api.TaskInfo)
preemptees.Push(preempteeJob)
}

if ssn.TaskCompareFns(preemptor, preemptee) < 0 {
if err := ssn.Preempt(preemptor, preemptee); err != nil {
glog.Errorf("Failed to rebalance tasks in job <%v/%v>: %v",
job.Namespace, job.Name, err)
// Preemption between Task within Job.
for _, job := range underRequest {
for {
if _, found := preempteeTasks[job.UID]; !found {
break
}
// If preempted, continue to check next pair.
continue
}

// If no preemption, next job.
break
if _, found := preemptorTasks[job.UID]; !found {
break
}

if preemptorTasks[job.UID].Empty() || preempteeTasks[job.UID].Empty() {
break
}

preemptor := preemptorTasks[job.UID].Pop().(*api.TaskInfo)
preemptee := preempteeTasks[job.UID].Pop().(*api.TaskInfo)

if ssn.TaskCompareFns(preemptor, preemptee) < 0 {
if err := ssn.Preempt(preemptor, preemptee); err != nil {
glog.Errorf("Failed to rebalance tasks in job <%v/%v>: %v",
job.Namespace, job.Name, err)
break
}
// If preempted, continue to check next pair.
continue
}

// If no preemption, next job.
break
}
}
}

}

func (alloc *preemptAction) UnInitialize() {}

0 comments on commit 5d69b95

Please sign in to comment.