Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add terminalState to jobset status #594

Merged
merged 6 commits into from
Jun 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/jobset/v1alpha2/jobset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type JobSetConditionType string

// These are built-in conditions of a JobSet.
const (
// JobSetRunning means the job is running.
JobSetRunning JobSetConditionType = "Running"
ahg-g marked this conversation as resolved.
Show resolved Hide resolved
// JobSetCompleted means the job has completed its execution.
JobSetCompleted JobSetConditionType = "Completed"
// JobSetFailed means the job has failed its execution.
Expand Down Expand Up @@ -134,6 +136,10 @@ type JobSetStatus struct {
// RestartsCountTowardsMax tracks the number of times the JobSet has restarted that counts towards the maximum allowed number of restarts.
RestartsCountTowardsMax int32 `json:"restartsCountTowardsMax,omitempty"`

// Phase of the JobSet.
// +kubebuilder:default="Running"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be in the status, not the spec, and we can't default it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently it is set in jobs status

// JobSetStatus defines the observed state of JobSet
type JobSetStatus struct {
	// +optional
	// +listType=map
	// +listMapKey=type
	Conditions []metav1.Condition `json:"conditions,omitempty"`

         ....
	// Phase of the JobSet.
	// +kubebuilder:default="Running"
	Phase string `json:"phase,omitempty"`

	
}

Phase string `json:"phase,omitempty"`
ahg-g marked this conversation as resolved.
Show resolved Hide resolved

// ReplicatedJobsStatus track the number of JobsReady for each replicatedJob.
// +optional
// +listType=map
Expand Down Expand Up @@ -169,6 +175,7 @@ type ReplicatedJobStatus struct {
// +k8s:openapi-gen=true
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Phase",JSONPath=".status.phase",type=string,description="Phase of the JobSet"
// +kubebuilder:printcolumn:name="Restarts",JSONPath=".status.restarts",type=string,description="Number of restarts"
// +kubebuilder:printcolumn:name="Completed",type="string",priority=0,JSONPath=".status.conditions[?(@.type==\"Completed\")].status"
// +kubebuilder:printcolumn:name="Suspended",type="string",JSONPath=".spec.suspend",description="JobSet suspended"
Expand Down
7 changes: 7 additions & 0 deletions api/jobset/v1alpha2/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions client-go/applyconfiguration/jobset/v1alpha2/jobsetstatus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ spec:
scope: Namespaced
versions:
- additionalPrinterColumns:
- description: Phase of the JobSet
jsonPath: .status.phase
name: Phase
type: string
- description: Number of restarts
jsonPath: .status.restarts
name: Restarts
Expand Down Expand Up @@ -8487,6 +8491,10 @@ spec:
x-kubernetes-list-map-keys:
- type
x-kubernetes-list-type: map
phase:
default: Running
description: Phase of the JobSet.
type: string
replicatedJobsStatus:
description: ReplicatedJobsStatus track the number of JobsReady for
each replicatedJob.
Expand Down
4 changes: 4 additions & 0 deletions hack/python-sdk/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@
],
"x-kubernetes-list-type": "map"
},
"phase": {
"description": "Phase of the JobSet.",
"type": "string"
},
"replicatedJobsStatus": {
"description": "ReplicatedJobsStatus track the number of JobsReady for each replicatedJob.",
"type": "array",
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/failure_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func makeFailedConditionOpts(reason, msg string) *conditionOpts {
Reason: reason,
Message: msg,
},
phase: string(jobset.JobSetFailed),
eventType: corev1.EventTypeWarning,
}
}
Expand Down
31 changes: 22 additions & 9 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (r *JobSetReconciler) reconcile(ctx context.Context, js *jobset.JobSet, upd

// Calculate JobsReady and update statuses for each ReplicatedJob.
rjobStatuses := r.calculateReplicatedJobStatuses(ctx, js, ownedJobs)
updateReplicatedJobsStatuses(ctx, js, rjobStatuses, updateStatusOpts)
updateReplicatedJobsStatuses(js, rjobStatuses, updateStatusOpts)

// If JobSet is already completed or failed, clean up active child jobs and requeue if TTLSecondsAfterFinished is set.
if jobSetFinished(js) {
Expand Down Expand Up @@ -185,7 +185,7 @@ func (r *JobSetReconciler) reconcile(ctx context.Context, js *jobset.JobSet, upd

// If any jobs have succeeded, execute the JobSet success policy.
if len(ownedJobs.successful) > 0 {
if completed := executeSuccessPolicy(ctx, js, ownedJobs, updateStatusOpts); completed {
if completed := executeSuccessPolicy(js, ownedJobs, updateStatusOpts); completed {
return ctrl.Result{}, nil
}
}
Expand Down Expand Up @@ -304,7 +304,7 @@ func (r *JobSetReconciler) getChildJobs(ctx context.Context, js *jobset.JobSet)
}

// updateReplicatedJobsStatuses updates the replicatedJob statuses if they have changed.
func updateReplicatedJobsStatuses(ctx context.Context, js *jobset.JobSet, statuses []jobset.ReplicatedJobStatus, updateStatusOpts *statusUpdateOpts) {
func updateReplicatedJobsStatuses(js *jobset.JobSet, statuses []jobset.ReplicatedJobStatus, updateStatusOpts *statusUpdateOpts) {
// If replicated job statuses haven't changed, there's nothing to do here.
if replicatedJobStatusesEqual(js.Status.ReplicatedJobsStatus, statuses) {
return
Expand Down Expand Up @@ -630,7 +630,7 @@ func (r *JobSetReconciler) createHeadlessSvcIfNecessary(ctx context.Context, js
// executeSuccessPolicy checks the completed jobs against the jobset success policy
// and updates the jobset status to completed if the success policy conditions are met.
// Returns a boolean value indicating if the jobset was completed or not.
func executeSuccessPolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs, updateStatusOpts *statusUpdateOpts) bool {
func executeSuccessPolicy(js *jobset.JobSet, ownedJobs *childJobs, updateStatusOpts *statusUpdateOpts) bool {
if numJobsMatchingSuccessPolicy(js, ownedJobs.successful) >= numJobsExpectedToSucceed(js) {
setJobSetCompletedCondition(js, updateStatusOpts)
return true
Expand Down Expand Up @@ -867,14 +867,15 @@ func enqueueEvent(updateStatusOpts *statusUpdateOpts, event *eventParams) {
// function parameters for setCondition
type conditionOpts struct {
eventType string
phase string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to pass this around, we should be able to compute the phase from the condition in the status update function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we only use condition calculation, we may need to exclude other JobSetConditionType. I think it would be more convenient to use one more field such as phase or terminalState.

such as: These conditions are not what this field needs to care about

// These are built-in conditions of a JobSet.
const (
	...
	// JobSetSuspended means the job is suspended.
	JobSetSuspended JobSetConditionType = "Suspended"
	// JobSetStartupPolicyInProgress means the StartupPolicy is in progress.
	JobSetStartupPolicyInProgress JobSetConditionType = "StartupPolicyInProgress"
	// JobSetStartupPolicyCompleted means the StartupPolicy has completed.
	JobSetStartupPolicyCompleted JobSetConditionType = "StartupPolicyCompleted"
)

condition *metav1.Condition
}

// setCondition will add a new condition to the JobSet status (or update an existing one),
// setCondition will add a new condition and phase to the JobSet status (or update an existing one),
// and enqueue an event for emission if the status update succeeds at the end of the reconcile.
func setCondition(js *jobset.JobSet, condOpts *conditionOpts, updateStatusOpts *statusUpdateOpts) {
// Return early if no status update is required for this condition.
if !updateCondition(js, condOpts) {
// Return early if no status update is required for this condition and phase.
if !updateConditionAndPhase(js, condOpts) {
return
}

Expand All @@ -897,12 +898,14 @@ func setCondition(js *jobset.JobSet, condOpts *conditionOpts, updateStatusOpts *
enqueueEvent(updateStatusOpts, event)
}

// updateCondition accepts a given condition and does one of the following:
// updateConditionAndPhase accepts a condition and a phase, and does the following:
// 1. If an identical condition already exists, do nothing and return false (indicating
// no change was made).
// 2. If a condition of the same type exists but with a different status, update
// the condition in place and return true (indicating a condition change was made).
func updateCondition(js *jobset.JobSet, opts *conditionOpts) bool {
// 3. If the specified phase is different from the current phase of the JobSet,
// update the JobSet Status Phase
func updateConditionAndPhase(js *jobset.JobSet, opts *conditionOpts) bool {
if opts == nil || opts.condition == nil {
return false
}
Expand Down Expand Up @@ -941,6 +944,13 @@ func updateCondition(js *jobset.JobSet, opts *conditionOpts) bool {
js.Status.Conditions = append(js.Status.Conditions, newCond)
shouldUpdate = true
}

// Update the JobSet Status Phase if necessary.
if opts.phase != "" && js.Status.Phase != opts.phase {
js.Status.Phase = opts.phase
shouldUpdate = true
}

return shouldUpdate
}

Expand Down Expand Up @@ -970,6 +980,7 @@ func makeCompletedConditionsOpts() *conditionOpts {
Reason: constants.AllJobsCompletedReason,
Message: constants.AllJobsCompletedMessage,
},
phase: string(jobset.JobSetCompleted),
}
}

Expand All @@ -984,6 +995,7 @@ func makeSuspendedConditionOpts() *conditionOpts {
Reason: constants.JobSetSuspendedReason,
Message: constants.JobSetSuspendedMessage,
},
phase: string(jobset.JobSetSuspended),
}
}

Expand All @@ -998,6 +1010,7 @@ func makeResumedConditionOpts() *conditionOpts {
Reason: constants.JobSetResumedReason,
Message: constants.JobSetResumedMessage,
},
phase: string(jobset.JobSetRunning),
}
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/controllers/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ func TestUpdateConditions(t *testing.T) {
ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName).
Job(testutils.MakeJobTemplate(jobName, ns).Obj()).
Replicas(1).
Obj()).Obj(),
Obj()).Phase(jobset.JobSetRunning).Obj(),
opts: makeCompletedConditionsOpts(),
expectedUpdate: true,
},
Expand All @@ -706,7 +706,7 @@ func TestUpdateConditions(t *testing.T) {
ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName).
Job(testutils.MakeJobTemplate(jobName, ns).Obj()).
Replicas(1).
Obj()).Obj(),
Obj()).Phase(jobset.JobSetRunning).Obj(),
opts: makeSuspendedConditionOpts(),
expectedUpdate: true,
},
Expand All @@ -716,7 +716,7 @@ func TestUpdateConditions(t *testing.T) {
ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName).
Job(testutils.MakeJobTemplate(jobName, ns).Obj()).
Replicas(1).
Obj()).
Obj()).Phase(jobset.JobSetRunning).
Conditions([]metav1.Condition{
// JobSet is currrently suspended.
{
Expand All @@ -736,7 +736,7 @@ func TestUpdateConditions(t *testing.T) {
ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName).
Job(testutils.MakeJobTemplate(jobName, ns).Obj()).
Replicas(1).
Obj()).
Obj()).Phase(jobset.JobSetCompleted).
Conditions([]metav1.Condition{
// JobSet is completed..
{
Expand All @@ -752,7 +752,7 @@ func TestUpdateConditions(t *testing.T) {
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
gotUpdate := updateCondition(tc.js, tc.opts)
gotUpdate := updateConditionAndPhase(tc.js, tc.opts)
if gotUpdate != tc.expectedUpdate {
t.Errorf("updateCondition return mismatch (want: %v, got %v)", tc.expectedUpdate, gotUpdate)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ func (j *JobSetWrapper) FailedCondition(failedAt metav1.Time) *JobSetWrapper {
return j
}

// Phase sets the value of JobSet.Status.Phase.
func (j *JobSetWrapper) Phase(phase jobset.JobSetConditionType) *JobSetWrapper {
j.Status.Phase = string(phase)
return j
}

func (j *JobSetWrapper) DeletionTimestamp(deletionTimestamp *metav1.Time) *JobSetWrapper {
j.ObjectMeta.DeletionTimestamp = deletionTimestamp
return j
Expand Down
1 change: 1 addition & 0 deletions sdk/python/docs/JobsetV1alpha2JobSetStatus.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ JobSetStatus defines the observed state of JobSet
Name | Type | Description | Notes
------------ | ------------- | ------------- | -------------
**conditions** | [**list[V1Condition]**](V1Condition.md) | | [optional]
**phase** | **str** | Phase of the JobSet. | [optional]
**replicated_jobs_status** | [**list[JobsetV1alpha2ReplicatedJobStatus]**](JobsetV1alpha2ReplicatedJobStatus.md) | ReplicatedJobsStatus track the number of JobsReady for each replicatedJob. | [optional]
**restarts** | **int** | Restarts tracks the number of times the JobSet has restarted (i.e. recreated in case of RecreateAll policy). | [optional]
**restarts_count_towards_max** | **int** | RestartsCountTowardsMax tracks the number of times the JobSet has restarted that counts towards the maximum allowed number of restarts. | [optional]
Expand Down
30 changes: 29 additions & 1 deletion sdk/python/jobset/models/jobset_v1alpha2_job_set_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,32 +34,37 @@ class JobsetV1alpha2JobSetStatus(object):
"""
openapi_types = {
'conditions': 'list[V1Condition]',
'phase': 'str',
'replicated_jobs_status': 'list[JobsetV1alpha2ReplicatedJobStatus]',
'restarts': 'int',
'restarts_count_towards_max': 'int'
}

attribute_map = {
'conditions': 'conditions',
'phase': 'phase',
'replicated_jobs_status': 'replicatedJobsStatus',
'restarts': 'restarts',
'restarts_count_towards_max': 'restartsCountTowardsMax'
}

def __init__(self, conditions=None, replicated_jobs_status=None, restarts=None, restarts_count_towards_max=None, local_vars_configuration=None): # noqa: E501
def __init__(self, conditions=None, phase=None, replicated_jobs_status=None, restarts=None, restarts_count_towards_max=None, local_vars_configuration=None): # noqa: E501
"""JobsetV1alpha2JobSetStatus - a model defined in OpenAPI""" # noqa: E501
if local_vars_configuration is None:
local_vars_configuration = Configuration()
self.local_vars_configuration = local_vars_configuration

self._conditions = None
self._phase = None
self._replicated_jobs_status = None
self._restarts = None
self._restarts_count_towards_max = None
self.discriminator = None

if conditions is not None:
self.conditions = conditions
if phase is not None:
self.phase = phase
if replicated_jobs_status is not None:
self.replicated_jobs_status = replicated_jobs_status
if restarts is not None:
Expand Down Expand Up @@ -88,6 +93,29 @@ def conditions(self, conditions):

self._conditions = conditions

@property
def phase(self):
"""Gets the phase of this JobsetV1alpha2JobSetStatus. # noqa: E501

Phase of the JobSet. # noqa: E501

:return: The phase of this JobsetV1alpha2JobSetStatus. # noqa: E501
:rtype: str
"""
return self._phase

@phase.setter
def phase(self, phase):
"""Sets the phase of this JobsetV1alpha2JobSetStatus.

Phase of the JobSet. # noqa: E501

:param phase: The phase of this JobsetV1alpha2JobSetStatus. # noqa: E501
:type: str
"""

self._phase = phase

@property
def replicated_jobs_status(self):
"""Gets the replicated_jobs_status of this JobsetV1alpha2JobSetStatus. # noqa: E501
Expand Down
1 change: 1 addition & 0 deletions sdk/python/test/test_jobset_v1alpha2_job_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def make_instance(self, include_optional):
conditions = [
None
],
phase = '0',
replicated_jobs_status = [
jobset.models.jobset_v1alpha2_replicated_job_status.JobsetV1alpha2ReplicatedJobStatus(
active = 56,
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/test/test_jobset_v1alpha2_job_set_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def make_instance(self, include_optional):
conditions = [
None
],
phase = '0',
replicated_jobs_status = [
jobset.models.jobset_v1alpha2_replicated_job_status.JobsetV1alpha2ReplicatedJobStatus(
active = 56,
Expand Down Expand Up @@ -136,6 +137,7 @@ def make_instance(self, include_optional):
conditions = [
None
],
phase = '0',
replicated_jobs_status = [
jobset.models.jobset_v1alpha2_replicated_job_status.JobsetV1alpha2ReplicatedJobStatus(
active = 56,
Expand Down
1 change: 1 addition & 0 deletions sdk/python/test/test_jobset_v1alpha2_job_set_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def make_instance(self, include_optional):
conditions = [
None
],
phase = '0',
replicated_jobs_status = [
jobset.models.jobset_v1alpha2_replicated_job_status.JobsetV1alpha2ReplicatedJobStatus(
active = 56,
Expand Down
1 change: 1 addition & 0 deletions test/integration/controller/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1612,6 +1612,7 @@ var _ = ginkgo.Describe("JobSet controller", func() {
LastTransitionTime: metav1.Now(),
},
},
Phase: string(jobset.JobSetRunning),
Restarts: 1,
ReplicatedJobsStatus: []jobset.ReplicatedJobStatus{
{
Expand Down