Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add restart strategy #686

Merged
merged 11 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions api/jobset/v1alpha2/jobset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,13 +307,31 @@ type FailurePolicy struct {
// A restart is achieved by recreating all active child jobs.
MaxRestarts int32 `json:"maxRestarts,omitempty"`

// RestartStrategy defines the strategy to use when restarting the JobSet.
// Defaults to Recreate.
// +optional
// +kubebuilder:default=Recreate
Copy link
Contributor

@ahg-g ahg-g Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do validation here as follows:

// +kubebuilder:validation:Enum:=

Also, if the user was running a jobset and at the same time upgraded the controller, I suspect that this field will not exist for this job. I think this needs to be a reference that we explicitly default in the webhook and in the controller code (default the field if not set) to ensure we don't break running jobsets.

Copy link
Contributor Author

@nstogner nstogner Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the enum validation. Here is the behavior as-is when the CRD is updated (default is applied when JobSet is read after update of CRD):

(base) ➜  jobset.blocking-recreate git:(restart-strategy) kind create cluster
using podman due to KIND_EXPERIMENTAL_PROVIDER
enabling experimental podman provider
Creating cluster "kind" ...
 ✓ Ensuring node image (kindest/node:v1.29.2) 🖼 
 ✓ Preparing nodes 📦  
 ✓ Writing configuration 📜 
 ✓ Starting control-plane 🕹️ 
 ✓ Installing CNI 🔌 
 ✓ Installing StorageClass 💾 
Set kubectl context to "kind-kind"
You can now use your cluster with:

kubectl cluster-info --context kind-kind

Have a nice day! 👋
(base) ➜  jobset.blocking-recreate git:(restart-strategy) git checkout main
Switched to branch 'main'
Your branch is up to date with 'origin/main'.
(base) ➜  jobset.blocking-recreate git:(main) kubectl apply --server-side -k ./config/components/crd
# Warning: 'patchesStrategicMerge' is deprecated. Please use 'patches' instead. Run 'kustomize edit fix' to update your Kustomization automatically.
customresourcedefinition.apiextensions.k8s.io/jobsets.jobset.x-k8s.io serverside-applied
(base) ➜  jobset.blocking-recreate git:(main) kubectl apply -f ./examples/simple/max-restarts.yaml
jobset.jobset.x-k8s.io/max-restarts created
(base) ➜  jobset.blocking-recreate git:(main) kubectl get jobsets -oyaml | grep -A 3 failurePolicy
    failurePolicy:
      maxRestarts: 3
    replicatedJobs:
    - name: leader
(base) ➜  jobset.blocking-recreate git:(main) git checkout -
Switched to branch 'restart-strategy'
Your branch is up to date with 'nstogner/restart-strategy'.
(base) ➜  jobset.blocking-recreate git:(restart-strategy) kubectl apply --server-side -k ./config/components/crd
# Warning: 'patchesStrategicMerge' is deprecated. Please use 'patches' instead. Run 'kustomize edit fix' to update your Kustomization automatically.
customresourcedefinition.apiextensions.k8s.io/jobsets.jobset.x-k8s.io serverside-applied
(base) ➜  jobset.blocking-recreate git:(restart-strategy) kubectl get jobsets -oyaml | grep -A 3 failurePolicy
    failurePolicy:
      maxRestarts: 3
      restartStrategy: Recreate # <----- Default is applied
    replicatedJobs:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this will not break existing JobSets... WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg, can you add the validation, it is not on the last commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I see you added it on the enum definition itself, make sense

RestartStrategy JobSetRestartStrategy `json:"restartStrategy,omitempty"`

// List of failure policy rules for this JobSet.
// For a given Job failure, the rules will be evaluated in order,
// and only the first matching rule will be executed.
// If no matching rule is found, the RestartJobSet action is applied.
Rules []FailurePolicyRule `json:"rules,omitempty"`
}

// +kubebuilder:validation:Enum=Recreate;BlockingRecreate
type JobSetRestartStrategy string

const (
// Recreate Jobs on a Job-by-Job basis.
Recreate JobSetRestartStrategy = "Recreate"

// BlockingRecreate ensures that all Jobs (and Pods) from a previous iteration are deleted before
// creating new Jobs.
BlockingRecreate JobSetRestartStrategy = "BlockingRecreate"
)

