Skip to content

Commit

Permalink
remove shadowPodgroup in scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyuqing4 committed Aug 5, 2019
1 parent 1a856a1 commit 0e73ac3
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 241 deletions.
4 changes: 2 additions & 2 deletions pkg/scheduler/api/pod_group_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ const (

//PodGroupVersionV1Alpha2 represents PodGroupVersion of V1Alpha2
PodGroupVersionV1Alpha2 string = "v1alpha2"
// PodPending means the pod group has been accepted by the system, but scheduler can not allocate
// PodGroupPending means the pod group has been accepted by the system, but scheduler can not allocate
// enough resources to it.
PodGroupPending PodGroupPhase = "Pending"

// PodRunning means `spec.minMember` pods of PodGroups has been in running phase.
// PodGroupRunning means `spec.minMember` pods of PodGroups has been in running phase.
PodGroupRunning PodGroupPhase = "Running"

// PodGroupUnknown means part of `spec.minMember` pods are running but the other part can not
Expand Down
75 changes: 30 additions & 45 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/volumebinder"

"volcano.sh/volcano/cmd/scheduler/app/options"
"volcano.sh/volcano/pkg/apis/scheduling/v1alpha1"
"volcano.sh/volcano/pkg/apis/scheduling/v1alpha2"
kbver "volcano.sh/volcano/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/client/clientset/versioned/scheme"
Expand Down Expand Up @@ -507,24 +506,9 @@ func (sc *SchedulerCache) Evict(taskInfo *kbapi.TaskInfo, reason string) error {
}
}()

if !shadowPodGroup(job.PodGroup) {
if job.PodGroup.Version == api.PodGroupVersionV1Alpha1 {
pg, err := api.ConvertPodGroupInfoToV1alpha1(job.PodGroup)
if err != nil {
glog.Errorf("Error While converting api.PodGroup to v1alpha.PodGroup with error: %v", err)
return err
}
sc.Recorder.Eventf(pg, v1.EventTypeNormal, "Evict", reason)
} else if job.PodGroup.Version == api.PodGroupVersionV1Alpha2 {
pg, err := api.ConvertPodGroupInfoToV1alpha2(job.PodGroup)
if err != nil {
glog.Errorf("Error While converting api.PodGroup to v2alpha.PodGroup with error: %v", err)
return err
}
sc.Recorder.Eventf(pg, v1.EventTypeNormal, "Evict", reason)
} else {
return fmt.Errorf("Invalid PodGroup Version: %s", job.PodGroup.Version)
}
if err := sc.convertPodGroupInfo(job); err != nil {
glog.Errorf("Error While converting api.PodGroup %v", err)
return err
}

return nil
Expand Down Expand Up @@ -778,32 +762,15 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) {
baseErrorMessage = kbapi.AllNodeUnavailableMsg
}

