diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index c557dd9a07..ba3bafa92c 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -17,8 +17,6 @@ limitations under the License. package allocate import ( - "fmt" - "github.com/golang/glog" "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" @@ -88,8 +86,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { // ... // } if !task.InitResreq.LessEqual(node.Idle) && !task.InitResreq.LessEqual(node.Releasing) { - return fmt.Errorf("task <%s/%s> ResourceFit failed on node <%s>", - task.Namespace, task.Name, node.Name) + return api.NewFitError(task, node, api.NodeResourceFitFailed) } return ssn.PredicateFn(task, node) @@ -149,8 +146,9 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { job.NodesFitDelta = make(api.NodeResourceMap) } - predicateNodes := util.PredicateNodes(task, allNodes, predicateFn) + predicateNodes, fitErrors := util.PredicateNodes(task, allNodes, predicateFn) if len(predicateNodes) == 0 { + job.NodesFitErrors[task.UID] = fitErrors break } diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index 4b1a38dd19..6bd970e788 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -54,6 +54,9 @@ func (alloc *backfillAction) Execute(ssn *framework.Session) { for _, task := range job.TaskStatusIndex[api.Pending] { if task.InitResreq.IsEmpty() { + allocated := false + fe := api.NewFitErrors() + // As task did not request resources, so it only need to meet predicates. // TODO (k82cn): need to prioritize nodes to avoid pod hole. for _, node := range ssn.Nodes { @@ -62,16 +65,24 @@ func (alloc *backfillAction) Execute(ssn *framework.Session) { if err := ssn.PredicateFn(task, node); err != nil { glog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v", task.Namespace, task.Name, node.Name, err) + fe.SetNodeError(node.Name, err) continue } glog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name) if err := ssn.Allocate(task, node.Name); err != nil { glog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID) + fe.SetNodeError(node.Name, err) continue } + + allocated = true break } + + if !allocated { + job.NodesFitErrors[task.UID] = fe + } } else { // TODO (k82cn): backfill for other case. } diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index f409d0ada2..5081c94c09 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -188,7 +188,7 @@ func preempt( allNodes := util.GetNodeList(nodes) - predicateNodes := util.PredicateNodes(preemptor, allNodes, ssn.PredicateFn) + predicateNodes, _ := util.PredicateNodes(preemptor, allNodes, ssn.PredicateFn) nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) diff --git a/pkg/scheduler/api/job_info.go b/pkg/scheduler/api/job_info.go index ce27db2545..3f4ee7e325 100644 --- a/pkg/scheduler/api/job_info.go +++ b/pkg/scheduler/api/job_info.go @@ -139,6 +139,9 @@ type JobInfo struct { NodesFitDelta NodeResourceMap + JobFitErrors string + NodesFitErrors map[TaskID]*FitErrors + // All tasks of the Job. TaskStatusIndex map[TaskStatus]tasksMap Tasks tasksMap @@ -164,6 +167,8 @@ func NewJobInfo(uid JobID, tasks ...*TaskInfo) *JobInfo { Allocated: EmptyResource(), TotalRequest: EmptyResource(), + NodesFitErrors: make(map[TaskID]*FitErrors), + TaskStatusIndex: map[TaskStatus]tasksMap{}, Tasks: tasksMap{}, } @@ -301,6 +306,8 @@ func (ji *JobInfo) Clone() *JobInfo { TotalRequest: EmptyResource(), NodesFitDelta: make(NodeResourceMap), + NodesFitErrors: make(map[TaskID]*FitErrors), + PDB: ji.PDB, PodGroup: ji.PodGroup.DeepCopy(), @@ -338,36 +345,21 @@ func (ji JobInfo) String() string { // FitError returns detailed information on why a job's task failed to fit on // each available node func (ji *JobInfo) FitError() string { - if len(ji.NodesFitDelta) == 0 { - reasonMsg := fmt.Sprintf("0 nodes are available") - return reasonMsg - } - reasons := make(map[string]int) - for _, v := range ji.NodesFitDelta { - if v.Get(v1.ResourceCPU) < 0 { - reasons["cpu"]++ - } - if v.Get(v1.ResourceMemory) < 0 { - reasons["memory"]++ - } - - for rName, rQuant := range v.ScalarResources { - if rQuant < 0 { - reasons[string(rName)]++ - } - } + for status, taskMap := range ji.TaskStatusIndex { + reasons[fmt.Sprintf("%s", status)] += len(taskMap) } + reasons["minAvailable"] = int(ji.MinAvailable) sortReasonsHistogram := func() []string { reasonStrings := []string{} for k, v := range reasons { - reasonStrings = append(reasonStrings, fmt.Sprintf("%v insufficient %v", v, k)) + reasonStrings = append(reasonStrings, fmt.Sprintf("%v %v", v, k)) } sort.Strings(reasonStrings) return reasonStrings } - reasonMsg := fmt.Sprintf("0/%v nodes are available, %v.", len(ji.NodesFitDelta), strings.Join(sortReasonsHistogram(), ", ")) + reasonMsg := fmt.Sprintf("job is not ready, %v.", strings.Join(sortReasonsHistogram(), ", ")) return reasonMsg } diff --git a/pkg/scheduler/api/job_info_test.go b/pkg/scheduler/api/job_info_test.go index b053360c37..629a293e62 100644 --- a/pkg/scheduler/api/job_info_test.go +++ b/pkg/scheduler/api/job_info_test.go @@ -81,6 +81,8 @@ func TestAddTaskInfo(t *testing.T) { }, NodeSelector: make(map[string]string), NodesFitDelta: make(NodeResourceMap), + + NodesFitErrors: make(map[TaskID]*FitErrors), }, }, } @@ -147,6 +149,8 @@ func TestDeleteTaskInfo(t *testing.T) { }, NodeSelector: make(map[string]string), NodesFitDelta: make(NodeResourceMap), + + NodesFitErrors: make(map[TaskID]*FitErrors), }, }, { @@ -172,6 +176,8 @@ func TestDeleteTaskInfo(t *testing.T) { }, NodeSelector: make(map[string]string), NodesFitDelta: make(NodeResourceMap), + + NodesFitErrors: make(map[TaskID]*FitErrors), }, }, } diff --git a/pkg/scheduler/api/types.go b/pkg/scheduler/api/types.go index 71842960fe..1cff29aeb2 100644 --- a/pkg/scheduler/api/types.go +++ b/pkg/scheduler/api/types.go @@ -61,6 +61,10 @@ func (ts TaskStatus) String() string { switch ts { case Pending: return "Pending" + case Allocated: + return "Allocated" + case Pipelined: + return "Pipelined" case Binding: return "Binding" case Bound: diff --git a/pkg/scheduler/api/unschedule_info.go b/pkg/scheduler/api/unschedule_info.go new file mode 100644 index 0000000000..2275f13ecb --- /dev/null +++ b/pkg/scheduler/api/unschedule_info.go @@ -0,0 +1,112 @@ +package api + +import ( + "fmt" + "sort" + "strings" + + "k8s.io/kubernetes/pkg/scheduler/algorithm" +) + +const ( + // NodePodNumberExceeded means pods in node exceed the allocatable pod number + NodePodNumberExceeded = "node(s) pod number exceeded" + // NodeResourceFitFailed means node could not fit the request of pod + NodeResourceFitFailed = "node(s) resource fit failed" + + // AllNodeUnavailableMsg is the default error message + AllNodeUnavailableMsg = "all nodes are unavailable" +) + +// FitErrors is set of FitError on many nodes +type FitErrors struct { + nodes map[string]*FitError + err string +} + +// NewFitErrors returns an FitErrors +func NewFitErrors() *FitErrors { + f := new(FitErrors) + f.nodes = make(map[string]*FitError) + return f +} + +// SetError set the common error message in FitErrors +func (f *FitErrors) SetError(err string) { + f.err = err +} + +// SetNodeError set the node error in FitErrors +func (f *FitErrors) SetNodeError(nodeName string, err error) { + var fe *FitError + switch obj := err.(type) { + case *FitError: + obj.NodeName = nodeName + fe = obj + default: + fe = &FitError{ + NodeName: nodeName, + Reasons: []string{obj.Error()}, + } + } + + f.nodes[nodeName] = fe +} + +// Error returns the final error message +func (f *FitErrors) Error() string { + reasons := make(map[string]int) + + for _, node := range f.nodes { + for _, reason := range node.Reasons { + reasons[reason]++ + } + } + + sortReasonsHistogram := func() []string { + reasonStrings := []string{} + for k, v := range reasons { + reasonStrings = append(reasonStrings, fmt.Sprintf("%v %v", v, k)) + } + sort.Strings(reasonStrings) + return reasonStrings + } + if f.err == "" { + f.err = AllNodeUnavailableMsg + } + reasonMsg := fmt.Sprintf(f.err+": %v.", strings.Join(sortReasonsHistogram(), ", ")) + return reasonMsg +} + +// FitError describe the reason why task could not fit that node +type FitError struct { + taskNamespace string + taskName string + NodeName string + Reasons []string +} + +// NewFitError return FitError by message +func NewFitError(task *TaskInfo, node *NodeInfo, message ...string) *FitError { + fe := &FitError{ + taskName: task.Name, + taskNamespace: task.Namespace, + NodeName: node.Name, + Reasons: message, + } + return fe +} + +// NewFitErrorByReasons return FitError by reasons +func NewFitErrorByReasons(task *TaskInfo, node *NodeInfo, reasons ...algorithm.PredicateFailureReason) *FitError { + message := make([]string, 0, len(reasons)) + for _, reason := range reasons { + message = append(message, reason.GetReason()) + } + return NewFitError(task, node, message...) +} + +// Error returns the final error message +func (f *FitError) Error() string { + return fmt.Sprintf("task %s/%s on node %s fit failed: %s", f.taskNamespace, f.taskName, f.NodeName, strings.Join(f.Reasons, ", ")) +} diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 474b00f5a8..fdcb209e89 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -686,7 +686,10 @@ func (sc *SchedulerCache) String() string { // RecordJobStatusEvent records related events according to job status. func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) { - jobErrMsg := job.FitError() + baseErrorMessage := job.JobFitErrors + if baseErrorMessage == "" { + baseErrorMessage = kbapi.AllNodeUnavailableMsg + } if !shadowPodGroup(job.PodGroup) { pgUnschedulable := job.PodGroup != nil && @@ -696,17 +699,20 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) { // If pending or unschedulable, record unschedulable event. if pgUnschedulable || pdbUnschedulabe { - msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v", - len(job.TaskStatusIndex[api.Pending]), len(job.Tasks), job.FitError()) sc.Recorder.Eventf(job.PodGroup, v1.EventTypeWarning, - string(v1alpha1.PodGroupUnschedulableType), msg) + string(v1alpha1.PodGroupUnschedulableType), baseErrorMessage) } } // Update podCondition for tasks Allocated and Pending before job discarded for _, status := range []api.TaskStatus{api.Allocated, api.Pending} { for _, taskInfo := range job.TaskStatusIndex[status] { - if err := sc.taskUnschedulable(taskInfo, jobErrMsg); err != nil { + msg := baseErrorMessage + fitError := job.NodesFitErrors[taskInfo.UID] + if fitError != nil { + msg = fitError.Error() + } + if err := sc.taskUnschedulable(taskInfo, msg); err != nil { glog.Errorf("Failed to update unschedulable task status <%s/%s>: %v", taskInfo.Namespace, taskInfo.Name, err) } diff --git a/pkg/scheduler/plugins/gang/gang.go b/pkg/scheduler/plugins/gang/gang.go index f1c707b09f..d203517a49 100644 --- a/pkg/scheduler/plugins/gang/gang.go +++ b/pkg/scheduler/plugins/gang/gang.go @@ -137,6 +137,7 @@ func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) { unreadyTaskCount = job.MinAvailable - job.ReadyTaskNum() msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v", job.MinAvailable-job.ReadyTaskNum(), len(job.Tasks), job.FitError()) + job.JobFitErrors = msg unScheduleJobCount++ metrics.UpdateUnscheduleTaskCount(job.Name, int(unreadyTaskCount)) @@ -155,6 +156,18 @@ func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) { glog.Errorf("Failed to update job <%s/%s> condition: %v", job.Namespace, job.Name, err) } + + // allocated task should follow the job fit error + for _, taskInfo := range job.TaskStatusIndex[api.Allocated] { + fitError := job.NodesFitErrors[taskInfo.UID] + if fitError != nil { + continue + } + + fitError = api.NewFitErrors() + job.NodesFitErrors[taskInfo.UID] = fitError + fitError.SetError(msg) + } } } diff --git a/pkg/scheduler/plugins/predicates/predicates.go b/pkg/scheduler/plugins/predicates/predicates.go index cc720f3661..14dfbdab42 100644 --- a/pkg/scheduler/plugins/predicates/predicates.go +++ b/pkg/scheduler/plugins/predicates/predicates.go @@ -160,7 +160,9 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { } if node.Allocatable.MaxTaskNum <= len(nodeInfo.Pods()) { - return fmt.Errorf("node <%s> can not allow more task running on it", node.Name) + glog.V(4).Infof("NodePodNumber predicates Task <%s/%s> on Node <%s> failed", + task.Namespace, task.Name, node.Name) + return api.NewFitError(task, node, api.NodePodNumberExceeded) } // CheckNodeCondition Predicate @@ -173,12 +175,11 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { task.Namespace, task.Name, node.Name, fit, err) if !fit { - return fmt.Errorf("node <%s> are not available to schedule task <%s/%s>: %s", - node.Name, task.Namespace, task.Name, formatReason(reasons)) + return api.NewFitErrorByReasons(task, node, reasons...) } // CheckNodeUnschedulable Predicate - fit, _, err = predicates.CheckNodeUnschedulablePredicate(task.Pod, nil, nodeInfo) + fit, reasons, err = predicates.CheckNodeUnschedulablePredicate(task.Pod, nil, nodeInfo) if err != nil { return err } @@ -187,12 +188,11 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { task.Namespace, task.Name, node.Name, fit, err) if !fit { - return fmt.Errorf("task <%s/%s> node <%s> set to unschedulable", - task.Namespace, task.Name, node.Name) + return api.NewFitErrorByReasons(task, node, reasons...) } // NodeSelector Predicate - fit, _, err = predicates.PodMatchNodeSelector(task.Pod, nil, nodeInfo) + fit, reasons, err = predicates.PodMatchNodeSelector(task.Pod, nil, nodeInfo) if err != nil { return err } @@ -201,12 +201,11 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { task.Namespace, task.Name, node.Name, fit, err) if !fit { - return fmt.Errorf("node <%s> didn't match task <%s/%s> node selector", - node.Name, task.Namespace, task.Name) + return api.NewFitErrorByReasons(task, node, reasons...) } // HostPorts Predicate - fit, _, err = predicates.PodFitsHostPorts(task.Pod, nil, nodeInfo) + fit, reasons, err = predicates.PodFitsHostPorts(task.Pod, nil, nodeInfo) if err != nil { return err } @@ -215,12 +214,11 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { task.Namespace, task.Name, node.Name, fit, err) if !fit { - return fmt.Errorf("node <%s> didn't have available host ports for task <%s/%s>", - node.Name, task.Namespace, task.Name) + return api.NewFitErrorByReasons(task, node, reasons...) } // Toleration/Taint Predicate - fit, _, err = predicates.PodToleratesNodeTaints(task.Pod, nil, nodeInfo) + fit, reasons, err = predicates.PodToleratesNodeTaints(task.Pod, nil, nodeInfo) if err != nil { return err } @@ -229,13 +227,12 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { task.Namespace, task.Name, node.Name, fit, err) if !fit { - return fmt.Errorf("task <%s/%s> does not tolerate node <%s> taints", - task.Namespace, task.Name, node.Name) + return api.NewFitErrorByReasons(task, node, reasons...) } if predicate.memoryPressureEnable { // CheckNodeMemoryPressurePredicate - fit, _, err = predicates.CheckNodeMemoryPressurePredicate(task.Pod, nil, nodeInfo) + fit, reasons, err = predicates.CheckNodeMemoryPressurePredicate(task.Pod, nil, nodeInfo) if err != nil { return err } @@ -244,14 +241,13 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { task.Namespace, task.Name, node.Name, fit, err) if !fit { - return fmt.Errorf("node <%s> are not available to schedule task <%s/%s> due to Memory Pressure", - node.Name, task.Namespace, task.Name) + return api.NewFitErrorByReasons(task, node, reasons...) } } if predicate.diskPressureEnable { // CheckNodeDiskPressurePredicate - fit, _, err = predicates.CheckNodeDiskPressurePredicate(task.Pod, nil, nodeInfo) + fit, reasons, err = predicates.CheckNodeDiskPressurePredicate(task.Pod, nil, nodeInfo) if err != nil { return err } @@ -260,14 +256,13 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { task.Namespace, task.Name, node.Name, fit, err) if !fit { - return fmt.Errorf("node <%s> are not available to schedule task <%s/%s> due to Disk Pressure", - node.Name, task.Namespace, task.Name) + return api.NewFitErrorByReasons(task, node, reasons...) } } if predicate.pidPressureEnable { // CheckNodePIDPressurePredicate - fit, _, err = predicates.CheckNodePIDPressurePredicate(task.Pod, nil, nodeInfo) + fit, reasons, err = predicates.CheckNodePIDPressurePredicate(task.Pod, nil, nodeInfo) if err != nil { return err } @@ -276,8 +271,7 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { task.Namespace, task.Name, node.Name, fit, err) if !fit { - return fmt.Errorf("node <%s> are not available to schedule task <%s/%s> due to PID Pressure", - node.Name, task.Namespace, task.Name) + return api.NewFitErrorByReasons(task, node, reasons...) } } @@ -289,7 +283,7 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { } // Pod Affinity/Anti-Affinity Predicate podAffinityPredicate := predicates.NewPodAffinityPredicate(ni, lister) - fit, _, err = podAffinityPredicate(task.Pod, nil, nodeInfo) + fit, reasons, err = podAffinityPredicate(task.Pod, nil, nodeInfo) if err != nil { return err } @@ -298,8 +292,7 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { task.Namespace, task.Name, node.Name, fit, err) if !fit { - return fmt.Errorf("task <%s/%s> affinity/anti-affinity failed on node <%s>", - node.Name, task.Namespace, task.Name) + return api.NewFitErrorByReasons(task, node, reasons...) } return nil diff --git a/pkg/scheduler/util/scheduler_helper.go b/pkg/scheduler/util/scheduler_helper.go index 95eef11785..3932082b4c 100644 --- a/pkg/scheduler/util/scheduler_helper.go +++ b/pkg/scheduler/util/scheduler_helper.go @@ -31,10 +31,14 @@ import ( ) // PredicateNodes returns nodes that fit task -func PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateFn) []*api.NodeInfo { +func PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateFn) ([]*api.NodeInfo, *api.FitErrors) { var predicateNodes []*api.NodeInfo var workerLock sync.Mutex + + var errorLock sync.Mutex + fe := api.NewFitErrors() + checkNode := func(index int) { node := nodes[index] glog.V(3).Infof("Considering Task <%v/%v> on node <%v>: <%v> vs. <%v>", @@ -44,6 +48,9 @@ func PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateF if err := fn(task, node); err != nil { glog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v", task.Namespace, task.Name, node.Name, err) + errorLock.Lock() + fe.SetNodeError(node.Name, err) + errorLock.Unlock() return } @@ -53,7 +60,7 @@ func PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateF } workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), checkNode) - return predicateNodes + return predicateNodes, fe } // PrioritizeNodes returns a map whose key is node's score and value are corresponding nodes