Skip to content

Commit

Permalink
Do not create jobs until pg inqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxuzhonghu committed Dec 23, 2019
1 parent c4dbaf3 commit 672cb8c
Showing 1 changed file with 46 additions and 6 deletions.
52 changes: 46 additions & 6 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseM
return nil
}

func (cc *Controller) createJob(job *batch.Job) (*batch.Job, error) {
func (cc *Controller) initiateJob(job *batch.Job) (*batch.Job, error) {
klog.V(3).Infof("Starting to initiate Job <%s/%s>", job.Namespace, job.Name)
defer klog.V(3).Infof("Finished Job <%s/%s> initiate", job.Namespace, job.Name)

klog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name)
job, err := cc.initJobStatus(job)
if err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(batch.JobStatusError),
Expand Down Expand Up @@ -185,9 +189,46 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
return nil
}

var err error
if job, err = cc.createJob(job); err != nil {
return err
// Skip job initiation if job is already accepted
if job.Status.State.Phase == "" {
var err error
if job, err = cc.initiateJob(job); err != nil {
return err
}
}

var syncTask bool
if pg, _ := cc.pgLister.PodGroups(job.Namespace).Get(job.Name); pg != nil {
if pg.Status.Phase != "" && pg.Status.Phase != scheduling.PodGroupPending {
syncTask = true
}
}

if !syncTask {
if updateStatus != nil {
if updateStatus(&job.Status) {
job.Status.State.LastTransitionTime = metav1.Now()
}
}
newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job)
if err != nil {
klog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
if e := cc.cache.Update(newJob); e != nil {
klog.Errorf("SyncJob - Failed to update Job %v/%v in cache: %v",
newJob.Namespace, newJob.Name, e)
return e
}
return nil
}

// Skip job task sync if it is pending
if job.Status.State.Phase == batch.Pending {
klog.Infof("Job <%s/%s> is pending, skip pod sync.",
job.Namespace, job.Name)
return nil
}

var running, pending, terminating, succeeded, failed, unknown int32
Expand Down Expand Up @@ -297,7 +338,6 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
fmt.Sprintf("Error deleting pods: %+v", deletionErrs))
return fmt.Errorf("failed to delete %d pods of %d", len(deletionErrs), len(podToDelete))
}

job.Status = batch.JobStatus{
State: job.Status.State,

Expand Down Expand Up @@ -422,7 +462,7 @@ func (cc *Controller) createPVC(job *batch.Job, vcName string, volumeClaim *v1.P

func (cc *Controller) createPodGroupIfNotExist(job *batch.Job) error {
// If PodGroup does not exist, create one for Job.
if _, err := cc.pgLister.PodGroups(job.Namespace).Get(job.Name); err != nil {
if pg, err := cc.pgLister.PodGroups(job.Namespace).Get(job.Name); err != nil {
if !apierrors.IsNotFound(err) {
klog.V(3).Infof("Failed to get PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
Expand Down

0 comments on commit 672cb8c

Please sign in to comment.