Skip to content

Commit

Permalink
Add new job-id annotation to assign globally unique job index to each…
Browse files Browse the repository at this point in the history
… job (#650)

* add job-id annotation

* update unit tests

* change name to job global index
  • Loading branch information
danielvegamyhre authored Aug 16, 2024
1 parent ec39730 commit 7886910
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 2 deletions.
11 changes: 9 additions & 2 deletions api/jobset/v1alpha2/jobset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,15 @@ const (
ReplicatedJobReplicas string = "jobset.sigs.k8s.io/replicatedjob-replicas"
// ReplicatedJobNameKey is used to index into a Jobs labels and retrieve the name of the parent ReplicatedJob
ReplicatedJobNameKey string = "jobset.sigs.k8s.io/replicatedjob-name"
JobIndexKey string = "jobset.sigs.k8s.io/job-index"
JobKey string = "jobset.sigs.k8s.io/job-key"
// JobIndexKey is a label/annotation set to the index of the Job replica within its parent replicatedJob.
// For each replicatedJob, this value will range from 0 to replicas-1, where `replicas`
// is equal to jobset.spec.replicatedJobs[*].replicas.
JobIndexKey string = "jobset.sigs.k8s.io/job-index"
// JobGlobalIndexKey is a label/annotation set to an integer that is unique across the entire JobSet.
// For each JobSet, this value will range from 0 to N-1, where N=total number of jobs in the jobset.
JobGlobalIndexKey string = "jobset.sigs.k8s.io/job-global-index"
// JobKey holds the SHA256 hash of the namespaced job name, which can be used to uniquely identify the job.
JobKey string = "jobset.sigs.k8s.io/job-key"
// ExclusiveKey is an annotation that can be set on the JobSet or on a ReplicatedJob template.
// If set at the JobSet level, all child jobs from all ReplicatedJobs will be scheduled using exclusive
// job placement per topology group (defined as the label value).
Expand Down
31 changes: 31 additions & 0 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ func labelAndAnnotateObject(obj metav1.Object, js *jobset.JobSet, rjob *jobset.R
labels[jobset.ReplicatedJobReplicas] = strconv.Itoa(int(rjob.Replicas))
labels[jobset.JobIndexKey] = strconv.Itoa(jobIdx)
labels[jobset.JobKey] = jobHashKey(js.Namespace, jobName)
labels[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjob.Name, jobIdx)

// Set annotations on the object.
annotations := collections.CloneMap(obj.GetAnnotations())
Expand All @@ -739,6 +740,7 @@ func labelAndAnnotateObject(obj metav1.Object, js *jobset.JobSet, rjob *jobset.R
annotations[jobset.ReplicatedJobReplicas] = strconv.Itoa(int(rjob.Replicas))
annotations[jobset.JobIndexKey] = strconv.Itoa(jobIdx)
annotations[jobset.JobKey] = jobHashKey(js.Namespace, jobName)
annotations[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjob.Name, jobIdx)

// Apply coordinator annotation/label if a coordinator is defined in the JobSet spec.
if js.Spec.Coordinator != nil {
Expand Down Expand Up @@ -1032,3 +1034,32 @@ func exclusiveConditions(cond1, cond2 metav1.Condition) bool {
func coordinatorEndpoint(js *jobset.JobSet) string {
return fmt.Sprintf("%s-%s-%d-%d.%s", js.Name, js.Spec.Coordinator.ReplicatedJob, js.Spec.Coordinator.JobIndex, js.Spec.Coordinator.PodIndex, GetSubdomain(js))
}

// globalJobIndex determines the job global index for a given job. The job global index is a unique
// global index for the job, with values ranging from 0 to N-1,
// where N=total number of jobs in the jobset. The job global index is calculated by
// iterating through the replicatedJobs in the order, as defined in the JobSet
// spec, keeping a cumulative sum of total replicas seen so far, then when we
// arrive at the parent replicatedJob of the target job, we add the local job
// index to our running sum of total jobs seen so far, in order to arrive at
// the final job global index value.
//
// Below is a diagram illustrating how job global indexs differ from job indexes.
//
// | my-jobset |
// | replicated job A | replicated job B |
// | job index 0 | job index 1 | job index 0 | job index 1 |
// | global index 0 | global index 2 | global index 3 | global index 4 |
//
// Returns an empty string if the parent replicated Job does not exist,
// although this should never happen in practice.
func globalJobIndex(js *jobset.JobSet, replicatedJobName string, jobIdx int) string {
currTotalJobs := 0
for _, rjob := range js.Spec.ReplicatedJobs {
if rjob.Name == replicatedJobName {
return strconv.Itoa(currTotalJobs + jobIdx)
}
currTotalJobs += int(rjob.Replicas)
}
return ""
}
90 changes: 90 additions & 0 deletions pkg/controllers/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,15 @@ func TestConstructJobsFromTemplate(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
// Here we update the expected Jobs with certain features which require
// direct access to the JobSet object itself to calculate. For example,
// the `jobset.sigs.k8s.io/job-global-index` annotation requires access to the
// full JobSet spec to calculate a unique ID for each Job.
for _, expectedJob := range tc.want {
addJobGlobalIndex(t, tc.js, expectedJob)
}

// Now get the actual output of constructJobsFromTemplate, and diff the results.
var got []*batchv1.Job
for _, rjob := range tc.js.Spec.ReplicatedJobs {
jobs := constructJobsFromTemplate(tc.js, &rjob, tc.ownedJobs)
Expand All @@ -699,6 +708,26 @@ func TestConstructJobsFromTemplate(t *testing.T) {
}
}

// addJobGlobalIndex modifies the Job object in place by adding
// the `jobset.sigs.k8s.io/job-global-index` label/annotation to both the
// Job itself and the Job template spec.`
func addJobGlobalIndex(t *testing.T, js *jobset.JobSet, job *batchv1.Job) {
t.Helper()

rjobName := job.Annotations[jobset.ReplicatedJobNameKey]
jobIdx, err := strconv.Atoi(job.Annotations[jobset.JobIndexKey])
if err != nil {
t.Fatalf("invalid test case: %v", err)
}
// Job label/annotation
job.Labels[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjobName, jobIdx)
job.Annotations[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjobName, jobIdx)

// Job template spec label/annotation
job.Spec.Template.Labels[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjobName, jobIdx)
job.Spec.Template.Annotations[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjobName, jobIdx)
}

func TestUpdateConditions(t *testing.T) {
var (
jobSetName = "test-jobset"
Expand Down Expand Up @@ -1381,3 +1410,64 @@ func TestCreateHeadlessSvcIfNecessary(t *testing.T) {
})
}
}

func TestGlobalJobIndex(t *testing.T) {
tests := []struct {
name string
jobSet *jobset.JobSet
replicatedJob string
jobIdx int
expectedJobGlobalIndex string
}{
{
name: "single replicated job",
jobSet: &jobset.JobSet{
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{Name: "rjob", Replicas: 3},
},
},
},
replicatedJob: "rjob",
jobIdx: 1,
expectedJobGlobalIndex: "1",
},
{
name: "multiple replicated jobs",
jobSet: &jobset.JobSet{
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{Name: "rjob1", Replicas: 2},
{Name: "rjob2", Replicas: 4},
{Name: "rjob3", Replicas: 1},
},
},
},
replicatedJob: "rjob2",
jobIdx: 3,
expectedJobGlobalIndex: "5",
},
{
name: "replicated job not found",
jobSet: &jobset.JobSet{
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{Name: "rjob1", Replicas: 2},
},
},
},
replicatedJob: "rjob2",
jobIdx: 0,
expectedJobGlobalIndex: "",
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
actualJobGlobalIndex := globalJobIndex(tc.jobSet, tc.replicatedJob, tc.jobIdx)
if diff := cmp.Diff(tc.expectedJobGlobalIndex, actualJobGlobalIndex); diff != "" {
t.Errorf("unexpected global job index (-want/+got): %s", diff)
}
})
}
}

0 comments on commit 7886910

Please sign in to comment.