Skip to content

Commit

Permalink
Merge pull request #3090 from xiaoanyunfei/improve/list_jobflow
Browse files Browse the repository at this point in the history
improve list job of jobflow
  • Loading branch information
volcano-sh-bot authored Dec 18, 2023
2 parents 7e23a7c + 6b6eb7e commit a2e6dab
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 30 deletions.
2 changes: 1 addition & 1 deletion pkg/controllers/jobflow/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ const (
Volcano = "volcano"
// JobFlow kind of jobFlow
JobFlow = "JobFlow"
// CreatedByJobTemplate the vcjob annotation of created by jobTemplate
// CreatedByJobTemplate the vcjob annotation and label of created by jobTemplate
CreatedByJobTemplate = "volcano.sh/createdByJobTemplate"
)
53 changes: 24 additions & 29 deletions pkg/controllers/jobflow/jobflow_controller_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@ package jobflow
import (
"context"
"fmt"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/klog"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"volcano.sh/apis/pkg/apis/batch/v1alpha1"
v1alpha1flow "volcano.sh/apis/pkg/apis/flow/v1alpha1"
"volcano.sh/apis/pkg/apis/helpers"
"volcano.sh/apis/pkg/client/clientset/versioned/scheme"
"volcano.sh/volcano/pkg/controllers/jobflow/state"
)
Expand Down Expand Up @@ -142,23 +141,12 @@ func (jf *jobflowcontroller) createJob(jobFlow *v1alpha1flow.JobFlow, flow v1alp

// getAllJobStatus Get the information of all created jobs
func (jf *jobflowcontroller) getAllJobStatus(jobFlow *v1alpha1flow.JobFlow) (*v1alpha1flow.JobFlowStatus, error) {
selector := labels.NewSelector()

allJobList, err := jf.jobLister.Jobs(jobFlow.Namespace).List(selector)
jobList, err := jf.getAllJobsCreatedByJobFlow(jobFlow)
if err != nil {
klog.Error(err, "get jobList error")
return nil, err
}

jobListFilter := make([]*v1alpha1.Job, 0)
for _, job := range allJobList {
for _, reference := range job.OwnerReferences {
if reference.Kind == JobFlow && strings.Contains(reference.APIVersion, Volcano) && reference.Name == jobFlow.Name {
jobListFilter = append(jobListFilter, job)
}
}
}

statusListJobMap := map[v1alpha1.JobPhase][]string{
v1alpha1.Pending: make([]string, 0),
v1alpha1.Running: make([]string, 0),
Expand All @@ -171,7 +159,7 @@ func (jf *jobflowcontroller) getAllJobStatus(jobFlow *v1alpha1flow.JobFlow) (*v1

UnKnowJobs := make([]string, 0)
conditions := make(map[string]v1alpha1flow.Condition)
for _, job := range jobListFilter {
for _, job := range jobList {
if _, ok := statusListJobMap[job.Status.State.Phase]; ok {
statusListJobMap[job.Status.State.Phase] = append(statusListJobMap[job.Status.State.Phase], job.Name)
} else {
Expand All @@ -188,7 +176,7 @@ func (jf *jobflowcontroller) getAllJobStatus(jobFlow *v1alpha1flow.JobFlow) (*v1
if jobFlow.Status.JobStatusList != nil {
jobStatusList = jobFlow.Status.JobStatusList
}
for _, job := range jobListFilter {
for _, job := range jobList {
runningHistories := getRunningHistories(jobStatusList, job)
endTimeStamp := metav1.Time{}
if job.Status.RunningDuration != nil {
Expand Down Expand Up @@ -285,25 +273,32 @@ func (jf *jobflowcontroller) loadJobTemplateAndSetJob(jobFlow *v1alpha1flow.JobF
}

func (jf *jobflowcontroller) deleteAllJobsCreatedByJobFlow(jobFlow *v1alpha1flow.JobFlow) error {
selector := labels.NewSelector()
jobList, err := jf.jobLister.Jobs(jobFlow.Namespace).List(selector)
jobList, err := jf.getAllJobsCreatedByJobFlow(jobFlow)
if err != nil {
return err
}

for _, job := range jobList {
if len(job.OwnerReferences) > 0 {
for _, reference := range job.OwnerReferences {
if reference.Kind == helpers.JobFlowKind.Kind && reference.Name == jobFlow.Name {
err := jf.vcClient.BatchV1alpha1().Jobs(jobFlow.Namespace).Delete(context.Background(), job.Name, metav1.DeleteOptions{})
if err != nil {
klog.Errorf("Failed to delete job of JobFlow %v/%v: %v",
jobFlow.Namespace, jobFlow.Name, err)
return err
}
}
}
err := jf.vcClient.BatchV1alpha1().Jobs(jobFlow.Namespace).Delete(context.Background(), job.Name, metav1.DeleteOptions{})
if err != nil {
klog.Errorf("Failed to delete job of JobFlow %v/%v: %v",
jobFlow.Namespace, jobFlow.Name, err)
return err
}
}
return nil
}

func (jf *jobflowcontroller) getAllJobsCreatedByJobFlow(jobFlow *v1alpha1flow.JobFlow) ([]*v1alpha1.Job, error) {
var flowNames []string
for _, flow := range jobFlow.Spec.Flows {
flowNames = append(flowNames, GetTemplateString(jobFlow.Namespace, flow.Name))
}
selector := labels.NewSelector()
r, err := labels.NewRequirement(CreatedByJobTemplate, selection.In, flowNames)
if err != nil {
return nil, err
}
selector = selector.Add(*r)
return jf.jobLister.Jobs(jobFlow.Namespace).List(selector)
}
1 change: 1 addition & 0 deletions pkg/controllers/jobflow/jobflow_controller_action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func TestSyncJobFlowFunc(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: getJobName(tt.args.jobFlow.Name, tt.args.jobTemplateList[i].Name),
Namespace: tt.args.jobFlow.Namespace,
Labels: map[string]string{CreatedByJobTemplate: GetTemplateString(tt.args.jobFlow.Namespace, tt.args.jobTemplateList[i].Name)},
},
Spec: tt.args.jobTemplateList[i].Spec,
Status: v1alpha1.JobStatus{
Expand Down

0 comments on commit a2e6dab

Please sign in to comment.