diff --git a/cmd/controllers/app/server.go b/cmd/controllers/app/server.go index 8d0e93a7363..57e85b1cf6e 100644 --- a/cmd/controllers/app/server.go +++ b/cmd/controllers/app/server.go @@ -40,6 +40,7 @@ import ( "volcano.sh/volcano/cmd/controllers/app/options" vkclient "volcano.sh/volcano/pkg/client/clientset/versioned" + "volcano.sh/volcano/pkg/controllers/garbagecollector" "volcano.sh/volcano/pkg/controllers/job" "volcano.sh/volcano/pkg/controllers/queue" ) @@ -83,10 +84,12 @@ func Run(opt *options.ServerOption) error { jobController := job.NewJobController(kubeClient, kbClient, vkClient) queueController := queue.NewQueueController(kubeClient, kbClient) + garbageCollector := garbagecollector.New(vkClient) run := func(ctx context.Context) { go jobController.Run(ctx.Done()) go queueController.Run(ctx.Done()) + go garbageCollector.Run(ctx.Done()) <-ctx.Done() } diff --git a/pkg/apis/batch/v1alpha1/job.go b/pkg/apis/batch/v1alpha1/job.go index a9406dd3656..2c922647517 100644 --- a/pkg/apis/batch/v1alpha1/job.go +++ b/pkg/apis/batch/v1alpha1/job.go @@ -70,6 +70,15 @@ type JobSpec struct { // Defaults to 3. // +optional MaxRetry int32 `json:"maxRetry,omitempty" protobuf:"bytes,8,opt,name=maxRetry"` + + // ttlSecondsAfterFinished limits the lifetime of a Job that has finished + // execution (either Completed or Failed). If this field is set, + // ttlSecondsAfterFinished after the Job finishes, it is eligible to be + // automatically deleted. If this field is unset, + // the Job won't be automatically deleted. If this field is set to zero, + // the Job becomes eligible to be deleted immediately after it finishes. + // +optional + TTLSecondsAfterFinished *int32 `json:"ttlSecondsAfterFinished,omitempty" protobuf:"varint,9,opt,name=ttlSecondsAfterFinished"` } // VolumeSpec defines the specification of Volume, e.g. PVC @@ -225,6 +234,10 @@ type JobState struct { // Human-readable message indicating details about last transition. // +optional Message string `json:"message,omitempty" protobuf:"bytes,3,opt,name=message"` + + // Last time the condition transit from one phase to another. + // +optional + LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty" protobuf:"bytes,4,opt,name=lastTransitionTime"` } // JobStatus represents the current status of a Job diff --git a/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go index ab368e414e7..7fd00a4301a 100644 --- a/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go @@ -126,6 +126,11 @@ func (in *JobSpec) DeepCopyInto(out *JobSpec) { (*out)[key] = outVal } } + if in.TTLSecondsAfterFinished != nil { + in, out := &in.TTLSecondsAfterFinished, &out.TTLSecondsAfterFinished + *out = new(int32) + **out = **in + } return } @@ -142,6 +147,7 @@ func (in *JobSpec) DeepCopy() *JobSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *JobState) DeepCopyInto(out *JobState) { *out = *in + in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime) return } @@ -158,7 +164,7 @@ func (in *JobState) DeepCopy() *JobState { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *JobStatus) DeepCopyInto(out *JobStatus) { *out = *in - out.State = in.State + in.State.DeepCopyInto(&out.State) if in.ControlledResources != nil { in, out := &in.ControlledResources, &out.ControlledResources *out = make(map[string]string, len(*in)) diff --git a/pkg/controllers/garbagecollector/garbagecollector.go b/pkg/controllers/garbagecollector/garbagecollector.go new file mode 100644 index 00000000000..25d3dc242f6 --- /dev/null +++ b/pkg/controllers/garbagecollector/garbagecollector.go @@ -0,0 +1,283 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package garbagecollector + +import ( + "fmt" + "time" + + "github.com/golang/glog" + + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/pkg/controller" + + "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + vkver "volcano.sh/volcano/pkg/client/clientset/versioned" + vkinfoext "volcano.sh/volcano/pkg/client/informers/externalversions" + vkbatchinfo "volcano.sh/volcano/pkg/client/informers/externalversions/batch/v1alpha1" + vkbatchlister "volcano.sh/volcano/pkg/client/listers/batch/v1alpha1" +) + +// GarbageCollector runs reflectors to watch for changes of managed API +// objects. Currently it only watches Jobs. Triggered by Job creation +// and updates, it enqueues Jobs that have non-nil `.spec.ttlSecondsAfterFinished` +// to the `queue`. The GarbageCollector has workers who consume `queue`, check whether +// the Job TTL has expired or not; if the Job TTL hasn't expired, it will add the +// Job to the queue after the TTL is expected to expire; if the TTL has expired, the +// worker will send requests to the API server to delete the Jobs accordingly. +// This is implemented outside of Job controller for separation of concerns, and +// because it will be extended to handle other finishable resource types. +type GarbageCollector struct { + vkClient vkver.Interface + + jobInformer vkbatchinfo.JobInformer + + // A store of jobs + jobLister vkbatchlister.JobLister + jobSynced func() bool + + // queues that need to be updated. + queue workqueue.RateLimitingInterface +} + +// New creates an instance of GarbageCollector +func New(vkClient vkver.Interface) *GarbageCollector { + jobInformer := vkinfoext.NewSharedInformerFactory(vkClient, 0).Batch().V1alpha1().Jobs() + + gb := &GarbageCollector{ + vkClient: vkClient, + jobInformer: jobInformer, + jobLister: jobInformer.Lister(), + jobSynced: jobInformer.Informer().HasSynced, + queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + } + jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: gb.addJob, + UpdateFunc: gb.updateJob, + }) + return gb +} + +// Run starts the worker to clean up Jobs. +func (gb *GarbageCollector) Run(stopCh <-chan struct{}) { + defer gb.queue.ShutDown() + + glog.Infof("Starting garbage collector") + defer glog.Infof("Shutting down garbage collector") + + go gb.jobInformer.Informer().Run(stopCh) + if !controller.WaitForCacheSync("garbage collector", stopCh, gb.jobSynced) { + return + } + + go wait.Until(gb.worker, time.Second, stopCh) + + <-stopCh +} + +func (gb *GarbageCollector) addJob(obj interface{}) { + job := obj.(*v1alpha1.Job) + glog.V(4).Infof("Adding job %s/%s", job.Namespace, job.Name) + + if job.DeletionTimestamp == nil && needsCleanup(job) { + gb.enqueue(job) + } +} + +func (gb *GarbageCollector) updateJob(old, cur interface{}) { + job := cur.(*v1alpha1.Job) + glog.V(4).Infof("Updating job %s/%s", job.Namespace, job.Name) + + if job.DeletionTimestamp == nil && needsCleanup(job) { + gb.enqueue(job) + } +} + +func (gb *GarbageCollector) enqueue(job *v1alpha1.Job) { + glog.V(4).Infof("Add job %s/%s to cleanup", job.Namespace, job.Name) + key, err := controller.KeyFunc(job) + if err != nil { + glog.Errorf("couldn't get key for object %#v: %v", job, err) + return + } + + gb.queue.Add(key) +} + +func (gb *GarbageCollector) enqueueAfter(job *v1alpha1.Job, after time.Duration) { + key, err := controller.KeyFunc(job) + if err != nil { + glog.Errorf("couldn't get key for object %#v: %v", job, err) + return + } + + gb.queue.AddAfter(key, after) +} + +func (gb *GarbageCollector) worker() { + for gb.processNextWorkItem() { + } +} + +func (gb *GarbageCollector) processNextWorkItem() bool { + key, quit := gb.queue.Get() + if quit { + return false + } + defer gb.queue.Done(key) + + err := gb.processJob(key.(string)) + gb.handleErr(err, key) + + return true +} + +func (gb *GarbageCollector) handleErr(err error, key interface{}) { + if err == nil { + gb.queue.Forget(key) + return + } + + glog.Errorf("error cleaning up Job %v, will retry: %v", key, err) + gb.queue.AddRateLimited(key) +} + +// processJob will check the Job's state and TTL and delete the Job when it +// finishes and its TTL after finished has expired. If the Job hasn't finished or +// its TTL hasn't expired, it will be added to the queue after the TTL is expected +// to expire. +// This function is not meant to be invoked concurrently with the same key. +func (gb *GarbageCollector) processJob(key string) error { + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + + glog.V(4).Infof("Checking if Job %s/%s is ready for cleanup", namespace, name) + // Ignore the Jobs that are already deleted or being deleted, or the ones that don't need clean up. + job, err := gb.jobLister.Jobs(namespace).Get(name) + if errors.IsNotFound(err) { + return nil + } + if err != nil { + return err + } + + if expired, err := gb.processTTL(job); err != nil { + return err + } else if !expired { + return nil + } + + // The Job's TTL is assumed to have expired, but the Job TTL might be stale. + // Before deleting the Job, do a final sanity check. + // If TTL is modified before we do this check, we cannot be sure if the TTL truly expires. + // The latest Job may have a different UID, but it's fine because the checks will be run again. + fresh, err := gb.vkClient.BatchV1alpha1().Jobs(namespace).Get(name, metav1.GetOptions{}) + if errors.IsNotFound(err) { + return nil + } + if err != nil { + return err + } + // Use the latest Job TTL to see if the TTL truly expires. + if expired, err := gb.processTTL(fresh); err != nil { + return err + } else if !expired { + return nil + } + // Cascade deletes the Jobs if TTL truly expires. + policy := metav1.DeletePropagationForeground + options := &metav1.DeleteOptions{ + PropagationPolicy: &policy, + Preconditions: &metav1.Preconditions{UID: &fresh.UID}, + } + glog.V(4).Infof("Cleaning up Job %s/%s", namespace, name) + return gb.vkClient.BatchV1alpha1().Jobs(fresh.Namespace).Delete(fresh.Name, options) +} + +// processTTL checks whether a given Job's TTL has expired, and add it to the queue after the TTL is expected to expire +// if the TTL will expire later. +func (gb *GarbageCollector) processTTL(job *v1alpha1.Job) (expired bool, err error) { + // We don't care about the Jobs that are going to be deleted, or the ones that don't need clean up. + if job.DeletionTimestamp != nil || !needsCleanup(job) { + return false, nil + } + + now := time.Now() + t, err := timeLeft(job, &now) + if err != nil { + return false, err + } + + // TTL has expired + if *t <= 0 { + return true, nil + } + + gb.enqueueAfter(job, *t) + return false, nil +} + +// needsCleanup checks whether a Job has finished and has a TTL set. +func needsCleanup(j *v1alpha1.Job) bool { + return j.Spec.TTLSecondsAfterFinished != nil && isJobFinished(j) +} + +func isJobFinished(job *v1alpha1.Job) bool { + return job.Status.State.Phase == v1alpha1.Completed || + job.Status.State.Phase == v1alpha1.Failed || + job.Status.State.Phase == v1alpha1.Terminated +} + +func getFinishAndExpireTime(j *v1alpha1.Job) (*time.Time, *time.Time, error) { + if !needsCleanup(j) { + return nil, nil, fmt.Errorf("Job %s/%s should not be cleaned up", j.Namespace, j.Name) + } + finishAt, err := jobFinishTime(j) + if err != nil { + return nil, nil, err + } + finishAtUTC := finishAt.UTC() + expireAtUTC := finishAtUTC.Add(time.Duration(*j.Spec.TTLSecondsAfterFinished) * time.Second) + return &finishAtUTC, &expireAtUTC, nil +} + +func timeLeft(j *v1alpha1.Job, since *time.Time) (*time.Duration, error) { + finishAt, expireAt, err := getFinishAndExpireTime(j) + if err != nil { + return nil, err + } + if finishAt.UTC().After(since.UTC()) { + glog.Warningf("Warning: Found Job %s/%s finished in the future. This is likely due to time skew in the cluster. Job cleanup will be deferred.", j.Namespace, j.Name) + } + remaining := expireAt.UTC().Sub(since.UTC()) + glog.V(4).Infof("Found Job %s/%s finished at %v, remaining TTL %v since %v, TTL will expire at %v", j.Namespace, j.Name, finishAt.UTC(), remaining, since.UTC(), expireAt.UTC()) + return &remaining, nil +} + +// jobFinishTime takes an already finished Job and returns the time it finishes. +func jobFinishTime(finishedJob *v1alpha1.Job) (metav1.Time, error) { + if finishedJob.Status.State.LastTransitionTime.IsZero() { + return metav1.Time{}, fmt.Errorf("unable to find the time when the Job %s/%s finished", finishedJob.Namespace, finishedJob.Name) + } + return finishedJob.Status.State.LastTransitionTime, nil +} diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index 129524bc46a..94017c11b6a 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -104,7 +104,9 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt } if updateStatus != nil { - updateStatus(&job.Status) + if updateStatus(&job.Status) { + job.Status.State.LastTransitionTime = metav1.Now() + } } // Update Job status @@ -165,7 +167,9 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, updateStatus state.Update } if updateStatus != nil { - updateStatus(&job.Status) + if updateStatus(&job.Status) { + job.Status.State.LastTransitionTime = metav1.Now() + } } if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil { @@ -316,7 +320,9 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt } if updateStatus != nil { - updateStatus(&job.Status) + if updateStatus(&job.Status) { + job.Status.State.LastTransitionTime = metav1.Now() + } } if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil { diff --git a/pkg/controllers/job/state/aborted.go b/pkg/controllers/job/state/aborted.go index 104170615b4..953ee392d55 100644 --- a/pkg/controllers/job/state/aborted.go +++ b/pkg/controllers/job/state/aborted.go @@ -28,9 +28,10 @@ type abortedState struct { func (as *abortedState) Execute(action vkv1.Action) error { switch action { case vkv1.ResumeJobAction: - return KillJob(as.job, func(status *vkv1.JobStatus) { + return KillJob(as.job, func(status *vkv1.JobStatus) bool { status.State.Phase = vkv1.Restarting status.RetryCount++ + return true }) default: return KillJob(as.job, nil) diff --git a/pkg/controllers/job/state/aborting.go b/pkg/controllers/job/state/aborting.go index cae21cb466d..8d123b6ada5 100644 --- a/pkg/controllers/job/state/aborting.go +++ b/pkg/controllers/job/state/aborting.go @@ -17,6 +17,8 @@ limitations under the License. package state import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" "volcano.sh/volcano/pkg/controllers/apis" ) @@ -29,19 +31,20 @@ func (ps *abortingState) Execute(action vkv1.Action) error { switch action { case vkv1.ResumeJobAction: // Already in Restarting phase, just sync it - return KillJob(ps.job, func(status *vkv1.JobStatus) { - status.State.Phase = vkv1.Restarting + return KillJob(ps.job, func(status *vkv1.JobStatus) bool { status.RetryCount++ + return false }) default: - return KillJob(ps.job, func(status *vkv1.JobStatus) { + return KillJob(ps.job, func(status *vkv1.JobStatus) bool { // If any "alive" pods, still in Aborting phase - phase := vkv1.Aborted if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 { - phase = vkv1.Aborting + return false + } else { + status.State.Phase = vkv1.Aborted + status.State.LastTransitionTime = metav1.Now() + return true } - - status.State.Phase = phase }) } } diff --git a/pkg/controllers/job/state/completing.go b/pkg/controllers/job/state/completing.go index 1fb48ab0402..cb9f7074bd0 100644 --- a/pkg/controllers/job/state/completing.go +++ b/pkg/controllers/job/state/completing.go @@ -26,13 +26,13 @@ type completingState struct { } func (ps *completingState) Execute(action vkv1.Action) error { - return KillJob(ps.job, func(status *vkv1.JobStatus) { + return KillJob(ps.job, func(status *vkv1.JobStatus) bool { // If any "alive" pods, still in Completing phase - phase := vkv1.Completed if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 { - phase = vkv1.Completing + return false + } else { + status.State.Phase = vkv1.Completed + return true } - - status.State.Phase = phase }) } diff --git a/pkg/controllers/job/state/factory.go b/pkg/controllers/job/state/factory.go index a24f6055437..f33cb753f76 100644 --- a/pkg/controllers/job/state/factory.go +++ b/pkg/controllers/job/state/factory.go @@ -21,7 +21,7 @@ import ( "volcano.sh/volcano/pkg/controllers/apis" ) -type UpdateStatusFn func(status *vkv1.JobStatus) +type UpdateStatusFn func(status *vkv1.JobStatus) (jobPhaseChanged bool) type ActionFn func(job *apis.JobInfo, fn UpdateStatusFn) error var ( diff --git a/pkg/controllers/job/state/inqueue.go b/pkg/controllers/job/state/inqueue.go index f15415a8622..44cbfa514f0 100644 --- a/pkg/controllers/job/state/inqueue.go +++ b/pkg/controllers/job/state/inqueue.go @@ -28,43 +28,41 @@ type inqueueState struct { func (ps *inqueueState) Execute(action vkv1.Action) error { switch action { case vkv1.RestartJobAction: - return KillJob(ps.job, func(status *vkv1.JobStatus) { + return KillJob(ps.job, func(status *vkv1.JobStatus) bool { phase := vkv1.Pending if status.Terminating != 0 { phase = vkv1.Restarting status.RetryCount++ } - status.State.Phase = phase + return true }) case vkv1.AbortJobAction: - return KillJob(ps.job, func(status *vkv1.JobStatus) { + return KillJob(ps.job, func(status *vkv1.JobStatus) bool { phase := vkv1.Pending if status.Terminating != 0 { phase = vkv1.Aborting } - status.State.Phase = phase + return true }) case vkv1.CompleteJobAction: - return KillJob(ps.job, func(status *vkv1.JobStatus) { + return KillJob(ps.job, func(status *vkv1.JobStatus) bool { phase := vkv1.Completed if status.Terminating != 0 { phase = vkv1.Completing } - status.State.Phase = phase + return true }) default: - return SyncJob(ps.job, func(status *vkv1.JobStatus) { - phase := vkv1.Inqueue - + return SyncJob(ps.job, func(status *vkv1.JobStatus) bool { if ps.job.Job.Spec.MinAvailable <= status.Running+status.Succeeded+status.Failed { - phase = vkv1.Running + status.State.Phase = vkv1.Running + return true } - - status.State.Phase = phase + return false }) } return nil diff --git a/pkg/controllers/job/state/pending.go b/pkg/controllers/job/state/pending.go index f3bb4b501fc..9cead263a57 100644 --- a/pkg/controllers/job/state/pending.go +++ b/pkg/controllers/job/state/pending.go @@ -28,36 +28,36 @@ type pendingState struct { func (ps *pendingState) Execute(action vkv1.Action) error { switch action { case vkv1.RestartJobAction: - return KillJob(ps.job, func(status *vkv1.JobStatus) { + return KillJob(ps.job, func(status *vkv1.JobStatus) bool { phase := vkv1.Pending if status.Terminating != 0 { phase = vkv1.Restarting status.RetryCount++ } - status.State.Phase = phase + return true }) case vkv1.AbortJobAction: - return KillJob(ps.job, func(status *vkv1.JobStatus) { + return KillJob(ps.job, func(status *vkv1.JobStatus) bool { phase := vkv1.Pending if status.Terminating != 0 { phase = vkv1.Aborting } - status.State.Phase = phase + return true }) case vkv1.CompleteJobAction: - return KillJob(ps.job, func(status *vkv1.JobStatus) { + return KillJob(ps.job, func(status *vkv1.JobStatus) bool { phase := vkv1.Completed if status.Terminating != 0 { phase = vkv1.Completing } - status.State.Phase = phase + return true }) case vkv1.EnqueueAction: - return SyncJob(ps.job, func(status *vkv1.JobStatus) { + return SyncJob(ps.job, func(status *vkv1.JobStatus) bool { phase := vkv1.Inqueue if ps.job.Job.Spec.MinAvailable <= status.Running+status.Succeeded+status.Failed { @@ -65,10 +65,12 @@ func (ps *pendingState) Execute(action vkv1.Action) error { } status.State.Phase = phase + return true }) default: - return CreateJob(ps.job, func(status *vkv1.JobStatus) { + return CreateJob(ps.job, func(status *vkv1.JobStatus) bool { status.State.Phase = vkv1.Pending + return true }) } } diff --git a/pkg/controllers/job/state/restarting.go b/pkg/controllers/job/state/restarting.go index a58dbd78111..b11fb460a23 100644 --- a/pkg/controllers/job/state/restarting.go +++ b/pkg/controllers/job/state/restarting.go @@ -26,9 +26,7 @@ type restartingState struct { } func (ps *restartingState) Execute(action vkv1.Action) error { - return KillJob(ps.job, func(status *vkv1.JobStatus) { - phase := vkv1.Restarting - + return KillJob(ps.job, func(status *vkv1.JobStatus) bool { // Get the maximum number of retries. maxRetry := DefaultMaxRetry if ps.job.Job.Spec.MaxRetry != 0 { @@ -37,7 +35,8 @@ func (ps *restartingState) Execute(action vkv1.Action) error { if status.RetryCount >= maxRetry { // Failed is the phase that the job is restarted failed reached the maximum number of retries. - phase = vkv1.Failed + status.State.Phase = vkv1.Failed + return true } else { total := int32(0) for _, task := range ps.job.Job.Spec.Tasks { @@ -45,11 +44,12 @@ func (ps *restartingState) Execute(action vkv1.Action) error { } if total-status.Terminating >= status.MinAvailable { - phase = vkv1.Pending + status.State.Phase = vkv1.Pending + return true } } - status.State.Phase = phase + return false }) } diff --git a/pkg/controllers/job/state/running.go b/pkg/controllers/job/state/running.go index b2204ba7275..e19fecc151b 100644 --- a/pkg/controllers/job/state/running.go +++ b/pkg/controllers/job/state/running.go @@ -28,50 +28,50 @@ type runningState struct { func (ps *runningState) Execute(action vkv1.Action) error { switch action { case vkv1.RestartJobAction: - return KillJob(ps.job, func(status *vkv1.JobStatus) { - phase := vkv1.Running + return KillJob(ps.job, func(status *vkv1.JobStatus) bool { if status.Terminating != 0 { - phase = vkv1.Restarting + status.State.Phase = vkv1.Restarting status.RetryCount++ + return true } - - status.State.Phase = phase + return false }) case vkv1.AbortJobAction: - return KillJob(ps.job, func(status *vkv1.JobStatus) { - phase := vkv1.Running + return KillJob(ps.job, func(status *vkv1.JobStatus) bool { if status.Terminating != 0 { - phase = vkv1.Aborting + status.State.Phase = vkv1.Aborting + return true } - status.State.Phase = phase + return false }) case vkv1.TerminateJobAction: - return KillJob(ps.job, func(status *vkv1.JobStatus) { - phase := vkv1.Running + return KillJob(ps.job, func(status *vkv1.JobStatus) bool { if status.Terminating != 0 { - phase = vkv1.Terminating + status.State.Phase = vkv1.Terminating + return true } - status.State.Phase = phase + return false }) case vkv1.CompleteJobAction: - return KillJob(ps.job, func(status *vkv1.JobStatus) { + return KillJob(ps.job, func(status *vkv1.JobStatus) bool { phase := vkv1.Completed if status.Terminating != 0 { phase = vkv1.Completing } status.State.Phase = phase + return true }) default: - return SyncJob(ps.job, func(status *vkv1.JobStatus) { - phase := vkv1.Running + return SyncJob(ps.job, func(status *vkv1.JobStatus) bool { if status.Succeeded+status.Failed == TotalTasks(ps.job.Job) { - phase = vkv1.Completed + status.State.Phase = vkv1.Completed + return true } - status.State.Phase = phase + return false }) } } diff --git a/pkg/controllers/job/state/terminating.go b/pkg/controllers/job/state/terminating.go index a818988797f..5ceddecb785 100644 --- a/pkg/controllers/job/state/terminating.go +++ b/pkg/controllers/job/state/terminating.go @@ -26,13 +26,13 @@ type terminatingState struct { } func (ps *terminatingState) Execute(action vkv1.Action) error { - return KillJob(ps.job, func(status *vkv1.JobStatus) { + return KillJob(ps.job, func(status *vkv1.JobStatus) bool { // If any "alive" pods, still in Terminating phase - phase := vkv1.Terminated if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 { - phase = vkv1.Terminating + return false + } else { + status.State.Phase = vkv1.Terminated + return true } - - status.State.Phase = phase }) } diff --git a/test/e2e/util.go b/test/e2e/util.go index 0e4bbbe4ac2..53e274a9f7d 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -278,6 +278,8 @@ type jobSpec struct { min int32 plugins map[string][]string volumes []vkv1.VolumeSpec + // ttl seconds after job finished + ttl *int32 } func getNS(context *context, job *jobSpec) string { @@ -305,9 +307,10 @@ func createJobInner(context *context, jobSpec *jobSpec) (*vkv1.Job, error) { Namespace: ns, }, Spec: vkv1.JobSpec{ - Policies: jobSpec.policies, - Queue: jobSpec.queue, - Plugins: jobSpec.plugins, + Policies: jobSpec.policies, + Queue: jobSpec.queue, + Plugins: jobSpec.plugins, + TTLSecondsAfterFinished: jobSpec.ttl, }, } @@ -672,6 +675,35 @@ func createReplicaSet(context *context, name string, rep int32, img string, req return deployment } +func waitJobCleanedUp(ctx *context, job *vkv1.Job) error { + var additionalError error + err := wait.Poll(100*time.Millisecond, oneMinute, func() (bool, error) { + job, err := ctx.vkclient.BatchV1alpha1().Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{}) + if err != nil && !errors.IsNotFound(err) { + return false, nil + } + if job != nil { + additionalError = fmt.Errorf("job %s/%s still exist", job.Namespace, job.Name) + return false, nil + } + + pg, err := ctx.kbclient.SchedulingV1alpha1().PodGroups(job.Namespace).Get(job.Name, metav1.GetOptions{}) + if err != nil && !errors.IsNotFound(err) { + return false, nil + } + if pg != nil { + additionalError = fmt.Errorf("pdgroup %s/%s still exist", job.Namespace, job.Name) + return false, nil + } + + return true, nil + }) + if err != nil && strings.Contains(err.Error(), timeOutMessage) { + return fmt.Errorf("[Wait time out]: %s", additionalError) + } + return err +} + func deleteReplicaSet(ctx *context, name string) error { foreground := metav1.DeletePropagationForeground return ctx.kubeclient.AppsV1().ReplicaSets(ctx.namespace).Delete(name, &metav1.DeleteOptions{