Skip to content

Commit

Permalink
Merge pull request #3164 from lowang-bh/cherry-pick
Browse files Browse the repository at this point in the history
[cherry-pick for release-1.8]backfill add score process
  • Loading branch information
volcano-sh-bot authored Nov 15, 2023
2 parents 89148fe + 26a1747 commit ee8724a
Showing 1 changed file with 39 additions and 29 deletions.
68 changes: 39 additions & 29 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ func (backfill *Action) Execute(ssn *framework.Session) {
klog.V(5).Infof("Enter Backfill ...")
defer klog.V(5).Infof("Leaving Backfill ...")

predicatFunc := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, node)
if err != nil {
return nil, err
}

// predicateHelper.PredicateNodes will print the log if predicate failed, so don't print log anymore here
if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() {
err := fmt.Errorf(statusSets.Message()) // should not include variables in api node errors
return nil, err
}
return nil, nil
}

// TODO (k82cn): When backfill, it's also need to balance between Queues.
for _, job := range ssn.Jobs {
if job.IsPending() {
Expand All @@ -55,53 +70,48 @@ func (backfill *Action) Execute(ssn *framework.Session) {
continue
}

ph := util.NewPredicateHelper()

for _, task := range job.TaskStatusIndex[api.Pending] {
if task.InitResreq.IsEmpty() {
allocated := false
fe := api.NewFitErrors()

if err := ssn.PrePredicateFn(task); err != nil {
klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err)
klog.V(3).Infof("PrePredicate for task %s/%s failed in backfill for: %v", task.Namespace, task.Name, err)
for _, ni := range ssn.Nodes {
fe.SetNodeError(ni.Name, err)
}
job.NodesFitErrors[task.UID] = fe
break
}

// As task did not request resources, so it only need to meet predicates.
// TODO (k82cn): need to prioritize nodes to avoid pod hole.
for _, node := range ssn.Nodes {
// TODO (k82cn): predicates did not consider pod number for now, there'll
// be ping-pong case here.
// Only nodes whose status is success after predicate filtering can be scheduled.
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, node)
if err != nil {
fe.SetNodeError(node.Name, err)
continue
}

if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() ||
statusSets.ContainsErrorSkipOrWait() {
err := fmt.Errorf("%s", statusSets.Message())
fe.SetNodeError(node.Name, err)
continue
}
predicateNodes, fitErrors := ph.PredicateNodes(task, ssn.NodeList, predicatFunc, true)
if len(predicateNodes) == 0 {
job.NodesFitErrors[task.UID] = fitErrors
break
}

klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := ssn.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID)
fe.SetNodeError(node.Name, err)
continue
node := predicateNodes[0]
if len(predicateNodes) > 1 {
nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
node = ssn.BestNodeFn(task, nodeScores)
if node == nil {
node = util.SelectBestNode(nodeScores)
}
}

metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
allocated = true
break
klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := ssn.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID)
fe.SetNodeError(node.Name, err)
continue
}

metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
allocated = true

if !allocated {
job.NodesFitErrors[task.UID] = fe
}
Expand Down

0 comments on commit ee8724a

Please sign in to comment.