type SuccessPolicy struct {
// Operator determines either All or Any of the selected jobs should succeed to consider the JobSet successful
// +kubebuilder:validation:Enum=All;Any
Expand Down
7 changes: 7 additions & 0 deletions api/jobset/v1alpha2/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 15 additions & 2 deletions client-go/applyconfiguration/jobset/v1alpha2/failurepolicy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ spec:
A restart is achieved by recreating all active child jobs.
format: int32
type: integer
restartStrategy:
default: Recreate
description: |-
RestartStrategy defines the strategy to use when restarting the JobSet.
Defaults to Recreate.
enum:
- Recreate
- BlockingRecreate
type: string
rules:
description: |-
List of failure policy rules for this JobSet.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module sigs.k8s.io/jobset

go 1.23
go 1.23.0

require (
github.com/google/go-cmp v0.6.0
Expand Down
4 changes: 4 additions & 0 deletions hack/python-sdk/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
"type": "integer",
"format": "int32"
},
"restartStrategy": {
"description": "RestartStrategy defines the strategy to use when restarting the JobSet. Defaults to Recreate.",
"type": "string"
},
"rules": {
"description": "List of failure policy rules for this JobSet. For a given Job failure, the rules will be evaluated in order, and only the first matching rule will be executed. If no matching rule is found, the RestartJobSet action is applied.",
"type": "array",
Expand Down
22 changes: 15 additions & 7 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ type childJobs struct {
successful []*batchv1.Job
failed []*batchv1.Job

// Jobs marked for deletion are mutually exclusive with the set of jobs in active, successful, and failed.
delete []*batchv1.Job
// Jobs from a previous restart (marked for deletion) are mutually exclusive
// with the set of jobs in active, successful, and failed.
previous []*batchv1.Job
}

// statusUpdateOpts tracks if a JobSet status update should be performed at the end of the reconciliation
Expand Down Expand Up @@ -169,8 +170,8 @@ func (r *JobSetReconciler) reconcile(ctx context.Context, js *jobset.JobSet, upd
return ctrl.Result{}, nil
}

// Delete any jobs marked for deletion.
if err := r.deleteJobs(ctx, ownedJobs.delete); err != nil {
// Delete all jobs from a previous restart that are marked for deletion.
if err := r.deleteJobs(ctx, ownedJobs.previous); err != nil {
log.Error(err, "deleting jobs")
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -281,11 +282,11 @@ func (r *JobSetReconciler) getChildJobs(ctx context.Context, js *jobset.JobSet)
jobRestarts, err := strconv.Atoi(job.Labels[constants.RestartsKey])
if err != nil {
log.Error(err, fmt.Sprintf("invalid value for label %s, must be integer", constants.RestartsKey))
ownedJobs.delete = append(ownedJobs.delete, &childJobList.Items[i])
ownedJobs.previous = append(ownedJobs.previous, &childJobList.Items[i])
return nil, err
}
if int32(jobRestarts) < js.Status.Restarts {
ownedJobs.delete = append(ownedJobs.delete, &childJobList.Items[i])
ownedJobs.previous = append(ownedJobs.previous, &childJobList.Items[i])
continue
}

Expand Down Expand Up @@ -637,6 +638,13 @@ func executeSuccessPolicy(js *jobset.JobSet, ownedJobs *childJobs, updateStatusO

func constructJobsFromTemplate(js *jobset.JobSet, rjob *jobset.ReplicatedJob, ownedJobs *childJobs) []*batchv1.Job {
var jobs []*batchv1.Job
// If the JobSet is using the BlockingRecreate failure policy, we should not create any new jobs until
// all the jobs slated for deletion (i.e. from the last restart index) have been deleted.
useBlockingRecreate := js.Spec.FailurePolicy != nil && js.Spec.FailurePolicy.RestartStrategy == jobset.BlockingRecreate
if len(ownedJobs.previous) > 0 && useBlockingRecreate {
return jobs
}

for jobIdx := 0; jobIdx < int(rjob.Replicas); jobIdx++ {
jobName := placement.GenJobName(js.Name, rjob.Name, jobIdx)
if create := shouldCreateJob(jobName, ownedJobs); !create {
Expand Down Expand Up @@ -700,7 +708,7 @@ func shouldCreateJob(jobName string, ownedJobs *childJobs) bool {
// TODO: maybe we can use a job map here so we can do O(1) lookups
// to check if the job already exists, rather than a linear scan
// through all the jobs owned by the jobset.
for _, job := range collections.Concat(ownedJobs.active, ownedJobs.successful, ownedJobs.failed, ownedJobs.delete) {
for _, job := range collections.Concat(ownedJobs.active, ownedJobs.successful, ownedJobs.failed, ownedJobs.previous) {
if jobName == job.Name {
return false
}
Expand Down
19 changes: 18 additions & 1 deletion pkg/controllers/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func TestConstructJobsFromTemplate(t *testing.T) {
Replicas(2).
Obj()).Obj(),
ownedJobs: &childJobs{
delete: []*batchv1.Job{
previous: []*batchv1.Job{
testutils.MakeJob("test-jobset-replicated-job-0", ns).Obj(),
},
},
Expand All @@ -275,6 +275,23 @@ func TestConstructJobsFromTemplate(t *testing.T) {
Suspend(false).Obj(),
},
},
{
name: "job creation blocked until all previous jobs no longer exist",
js: testutils.MakeJobSet(jobSetName, ns).
FailurePolicy(&jobset.FailurePolicy{
RestartStrategy: jobset.BlockingRecreate,
}).
ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName).
Job(testutils.MakeJobTemplate(jobName, ns).Obj()).
Replicas(2).
Obj()).Obj(),
ownedJobs: &childJobs{
previous: []*batchv1.Job{
testutils.MakeJob("test-jobset-replicated-job-0", ns).Obj(),
},
},
want: nil,
},
{
name: "multiple replicated jobs",
js: testutils.MakeJobSet(jobSetName, ns).
Expand Down
1 change: 1 addition & 0 deletions sdk/python/docs/JobsetV1alpha2FailurePolicy.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Name | Type | Description | Notes
------------ | ------------- | ------------- | -------------
**max_restarts** | **int** | MaxRestarts defines the limit on the number of JobSet restarts. A restart is achieved by recreating all active child jobs. | [optional]
**restart_strategy** | **str** | RestartStrategy defines the strategy to use when restarting the JobSet. Defaults to Recreate. | [optional]
**rules** | [**list[JobsetV1alpha2FailurePolicyRule]**](JobsetV1alpha2FailurePolicyRule.md) | List of failure policy rules for this JobSet. For a given Job failure, the rules will be evaluated in order, and only the first matching rule will be executed. If no matching rule is found, the RestartJobSet action is applied. | [optional]

[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)
Expand Down
30 changes: 29 additions & 1 deletion sdk/python/jobset/models/jobset_v1alpha2_failure_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,31 @@ class JobsetV1alpha2FailurePolicy(object):
"""
openapi_types = {
'max_restarts': 'int',
'restart_strategy': 'str',
'rules': 'list[JobsetV1alpha2FailurePolicyRule]'
}

attribute_map = {
'max_restarts': 'maxRestarts',
'restart_strategy': 'restartStrategy',
'rules': 'rules'
}

def __init__(self, max_restarts=None, rules=None, local_vars_configuration=None): # noqa: E501
def __init__(self, max_restarts=None, restart_strategy=None, rules=None, local_vars_configuration=None): # noqa: E501
"""JobsetV1alpha2FailurePolicy - a model defined in OpenAPI""" # noqa: E501
if local_vars_configuration is None:
local_vars_configuration = Configuration()
self.local_vars_configuration = local_vars_configuration

self._max_restarts = None
self._restart_strategy = None
self._rules = None
self.discriminator = None

if max_restarts is not None:
self.max_restarts = max_restarts
if restart_strategy is not None:
self.restart_strategy = restart_strategy
if rules is not None:
self.rules = rules

Expand All @@ -80,6 +85,29 @@ def max_restarts(self, max_restarts):

self._max_restarts = max_restarts

@property
def restart_strategy(self):
"""Gets the restart_strategy of this JobsetV1alpha2FailurePolicy. # noqa: E501

RestartStrategy defines the strategy to use when restarting the JobSet. Defaults to Recreate. # noqa: E501

:return: The restart_strategy of this JobsetV1alpha2FailurePolicy. # noqa: E501
:rtype: str
"""
return self._restart_strategy

@restart_strategy.setter
def restart_strategy(self, restart_strategy):
"""Sets the restart_strategy of this JobsetV1alpha2FailurePolicy.

RestartStrategy defines the strategy to use when restarting the JobSet. Defaults to Recreate. # noqa: E501

:param restart_strategy: The restart_strategy of this JobsetV1alpha2FailurePolicy. # noqa: E501
:type: str
"""

self._restart_strategy = restart_strategy

@property
def rules(self):
"""Gets the rules of this JobsetV1alpha2FailurePolicy. # noqa: E501
Expand Down
1 change: 1 addition & 0 deletions sdk/python/test/test_jobset_v1alpha2_failure_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def make_instance(self, include_optional):
if include_optional :
return JobsetV1alpha2FailurePolicy(
max_restarts = 56,
restart_strategy = '0',
rules = [
jobset.models.jobset_v1alpha2_failure_policy_rule.JobsetV1alpha2FailurePolicyRule(
action = '0',
Expand Down
1 change: 1 addition & 0 deletions sdk/python/test/test_jobset_v1alpha2_job_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def make_instance(self, include_optional):
replicated_job = '0', ),
failure_policy = jobset.models.jobset_v1alpha2_failure_policy.JobsetV1alpha2FailurePolicy(
max_restarts = 56,
restart_strategy = '0',
rules = [
jobset.models.jobset_v1alpha2_failure_policy_rule.JobsetV1alpha2FailurePolicyRule(
action = '0',
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/test/test_jobset_v1alpha2_job_set_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def make_instance(self, include_optional):
replicated_job = '0', ),
failure_policy = jobset.models.jobset_v1alpha2_failure_policy.JobsetV1alpha2FailurePolicy(
max_restarts = 56,
restart_strategy = '0',
rules = [
jobset.models.jobset_v1alpha2_failure_policy_rule.JobsetV1alpha2FailurePolicyRule(
action = '0',
Expand Down Expand Up @@ -113,6 +114,7 @@ def make_instance(self, include_optional):
replicated_job = '0', ),
failure_policy = jobset.models.jobset_v1alpha2_failure_policy.JobsetV1alpha2FailurePolicy(
max_restarts = 56,
restart_strategy = '0',
rules = [
jobset.models.jobset_v1alpha2_failure_policy_rule.JobsetV1alpha2FailurePolicyRule(
action = '0',
Expand Down
1 change: 1 addition & 0 deletions sdk/python/test/test_jobset_v1alpha2_job_set_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def make_instance(self, include_optional):
replicated_job = '0', ),
failure_policy = jobset.models.jobset_v1alpha2_failure_policy.JobsetV1alpha2FailurePolicy(
max_restarts = 56,
restart_strategy = '0',
rules = [
jobset.models.jobset_v1alpha2_failure_policy_rule.JobsetV1alpha2FailurePolicyRule(
action = '0',
Expand Down
2 changes: 1 addition & 1 deletion site/content/en/docs/tasks/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Here we have some simple examples demonstrating core JobSet features.
Success Policy allows one to specify when to mark a JobSet as success.
This example showcases an example of using the success policy to mark the JobSet as successful if the worker replicated job completes.

- [Failure Policy with Max Restarts](https://github.com/kubernetes-sigs/jobset/blob/release-0.5/examples/simple/max-restarts.yaml) demonstrates an example of utilizing `failurePolicy`. Failure Policy allows one to control how many restarts a JobSet can do before declaring the JobSet as failed.
- [Failure Policy](https://github.com/kubernetes-sigs/jobset/blob/release-0.5/examples/simple/failure-policy.yaml) demonstrates an example of utilizing `failurePolicy`. Failure Policy allows one to control how many restarts a JobSet can do before declaring the JobSet as failed. The strategy used when restarting can also be specified (i.e. whether to first delete all Jobs, or recreate on a one-by-one basis).

- [Exclusive Job Placement](https://github.com/kubernetes-sigs/jobset/blob/release-0.5/examples/simple/exclusive-placement.yaml) demonstrates how you can configure a JobSet to have a 1:1 mapping between each child Job and a particular topology domain, such as a datacenter rack or zone. This means that all the pods belonging to a child job will be colocated in the same topology domain, while pods from other jobs will not be allowed to run within this domain. This gives the child job exclusive access to computer resources in this domain.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
apiVersion: jobset.x-k8s.io/v1alpha2
kind: JobSet
metadata:
name: max-restarts
name: failure-policy
spec:
# On failure, restart all jobs up to 3 times.
failurePolicy:
# Wait for all Jobs to be fully deleted before recreating any.
# Defaults to "Recreate" which restarts Jobs individually.
restartStrategy: BlockingRecreate
# On failure, restart all jobs up to 3 times.
maxRestarts: 3
replicatedJobs:
- name: leader
Expand Down
Loading