From 7886910db954eddb0f21a290f18fd931b9b390e7 Mon Sep 17 00:00:00 2001 From: Daniel Vega-Myhre <105610547+danielvegamyhre@users.noreply.github.com> Date: Fri, 16 Aug 2024 13:18:39 -0700 Subject: [PATCH] Add new job-id annotation to assign globally unique job index to each job (#650) * add job-id annotation * update unit tests * change name to job global index --- api/jobset/v1alpha2/jobset_types.go | 11 ++- pkg/controllers/jobset_controller.go | 31 ++++++++ pkg/controllers/jobset_controller_test.go | 90 +++++++++++++++++++++++ 3 files changed, 130 insertions(+), 2 deletions(-) diff --git a/api/jobset/v1alpha2/jobset_types.go b/api/jobset/v1alpha2/jobset_types.go index 8d80339c..01f2f7fe 100644 --- a/api/jobset/v1alpha2/jobset_types.go +++ b/api/jobset/v1alpha2/jobset_types.go @@ -24,8 +24,15 @@ const ( ReplicatedJobReplicas string = "jobset.sigs.k8s.io/replicatedjob-replicas" // ReplicatedJobNameKey is used to index into a Jobs labels and retrieve the name of the parent ReplicatedJob ReplicatedJobNameKey string = "jobset.sigs.k8s.io/replicatedjob-name" - JobIndexKey string = "jobset.sigs.k8s.io/job-index" - JobKey string = "jobset.sigs.k8s.io/job-key" + // JobIndexKey is a label/annotation set to the index of the Job replica within its parent replicatedJob. + // For each replicatedJob, this value will range from 0 to replicas-1, where `replicas` + // is equal to jobset.spec.replicatedJobs[*].replicas. + JobIndexKey string = "jobset.sigs.k8s.io/job-index" + // JobGlobalIndexKey is a label/annotation set to an integer that is unique across the entire JobSet. + // For each JobSet, this value will range from 0 to N-1, where N=total number of jobs in the jobset. + JobGlobalIndexKey string = "jobset.sigs.k8s.io/job-global-index" + // JobKey holds the SHA256 hash of the namespaced job name, which can be used to uniquely identify the job. + JobKey string = "jobset.sigs.k8s.io/job-key" // ExclusiveKey is an annotation that can be set on the JobSet or on a ReplicatedJob template. // If set at the JobSet level, all child jobs from all ReplicatedJobs will be scheduled using exclusive // job placement per topology group (defined as the label value). diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index 20f62f52..f41ea3ba 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -730,6 +730,7 @@ func labelAndAnnotateObject(obj metav1.Object, js *jobset.JobSet, rjob *jobset.R labels[jobset.ReplicatedJobReplicas] = strconv.Itoa(int(rjob.Replicas)) labels[jobset.JobIndexKey] = strconv.Itoa(jobIdx) labels[jobset.JobKey] = jobHashKey(js.Namespace, jobName) + labels[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjob.Name, jobIdx) // Set annotations on the object. annotations := collections.CloneMap(obj.GetAnnotations()) @@ -739,6 +740,7 @@ func labelAndAnnotateObject(obj metav1.Object, js *jobset.JobSet, rjob *jobset.R annotations[jobset.ReplicatedJobReplicas] = strconv.Itoa(int(rjob.Replicas)) annotations[jobset.JobIndexKey] = strconv.Itoa(jobIdx) annotations[jobset.JobKey] = jobHashKey(js.Namespace, jobName) + annotations[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjob.Name, jobIdx) // Apply coordinator annotation/label if a coordinator is defined in the JobSet spec. if js.Spec.Coordinator != nil { @@ -1032,3 +1034,32 @@ func exclusiveConditions(cond1, cond2 metav1.Condition) bool { func coordinatorEndpoint(js *jobset.JobSet) string { return fmt.Sprintf("%s-%s-%d-%d.%s", js.Name, js.Spec.Coordinator.ReplicatedJob, js.Spec.Coordinator.JobIndex, js.Spec.Coordinator.PodIndex, GetSubdomain(js)) } + +// globalJobIndex determines the job global index for a given job. The job global index is a unique +// global index for the job, with values ranging from 0 to N-1, +// where N=total number of jobs in the jobset. The job global index is calculated by +// iterating through the replicatedJobs in the order, as defined in the JobSet +// spec, keeping a cumulative sum of total replicas seen so far, then when we +// arrive at the parent replicatedJob of the target job, we add the local job +// index to our running sum of total jobs seen so far, in order to arrive at +// the final job global index value. +// +// Below is a diagram illustrating how job global indexs differ from job indexes. +// +// | my-jobset | +// | replicated job A | replicated job B | +// | job index 0 | job index 1 | job index 0 | job index 1 | +// | global index 0 | global index 2 | global index 3 | global index 4 | +// +// Returns an empty string if the parent replicated Job does not exist, +// although this should never happen in practice. +func globalJobIndex(js *jobset.JobSet, replicatedJobName string, jobIdx int) string { + currTotalJobs := 0 + for _, rjob := range js.Spec.ReplicatedJobs { + if rjob.Name == replicatedJobName { + return strconv.Itoa(currTotalJobs + jobIdx) + } + currTotalJobs += int(rjob.Replicas) + } + return "" +} diff --git a/pkg/controllers/jobset_controller_test.go b/pkg/controllers/jobset_controller_test.go index cbaaa36e..4711d7ff 100644 --- a/pkg/controllers/jobset_controller_test.go +++ b/pkg/controllers/jobset_controller_test.go @@ -686,6 +686,15 @@ func TestConstructJobsFromTemplate(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { + // Here we update the expected Jobs with certain features which require + // direct access to the JobSet object itself to calculate. For example, + // the `jobset.sigs.k8s.io/job-global-index` annotation requires access to the + // full JobSet spec to calculate a unique ID for each Job. + for _, expectedJob := range tc.want { + addJobGlobalIndex(t, tc.js, expectedJob) + } + + // Now get the actual output of constructJobsFromTemplate, and diff the results. var got []*batchv1.Job for _, rjob := range tc.js.Spec.ReplicatedJobs { jobs := constructJobsFromTemplate(tc.js, &rjob, tc.ownedJobs) @@ -699,6 +708,26 @@ func TestConstructJobsFromTemplate(t *testing.T) { } } +// addJobGlobalIndex modifies the Job object in place by adding +// the `jobset.sigs.k8s.io/job-global-index` label/annotation to both the +// Job itself and the Job template spec.` +func addJobGlobalIndex(t *testing.T, js *jobset.JobSet, job *batchv1.Job) { + t.Helper() + + rjobName := job.Annotations[jobset.ReplicatedJobNameKey] + jobIdx, err := strconv.Atoi(job.Annotations[jobset.JobIndexKey]) + if err != nil { + t.Fatalf("invalid test case: %v", err) + } + // Job label/annotation + job.Labels[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjobName, jobIdx) + job.Annotations[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjobName, jobIdx) + + // Job template spec label/annotation + job.Spec.Template.Labels[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjobName, jobIdx) + job.Spec.Template.Annotations[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjobName, jobIdx) +} + func TestUpdateConditions(t *testing.T) { var ( jobSetName = "test-jobset" @@ -1381,3 +1410,64 @@ func TestCreateHeadlessSvcIfNecessary(t *testing.T) { }) } } + +func TestGlobalJobIndex(t *testing.T) { + tests := []struct { + name string + jobSet *jobset.JobSet + replicatedJob string + jobIdx int + expectedJobGlobalIndex string + }{ + { + name: "single replicated job", + jobSet: &jobset.JobSet{ + Spec: jobset.JobSetSpec{ + ReplicatedJobs: []jobset.ReplicatedJob{ + {Name: "rjob", Replicas: 3}, + }, + }, + }, + replicatedJob: "rjob", + jobIdx: 1, + expectedJobGlobalIndex: "1", + }, + { + name: "multiple replicated jobs", + jobSet: &jobset.JobSet{ + Spec: jobset.JobSetSpec{ + ReplicatedJobs: []jobset.ReplicatedJob{ + {Name: "rjob1", Replicas: 2}, + {Name: "rjob2", Replicas: 4}, + {Name: "rjob3", Replicas: 1}, + }, + }, + }, + replicatedJob: "rjob2", + jobIdx: 3, + expectedJobGlobalIndex: "5", + }, + { + name: "replicated job not found", + jobSet: &jobset.JobSet{ + Spec: jobset.JobSetSpec{ + ReplicatedJobs: []jobset.ReplicatedJob{ + {Name: "rjob1", Replicas: 2}, + }, + }, + }, + replicatedJob: "rjob2", + jobIdx: 0, + expectedJobGlobalIndex: "", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + actualJobGlobalIndex := globalJobIndex(tc.jobSet, tc.replicatedJob, tc.jobIdx) + if diff := cmp.Diff(tc.expectedJobGlobalIndex, actualJobGlobalIndex); diff != "" { + t.Errorf("unexpected global job index (-want/+got): %s", diff) + } + }) + } +}