Skip to content

Commit

Permalink
Merge pull request volcano-sh#34 from lminzhw/scheduler_detailed_event
Browse files Browse the repository at this point in the history
support scheduler detailed event
  • Loading branch information
volcano-sh-bot authored Jun 28, 2019
2 parents a92572f + 8651f7a commit 31b04ec
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 60 deletions.
8 changes: 3 additions & 5 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package allocate

import (
"fmt"

"github.com/golang/glog"

"github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
Expand Down Expand Up @@ -88,8 +86,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
// ...
// }
if !task.InitResreq.LessEqual(node.Idle) && !task.InitResreq.LessEqual(node.Releasing) {
return fmt.Errorf("task <%s/%s> ResourceFit failed on node <%s>",
task.Namespace, task.Name, node.Name)
return api.NewFitError(task, node, api.NodeResourceFitFailed)
}

return ssn.PredicateFn(task, node)
Expand Down Expand Up @@ -149,8 +146,9 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
job.NodesFitDelta = make(api.NodeResourceMap)
}

predicateNodes := util.PredicateNodes(task, allNodes, predicateFn)
predicateNodes, fitErrors := util.PredicateNodes(task, allNodes, predicateFn)
if len(predicateNodes) == 0 {
job.NodesFitErrors[task.UID] = fitErrors
break
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ func (alloc *backfillAction) Execute(ssn *framework.Session) {

for _, task := range job.TaskStatusIndex[api.Pending] {
if task.InitResreq.IsEmpty() {
allocated := false
fe := api.NewFitErrors()

// As task did not request resources, so it only need to meet predicates.
// TODO (k82cn): need to prioritize nodes to avoid pod hole.
for _, node := range ssn.Nodes {
Expand All @@ -62,16 +65,24 @@ func (alloc *backfillAction) Execute(ssn *framework.Session) {
if err := ssn.PredicateFn(task, node); err != nil {
glog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
fe.SetNodeError(node.Name, err)
continue
}

glog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := ssn.Allocate(task, node.Name); err != nil {
glog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID)
fe.SetNodeError(node.Name, err)
continue
}

allocated = true
break
}

if !allocated {
job.NodesFitErrors[task.UID] = fe
}
} else {
// TODO (k82cn): backfill for other case.
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func preempt(

allNodes := util.GetNodeList(nodes)

predicateNodes := util.PredicateNodes(preemptor, allNodes, ssn.PredicateFn)
predicateNodes, _ := util.PredicateNodes(preemptor, allNodes, ssn.PredicateFn)

nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)

Expand Down
32 changes: 12 additions & 20 deletions pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ type JobInfo struct {

NodesFitDelta NodeResourceMap

JobFitErrors string
NodesFitErrors map[TaskID]*FitErrors

// All tasks of the Job.
TaskStatusIndex map[TaskStatus]tasksMap
Tasks tasksMap
Expand All @@ -164,6 +167,8 @@ func NewJobInfo(uid JobID, tasks ...*TaskInfo) *JobInfo {
Allocated: EmptyResource(),
TotalRequest: EmptyResource(),

NodesFitErrors: make(map[TaskID]*FitErrors),

TaskStatusIndex: map[TaskStatus]tasksMap{},
Tasks: tasksMap{},
}
Expand Down Expand Up @@ -301,6 +306,8 @@ func (ji *JobInfo) Clone() *JobInfo {
TotalRequest: EmptyResource(),
NodesFitDelta: make(NodeResourceMap),

NodesFitErrors: make(map[TaskID]*FitErrors),

PDB: ji.PDB,
PodGroup: ji.PodGroup.DeepCopy(),

Expand Down Expand Up @@ -338,36 +345,21 @@ func (ji JobInfo) String() string {
// FitError returns detailed information on why a job's task failed to fit on
// each available node
func (ji *JobInfo) FitError() string {
if len(ji.NodesFitDelta) == 0 {
reasonMsg := fmt.Sprintf("0 nodes are available")
return reasonMsg
}

reasons := make(map[string]int)
for _, v := range ji.NodesFitDelta {
if v.Get(v1.ResourceCPU) < 0 {
reasons["cpu"]++
}
if v.Get(v1.ResourceMemory) < 0 {
reasons["memory"]++
}

for rName, rQuant := range v.ScalarResources {
if rQuant < 0 {
reasons[string(rName)]++
}
}
for status, taskMap := range ji.TaskStatusIndex {
reasons[fmt.Sprintf("%s", status)] += len(taskMap)
}
reasons["minAvailable"] = int(ji.MinAvailable)

sortReasonsHistogram := func() []string {
reasonStrings := []string{}
for k, v := range reasons {
reasonStrings = append(reasonStrings, fmt.Sprintf("%v insufficient %v", v, k))
reasonStrings = append(reasonStrings, fmt.Sprintf("%v %v", v, k))
}
sort.Strings(reasonStrings)
return reasonStrings
}
reasonMsg := fmt.Sprintf("0/%v nodes are available, %v.", len(ji.NodesFitDelta), strings.Join(sortReasonsHistogram(), ", "))
reasonMsg := fmt.Sprintf("job is not ready, %v.", strings.Join(sortReasonsHistogram(), ", "))
return reasonMsg
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/scheduler/api/job_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ func TestAddTaskInfo(t *testing.T) {
},
NodeSelector: make(map[string]string),
NodesFitDelta: make(NodeResourceMap),

NodesFitErrors: make(map[TaskID]*FitErrors),
},
},
}
Expand Down Expand Up @@ -147,6 +149,8 @@ func TestDeleteTaskInfo(t *testing.T) {
},
NodeSelector: make(map[string]string),
NodesFitDelta: make(NodeResourceMap),

NodesFitErrors: make(map[TaskID]*FitErrors),
},
},
{
Expand All @@ -172,6 +176,8 @@ func TestDeleteTaskInfo(t *testing.T) {
},
NodeSelector: make(map[string]string),
NodesFitDelta: make(NodeResourceMap),

NodesFitErrors: make(map[TaskID]*FitErrors),
},
},
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (ts TaskStatus) String() string {
switch ts {
case Pending:
return "Pending"
case Allocated:
return "Allocated"
case Pipelined:
return "Pipelined"
case Binding:
return "Binding"
case Bound:
Expand Down
112 changes: 112 additions & 0 deletions pkg/scheduler/api/unschedule_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package api

import (
"fmt"
"sort"
"strings"

"k8s.io/kubernetes/pkg/scheduler/algorithm"
)

const (
// NodePodNumberExceeded means pods in node exceed the allocatable pod number
NodePodNumberExceeded = "node(s) pod number exceeded"
// NodeResourceFitFailed means node could not fit the request of pod
NodeResourceFitFailed = "node(s) resource fit failed"

// AllNodeUnavailableMsg is the default error message
AllNodeUnavailableMsg = "all nodes are unavailable"
)

// FitErrors is set of FitError on many nodes
type FitErrors struct {
nodes map[string]*FitError
err string
}

// NewFitErrors returns an FitErrors
func NewFitErrors() *FitErrors {
f := new(FitErrors)
f.nodes = make(map[string]*FitError)
return f
}

// SetError set the common error message in FitErrors
func (f *FitErrors) SetError(err string) {
f.err = err
}

// SetNodeError set the node error in FitErrors
func (f *FitErrors) SetNodeError(nodeName string, err error) {
var fe *FitError
switch obj := err.(type) {
case *FitError:
obj.NodeName = nodeName
fe = obj
default:
fe = &FitError{
NodeName: nodeName,
Reasons: []string{obj.Error()},
}
}

f.nodes[nodeName] = fe
}

// Error returns the final error message
func (f *FitErrors) Error() string {
reasons := make(map[string]int)

for _, node := range f.nodes {
for _, reason := range node.Reasons {
reasons[reason]++
}
}

sortReasonsHistogram := func() []string {
reasonStrings := []string{}
for k, v := range reasons {
reasonStrings = append(reasonStrings, fmt.Sprintf("%v %v", v, k))
}
sort.Strings(reasonStrings)
return reasonStrings
}
if f.err == "" {
f.err = AllNodeUnavailableMsg
}
reasonMsg := fmt.Sprintf(f.err+": %v.", strings.Join(sortReasonsHistogram(), ", "))
return reasonMsg
}

// FitError describe the reason why task could not fit that node
type FitError struct {
taskNamespace string
taskName string
NodeName string
Reasons []string
}

// NewFitError return FitError by message
func NewFitError(task *TaskInfo, node *NodeInfo, message ...string) *FitError {
fe := &FitError{
taskName: task.Name,
taskNamespace: task.Namespace,
NodeName: node.Name,
Reasons: message,
}
return fe
}

// NewFitErrorByReasons return FitError by reasons
func NewFitErrorByReasons(task *TaskInfo, node *NodeInfo, reasons ...algorithm.PredicateFailureReason) *FitError {
message := make([]string, 0, len(reasons))
for _, reason := range reasons {
message = append(message, reason.GetReason())
}
return NewFitError(task, node, message...)
}

// Error returns the final error message
func (f *FitError) Error() string {
return fmt.Sprintf("task %s/%s on node %s fit failed: %s", f.taskNamespace, f.taskName, f.NodeName, strings.Join(f.Reasons, ", "))
}
16 changes: 11 additions & 5 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,10 @@ func (sc *SchedulerCache) String() string {

// RecordJobStatusEvent records related events according to job status.
func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) {
jobErrMsg := job.FitError()
baseErrorMessage := job.JobFitErrors
if baseErrorMessage == "" {
baseErrorMessage = kbapi.AllNodeUnavailableMsg
}

if !shadowPodGroup(job.PodGroup) {
pgUnschedulable := job.PodGroup != nil &&
Expand All @@ -696,17 +699,20 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) {

// If pending or unschedulable, record unschedulable event.
if pgUnschedulable || pdbUnschedulabe {
msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v",
len(job.TaskStatusIndex[api.Pending]), len(job.Tasks), job.FitError())
sc.Recorder.Eventf(job.PodGroup, v1.EventTypeWarning,
string(v1alpha1.PodGroupUnschedulableType), msg)
string(v1alpha1.PodGroupUnschedulableType), baseErrorMessage)
}
}

// Update podCondition for tasks Allocated and Pending before job discarded
for _, status := range []api.TaskStatus{api.Allocated, api.Pending} {
for _, taskInfo := range job.TaskStatusIndex[status] {
if err := sc.taskUnschedulable(taskInfo, jobErrMsg); err != nil {
msg := baseErrorMessage
fitError := job.NodesFitErrors[taskInfo.UID]
if fitError != nil {
msg = fitError.Error()
}
if err := sc.taskUnschedulable(taskInfo, msg); err != nil {
glog.Errorf("Failed to update unschedulable task status <%s/%s>: %v",
taskInfo.Namespace, taskInfo.Name, err)
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/scheduler/plugins/gang/gang.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) {
unreadyTaskCount = job.MinAvailable - job.ReadyTaskNum()
msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v",
job.MinAvailable-job.ReadyTaskNum(), len(job.Tasks), job.FitError())
job.JobFitErrors = msg

unScheduleJobCount++
metrics.UpdateUnscheduleTaskCount(job.Name, int(unreadyTaskCount))
Expand All @@ -155,6 +156,18 @@ func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) {
glog.Errorf("Failed to update job <%s/%s> condition: %v",
job.Namespace, job.Name, err)
}

// allocated task should follow the job fit error
for _, taskInfo := range job.TaskStatusIndex[api.Allocated] {
fitError := job.NodesFitErrors[taskInfo.UID]
if fitError != nil {
continue
}

fitError = api.NewFitErrors()
job.NodesFitErrors[taskInfo.UID] = fitError
fitError.SetError(msg)
}
}
}

Expand Down
Loading

0 comments on commit 31b04ec

Please sign in to comment.