Skip to content

Commit

Permalink
integration tests for managedJobNamespaceSelector functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
dgrove-oss committed Dec 4, 2024
1 parent 50563f7 commit f17a525
Show file tree
Hide file tree
Showing 13 changed files with 129 additions and 24 deletions.
41 changes: 39 additions & 2 deletions test/integration/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -63,9 +62,15 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
ginkgo.BeforeAll(func() {
fwk.StartManager(ctx, cfg, managerSetup(
jobframework.WithManageJobsWithoutQueueName(true),
jobframework.WithManagedJobsNamespaceSelector(labels.Everything()),
jobframework.WithManagedJobsNamespaceSelector(util.NewNamespaceSelectorExcluding("unmanaged-ns")),
jobframework.WithLabelKeysToCopy([]string{"toCopyKey"}),
))
unmanagedNamespace := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "unmanaged-ns",
},
}
gomega.Expect(k8sClient.Create(ctx, unmanagedNamespace)).To(gomega.Succeed())
})
ginkgo.AfterAll(func() {
fwk.StopManager(ctx)
Expand Down Expand Up @@ -283,6 +288,38 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
gomega.Expect(createdWorkload.Spec.QueueName).Should(gomega.Equal(jobQueueName))
})

ginkgo.It("Should not manage a job without a queue-name submittted to an unmanaged namespace", func() {
ginkgo.By("Creating an unsuspended job without a queue-name in unmanaged-ns")
job := testingjob.MakeJob(jobName, "unmanaged-ns").Suspend(false).Obj()
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())

ginkgo.By("The job is not suspended and a workload is not created")
lookupKey := types.NamespacedName{Name: job.Name, Namespace: job.Namespace}
childWorkload := &kueue.Workload{}
childWlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name, job.UID), Namespace: job.Namespace}
gomega.Consistently(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, lookupKey, job)).Should(gomega.Succeed())
g.Expect(job.Spec.Suspend).Should(gomega.Equal(ptr.To(false)))
g.Expect(k8sClient.Get(ctx, childWlLookupKey, childWorkload)).Should(testing.BeNotFoundError())
}, util.ConsistentDuration, util.Interval).Should(gomega.Succeed())
})

