Skip to content

Commit

Permalink
Add queue-allocated test;
Browse files Browse the repository at this point in the history
  • Loading branch information
waiterQ authored and jiangkaihua committed Dec 12, 2022
1 parent 77330a3 commit 83312e4
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 1 deletion.
3 changes: 3 additions & 0 deletions pkg/scheduler/util/scheduler_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ func GetMinInt(vals ...int) int {
// ConvertRes2ResList convert resource type from api.Resource in scheduler to v1.ResourceList in yaml
func ConvertRes2ResList(res *api.Resource) v1.ResourceList {
var rl = v1.ResourceList{}
if res == nil {
return rl
}
rl[v1.ResourceCPU] = *resource.NewMilliQuantity(int64(res.MilliCPU), resource.DecimalSI)
rl[v1.ResourceMemory] = *resource.NewQuantity(int64(res.Memory), resource.BinarySI)
for resourceName, f := range res.ScalarResources {
Expand Down
38 changes: 38 additions & 0 deletions pkg/scheduler/util/scheduler_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"reflect"
"testing"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

"volcano.sh/volcano/pkg/scheduler/api"
)

Expand Down Expand Up @@ -99,3 +102,38 @@ func TestGetMinInt(t *testing.T) {
}
}
}

func TestConvertRes2ResList(t *testing.T) {
cases := []struct {
name string
Res *api.Resource
Expect v1.ResourceList
}{
{
name: "mutiResources",
Res: &api.Resource{
MilliCPU: 4000,
Memory: 4000,
ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 1, "scalar.test/scalar2": 2},
},
Expect: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(4000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(4000, resource.BinarySI),
"scalar.test/scalar1": *resource.NewMilliQuantity(1, resource.DecimalSI),
"scalar.test/scalar2": *resource.NewMilliQuantity(2, resource.DecimalSI),
},
},
{
name: "null Resources",
Res: nil,
Expect: v1.ResourceList{},
},
}

for _, test := range cases {
result := ConvertRes2ResList(test.Res)
if !reflect.DeepEqual(result, test.Expect) {
t.Errorf("Failed test case #%s, expected: %#v, got %#v", test.name, test.Expect, result)
}
}
}
131 changes: 130 additions & 1 deletion test/e2e/jobseq/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ package jobseq

import (
"context"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1helper "k8s.io/kubernetes/pkg/scheduler/util"

busv1alpha1 "volcano.sh/apis/pkg/apis/bus/v1alpha1"
schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/pkg/cli/util"

"volcano.sh/volcano/pkg/scheduler/api"
e2eutil "volcano.sh/volcano/test/e2e/util"
)

Expand Down Expand Up @@ -65,4 +68,130 @@ var _ = Describe("Queue E2E Test", func() {
Expect(err).NotTo(HaveOccurred(), "Wait for reopen queue %s failed", q1)

})

It("Queue status allocated Check", func() {
q1 := "q-test-allocated"
ctx := e2eutil.InitTestContext(e2eutil.Options{
Queues: []string{q1},
})
defer e2eutil.CleanupTestContext(ctx)

err := e2eutil.WaitQueueStatus(func() (bool, error) {
queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", q1)
return queue.Status.State == schedulingv1beta1.QueueStateOpen, nil
})
Expect(err).NotTo(HaveOccurred(), "Wait for queue %s open failed", q1)

job1 := e2eutil.CreateJob(ctx, &e2eutil.JobSpec{
Name: "job1",
Queue: q1,
Tasks: []e2eutil.TaskSpec{
{
Img: e2eutil.DefaultBusyBoxImage,
Req: e2eutil.HalfCPU,
Command: "sleep 300s && exit 3",
Rep: 2,
},
{
Img: e2eutil.DefaultBusyBoxImage,
Req: e2eutil.CPU1Mem1,
Rep: 1,
Command: "sleep 300s && exit 3",
},
},
})

err = e2eutil.WaitJobReady(ctx, job1)
Expect(err).NotTo(HaveOccurred())

By("queue allocated check 1")
expectAllocated := &api.Resource{
MilliCPU: 2000,
Memory: 1000,
}
err = e2eutil.WaitQueueStatus(func() (bool, error) {
queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", q1)
allocated := ConvertResource(queue.Status.Allocated)
allocated.Memory /= 1<<30 // Convert to million

return allocated.Equal(expectAllocated, api.Zero), nil
})
Expect(err).NotTo(HaveOccurred(), "Wait for queue %s allocated check 1 failed", q1)

job1.Spec.Tasks[0].Replicas = 1
job1.Spec.Tasks[0].MinAvailable = &job1.Spec.Tasks[0].Replicas
job1.Spec.Tasks[1].Replicas = 2
job1.Spec.Tasks[1].MinAvailable = &job1.Spec.Tasks[1].Replicas
err = e2eutil.UpdateJob(ctx, job1)
Expect(err).NotTo(HaveOccurred())

err = e2eutil.WaitJobReady(ctx, job1)
Expect(err).NotTo(HaveOccurred())

By("queue allocated check 2")
expectAllocated = &api.Resource{
MilliCPU: 2500,
Memory: 2000,
}
err = e2eutil.WaitQueueStatus(func() (bool, error) {
queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", q1)
allocated := ConvertResource(queue.Status.Allocated)
allocated.Memory /= 1024*1024*1024 // Convert to million

return allocated.Equal(expectAllocated, api.Zero), nil
})
Expect(err).NotTo(HaveOccurred(), "Wait for queue %s allocated check 2 failed", q1)

job1.Spec.Tasks[0].Replicas = 0
job1.Spec.Tasks[0].MinAvailable = &job1.Spec.Tasks[0].Replicas
job1.Spec.Tasks[1].Replicas = 0
job1.Spec.Tasks[1].MinAvailable = &job1.Spec.Tasks[1].Replicas
job1.Spec.MinAvailable = 0
err = e2eutil.UpdateJob(ctx, job1)
Expect(err).NotTo(HaveOccurred())

err = e2eutil.WaitJobReady(ctx, job1)
Expect(err).NotTo(HaveOccurred())

By("queue allocated check 3")
expectAllocated = &api.Resource{}
err = e2eutil.WaitQueueStatus(func() (bool, error) {
queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", q1)
allocated := ConvertResource(queue.Status.Allocated)
allocated.Memory /= 1<<30 // Convert to million

return allocated.Equal(expectAllocated, api.Zero), nil
})
Expect(err).NotTo(HaveOccurred(), "Wait for queue %s allocated check 3 failed", q1)
})
})

// ConvertResource creates a new resource object from resource list
func ConvertResource(rl v1.ResourceList) *api.Resource {
r := api.EmptyResource()
for rName, rQuant := range rl {
switch rName {
case v1.ResourceCPU:
r.MilliCPU += float64(rQuant.MilliValue())
case v1.ResourceMemory:
r.Memory += float64(rQuant.MilliValue()) // diff
case v1.ResourcePods:
r.MaxTaskNum += int(rQuant.Value())
case v1.ResourceEphemeralStorage:
r.AddScalar(rName, float64(rQuant.MilliValue()))
default:
if api.IsCountQuota(rName) {
continue
}
//NOTE: When converting this back to k8s resource, we need record the format as well as / 1000
if v1helper.IsScalarResourceName(rName) {
r.AddScalar(rName, float64(rQuant.MilliValue()))
}
}
}
return r
}

0 comments on commit 83312e4

Please sign in to comment.