Skip to content

Commit

Permalink
remove shadowPodgroup in scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyuqing4 committed Aug 3, 2019
1 parent 1a856a1 commit 58284a7
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 127 deletions.
18 changes: 18 additions & 0 deletions pkg/apis/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,21 @@ func DeleteConfigmap(job *vkv1.Job, kubeClients kubernetes.Interface, cmName str

return nil
}

// GeneratePodgroupName generate podgroup name of normal pod
func GeneratePodgroupName(pod *v1.Pod) string {
pgName := vkbatchv1.PodgroupNamePrefix

if len(pod.OwnerReferences) != 0 {
for _, ownerReference := range pod.OwnerReferences {
if ownerReference.Controller != nil && *ownerReference.Controller == true {
pgName += string(ownerReference.UID)
return pgName
}
}
}

pgName += string(pod.UID)

return pgName
}
20 changes: 1 addition & 19 deletions pkg/controllers/podgroup/pg_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

vkbatchv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
"volcano.sh/volcano/pkg/apis/helpers"
scheduling "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2"
)
Expand Down Expand Up @@ -71,7 +70,7 @@ func (cc *Controller) updatePodAnnotations(pod *v1.Pod, pgName string) error {
}

func (cc *Controller) createNormalPodPGIfNotExist(pod *v1.Pod) error {
pgName := generatePodgroupName(pod)
pgName := helpers.GeneratePodgroupName(pod)

if _, err := cc.pgLister.PodGroups(pod.Namespace).Get(pgName); err != nil {
if !apierrors.IsNotFound(err) {
Expand Down Expand Up @@ -101,23 +100,6 @@ func (cc *Controller) createNormalPodPGIfNotExist(pod *v1.Pod) error {
return cc.updatePodAnnotations(pod, pgName)
}

func generatePodgroupName(pod *v1.Pod) string {
pgName := vkbatchv1.PodgroupNamePrefix

if len(pod.OwnerReferences) != 0 {
for _, ownerReference := range pod.OwnerReferences {
if ownerReference.Controller != nil && *ownerReference.Controller == true {
pgName += string(ownerReference.UID)
return pgName
}
}
}

pgName += string(pod.UID)

return pgName
}

func newPGOwnerReferences(pod *v1.Pod) []metav1.OwnerReference {
if len(pod.OwnerReferences) != 0 {
for _, ownerReference := range pod.OwnerReferences {
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/api/pod_group_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ const (

//PodGroupVersionV1Alpha2 represents PodGroupVersion of V1Alpha2
PodGroupVersionV1Alpha2 string = "v1alpha2"
// PodPending means the pod group has been accepted by the system, but scheduler can not allocate
// PodGroupPending means the pod group has been accepted by the system, but scheduler can not allocate
// enough resources to it.
PodGroupPending PodGroupPhase = "Pending"

// PodRunning means `spec.minMember` pods of PodGroups has been in running phase.
// PodGroupRunning means `spec.minMember` pods of PodGroups has been in running phase.
PodGroupRunning PodGroupPhase = "Running"

// PodGroupUnknown means part of `spec.minMember` pods are running but the other part can not
Expand Down
77 changes: 33 additions & 44 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,24 +507,8 @@ func (sc *SchedulerCache) Evict(taskInfo *kbapi.TaskInfo, reason string) error {
}
}()

if !shadowPodGroup(job.PodGroup) {
if job.PodGroup.Version == api.PodGroupVersionV1Alpha1 {
pg, err := api.ConvertPodGroupInfoToV1alpha1(job.PodGroup)
if err != nil {
glog.Errorf("Error While converting api.PodGroup to v1alpha.PodGroup with error: %v", err)
return err
}
sc.Recorder.Eventf(pg, v1.EventTypeNormal, "Evict", reason)
} else if job.PodGroup.Version == api.PodGroupVersionV1Alpha2 {
pg, err := api.ConvertPodGroupInfoToV1alpha2(job.PodGroup)
if err != nil {
glog.Errorf("Error While converting api.PodGroup to v2alpha.PodGroup with error: %v", err)
return err
}
sc.Recorder.Eventf(pg, v1.EventTypeNormal, "Evict", reason)
} else {
return fmt.Errorf("Invalid PodGroup Version: %s", job.PodGroup.Version)
}
if err := sc.convertPodGroupInfo(job, v1.EventTypeNormal, "Evict", reason); err != nil {
return err
}

return nil
Expand Down Expand Up @@ -778,33 +762,16 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) {
baseErrorMessage = kbapi.AllNodeUnavailableMsg
}

if !shadowPodGroup(job.PodGroup) {
pgUnschedulable := job.PodGroup != nil &&
(job.PodGroup.Status.Phase == api.PodGroupUnknown ||
job.PodGroup.Status.Phase == api.PodGroupPending)
pdbUnschedulabe := job.PDB != nil && len(job.TaskStatusIndex[api.Pending]) != 0
pgUnschedulable := job.PodGroup != nil &&
(job.PodGroup.Status.Phase == api.PodGroupUnknown ||
job.PodGroup.Status.Phase == api.PodGroupPending)
pdbUnschedulabe := job.PDB != nil && len(job.TaskStatusIndex[api.Pending]) != 0

// If pending or unschedulable, record unschedulable event.
if pgUnschedulable || pdbUnschedulabe {
msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v", len(job.TaskStatusIndex[api.Pending]), len(job.Tasks), job.FitError())
if job.PodGroup.Version == api.PodGroupVersionV1Alpha1 {
podGroup, err := api.ConvertPodGroupInfoToV1alpha1(job.PodGroup)
if err != nil {
glog.Errorf("Error while converting PodGroup to v1alpha1.PodGroup with error: %v", err)
}
sc.Recorder.Eventf(podGroup, v1.EventTypeWarning,
string(v1alpha1.PodGroupUnschedulableType), msg)
}
// If pending or unschedulable, record unschedulable event.
if pgUnschedulable || pdbUnschedulabe {
msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v", len(job.TaskStatusIndex[api.Pending]), len(job.Tasks), job.FitError())

if job.PodGroup.Version == api.PodGroupVersionV1Alpha2 {
podGroup, err := api.ConvertPodGroupInfoToV1alpha2(job.PodGroup)
if err != nil {
glog.Errorf("Error while converting PodGroup to v1alpha2.PodGroup with error: %v", err)
}
sc.Recorder.Eventf(podGroup, v1.EventTypeWarning,
string(v1alpha1.PodGroupUnschedulableType), msg)
}
}
sc.convertPodGroupInfo(job, v1.EventTypeWarning, string(v1alpha1.PodGroupUnschedulableType), msg)
}

// Update podCondition for tasks Allocated and Pending before job discarded
Expand All @@ -825,7 +792,7 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) {

// UpdateJobStatus update the status of job and its tasks.
func (sc *SchedulerCache) UpdateJobStatus(job *kbapi.JobInfo, updatePG bool) (*kbapi.JobInfo, error) {
if updatePG && !shadowPodGroup(job.PodGroup) {
if updatePG {
pg, err := sc.StatusUpdater.UpdatePodGroup(job.PodGroup)
if err != nil {
return nil, err
Expand All @@ -837,3 +804,25 @@ func (sc *SchedulerCache) UpdateJobStatus(job *kbapi.JobInfo, updatePG bool) (*k

return job, nil
}

func (sc *SchedulerCache) convertPodGroupInfo(job *kbapi.JobInfo, eventtype, reason, message string) error {
if job.PodGroup.Version == api.PodGroupVersionV1Alpha1 {
pg, err := api.ConvertPodGroupInfoToV1alpha1(job.PodGroup)
if err != nil {
glog.Errorf("Error While converting api.PodGroup to v1alpha.PodGroup with error: %v", err)
return err
}
sc.Recorder.Eventf(pg, eventtype, reason, message)
} else if job.PodGroup.Version == api.PodGroupVersionV1Alpha2 {
pg, err := api.ConvertPodGroupInfoToV1alpha2(job.PodGroup)
if err != nil {
glog.Errorf("Error While converting api.PodGroup to v2alpha.PodGroup with error: %v", err)
return err
}
sc.Recorder.Eventf(pg, eventtype, reason, message)
} else {
return fmt.Errorf("Invalid PodGroup Version: %s", job.PodGroup.Version)
}

return nil
}
30 changes: 14 additions & 16 deletions pkg/scheduler/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,15 @@ func TestAddPod(t *testing.T) {
pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"),
[]metav1.OwnerReference{owner}, make(map[string]string))
pi1 := api.NewTaskInfo(pod1)
pi1.Job = "j1" // The job name is set by cache.
pg1 := createShadowPodGroup(pod1)
pi1.Job = getJobID(pg1) // The job name is set by cache.
pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("1000m", "1G"),
[]metav1.OwnerReference{owner}, make(map[string]string))
pi2 := api.NewTaskInfo(pod2)
pi2.Job = "j1" // The job name is set by cache.
pg2 := createShadowPodGroup(pod2)
pi2.Job = getJobID(pg2) // The job name is set by cache.

j1 := api.NewJobInfo(api.JobID("j1"), pi1, pi2)
pg := createShadowPodGroup(pod1)
j1.SetPodGroup(pg)
j1 := api.NewJobInfo(api.JobID("c1/podgroup-j1"), pi1, pi2)

node1 := buildNode("n1", buildResourceList("2000m", "10G"))
ni1 := api.NewNodeInfo(node1)
Expand All @@ -160,7 +160,7 @@ func TestAddPod(t *testing.T) {
"n1": ni1,
},
Jobs: map[api.JobID]*api.JobInfo{
"j1": j1,
"c1/podgroup-j1": j1,
},
},
},
Expand Down Expand Up @@ -196,23 +196,21 @@ func TestAddNode(t *testing.T) {
pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"),
[]metav1.OwnerReference{owner1}, make(map[string]string))
pi1 := api.NewTaskInfo(pod1)
pi1.Job = "j1" // The job name is set by cache.
pg1 := createShadowPodGroup(pod1)
pi1.Job = getJobID(pg1) // The job name is set by cache.

pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("1000m", "1G"),
[]metav1.OwnerReference{owner2}, make(map[string]string))
pi2 := api.NewTaskInfo(pod2)
pi2.Job = "j2" // The job name is set by cache.
pg2 := createShadowPodGroup(pod2)
pi2.Job = getJobID(pg2) // The job name is set by cache.

ni1 := api.NewNodeInfo(node1)
ni1.AddTask(pi2)

j1 := api.NewJobInfo("j1")
pg1 := createShadowPodGroup(pod1)
j1.SetPodGroup(pg1)
j1 := api.NewJobInfo("c1/podgroup-j1")

j2 := api.NewJobInfo("j2")
pg2 := createShadowPodGroup(pod2)
j2.SetPodGroup(pg2)
j2 := api.NewJobInfo("c1/podgroup-j2")

j1.AddTaskInfo(pi1)
j2.AddTaskInfo(pi2)
Expand All @@ -230,8 +228,8 @@ func TestAddNode(t *testing.T) {
"n1": ni1,
},
Jobs: map[api.JobID]*api.JobInfo{
"j1": j1,
"j2": j2,
"c1/podgroup-j1": j1,
"c1/podgroup-j2": j2,
},
},
},
Expand Down
17 changes: 4 additions & 13 deletions pkg/scheduler/cache/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,11 @@ func (sc *SchedulerCache) getOrCreateJob(pi *kbapi.TaskInfo) *kbapi.JobInfo {
return nil
}
pb := createShadowPodGroup(pi.Pod)
pi.Job = kbapi.JobID(pb.Name)

if _, found := sc.Jobs[pi.Job]; !found {
job := kbapi.NewJobInfo(pi.Job)
job.SetPodGroup(pb)
// Set default queue for shadow podgroup.
job.Queue = kbapi.QueueID(sc.defaultQueue)
pi.Job = getJobID(pb)
}

sc.Jobs[pi.Job] = job
}
} else {
if _, found := sc.Jobs[pi.Job]; !found {
sc.Jobs[pi.Job] = kbapi.NewJobInfo(pi.Job)
}
if _, found := sc.Jobs[pi.Job]; !found {
sc.Jobs[pi.Job] = kbapi.NewJobInfo(pi.Job)
}

return sc.Jobs[pi.Job]
Expand Down
31 changes: 4 additions & 27 deletions pkg/scheduler/cache/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,20 @@ limitations under the License.
package cache

import (
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"volcano.sh/volcano/pkg/apis/utils"
"volcano.sh/volcano/pkg/apis/helpers"
"volcano.sh/volcano/pkg/scheduler/api"
)

const (
shadowPodGroupKey = "volcano/shadow-pod-group"
)

func shadowPodGroup(pg *api.PodGroup) bool {
if pg == nil {
return true
}

_, found := pg.Annotations[shadowPodGroupKey]

return found
}

func createShadowPodGroup(pod *v1.Pod) *api.PodGroup {
jobID := api.JobID(utils.GetController(pod))
if len(jobID) == 0 {
jobID = api.JobID(pod.UID)
}
pgName := helpers.GeneratePodgroupName(pod)

return &api.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: pod.Namespace,
Name: string(jobID),
Annotations: map[string]string{
shadowPodGroupKey: string(jobID),
},
},
Spec: api.PodGroupSpec{
MinMember: 1,
Name: pgName,
},
}
}
Expand Down
12 changes: 6 additions & 6 deletions test/e2e/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ var _ = Describe("Predicates E2E Test", func() {
name: "na-job",
tasks: []taskSpec{
{
img: "nginx",
img: defaultNginxImage,
req: slot,
min: 1,
rep: rep,
Expand Down Expand Up @@ -130,7 +130,7 @@ var _ = Describe("Predicates E2E Test", func() {
name: "pa-job",
tasks: []taskSpec{
{
img: "nginx",
img: defaultNginxImage,
req: slot,
min: rep,
rep: rep,
Expand Down Expand Up @@ -177,7 +177,7 @@ var _ = Describe("Predicates E2E Test", func() {
name: "pa-job",
tasks: []taskSpec{
{
img: "nginx",
img: defaultNginxImage,
req: slot,
min: 2,
rep: 2,
Expand Down Expand Up @@ -222,7 +222,7 @@ var _ = Describe("Predicates E2E Test", func() {
name: "tt-job",
tasks: []taskSpec{
{
img: "nginx",
img: defaultNginxImage,
req: oneCPU,
min: 1,
rep: 1,
Expand Down Expand Up @@ -270,7 +270,7 @@ var _ = Describe("Predicates E2E Test", func() {
name: "tt-job",
tasks: []taskSpec{
{
img: "nginx",
img: defaultNginxImage,
req: oneCPU,
min: 1,
rep: 1,
Expand All @@ -283,7 +283,7 @@ var _ = Describe("Predicates E2E Test", func() {
name: "tt-job-no-toleration",
tasks: []taskSpec{
{
img: "nginx",
img: defaultNginxImage,
req: oneCPU,
min: 1,
rep: 1,
Expand Down

0 comments on commit 58284a7

Please sign in to comment.