diff --git a/pkg/controllers/jobflow/constant.go b/pkg/controllers/jobflow/constant.go index 7b38d616aa..61de524f92 100755 --- a/pkg/controllers/jobflow/constant.go +++ b/pkg/controllers/jobflow/constant.go @@ -21,6 +21,6 @@ const ( Volcano = "volcano" // JobFlow kind of jobFlow JobFlow = "JobFlow" - // CreatedByJobTemplate the vcjob annotation of created by jobTemplate + // CreatedByJobTemplate the vcjob annotation and label of created by jobTemplate CreatedByJobTemplate = "volcano.sh/createdByJobTemplate" ) diff --git a/pkg/controllers/jobflow/jobflow_controller_action.go b/pkg/controllers/jobflow/jobflow_controller_action.go index 737a101ecd..da55071ca3 100755 --- a/pkg/controllers/jobflow/jobflow_controller_action.go +++ b/pkg/controllers/jobflow/jobflow_controller_action.go @@ -19,19 +19,18 @@ package jobflow import ( "context" "fmt" - "strings" "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" "k8s.io/klog" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "volcano.sh/apis/pkg/apis/batch/v1alpha1" v1alpha1flow "volcano.sh/apis/pkg/apis/flow/v1alpha1" - "volcano.sh/apis/pkg/apis/helpers" "volcano.sh/apis/pkg/client/clientset/versioned/scheme" "volcano.sh/volcano/pkg/controllers/jobflow/state" ) @@ -142,23 +141,12 @@ func (jf *jobflowcontroller) createJob(jobFlow *v1alpha1flow.JobFlow, flow v1alp // getAllJobStatus Get the information of all created jobs func (jf *jobflowcontroller) getAllJobStatus(jobFlow *v1alpha1flow.JobFlow) (*v1alpha1flow.JobFlowStatus, error) { - selector := labels.NewSelector() - - allJobList, err := jf.jobLister.Jobs(jobFlow.Namespace).List(selector) + jobList, err := jf.getAllJobsCreatedByJobFlow(jobFlow) if err != nil { klog.Error(err, "get jobList error") return nil, err } - jobListFilter := make([]*v1alpha1.Job, 0) - for _, job := range allJobList { - for _, reference := range job.OwnerReferences { - if reference.Kind == JobFlow && strings.Contains(reference.APIVersion, Volcano) && reference.Name == jobFlow.Name { - jobListFilter = append(jobListFilter, job) - } - } - } - statusListJobMap := map[v1alpha1.JobPhase][]string{ v1alpha1.Pending: make([]string, 0), v1alpha1.Running: make([]string, 0), @@ -171,7 +159,7 @@ func (jf *jobflowcontroller) getAllJobStatus(jobFlow *v1alpha1flow.JobFlow) (*v1 UnKnowJobs := make([]string, 0) conditions := make(map[string]v1alpha1flow.Condition) - for _, job := range jobListFilter { + for _, job := range jobList { if _, ok := statusListJobMap[job.Status.State.Phase]; ok { statusListJobMap[job.Status.State.Phase] = append(statusListJobMap[job.Status.State.Phase], job.Name) } else { @@ -188,7 +176,7 @@ func (jf *jobflowcontroller) getAllJobStatus(jobFlow *v1alpha1flow.JobFlow) (*v1 if jobFlow.Status.JobStatusList != nil { jobStatusList = jobFlow.Status.JobStatusList } - for _, job := range jobListFilter { + for _, job := range jobList { runningHistories := getRunningHistories(jobStatusList, job) endTimeStamp := metav1.Time{} if job.Status.RunningDuration != nil { @@ -285,25 +273,32 @@ func (jf *jobflowcontroller) loadJobTemplateAndSetJob(jobFlow *v1alpha1flow.JobF } func (jf *jobflowcontroller) deleteAllJobsCreatedByJobFlow(jobFlow *v1alpha1flow.JobFlow) error { - selector := labels.NewSelector() - jobList, err := jf.jobLister.Jobs(jobFlow.Namespace).List(selector) + jobList, err := jf.getAllJobsCreatedByJobFlow(jobFlow) if err != nil { return err } for _, job := range jobList { - if len(job.OwnerReferences) > 0 { - for _, reference := range job.OwnerReferences { - if reference.Kind == helpers.JobFlowKind.Kind && reference.Name == jobFlow.Name { - err := jf.vcClient.BatchV1alpha1().Jobs(jobFlow.Namespace).Delete(context.Background(), job.Name, metav1.DeleteOptions{}) - if err != nil { - klog.Errorf("Failed to delete job of JobFlow %v/%v: %v", - jobFlow.Namespace, jobFlow.Name, err) - return err - } - } - } + err := jf.vcClient.BatchV1alpha1().Jobs(jobFlow.Namespace).Delete(context.Background(), job.Name, metav1.DeleteOptions{}) + if err != nil { + klog.Errorf("Failed to delete job of JobFlow %v/%v: %v", + jobFlow.Namespace, jobFlow.Name, err) + return err } } return nil } + +func (jf *jobflowcontroller) getAllJobsCreatedByJobFlow(jobFlow *v1alpha1flow.JobFlow) ([]*v1alpha1.Job, error) { + var flowNames []string + for _, flow := range jobFlow.Spec.Flows { + flowNames = append(flowNames, GetTemplateString(jobFlow.Namespace, flow.Name)) + } + selector := labels.NewSelector() + r, err := labels.NewRequirement(CreatedByJobTemplate, selection.In, flowNames) + if err != nil { + return nil, err + } + selector = selector.Add(*r) + return jf.jobLister.Jobs(jobFlow.Namespace).List(selector) +} diff --git a/pkg/controllers/jobflow/jobflow_controller_action_test.go b/pkg/controllers/jobflow/jobflow_controller_action_test.go index bef72a052f..628b99971d 100755 --- a/pkg/controllers/jobflow/jobflow_controller_action_test.go +++ b/pkg/controllers/jobflow/jobflow_controller_action_test.go @@ -147,6 +147,7 @@ func TestSyncJobFlowFunc(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: getJobName(tt.args.jobFlow.Name, tt.args.jobTemplateList[i].Name), Namespace: tt.args.jobFlow.Namespace, + Labels: map[string]string{CreatedByJobTemplate: GetTemplateString(tt.args.jobFlow.Namespace, tt.args.jobTemplateList[i].Name)}, }, Spec: tt.args.jobTemplateList[i].Spec, Status: v1alpha1.JobStatus{