Skip to content

Commit

Permalink
Propagate schedulingGates set on PodTemplate when resuming JobSet (#705)
Browse files Browse the repository at this point in the history
  • Loading branch information
mimowo authored Nov 12, 2024
1 parent 1fe81df commit 82dbee1
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,10 @@ func (r *JobSetReconciler) resumeJob(ctx context.Context, job *batchv1.Job, repl
job.Spec.Template.Spec.Tolerations,
replicatedJobPodTemplate.Spec.Tolerations,
)
job.Spec.Template.Spec.SchedulingGates = collections.MergeSlices(
job.Spec.Template.Spec.SchedulingGates,
replicatedJobPodTemplate.Spec.SchedulingGates,
)
} else {
log.Error(nil, "job missing ReplicatedJobName label")
}
Expand Down
72 changes: 72 additions & 0 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
"sigs.k8s.io/jobset/pkg/util/testing"
Expand Down Expand Up @@ -198,6 +199,77 @@ var _ = ginkgo.Describe("JobSet", func() {
})
})

// This test shows that when a JobSet is resumed it allows to add a
// scheduling gate and propagates it down to Pods. This scenario is needed
// for the integration with Kueue, to support TopologyAwareScheduling (TAS),
// which adds the kueue.x-k8s.io/topology scheduling gate to control
// assignment of Pods to the topology domains.
ginkgo.When("JobSet is resumed is propagates scheduling gates to Pods", func() {

ginkgo.It("should allow to add schedulingGates to PodTemplate while resuming", func() {
ctx := context.Background()
js := sleepTestJobSet(ns, 1).Obj()
jsKey := types.NamespacedName{Name: js.Name, Namespace: js.Namespace}
const (
schedulingGateName = "example.com/gate"
)

ginkgo.By("Create a suspended JobSet", func() {
js.Spec.Suspend = ptr.To(true)
gomega.Expect(k8sClient.Create(ctx, js)).Should(gomega.Succeed())
})

ginkgo.By("Resume the JobSet and set schedulingGates", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
js.Spec.Suspend = ptr.To(false)
podTemplate := &js.Spec.ReplicatedJobs[0].Template.Spec.Template
podTemplate.Spec.SchedulingGates = append(podTemplate.Spec.SchedulingGates, corev1.PodSchedulingGate{
Name: schedulingGateName,
})
g.Expect(k8sClient.Update(ctx, js)).Should(gomega.Succeed())
}, timeout, interval).Should(gomega.Succeed())
})

// In this test the number of expected Pods equals the number of
// expected Jobs as the Jobs don't set completions or parallelism,
// so 1 Pod per Job is implied.
expectedPods := util.NumExpectedJobs(js)
ginkgo.By("Await for the expected number of gated pods created", func() {
gomega.Eventually(func(g gomega.Gomega) {
list := &corev1.PodList{}
g.Expect(k8sClient.List(ctx, list, client.InNamespace(js.Namespace))).Should(gomega.Succeed())
gatedCount := 0
for _, p := range list.Items {
if len(p.Spec.SchedulingGates) == 1 && p.Spec.SchedulingGates[0].Name == schedulingGateName {
gatedCount++
}
}
g.Expect(gatedCount).Should(gomega.Equal(expectedPods),
fmt.Sprintf("expected %v gated pods, got: %v, found items: %v", expectedPods, gatedCount, list.Items))
}, timeout, interval).Should(gomega.Succeed())
})

ginkgo.By("Ungate all of the pods to let the Job run and complete", func() {
gomega.Eventually(func(g gomega.Gomega) {
list := &corev1.PodList{}
g.Expect(k8sClient.List(ctx, list, client.InNamespace(js.Namespace))).Should(gomega.Succeed())
for i := range list.Items {
p := &list.Items[i]
if len(p.Spec.SchedulingGates) == 1 && p.Spec.SchedulingGates[0].Name == schedulingGateName {
p.Spec.SchedulingGates = nil
g.Expect(k8sClient.Update(ctx, p)).Should(gomega.Succeed())
}
}
}, timeout, interval).Should(gomega.Succeed())
})

ginkgo.By("Await for the JobSet to complete successfully", func() {
util.JobSetCompleted(ctx, k8sClient, js, timeout)
})
})
})

}) // end of Describe

// getPingCommand returns ping command for 4 hostnames
Expand Down

0 comments on commit 82dbee1

Please sign in to comment.