Skip to content

Commit

Permalink
Implement suspend semantics
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
  • Loading branch information
tenzen-y committed Jul 20, 2023
1 parent 72f2512 commit e4bf325
Show file tree
Hide file tree
Showing 51 changed files with 919 additions and 158 deletions.
2 changes: 2 additions & 0 deletions docs/api/kubeflow.org_v1_generated.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,8 @@ RunPolicy encapsulates various runtime policies of the distributed training job,
| *`activeDeadlineSeconds`* __integer__ | Specifies the duration in seconds relative to the startTime that the job may be active before the system tries to terminate it; value must be positive integer.
| *`backoffLimit`* __integer__ | Optional number of retries before marking this job failed.
| *`schedulingPolicy`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-schedulingpolicy[$$SchedulingPolicy$$]__ | SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling
| *`suspend`* __boolean__ | suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods and PodGroups associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job.
Defaults to false.
|===


Expand Down
4 changes: 4 additions & 0 deletions hack/python-sdk/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,10 @@
"description": "SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling",
"$ref": "#/definitions/kubeflow.org.v1.SchedulingPolicy"
},
"suspend": {
"description": "suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods and PodGroups associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job.\n\nDefaults to false.",
"type": "boolean"
},
"ttlSecondsAfterFinished": {
"description": "TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite.",
"type": "integer",
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_mpijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7702,6 +7702,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_mxjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7701,6 +7701,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_paddlejobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8212,6 +8212,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_pytorchjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8247,6 +8247,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_tfjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_xgboostjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
16 changes: 16 additions & 0 deletions pkg/apis/kubeflow.org/v1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ const (
// The training is complete without error.
JobSucceeded JobConditionType = "Succeeded"

// JobSuspended means the job has been suspended.
JobSuspended JobConditionType = "Suspended"

// JobFailed means one or more sub-resources (e.g. services/pods) of this job
// reached phase failed with no restarting.
// The training has failed its execution.
Expand Down Expand Up @@ -205,6 +208,19 @@ type RunPolicy struct {
// SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling
// +optional
SchedulingPolicy *SchedulingPolicy `json:"schedulingPolicy,omitempty"`

// suspend specifies whether the Job controller should create Pods or not.
// If a Job is created with suspend set to true, no Pods are created by
// the Job controller. If a Job is suspended after creation (i.e. the
// flag goes from false to true), the Job controller will delete all
// active Pods and PodGroups associated with this Job.
// Users must design their workload to gracefully handle this.
// Suspending a Job will reset the StartTime field of the Job.
//
// Defaults to false.
// +kubebuilder:default:=false
// +optional
Suspend *bool `json:"suspend,omitempty"`
}

// SchedulingPolicy encapsulates various scheduling policies of the distributed training
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/kubeflow.org/v1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

103 changes: 78 additions & 25 deletions pkg/controller.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/kubeflow/training-operator/pkg/core"
commonutil "github.com/kubeflow/training-operator/pkg/util"
"github.com/kubeflow/training-operator/pkg/util/k8sutil"
trainutil "github.com/kubeflow/training-operator/pkg/util/train"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
Expand All @@ -38,28 +39,30 @@ import (
volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
)

func (jc *JobController) DeletePodsAndServices(runPolicy *apiv1.RunPolicy, job interface{}, pods []*corev1.Pod) error {
// DeletePodsAndServices deletes pods and services considering cleanPodPolicy.
// However, if the job doesn't have Succeeded or Failed condition, it ignores cleanPodPolicy.
func (jc *JobController) DeletePodsAndServices(runtimeObject runtime.Object, runPolicy *apiv1.RunPolicy, jobStatus apiv1.JobStatus, pods []*corev1.Pod) error {
if len(pods) == 0 {
return nil
}

// Delete nothing when the cleanPodPolicy is None.
if *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyNone {
// Delete nothing when the cleanPodPolicy is None and the job has Succeeded or Failed condition.
if commonutil.IsFinished(jobStatus) && *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyNone {
return nil
}

for _, pod := range pods {
// Note that pending pod will turn into running once schedulable,
// not cleaning it may leave orphan running pod in the future,
// we should treat it equivalent to running phase here.
if *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyRunning && pod.Status.Phase != corev1.PodRunning && pod.Status.Phase != corev1.PodPending {
if commonutil.IsFinished(jobStatus) && *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyRunning && pod.Status.Phase != corev1.PodRunning && pod.Status.Phase != corev1.PodPending {
continue
}
if err := jc.PodControl.DeletePod(pod.Namespace, pod.Name, job.(runtime.Object)); err != nil {
if err := jc.PodControl.DeletePod(pod.Namespace, pod.Name, runtimeObject); err != nil {
return err
}
// Pod and service have the same name, thus the service could be deleted using pod's name.
if err := jc.ServiceControl.DeleteService(pod.Namespace, pod.Name, job.(runtime.Object)); err != nil {
if err := jc.ServiceControl.DeleteService(pod.Namespace, pod.Name, runtimeObject); err != nil {
return err
}
}
Expand Down Expand Up @@ -117,23 +120,9 @@ func (jc *JobController) ReconcileJobs(
}

oldStatus := jobStatus.DeepCopy()
if commonutil.IsSucceeded(jobStatus) || commonutil.IsFailed(jobStatus) {
// If the Job is succeed or failed, delete all pods and services.
if err := jc.DeletePodsAndServices(runPolicy, job, pods); err != nil {
return err
}

if jc.Config.EnableGangScheduling() {
jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, "JobTerminated", "Job has been terminated. Deleting PodGroup")
if err := jc.DeletePodGroup(metaObject); err != nil {
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err)
return err
} else {
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", jobName)
}
}

if err := jc.CleanupJob(runPolicy, jobStatus, job); err != nil {
if commonutil.IsFinished(jobStatus) {
// If the Job is succeed or failed, delete all pods, services, and podGroup.
if err = jc.CleanUpResources(runPolicy, runtimeObject, metaObject, jobStatus, pods); err != nil {
return err
}

Expand All @@ -155,6 +144,44 @@ func (jc *JobController) ReconcileJobs(
return nil
}

if trainutil.IsJobSuspended(runPolicy) {
if err = jc.CleanUpResources(runPolicy, runtimeObject, metaObject, jobStatus, pods); err != nil {
return err
}
for rType := range jobStatus.ReplicaStatuses {
jobStatus.ReplicaStatuses[rType].Active = 0
}
msg := fmt.Sprintf("%s %s is suspended.", jobKind, jobName)
if commonutil.IsRunning(jobStatus) {
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobRunning, corev1.ConditionFalse,
commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg); err != nil {
return err
}
}
// We add the suspended condition to the job only when the job doesn't have a suspended condition.
if !commonutil.IsSuspended(jobStatus) {
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionTrue,
commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg); err != nil {
return err
}
}
jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg)
if !reflect.DeepEqual(*oldStatus, jobStatus) {
return jc.Controller.UpdateJobStatusInApiServer(job, &jobStatus)
}
return nil
}
if commonutil.IsSuspended(jobStatus) {
msg := fmt.Sprintf("%s %s is resumed.", jobKind, jobName)
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionFalse,
commonutil.NewReason(jobKind, commonutil.JobResumedReason), msg); err != nil {
return err
}
now := metav1.Now()
jobStatus.StartTime = &now
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobResumedReason), msg)
}

// retrieve the previous number of retry
previousRetry := jc.WorkQueue.NumRequeues(jobKey)

Expand Down Expand Up @@ -205,7 +232,7 @@ func (jc *JobController) ReconcileJobs(

// If the Job exceeds backoff limit or is past active deadline
// delete all pods and services, then set the status to failed
if err := jc.DeletePodsAndServices(runPolicy, job, pods); err != nil {
if err := jc.DeletePodsAndServices(runtimeObject, runPolicy, jobStatus, pods); err != nil {
return err
}

Expand All @@ -225,7 +252,7 @@ func (jc *JobController) ReconcileJobs(

jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage)

if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage); err != nil {
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage); err != nil {
log.Infof("Append job condition error: %v", err)
return err
}
Expand Down Expand Up @@ -344,6 +371,32 @@ func (jc *JobController) ReconcileJobs(
return nil
}

func (jc *JobController) CleanUpResources(
runPolicy *apiv1.RunPolicy,
runtimeObject runtime.Object,
metaObject metav1.Object,
jobStatus apiv1.JobStatus,
pods []*v1.Pod,
) error {
if err := jc.DeletePodsAndServices(runtimeObject, runPolicy, jobStatus, pods); err != nil {
return err
}
if jc.Config.EnableGangScheduling() {

jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, "JobTerminated", "Job has been terminated. Deleting PodGroup")
if err := jc.DeletePodGroup(metaObject); err != nil {
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err)
return err
} else {
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", metaObject.GetName())
}
}
if err := jc.CleanupJob(runPolicy, jobStatus, runtimeObject); err != nil {
return err
}
return nil
}

// ResetExpectations reset the expectation for creates and deletes of pod/service to zero.
func (jc *JobController) ResetExpectations(jobKey string, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) error {
var allErrs error
Expand Down
Loading

0 comments on commit e4bf325

Please sign in to comment.