diff --git a/pkg/scheduler/util/scheduler_helper.go b/pkg/scheduler/util/scheduler_helper.go index 7d72999aa84..c74e6541cfe 100644 --- a/pkg/scheduler/util/scheduler_helper.go +++ b/pkg/scheduler/util/scheduler_helper.go @@ -206,6 +206,9 @@ func GetMinInt(vals ...int) int { // ConvertRes2ResList convert resource type from api.Resource in scheduler to v1.ResourceList in yaml func ConvertRes2ResList(res *api.Resource) v1.ResourceList { var rl = v1.ResourceList{} + if res == nil { + return rl + } rl[v1.ResourceCPU] = *resource.NewMilliQuantity(int64(res.MilliCPU), resource.DecimalSI) rl[v1.ResourceMemory] = *resource.NewQuantity(int64(res.Memory), resource.BinarySI) for resourceName, f := range res.ScalarResources { diff --git a/pkg/scheduler/util/scheduler_helper_test.go b/pkg/scheduler/util/scheduler_helper_test.go index 7bda57ecfb9..a39303dc3d1 100644 --- a/pkg/scheduler/util/scheduler_helper_test.go +++ b/pkg/scheduler/util/scheduler_helper_test.go @@ -20,6 +20,9 @@ import ( "reflect" "testing" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "volcano.sh/volcano/pkg/scheduler/api" ) @@ -99,3 +102,38 @@ func TestGetMinInt(t *testing.T) { } } } + +func TestConvertRes2ResList(t *testing.T) { + cases := []struct { + name string + Res *api.Resource + Expect v1.ResourceList + }{ + { + name: "mutiResources", + Res: &api.Resource{ + MilliCPU: 4000, + Memory: 4000, + ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 1, "scalar.test/scalar2": 2}, + }, + Expect: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(4000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(4000, resource.BinarySI), + "scalar.test/scalar1": *resource.NewMilliQuantity(1, resource.DecimalSI), + "scalar.test/scalar2": *resource.NewMilliQuantity(2, resource.DecimalSI), + }, + }, + { + name: "null Resources", + Res: nil, + Expect: v1.ResourceList{}, + }, + } + + for _, test := range cases { + result := ConvertRes2ResList(test.Res) + if !reflect.DeepEqual(result, test.Expect) { + t.Errorf("Failed test case #%s, expected: %#v, got %#v", test.name, test.Expect, result) + } + } +} \ No newline at end of file diff --git a/test/e2e/jobseq/queue.go b/test/e2e/jobseq/queue.go index d2edb735bb0..62fdccb2d47 100644 --- a/test/e2e/jobseq/queue.go +++ b/test/e2e/jobseq/queue.go @@ -18,14 +18,17 @@ package jobseq import ( "context" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + v1helper "k8s.io/kubernetes/pkg/scheduler/util" busv1alpha1 "volcano.sh/apis/pkg/apis/bus/v1alpha1" schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" "volcano.sh/volcano/pkg/cli/util" - + "volcano.sh/volcano/pkg/scheduler/api" e2eutil "volcano.sh/volcano/test/e2e/util" ) @@ -65,4 +68,130 @@ var _ = Describe("Queue E2E Test", func() { Expect(err).NotTo(HaveOccurred(), "Wait for reopen queue %s failed", q1) }) + + It("Queue status allocated Check", func() { + q1 := "q-test-allocated" + ctx := e2eutil.InitTestContext(e2eutil.Options{ + Queues: []string{q1}, + }) + defer e2eutil.CleanupTestContext(ctx) + + err := e2eutil.WaitQueueStatus(func() (bool, error) { + queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", q1) + return queue.Status.State == schedulingv1beta1.QueueStateOpen, nil + }) + Expect(err).NotTo(HaveOccurred(), "Wait for queue %s open failed", q1) + + job1 := e2eutil.CreateJob(ctx, &e2eutil.JobSpec{ + Name: "job1", + Queue: q1, + Tasks: []e2eutil.TaskSpec{ + { + Img: e2eutil.DefaultBusyBoxImage, + Req: e2eutil.HalfCPU, + Command: "sleep 300s && exit 3", + Rep: 2, + }, + { + Img: e2eutil.DefaultBusyBoxImage, + Req: e2eutil.CPU1Mem1, + Rep: 1, + Command: "sleep 300s && exit 3", + }, + }, + }) + + err = e2eutil.WaitJobReady(ctx, job1) + Expect(err).NotTo(HaveOccurred()) + + By("queue allocated check 1") + expectAllocated := &api.Resource{ + MilliCPU: 2000, + Memory: 1000, + } + err = e2eutil.WaitQueueStatus(func() (bool, error) { + queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", q1) + allocated := ConvertResource(queue.Status.Allocated) + allocated.Memory /= 1<<30 // Convert to million + + return allocated.Equal(expectAllocated, api.Zero), nil + }) + Expect(err).NotTo(HaveOccurred(), "Wait for queue %s allocated check 1 failed", q1) + + job1.Spec.Tasks[0].Replicas = 1 + job1.Spec.Tasks[0].MinAvailable = &job1.Spec.Tasks[0].Replicas + job1.Spec.Tasks[1].Replicas = 2 + job1.Spec.Tasks[1].MinAvailable = &job1.Spec.Tasks[1].Replicas + err = e2eutil.UpdateJob(ctx, job1) + Expect(err).NotTo(HaveOccurred()) + + err = e2eutil.WaitJobReady(ctx, job1) + Expect(err).NotTo(HaveOccurred()) + + By("queue allocated check 2") + expectAllocated = &api.Resource{ + MilliCPU: 2500, + Memory: 2000, + } + err = e2eutil.WaitQueueStatus(func() (bool, error) { + queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", q1) + allocated := ConvertResource(queue.Status.Allocated) + allocated.Memory /= 1024*1024*1024 // Convert to million + + return allocated.Equal(expectAllocated, api.Zero), nil + }) + Expect(err).NotTo(HaveOccurred(), "Wait for queue %s allocated check 2 failed", q1) + + job1.Spec.Tasks[0].Replicas = 0 + job1.Spec.Tasks[0].MinAvailable = &job1.Spec.Tasks[0].Replicas + job1.Spec.Tasks[1].Replicas = 0 + job1.Spec.Tasks[1].MinAvailable = &job1.Spec.Tasks[1].Replicas + job1.Spec.MinAvailable = 0 + err = e2eutil.UpdateJob(ctx, job1) + Expect(err).NotTo(HaveOccurred()) + + err = e2eutil.WaitJobReady(ctx, job1) + Expect(err).NotTo(HaveOccurred()) + + By("queue allocated check 3") + expectAllocated = &api.Resource{} + err = e2eutil.WaitQueueStatus(func() (bool, error) { + queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", q1) + allocated := ConvertResource(queue.Status.Allocated) + allocated.Memory /= 1<<30 // Convert to million + + return allocated.Equal(expectAllocated, api.Zero), nil + }) + Expect(err).NotTo(HaveOccurred(), "Wait for queue %s allocated check 3 failed", q1) + }) }) + +// ConvertResource creates a new resource object from resource list +func ConvertResource(rl v1.ResourceList) *api.Resource { + r := api.EmptyResource() + for rName, rQuant := range rl { + switch rName { + case v1.ResourceCPU: + r.MilliCPU += float64(rQuant.MilliValue()) + case v1.ResourceMemory: + r.Memory += float64(rQuant.MilliValue()) // diff + case v1.ResourcePods: + r.MaxTaskNum += int(rQuant.Value()) + case v1.ResourceEphemeralStorage: + r.AddScalar(rName, float64(rQuant.MilliValue())) + default: + if api.IsCountQuota(rName) { + continue + } + //NOTE: When converting this back to k8s resource, we need record the format as well as / 1000 + if v1helper.IsScalarResourceName(rName) { + r.AddScalar(rName, float64(rQuant.MilliValue())) + } + } + } + return r +} \ No newline at end of file