if !shadowPodGroup(job.PodGroup) {
pgUnschedulable := job.PodGroup != nil &&
(job.PodGroup.Status.Phase == api.PodGroupUnknown ||
job.PodGroup.Status.Phase == api.PodGroupPending)
pdbUnschedulabe := job.PDB != nil && len(job.TaskStatusIndex[api.Pending]) != 0
pgUnschedulable := job.PodGroup != nil &&
(job.PodGroup.Status.Phase == api.PodGroupUnknown ||
job.PodGroup.Status.Phase == api.PodGroupPending)
pdbUnschedulabe := job.PDB != nil && len(job.TaskStatusIndex[api.Pending]) != 0

// If pending or unschedulable, record unschedulable event.
if pgUnschedulable || pdbUnschedulabe {
msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v", len(job.TaskStatusIndex[api.Pending]), len(job.Tasks), job.FitError())
if job.PodGroup.Version == api.PodGroupVersionV1Alpha1 {
podGroup, err := api.ConvertPodGroupInfoToV1alpha1(job.PodGroup)
if err != nil {
glog.Errorf("Error while converting PodGroup to v1alpha1.PodGroup with error: %v", err)
}
sc.Recorder.Eventf(podGroup, v1.EventTypeWarning,
string(v1alpha1.PodGroupUnschedulableType), msg)
}

if job.PodGroup.Version == api.PodGroupVersionV1Alpha2 {
podGroup, err := api.ConvertPodGroupInfoToV1alpha2(job.PodGroup)
if err != nil {
glog.Errorf("Error while converting PodGroup to v1alpha2.PodGroup with error: %v", err)
}
sc.Recorder.Eventf(podGroup, v1.EventTypeWarning,
string(v1alpha1.PodGroupUnschedulableType), msg)
}
// If pending or unschedulable, record unschedulable event.
if pgUnschedulable || pdbUnschedulabe {
if err := sc.convertPodGroupInfo(job); err != nil {
glog.Errorf("Error While converting api.PodGroup %v", err)
}
}

Expand All @@ -825,7 +792,7 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) {

// UpdateJobStatus update the status of job and its tasks.
func (sc *SchedulerCache) UpdateJobStatus(job *kbapi.JobInfo, updatePG bool) (*kbapi.JobInfo, error) {
if updatePG && !shadowPodGroup(job.PodGroup) {
if updatePG {
pg, err := sc.StatusUpdater.UpdatePodGroup(job.PodGroup)
if err != nil {
return nil, err
Expand All @@ -837,3 +804,21 @@ func (sc *SchedulerCache) UpdateJobStatus(job *kbapi.JobInfo, updatePG bool) (*k

return job, nil
}

func (sc *SchedulerCache) convertPodGroupInfo(job *kbapi.JobInfo) error {
if job.PodGroup.Version == api.PodGroupVersionV1Alpha1 {
_, err := api.ConvertPodGroupInfoToV1alpha1(job.PodGroup)
if err != nil {
return fmt.Errorf("to v1alpha.PodGroup with error: %v", err)
}
} else if job.PodGroup.Version == api.PodGroupVersionV1Alpha2 {
_, err := api.ConvertPodGroupInfoToV1alpha2(job.PodGroup)
if err != nil {
return fmt.Errorf("to v2alpha.PodGroup with error: %v", err)
}
} else {
return fmt.Errorf("invalid PodGroup Version: %s", job.PodGroup.Version)
}

return nil
}
135 changes: 1 addition & 134 deletions pkg/scheduler/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,139 +125,6 @@ func buildOwnerReference(owner string) metav1.OwnerReference {
}
}

func TestAddPod(t *testing.T) {

owner := buildOwnerReference("j1")

// case 1:
pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"),
[]metav1.OwnerReference{owner}, make(map[string]string))
pi1 := api.NewTaskInfo(pod1)
pi1.Job = "j1" // The job name is set by cache.
pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("1000m", "1G"),
[]metav1.OwnerReference{owner}, make(map[string]string))
pi2 := api.NewTaskInfo(pod2)
pi2.Job = "j1" // The job name is set by cache.

j1 := api.NewJobInfo(api.JobID("j1"), pi1, pi2)
pg := createShadowPodGroup(pod1)
j1.SetPodGroup(pg)

node1 := buildNode("n1", buildResourceList("2000m", "10G"))
ni1 := api.NewNodeInfo(node1)
ni1.AddTask(pi2)

tests := []struct {
pods []*v1.Pod
nodes []*v1.Node
expected *SchedulerCache
}{
{
pods: []*v1.Pod{pod1, pod2},
nodes: []*v1.Node{node1},
expected: &SchedulerCache{
Nodes: map[string]*api.NodeInfo{
"n1": ni1,
},
Jobs: map[api.JobID]*api.JobInfo{
"j1": j1,
},
},
},
}

for i, test := range tests {
cache := &SchedulerCache{
Jobs: make(map[api.JobID]*api.JobInfo),
Nodes: make(map[string]*api.NodeInfo),
}

for _, n := range test.nodes {
cache.AddNode(n)
}

for _, p := range test.pods {
cache.AddPod(p)
}

if !cacheEqual(cache, test.expected) {
t.Errorf("case %d: \n expected %v, \n got %v \n",
i, test.expected, cache)
}
}
}

func TestAddNode(t *testing.T) {
owner1 := buildOwnerReference("j1")
owner2 := buildOwnerReference("j2")

// case 1
node1 := buildNode("n1", buildResourceList("2000m", "10G"))
pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"),
[]metav1.OwnerReference{owner1}, make(map[string]string))
pi1 := api.NewTaskInfo(pod1)
pi1.Job = "j1" // The job name is set by cache.

pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("1000m", "1G"),
[]metav1.OwnerReference{owner2}, make(map[string]string))
pi2 := api.NewTaskInfo(pod2)
pi2.Job = "j2" // The job name is set by cache.

ni1 := api.NewNodeInfo(node1)
ni1.AddTask(pi2)

j1 := api.NewJobInfo("j1")
pg1 := createShadowPodGroup(pod1)
j1.SetPodGroup(pg1)

j2 := api.NewJobInfo("j2")
pg2 := createShadowPodGroup(pod2)
j2.SetPodGroup(pg2)

j1.AddTaskInfo(pi1)
j2.AddTaskInfo(pi2)

tests := []struct {
pods []*v1.Pod
nodes []*v1.Node
expected *SchedulerCache
}{
{
pods: []*v1.Pod{pod1, pod2},
nodes: []*v1.Node{node1},
expected: &SchedulerCache{
Nodes: map[string]*api.NodeInfo{
"n1": ni1,
},
Jobs: map[api.JobID]*api.JobInfo{
"j1": j1,
"j2": j2,
},
},
},
}

for i, test := range tests {
cache := &SchedulerCache{
Nodes: make(map[string]*api.NodeInfo),
Jobs: make(map[api.JobID]*api.JobInfo),
}

for _, p := range test.pods {
cache.AddPod(p)
}

for _, n := range test.nodes {
cache.AddNode(n)
}

if !cacheEqual(cache, test.expected) {
t.Errorf("case %d: \n expected %v, \n got %v \n",
i, test.expected, cache)
}
}
}

func TestGetOrCreateJob(t *testing.T) {
owner1 := buildOwnerReference("j1")
owner2 := buildOwnerReference("j2")
Expand Down Expand Up @@ -292,7 +159,7 @@ func TestGetOrCreateJob(t *testing.T) {
},
{
task: pi2,
gotJob: true,
gotJob: false,
},
{
task: pi3,
Expand Down
19 changes: 4 additions & 15 deletions pkg/scheduler/cache/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,12 @@ func (sc *SchedulerCache) getOrCreateJob(pi *kbapi.TaskInfo) *kbapi.JobInfo {
if pi.Pod.Spec.SchedulerName != sc.schedulerName {
glog.V(4).Infof("Pod %s/%s will not not scheduled by %s, skip creating PodGroup and Job for it",
pi.Pod.Namespace, pi.Pod.Name, sc.schedulerName)
return nil
}
pb := createShadowPodGroup(pi.Pod)
pi.Job = kbapi.JobID(pb.Name)

if _, found := sc.Jobs[pi.Job]; !found {
job := kbapi.NewJobInfo(pi.Job)
job.SetPodGroup(pb)
// Set default queue for shadow podgroup.
job.Queue = kbapi.QueueID(sc.defaultQueue)
return nil
}

sc.Jobs[pi.Job] = job
}
} else {
if _, found := sc.Jobs[pi.Job]; !found {
sc.Jobs[pi.Job] = kbapi.NewJobInfo(pi.Job)
}
if _, found := sc.Jobs[pi.Job]; !found {
sc.Jobs[pi.Job] = kbapi.NewJobInfo(pi.Job)
}

return sc.Jobs[pi.Job]
Expand Down
40 changes: 1 addition & 39 deletions pkg/scheduler/cache/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,9 @@ limitations under the License.
package cache

import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"volcano.sh/volcano/pkg/apis/utils"
"volcano.sh/volcano/pkg/scheduler/api"
)

const (
shadowPodGroupKey = "volcano/shadow-pod-group"
"k8s.io/api/core/v1"
)

func shadowPodGroup(pg *api.PodGroup) bool {
if pg == nil {
return true
}

_, found := pg.Annotations[shadowPodGroupKey]

return found
}

func createShadowPodGroup(pod *v1.Pod) *api.PodGroup {
jobID := api.JobID(utils.GetController(pod))
if len(jobID) == 0 {
jobID = api.JobID(pod.UID)
}

return &api.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: pod.Namespace,
Name: string(jobID),
Annotations: map[string]string{
shadowPodGroupKey: string(jobID),
},
},
Spec: api.PodGroupSpec{
MinMember: 1,
},
}
}

// responsibleForPod returns true if the pod has asked to be scheduled by the given scheduler.
func responsibleForPod(pod *v1.Pod, schedulerName string) bool {
return schedulerName == pod.Spec.SchedulerName
Expand Down
Loading

0 comments on commit 0e73ac3

Please sign in to comment.