From 1d38cb8610b6b0331a38ec28d9119ac496cc14e9 Mon Sep 17 00:00:00 2001 From: lminzhw Date: Tue, 11 Jun 2019 21:45:56 +0800 Subject: [PATCH] modify state jump logic. modify UT --- pkg/controllers/job/state/aborting.go | 4 +- pkg/controllers/job/state/inqueue.go | 25 ++++------ pkg/controllers/job/state/pending.go | 25 ++++------ pkg/controllers/job/state/running.go | 32 +++--------- test/e2e/job_error_handling.go | 4 +- test/e2e/mpi.go | 2 +- test/e2e/util.go | 72 +++++++++++++++++++++++++-- 7 files changed, 99 insertions(+), 65 deletions(-) diff --git a/pkg/controllers/job/state/aborting.go b/pkg/controllers/job/state/aborting.go index fbf2b5e7648..8c8fdeedd0a 100644 --- a/pkg/controllers/job/state/aborting.go +++ b/pkg/controllers/job/state/aborting.go @@ -30,10 +30,10 @@ type abortingState struct { func (ps *abortingState) Execute(action vkv1.Action) error { switch action { case vkv1.ResumeJobAction: - // Already in Restarting phase, just sync it return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { + status.State.Phase = vkv1.Restarting status.RetryCount++ - return false + return true }) default: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { diff --git a/pkg/controllers/job/state/inqueue.go b/pkg/controllers/job/state/inqueue.go index 1cd55fcba15..b8d6d5805cc 100644 --- a/pkg/controllers/job/state/inqueue.go +++ b/pkg/controllers/job/state/inqueue.go @@ -29,31 +29,24 @@ func (ps *inqueueState) Execute(action vkv1.Action) error { switch action { case vkv1.RestartJobAction: return KillJob(ps.job, PodRetainPhaseNone, func(status *vkv1.JobStatus) bool { - phase := vkv1.Pending - if status.Terminating != 0 { - phase = vkv1.Restarting - status.RetryCount++ - } - status.State.Phase = phase + status.State.Phase = vkv1.Restarting + status.RetryCount++ return true }) case vkv1.AbortJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { - phase := vkv1.Pending - if status.Terminating != 0 { - phase = vkv1.Aborting - } - status.State.Phase = phase + status.State.Phase = vkv1.Aborting return true }) case vkv1.CompleteJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { - phase := vkv1.Completed - if status.Terminating != 0 { - phase = vkv1.Completing - } - status.State.Phase = phase + status.State.Phase = vkv1.Completing + return true + }) + case vkv1.TerminateJobAction: + return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { + status.State.Phase = vkv1.Terminating return true }) default: diff --git a/pkg/controllers/job/state/pending.go b/pkg/controllers/job/state/pending.go index 38fa2e08a43..b3c63396fd9 100644 --- a/pkg/controllers/job/state/pending.go +++ b/pkg/controllers/job/state/pending.go @@ -29,31 +29,24 @@ func (ps *pendingState) Execute(action vkv1.Action) error { switch action { case vkv1.RestartJobAction: return KillJob(ps.job, PodRetainPhaseNone, func(status *vkv1.JobStatus) bool { - phase := vkv1.Pending - if status.Terminating != 0 { - phase = vkv1.Restarting - status.RetryCount++ - } - status.State.Phase = phase + status.RetryCount++ + status.State.Phase = vkv1.Restarting return true }) case vkv1.AbortJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { - phase := vkv1.Pending - if status.Terminating != 0 { - phase = vkv1.Aborting - } - status.State.Phase = phase + status.State.Phase = vkv1.Aborting return true }) case vkv1.CompleteJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { - phase := vkv1.Completed - if status.Terminating != 0 { - phase = vkv1.Completing - } - status.State.Phase = phase + status.State.Phase = vkv1.Completing + return true + }) + case vkv1.TerminateJobAction: + return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { + status.State.Phase = vkv1.Terminating return true }) case vkv1.EnqueueAction: diff --git a/pkg/controllers/job/state/running.go b/pkg/controllers/job/state/running.go index e25b3a77af7..bdcf18090fb 100644 --- a/pkg/controllers/job/state/running.go +++ b/pkg/controllers/job/state/running.go @@ -29,39 +29,23 @@ func (ps *runningState) Execute(action vkv1.Action) error { switch action { case vkv1.RestartJobAction: return KillJob(ps.job, PodRetainPhaseNone, func(status *vkv1.JobStatus) bool { - if status.Terminating != 0 { - status.State.Phase = vkv1.Restarting - status.RetryCount++ - return true - } - return false + status.State.Phase = vkv1.Restarting + status.RetryCount++ + return true }) case vkv1.AbortJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { - if status.Terminating != 0 { - status.State.Phase = vkv1.Aborting - return true - } - - return false + status.State.Phase = vkv1.Aborting + return true }) case vkv1.TerminateJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { - if status.Terminating != 0 { - status.State.Phase = vkv1.Terminating - return true - } - - return false + status.State.Phase = vkv1.Terminating + return true }) case vkv1.CompleteJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { - phase := vkv1.Completed - if status.Terminating != 0 { - phase = vkv1.Completing - } - - status.State.Phase = phase + status.State.Phase = vkv1.Completing return true }) default: diff --git a/test/e2e/job_error_handling.go b/test/e2e/job_error_handling.go index 1a875cb851a..73287a9e71d 100644 --- a/test/e2e/job_error_handling.go +++ b/test/e2e/job_error_handling.go @@ -436,7 +436,7 @@ var _ = Describe("Job Error Handling", func() { By("create job") job := createJob(context, &jobSpec{ - name: "any-restart-job", + name: "any-complete-job", policies: []vkv1.LifecyclePolicy{ { Action: vkv1.CompleteJobAction, @@ -463,7 +463,7 @@ var _ = Describe("Job Error Handling", func() { By("job scheduled, then task 'completed_task' finished and job finally complete") // job phase: pending -> running -> completing -> completed - err := waitJobStates(context, job, []vkv1.JobPhase{ + err := waitJobPhases(context, job, []vkv1.JobPhase{ vkv1.Pending, vkv1.Inqueue, vkv1.Running, vkv1.Completing, vkv1.Completed}) Expect(err).NotTo(HaveOccurred()) diff --git a/test/e2e/mpi.go b/test/e2e/mpi.go index 0fc57b9953a..0c4be548ef6 100644 --- a/test/e2e/mpi.go +++ b/test/e2e/mpi.go @@ -70,7 +70,7 @@ mpiexec --allow-run-as-root --hostfile /etc/volcano/mpiworker.host -np 2 mpi_hel job := createJob(context, spec) - err := waitJobStates(context, job, []vkv1.JobPhase{ + err := waitJobPhases(context, job, []vkv1.JobPhase{ vkv1.Pending, vkv1.Running, vkv1.Completing, vkv1.Completed}) Expect(err).NotTo(HaveOccurred()) }) diff --git a/test/e2e/util.go b/test/e2e/util.go index 7fc26e2a55a..5a55cfe02d5 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -462,12 +462,76 @@ func jobEvicted(ctx *context, job *vkv1.Job, time time.Time) wait.ConditionFunc } func waitJobPhases(ctx *context, job *vkv1.Job, phases []vkv1.JobPhase) error { - for _, phase := range phases { - err := waitJobPhase(ctx, job, phase) - if err != nil { - return err + w, err := ctx.vkclient.BatchV1alpha1().Jobs(job.Namespace).Watch(metav1.ListOptions{}) + if err != nil { + return err + } + defer w.Stop() + + var additionalError error + total := int32(0) + for _, task := range job.Spec.Tasks { + total += task.Replicas + } + + ch := w.ResultChan() + index := 0 + timeout := time.After(oneMinute) + + for index < len(phases) { + select { + case event, open := <-ch: + if !open { + return fmt.Errorf("watch channel should be always open") + } + + newJob := event.Object.(*vkv1.Job) + phase := phases[index] + if newJob.Name != job.Name || newJob.Namespace != job.Namespace { + continue + } + + if newJob.Status.State.Phase != phase { + additionalError = fmt.Errorf( + "expected job '%s' to be in status %s, actual get %s", + job.Name, phase, newJob.Status.State.Phase) + continue + } + var flag = false + switch phase { + case vkv1.Pending: + flag = (newJob.Status.Pending+newJob.Status.Succeeded+ + newJob.Status.Failed+newJob.Status.Running) == 0 || + (total-newJob.Status.Terminating >= newJob.Status.MinAvailable) + case vkv1.Terminating, vkv1.Aborting, vkv1.Restarting, vkv1.Completing: + flag = newJob.Status.Terminating > 0 + case vkv1.Terminated, vkv1.Aborted, vkv1.Completed: + flag = newJob.Status.Pending == 0 && + newJob.Status.Running == 0 && + newJob.Status.Terminating == 0 + case vkv1.Running: + flag = newJob.Status.Running >= newJob.Spec.MinAvailable + case vkv1.Inqueue: + flag = newJob.Status.Pending > 0 + default: + return fmt.Errorf("unknown phase %s", phase) + } + + if !flag { + additionalError = fmt.Errorf( + "expected job '%s' to be in status %s, actual detail status %s", + job.Name, phase, getJobStatusDetail(newJob)) + continue + } + + index++ + timeout = time.After(oneMinute) + + case <-timeout: + return fmt.Errorf("[Wait time out]: %s", additionalError) } } + return nil }