From 0e73ac311d5763260fe08bcec2aed3955439f05b Mon Sep 17 00:00:00 2001 From: "wangyuqing (C)" Date: Sat, 3 Aug 2019 15:13:46 +0800 Subject: [PATCH] remove shadowPodgroup in scheduler --- pkg/scheduler/api/pod_group_info.go | 4 +- pkg/scheduler/cache/cache.go | 75 ++++++-------- pkg/scheduler/cache/cache_test.go | 135 +------------------------- pkg/scheduler/cache/event_handlers.go | 19 +--- pkg/scheduler/cache/util.go | 40 +------- test/e2e/predicates.go | 12 +-- 6 files changed, 44 insertions(+), 241 deletions(-) diff --git a/pkg/scheduler/api/pod_group_info.go b/pkg/scheduler/api/pod_group_info.go index fa19443d40..7637f4e3c1 100644 --- a/pkg/scheduler/api/pod_group_info.go +++ b/pkg/scheduler/api/pod_group_info.go @@ -44,11 +44,11 @@ const ( //PodGroupVersionV1Alpha2 represents PodGroupVersion of V1Alpha2 PodGroupVersionV1Alpha2 string = "v1alpha2" - // PodPending means the pod group has been accepted by the system, but scheduler can not allocate + // PodGroupPending means the pod group has been accepted by the system, but scheduler can not allocate // enough resources to it. PodGroupPending PodGroupPhase = "Pending" - // PodRunning means `spec.minMember` pods of PodGroups has been in running phase. + // PodGroupRunning means `spec.minMember` pods of PodGroups has been in running phase. PodGroupRunning PodGroupPhase = "Running" // PodGroupUnknown means part of `spec.minMember` pods are running but the other part can not diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 20cf9c9e10..002be6b43b 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -45,7 +45,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler/volumebinder" "volcano.sh/volcano/cmd/scheduler/app/options" - "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1" "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" kbver "volcano.sh/volcano/pkg/client/clientset/versioned" "volcano.sh/volcano/pkg/client/clientset/versioned/scheme" @@ -507,24 +506,9 @@ func (sc *SchedulerCache) Evict(taskInfo *kbapi.TaskInfo, reason string) error { } }() - if !shadowPodGroup(job.PodGroup) { - if job.PodGroup.Version == api.PodGroupVersionV1Alpha1 { - pg, err := api.ConvertPodGroupInfoToV1alpha1(job.PodGroup) - if err != nil { - glog.Errorf("Error While converting api.PodGroup to v1alpha.PodGroup with error: %v", err) - return err - } - sc.Recorder.Eventf(pg, v1.EventTypeNormal, "Evict", reason) - } else if job.PodGroup.Version == api.PodGroupVersionV1Alpha2 { - pg, err := api.ConvertPodGroupInfoToV1alpha2(job.PodGroup) - if err != nil { - glog.Errorf("Error While converting api.PodGroup to v2alpha.PodGroup with error: %v", err) - return err - } - sc.Recorder.Eventf(pg, v1.EventTypeNormal, "Evict", reason) - } else { - return fmt.Errorf("Invalid PodGroup Version: %s", job.PodGroup.Version) - } + if err := sc.convertPodGroupInfo(job); err != nil { + glog.Errorf("Error While converting api.PodGroup %v", err) + return err } return nil @@ -778,32 +762,15 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) { baseErrorMessage = kbapi.AllNodeUnavailableMsg } - if !shadowPodGroup(job.PodGroup) { - pgUnschedulable := job.PodGroup != nil && - (job.PodGroup.Status.Phase == api.PodGroupUnknown || - job.PodGroup.Status.Phase == api.PodGroupPending) - pdbUnschedulabe := job.PDB != nil && len(job.TaskStatusIndex[api.Pending]) != 0 + pgUnschedulable := job.PodGroup != nil && + (job.PodGroup.Status.Phase == api.PodGroupUnknown || + job.PodGroup.Status.Phase == api.PodGroupPending) + pdbUnschedulabe := job.PDB != nil && len(job.TaskStatusIndex[api.Pending]) != 0 - // If pending or unschedulable, record unschedulable event. - if pgUnschedulable || pdbUnschedulabe { - msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v", len(job.TaskStatusIndex[api.Pending]), len(job.Tasks), job.FitError()) - if job.PodGroup.Version == api.PodGroupVersionV1Alpha1 { - podGroup, err := api.ConvertPodGroupInfoToV1alpha1(job.PodGroup) - if err != nil { - glog.Errorf("Error while converting PodGroup to v1alpha1.PodGroup with error: %v", err) - } - sc.Recorder.Eventf(podGroup, v1.EventTypeWarning, - string(v1alpha1.PodGroupUnschedulableType), msg) - } - - if job.PodGroup.Version == api.PodGroupVersionV1Alpha2 { - podGroup, err := api.ConvertPodGroupInfoToV1alpha2(job.PodGroup) - if err != nil { - glog.Errorf("Error while converting PodGroup to v1alpha2.PodGroup with error: %v", err) - } - sc.Recorder.Eventf(podGroup, v1.EventTypeWarning, - string(v1alpha1.PodGroupUnschedulableType), msg) - } + // If pending or unschedulable, record unschedulable event. + if pgUnschedulable || pdbUnschedulabe { + if err := sc.convertPodGroupInfo(job); err != nil { + glog.Errorf("Error While converting api.PodGroup %v", err) } } @@ -825,7 +792,7 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) { // UpdateJobStatus update the status of job and its tasks. func (sc *SchedulerCache) UpdateJobStatus(job *kbapi.JobInfo, updatePG bool) (*kbapi.JobInfo, error) { - if updatePG && !shadowPodGroup(job.PodGroup) { + if updatePG { pg, err := sc.StatusUpdater.UpdatePodGroup(job.PodGroup) if err != nil { return nil, err @@ -837,3 +804,21 @@ func (sc *SchedulerCache) UpdateJobStatus(job *kbapi.JobInfo, updatePG bool) (*k return job, nil } + +func (sc *SchedulerCache) convertPodGroupInfo(job *kbapi.JobInfo) error { + if job.PodGroup.Version == api.PodGroupVersionV1Alpha1 { + _, err := api.ConvertPodGroupInfoToV1alpha1(job.PodGroup) + if err != nil { + return fmt.Errorf("to v1alpha.PodGroup with error: %v", err) + } + } else if job.PodGroup.Version == api.PodGroupVersionV1Alpha2 { + _, err := api.ConvertPodGroupInfoToV1alpha2(job.PodGroup) + if err != nil { + return fmt.Errorf("to v2alpha.PodGroup with error: %v", err) + } + } else { + return fmt.Errorf("invalid PodGroup Version: %s", job.PodGroup.Version) + } + + return nil +} diff --git a/pkg/scheduler/cache/cache_test.go b/pkg/scheduler/cache/cache_test.go index 93fc7a898a..dfcfd7c30c 100644 --- a/pkg/scheduler/cache/cache_test.go +++ b/pkg/scheduler/cache/cache_test.go @@ -125,139 +125,6 @@ func buildOwnerReference(owner string) metav1.OwnerReference { } } -func TestAddPod(t *testing.T) { - - owner := buildOwnerReference("j1") - - // case 1: - pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), - []metav1.OwnerReference{owner}, make(map[string]string)) - pi1 := api.NewTaskInfo(pod1) - pi1.Job = "j1" // The job name is set by cache. - pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("1000m", "1G"), - []metav1.OwnerReference{owner}, make(map[string]string)) - pi2 := api.NewTaskInfo(pod2) - pi2.Job = "j1" // The job name is set by cache. - - j1 := api.NewJobInfo(api.JobID("j1"), pi1, pi2) - pg := createShadowPodGroup(pod1) - j1.SetPodGroup(pg) - - node1 := buildNode("n1", buildResourceList("2000m", "10G")) - ni1 := api.NewNodeInfo(node1) - ni1.AddTask(pi2) - - tests := []struct { - pods []*v1.Pod - nodes []*v1.Node - expected *SchedulerCache - }{ - { - pods: []*v1.Pod{pod1, pod2}, - nodes: []*v1.Node{node1}, - expected: &SchedulerCache{ - Nodes: map[string]*api.NodeInfo{ - "n1": ni1, - }, - Jobs: map[api.JobID]*api.JobInfo{ - "j1": j1, - }, - }, - }, - } - - for i, test := range tests { - cache := &SchedulerCache{ - Jobs: make(map[api.JobID]*api.JobInfo), - Nodes: make(map[string]*api.NodeInfo), - } - - for _, n := range test.nodes { - cache.AddNode(n) - } - - for _, p := range test.pods { - cache.AddPod(p) - } - - if !cacheEqual(cache, test.expected) { - t.Errorf("case %d: \n expected %v, \n got %v \n", - i, test.expected, cache) - } - } -} - -func TestAddNode(t *testing.T) { - owner1 := buildOwnerReference("j1") - owner2 := buildOwnerReference("j2") - - // case 1 - node1 := buildNode("n1", buildResourceList("2000m", "10G")) - pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), - []metav1.OwnerReference{owner1}, make(map[string]string)) - pi1 := api.NewTaskInfo(pod1) - pi1.Job = "j1" // The job name is set by cache. - - pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("1000m", "1G"), - []metav1.OwnerReference{owner2}, make(map[string]string)) - pi2 := api.NewTaskInfo(pod2) - pi2.Job = "j2" // The job name is set by cache. - - ni1 := api.NewNodeInfo(node1) - ni1.AddTask(pi2) - - j1 := api.NewJobInfo("j1") - pg1 := createShadowPodGroup(pod1) - j1.SetPodGroup(pg1) - - j2 := api.NewJobInfo("j2") - pg2 := createShadowPodGroup(pod2) - j2.SetPodGroup(pg2) - - j1.AddTaskInfo(pi1) - j2.AddTaskInfo(pi2) - - tests := []struct { - pods []*v1.Pod - nodes []*v1.Node - expected *SchedulerCache - }{ - { - pods: []*v1.Pod{pod1, pod2}, - nodes: []*v1.Node{node1}, - expected: &SchedulerCache{ - Nodes: map[string]*api.NodeInfo{ - "n1": ni1, - }, - Jobs: map[api.JobID]*api.JobInfo{ - "j1": j1, - "j2": j2, - }, - }, - }, - } - - for i, test := range tests { - cache := &SchedulerCache{ - Nodes: make(map[string]*api.NodeInfo), - Jobs: make(map[api.JobID]*api.JobInfo), - } - - for _, p := range test.pods { - cache.AddPod(p) - } - - for _, n := range test.nodes { - cache.AddNode(n) - } - - if !cacheEqual(cache, test.expected) { - t.Errorf("case %d: \n expected %v, \n got %v \n", - i, test.expected, cache) - } - } -} - func TestGetOrCreateJob(t *testing.T) { owner1 := buildOwnerReference("j1") owner2 := buildOwnerReference("j2") @@ -292,7 +159,7 @@ func TestGetOrCreateJob(t *testing.T) { }, { task: pi2, - gotJob: true, + gotJob: false, }, { task: pi3, diff --git a/pkg/scheduler/cache/event_handlers.go b/pkg/scheduler/cache/event_handlers.go index 0b7e0f98bb..49c1e96efe 100644 --- a/pkg/scheduler/cache/event_handlers.go +++ b/pkg/scheduler/cache/event_handlers.go @@ -46,23 +46,12 @@ func (sc *SchedulerCache) getOrCreateJob(pi *kbapi.TaskInfo) *kbapi.JobInfo { if pi.Pod.Spec.SchedulerName != sc.schedulerName { glog.V(4).Infof("Pod %s/%s will not not scheduled by %s, skip creating PodGroup and Job for it", pi.Pod.Namespace, pi.Pod.Name, sc.schedulerName) - return nil } - pb := createShadowPodGroup(pi.Pod) - pi.Job = kbapi.JobID(pb.Name) - - if _, found := sc.Jobs[pi.Job]; !found { - job := kbapi.NewJobInfo(pi.Job) - job.SetPodGroup(pb) - // Set default queue for shadow podgroup. - job.Queue = kbapi.QueueID(sc.defaultQueue) + return nil + } - sc.Jobs[pi.Job] = job - } - } else { - if _, found := sc.Jobs[pi.Job]; !found { - sc.Jobs[pi.Job] = kbapi.NewJobInfo(pi.Job) - } + if _, found := sc.Jobs[pi.Job]; !found { + sc.Jobs[pi.Job] = kbapi.NewJobInfo(pi.Job) } return sc.Jobs[pi.Job] diff --git a/pkg/scheduler/cache/util.go b/pkg/scheduler/cache/util.go index 2e1340d74e..1c88dca9e8 100644 --- a/pkg/scheduler/cache/util.go +++ b/pkg/scheduler/cache/util.go @@ -17,47 +17,9 @@ limitations under the License. package cache import ( - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "volcano.sh/volcano/pkg/apis/utils" - "volcano.sh/volcano/pkg/scheduler/api" -) - -const ( - shadowPodGroupKey = "volcano/shadow-pod-group" + "k8s.io/api/core/v1" ) -func shadowPodGroup(pg *api.PodGroup) bool { - if pg == nil { - return true - } - - _, found := pg.Annotations[shadowPodGroupKey] - - return found -} - -func createShadowPodGroup(pod *v1.Pod) *api.PodGroup { - jobID := api.JobID(utils.GetController(pod)) - if len(jobID) == 0 { - jobID = api.JobID(pod.UID) - } - - return &api.PodGroup{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: pod.Namespace, - Name: string(jobID), - Annotations: map[string]string{ - shadowPodGroupKey: string(jobID), - }, - }, - Spec: api.PodGroupSpec{ - MinMember: 1, - }, - } -} - // responsibleForPod returns true if the pod has asked to be scheduled by the given scheduler. func responsibleForPod(pod *v1.Pod, schedulerName string) bool { return schedulerName == pod.Spec.SchedulerName diff --git a/test/e2e/predicates.go b/test/e2e/predicates.go index 64f12784de..1a106587d8 100644 --- a/test/e2e/predicates.go +++ b/test/e2e/predicates.go @@ -84,7 +84,7 @@ var _ = Describe("Predicates E2E Test", func() { name: "na-job", tasks: []taskSpec{ { - img: "nginx", + img: defaultNginxImage, req: slot, min: 1, rep: rep, @@ -130,7 +130,7 @@ var _ = Describe("Predicates E2E Test", func() { name: "pa-job", tasks: []taskSpec{ { - img: "nginx", + img: defaultNginxImage, req: slot, min: rep, rep: rep, @@ -177,7 +177,7 @@ var _ = Describe("Predicates E2E Test", func() { name: "pa-job", tasks: []taskSpec{ { - img: "nginx", + img: defaultNginxImage, req: slot, min: 2, rep: 2, @@ -222,7 +222,7 @@ var _ = Describe("Predicates E2E Test", func() { name: "tt-job", tasks: []taskSpec{ { - img: "nginx", + img: defaultNginxImage, req: oneCPU, min: 1, rep: 1, @@ -270,7 +270,7 @@ var _ = Describe("Predicates E2E Test", func() { name: "tt-job", tasks: []taskSpec{ { - img: "nginx", + img: defaultNginxImage, req: oneCPU, min: 1, rep: 1, @@ -283,7 +283,7 @@ var _ = Describe("Predicates E2E Test", func() { name: "tt-job-no-toleration", tasks: []taskSpec{ { - img: "nginx", + img: defaultNginxImage, req: oneCPU, min: 1, rep: 1,