ginkgo.It("Should manage a job without a queue-name submittted to managed namespace", func() {
ginkgo.By("Creating an unsuspended job without a queue-name in a")
job := testingjob.MakeJob(jobName, ns.Name).Suspend(false).Obj()
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())

ginkgo.By("The job is suspended and a workload is created")
lookupKey := types.NamespacedName{Name: job.Name, Namespace: ns.Name}
childWorkload := &kueue.Workload{}
childWlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name, job.UID), Namespace: job.Namespace}
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, lookupKey, job)).Should(gomega.Succeed())
g.Expect(job.Spec.Suspend).Should(gomega.Equal(ptr.To(true)))
g.Expect(k8sClient.Get(ctx, childWlLookupKey, childWorkload)).Should(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.When("The parent job is managed by kueue", func() {
ginkgo.It("Should suspend a job if the parent workload does not exist", func() {
ginkgo.By("creating the parent job")
Expand Down
39 changes: 37 additions & 2 deletions test/integration/controller/jobs/jobset/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -57,7 +56,14 @@ const (

var _ = ginkgo.Describe("JobSet controller", ginkgo.Ordered, ginkgo.ContinueOnFailure, ginkgo.ContinueOnFailure, func() {
ginkgo.BeforeAll(func() {
fwk.StartManager(ctx, cfg, managerSetup(jobframework.WithManageJobsWithoutQueueName(true), jobframework.WithManagedJobsNamespaceSelector(labels.Everything())))
fwk.StartManager(ctx, cfg, managerSetup(jobframework.WithManageJobsWithoutQueueName(true),
jobframework.WithManagedJobsNamespaceSelector(util.NewNamespaceSelectorExcluding("unmanaged-ns"))))
unmanagedNamespace := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "unmanaged-ns",
},
}
gomega.Expect(k8sClient.Create(ctx, unmanagedNamespace)).To(gomega.Succeed())
})
ginkgo.AfterAll(func() {
fwk.StopManager(ctx)
Expand Down Expand Up @@ -274,6 +280,35 @@ var _ = ginkgo.Describe("JobSet controller", ginkgo.Ordered, ginkgo.ContinueOnFa
util.ExpectWorkloadToFinish(ctx, k8sClient, wlLookupKey)
})

ginkgo.It("A jobset created in an unmanaged namespace is not suspended and a workload is not created", func() {
ginkgo.By("Creating an unsuspended job without a queue-name in unmanaged-ns")
jobSet := testingjobset.MakeJobSet(jobSetName, "unmanaged-ns").ReplicatedJobs(
testingjobset.ReplicatedJobRequirements{
Name: "replicated-job-1",
Replicas: 1,
Parallelism: 1,
Completions: 1,
}, testingjobset.ReplicatedJobRequirements{
Name: "replicated-job-2",
Replicas: 3,
Parallelism: 1,
Completions: 1,
},
).Suspend(false).
Obj()

gomega.Expect(k8sClient.Create(ctx, jobSet)).To(gomega.Succeed())
createdJobSet := &jobsetapi.JobSet{}
wlLookupKey := types.NamespacedName{Name: workloadjobset.GetWorkloadNameForJobSet(jobSet.Name, jobSet.UID), Namespace: ns.Name}
createdWorkload := &kueue.Workload{}

gomega.Consistently(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: jobSetName, Namespace: jobSet.Namespace}, createdJobSet)).Should(gomega.Succeed())
g.Expect(ptr.Deref(createdJobSet.Spec.Suspend, false)).Should(gomega.BeFalse())
g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(testing.BeNotFoundError())
}, util.ConsistentDuration, util.Interval).Should(gomega.Succeed())
})

ginkgo.It("Should finish the preemption when the jobset becomes inactive", func() {
jobSet := testingjobset.MakeJobSet(jobSetName, ns.Name).ReplicatedJobs(
testingjobset.ReplicatedJobRequirements{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -58,7 +57,8 @@ const (

var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailure, ginkgo.ContinueOnFailure, func() {
ginkgo.BeforeAll(func() {
fwk.StartManager(ctx, cfg, managerSetup(false, jobframework.WithManageJobsWithoutQueueName(true), jobframework.WithManagedJobsNamespaceSelector(labels.Everything())))
fwk.StartManager(ctx, cfg, managerSetup(false, jobframework.WithManageJobsWithoutQueueName(true),
jobframework.WithManagedJobsNamespaceSelector(util.NewNamespaceSelectorExcluding("unmanaged-ns"))))
})
ginkgo.AfterAll(func() {
fwk.StopManager(ctx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"

Expand Down Expand Up @@ -54,7 +53,8 @@ const (

var _ = ginkgo.Describe("Job controller", framework.RedundantSpec, ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
ginkgo.BeforeAll(func() {
fwk.StartManager(ctx, cfg, managerSetup(jobframework.WithManageJobsWithoutQueueName(true), jobframework.WithManagedJobsNamespaceSelector(labels.Everything())))
fwk.StartManager(ctx, cfg, managerSetup(jobframework.WithManageJobsWithoutQueueName(true),
jobframework.WithManagedJobsNamespaceSelector(util.NewNamespaceSelectorExcluding("unmanaged-ns"))))
})

ginkgo.AfterAll(func() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"

Expand Down Expand Up @@ -55,7 +54,8 @@ const (

var _ = ginkgo.Describe("Job controller", framework.RedundantSpec, ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
ginkgo.BeforeAll(func() {
fwk.StartManager(ctx, cfg, managerSetup(jobframework.WithManageJobsWithoutQueueName(true), jobframework.WithManagedJobsNamespaceSelector(labels.Everything())))
fwk.StartManager(ctx, cfg, managerSetup(jobframework.WithManageJobsWithoutQueueName(true),
jobframework.WithManagedJobsNamespaceSelector(util.NewNamespaceSelectorExcluding("unmanaged-ns"))))
})

ginkgo.AfterAll(func() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -57,7 +56,8 @@ const (

var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
ginkgo.BeforeAll(func() {
fwk.StartManager(ctx, cfg, managerSetup(jobframework.WithManageJobsWithoutQueueName(true), jobframework.WithManagedJobsNamespaceSelector(labels.Everything())))
fwk.StartManager(ctx, cfg, managerSetup(jobframework.WithManageJobsWithoutQueueName(true),
jobframework.WithManagedJobsNamespaceSelector(util.NewNamespaceSelectorExcluding("unmanaged-ns"))))
})

ginkgo.AfterAll(func() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
schedulingv1 "k8s.io/api/scheduling/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -54,7 +53,8 @@ const (

var _ = ginkgo.Describe("RayCluster controller", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
ginkgo.BeforeAll(func() {
fwk.StartManager(ctx, cfg, managerSetup(jobframework.WithManageJobsWithoutQueueName(true), jobframework.WithManagedJobsNamespaceSelector(labels.Everything())))
fwk.StartManager(ctx, cfg, managerSetup(jobframework.WithManageJobsWithoutQueueName(true),
jobframework.WithManagedJobsNamespaceSelector(util.NewNamespaceSelectorExcluding("unmanaged-ns"))))
})
ginkgo.AfterAll(func() {
fwk.StopManager(ctx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
schedulingv1 "k8s.io/api/scheduling/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -61,7 +60,8 @@ func setInitStatus(name, namespace string) {

var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
ginkgo.BeforeAll(func() {
fwk.StartManager(ctx, cfg, managerSetup(jobframework.WithManageJobsWithoutQueueName(true), jobframework.WithManagedJobsNamespaceSelector(labels.Everything())))
fwk.StartManager(ctx, cfg, managerSetup(jobframework.WithManageJobsWithoutQueueName(true),
jobframework.WithManagedJobsNamespaceSelector(util.NewNamespaceSelectorExcluding("unmanaged-ns"))))
})

ginkgo.AfterAll(func() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"

Expand Down Expand Up @@ -55,7 +54,8 @@ const (

var _ = ginkgo.Describe("Job controller", framework.RedundantSpec, ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
ginkgo.BeforeAll(func() {
fwk.StartManager(ctx, cfg, managerSetup(jobframework.WithManageJobsWithoutQueueName(true), jobframework.WithManagedJobsNamespaceSelector(labels.Everything())))
fwk.StartManager(ctx, cfg, managerSetup(jobframework.WithManageJobsWithoutQueueName(true),
jobframework.WithManagedJobsNamespaceSelector(util.NewNamespaceSelectorExcluding("unmanaged-ns"))))
})

ginkgo.AfterAll(func() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"

Expand Down Expand Up @@ -55,7 +54,8 @@ const (

var _ = ginkgo.Describe("Job controller", framework.RedundantSpec, ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
ginkgo.BeforeAll(func() {
fwk.StartManager(ctx, cfg, managerSetup(jobframework.WithManageJobsWithoutQueueName(true), jobframework.WithManagedJobsNamespaceSelector(labels.Everything())))
fwk.StartManager(ctx, cfg, managerSetup(jobframework.WithManageJobsWithoutQueueName(true),
jobframework.WithManagedJobsNamespaceSelector(util.NewNamespaceSelectorExcluding("unmanaged-ns"))))
})

ginkgo.AfterAll(func() {
Expand Down
21 changes: 19 additions & 2 deletions test/integration/webhook/jobs/job_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
"k8s.io/utils/ptr"
Expand All @@ -48,9 +47,15 @@ var _ = ginkgo.Describe("Job Webhook With manageJobsWithoutQueueName enabled", g
fwk.StartManager(ctx, cfg, managerSetup(
job.SetupWebhook,
jobframework.WithManageJobsWithoutQueueName(true),
jobframework.WithManagedJobsNamespaceSelector(labels.Everything()),
jobframework.WithManagedJobsNamespaceSelector(util.NewNamespaceSelectorExcluding("unmanaged-ns")),
jobframework.WithKubeServerVersion(serverVersionFetcher),
))
unmanagedNamespace := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "unmanaged-ns",
},
}
gomega.Expect(k8sClient.Create(ctx, unmanagedNamespace)).To(gomega.Succeed())
})
ginkgo.BeforeEach(func() {
ns = &corev1.Namespace{
Expand Down Expand Up @@ -87,6 +92,18 @@ var _ = ginkgo.Describe("Job Webhook With manageJobsWithoutQueueName enabled", g
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.It("Should not suspend a Job with no queue name specified in an unmanaged namespace", func() {
job := testingjob.MakeJob("job-without-queue-name-unmanaged", "unmanaged-ns").Suspend(false).Obj()
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())

lookupKey := types.NamespacedName{Name: job.Name, Namespace: job.Namespace}
createdJob := &batchv1.Job{}
gomega.Consistently(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
g.Expect(createdJob.Spec.Suspend).Should(gomega.Equal(ptr.To(false)))
}, util.ConsistentDuration, util.Interval).Should(gomega.Succeed())
})

ginkgo.It("Should not update unsuspend Job successfully when adding queue name", func() {
job := testingjob.MakeJob("job-without-queue-name", ns.Name).Suspend(false).Obj()
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())
Expand Down
3 changes: 1 addition & 2 deletions test/integration/webhook/jobs/raycluster_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -101,7 +100,7 @@ var _ = ginkgo.Describe("RayCluster Webhook", func() {
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook)

return nil
}, jobframework.WithManageJobsWithoutQueueName(true), jobframework.WithManagedJobsNamespaceSelector(labels.Everything())))
}, jobframework.WithManageJobsWithoutQueueName(true), jobframework.WithManagedJobsNamespaceSelector(util.NewNamespaceSelectorExcluding("unmanaged-ns"))))
})
ginkgo.BeforeEach(func() {
ns = &corev1.Namespace{
Expand Down
17 changes: 17 additions & 0 deletions test/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
Expand Down Expand Up @@ -823,3 +824,19 @@ func CreateNodes(ctx context.Context, c client.Client, nodes []corev1.Node) {
}, Timeout, Interval).Should(gomega.Succeed())
}
}

func NewNamespaceSelectorExcluding(unmanaged ...string) labels.Selector {
unmanaged = append(unmanaged, "kube-system", "kueue_system")
ls := &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "kubernetes.io/metadata.name",
Operator: metav1.LabelSelectorOpNotIn,
Values: unmanaged,
},
},
}
sel, err := metav1.LabelSelectorAsSelector(ls)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
return sel
}

0 comments on commit f17a525

Please sign in to comment.