Skip to content

Commit

Permalink
improve: add rwlock to fakebinder and fakeevictor
Browse files Browse the repository at this point in the history
Signed-off-by: lowang-bh <lhui_wang@163.com>
  • Loading branch information
lowang-bh committed May 6, 2024
1 parent 85cc961 commit 5fa8a3a
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 51 deletions.
16 changes: 7 additions & 9 deletions pkg/scheduler/actions/allocate/allocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestAllocate(t *testing.T) {
Queues: []*schedulingv1.Queue{
util.BuildQueue("c1", 1, nil),
},
Bind: map[string]string{
BindMap: map[string]string{
"c1/p1": "n1",
"c1/p2": "n1",
},
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestAllocate(t *testing.T) {
util.BuildQueue("c1", 1, nil),
util.BuildQueue("c2", 1, nil),
},
Bind: map[string]string{
BindMap: map[string]string{
"c2/pg2-p-1": "n1",
"c1/pg1-p-1": "n1",
},
Expand All @@ -128,7 +128,7 @@ func TestAllocate(t *testing.T) {
util.BuildQueue("c1", 1, nil),
util.BuildQueue("c2", 1, nil),
},
Bind: map[string]string{
BindMap: map[string]string{
"c1/p2": "n1",
},
BindsNum: 1,
Expand Down Expand Up @@ -276,10 +276,7 @@ func TestAllocateWithDynamicPVC(t *testing.T) {
}

fakeVolumeBinder := util.NewFakeVolumeBinder(kubeClient)
binder := &util.FakeBinder{
Binds: map[string]string{},
Channel: make(chan string, 10),
}
binder := util.NewFakeBinder(10)
schedulerCache := &cache.SchedulerCache{
Nodes: make(map[string]*api.NodeInfo),
Jobs: make(map[api.JobID]*api.JobInfo),
Expand Down Expand Up @@ -324,8 +321,9 @@ func TestAllocateWithDynamicPVC(t *testing.T) {
defer framework.CloseSession(ssn)

allocate.Execute(ssn)
if !reflect.DeepEqual(test.expectedBind, binder.Binds) {
t.Errorf("expected: %v, got %v ", test.expectedBind, binder.Binds)
bindResults := binder.Binds()
if !reflect.DeepEqual(test.expectedBind, bindResults) {
t.Errorf("expected: %v, got %v ", test.expectedBind, bindResults)
}
if !reflect.DeepEqual(test.expectedActions, fakeVolumeBinder.Actions) {
t.Errorf("expected: %v, got %v ", test.expectedActions, fakeVolumeBinder.Actions)
Expand Down
18 changes: 6 additions & 12 deletions pkg/scheduler/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,9 @@ func TestSchedulerCache_Bind_NodeWithSufficientResources(t *testing.T) {
owner := buildOwnerReference("j1")

cache := &SchedulerCache{
Jobs: make(map[api.JobID]*api.JobInfo),
Nodes: make(map[string]*api.NodeInfo),
Binder: &util.FakeBinder{
Binds: map[string]string{},
Channel: make(chan string),
},
Jobs: make(map[api.JobID]*api.JobInfo),
Nodes: make(map[string]*api.NodeInfo),
Binder: util.NewFakeBinder(0),
BindFlowChannel: make(chan *api.TaskInfo, 5000),
}

Expand All @@ -166,12 +163,9 @@ func TestSchedulerCache_Bind_NodeWithInsufficientResources(t *testing.T) {
owner := buildOwnerReference("j1")

cache := &SchedulerCache{
Jobs: make(map[api.JobID]*api.JobInfo),
Nodes: make(map[string]*api.NodeInfo),
Binder: &util.FakeBinder{
Binds: map[string]string{},
Channel: make(chan string),
},
Jobs: make(map[api.JobID]*api.JobInfo),
Nodes: make(map[string]*api.NodeInfo),
Binder: util.NewFakeBinder(0),
BindFlowChannel: make(chan *api.TaskInfo, 5000),
}

Expand Down
5 changes: 1 addition & 4 deletions pkg/scheduler/plugins/nodegroup/nodegroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,7 @@ func TestNodeGroup(t *testing.T) {

for i, test := range tests {
t.Run(fmt.Sprintf("case %v %v", i, test.name), func(t *testing.T) {
binder := &util.FakeBinder{
Binds: map[string]string{},
Channel: make(chan string),
}
binder := util.NewFakeBinder(0)
schedulerCache := &cache.SchedulerCache{
Nodes: make(map[string]*api.NodeInfo),
Jobs: make(map[api.JobID]*api.JobInfo),
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/predicates/predicates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestEventHandler(t *testing.T) {
PriClass: []*schedulingv1.PriorityClass{p1, p2},
PodGroups: []*schedulingv1beta1.PodGroup{pg1, pg2},
Queues: []*schedulingv1beta1.Queue{queue1},
Bind: map[string]string{ // podKey -> node
BindMap: map[string]string{ // podKey -> node
"ns1/worker-3": "node1",
},
BindsNum: 1,
Expand Down
5 changes: 1 addition & 4 deletions pkg/scheduler/plugins/proportion/proportion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,7 @@ func TestProportion(t *testing.T) {

for _, test := range tests {
// initialize schedulerCache
binder := &util.FakeBinder{
Binds: map[string]string{},
Channel: make(chan string),
}
binder := util.NewFakeBinder(0)
recorder := record.NewFakeRecorder(100)
go func() {
for {
Expand Down
40 changes: 25 additions & 15 deletions pkg/scheduler/uthelper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ type TestCommonStruct struct {
Queues []*vcapisv1.Queue
PriClass []*schedulingv1.PriorityClass
ResourceQuotas []*v1.ResourceQuota
Bind map[string]string // bind results: ns/podName -> nodeName
PipeLined map[string][]string // pipelined results: map[jobID][]{nodename}
BindMap map[string]string // bind results: ns/podName -> nodeName
PipeLined map[string][]string // pipelined results: map[jobID][]{nodeName}
Evicted []string // evicted pods list of ns/podName
Status map[api.JobID]scheduling.PodGroupPhase // final status
BindsNum int // binds events numbers
Expand All @@ -71,13 +71,8 @@ var _ Interface = &TestCommonStruct{}

// RegisterSession open session with tiers and configuration, and mock schedulerCache with self-defined FakeBinder and FakeEvictor
func (test *TestCommonStruct) RegisterSession(tiers []conf.Tier, config []conf.Configuration) *framework.Session {
binder := &util.FakeBinder{
Binds: map[string]string{},
Channel: make(chan string),
}
evictor := &util.FakeEvictor{
Channel: make(chan string),
}
binder := util.NewFakeBinder(0)
evictor := util.NewFakeEvictor(0)
stsUpdator := &util.FakeStatusUpdater{}
test.binder = binder
test.evictor = evictor
Expand Down Expand Up @@ -148,8 +143,8 @@ func (test *TestCommonStruct) CheckAll(caseIndex int) (err error) {

// CheckBind check expected bind result
func (test *TestCommonStruct) CheckBind(caseIndex int) error {
if test.BindsNum != len(test.Bind) {
return fmt.Errorf("invalid setting for binding check: want bind count %d, want bind result length %d", test.BindsNum, len(test.Bind))
if test.BindsNum != len(test.BindMap) {
return fmt.Errorf("invalid setting for binding check: want bind count %d, want bind result length %d", test.BindsNum, len(test.BindMap))
}
binder := test.binder.(*util.FakeBinder)
for i := 0; i < test.BindsNum; i++ {
Expand All @@ -160,11 +155,19 @@ func (test *TestCommonStruct) CheckBind(caseIndex int) error {
}
}

if len(test.Bind) != len(binder.Binds) {
return fmt.Errorf("case %d(%s) check bind: \nwant: %v\n got %v ", caseIndex, test.Name, test.Bind, binder.Binds)
// in case expected test.BindsNum is 0, but actually there is a binding and wait the binding goroutine to run
select {
case <-time.After(50 * time.Millisecond):
case key := <-binder.Channel:
return fmt.Errorf("unexpect binding %s in case %d(%s)", key, caseIndex, test.Name)
}

binds := binder.Binds()
if len(test.BindMap) != len(binds) {
return fmt.Errorf("case %d(%s) check bind: \nwant: %v\n got %v ", caseIndex, test.Name, test.BindMap, binds)
}
for key, value := range test.Bind {
got := binder.Binds[key]
for key, value := range test.BindMap {
got := binds[key]
if value != got {
return fmt.Errorf("case %d(%s) check bind: \nwant: %v->%v\n got: %v->%v ", caseIndex, test.Name, key, value, key, got)
}
Expand All @@ -186,6 +189,13 @@ func (test *TestCommonStruct) CheckEvict(caseIndex int) error {
}
}

// in case expected test.EvictNum is 0, but actually there is an evicting and wait the evicting goroutine to run
select {
case <-time.After(50 * time.Millisecond):
case key := <-evictor.Channel:
return fmt.Errorf("unexpect evicted %s in case %d(%s)", key, caseIndex, test.Name)
}

evicts := evictor.Evicts()
if len(test.Evicted) != len(evicts) {
return fmt.Errorf("case %d(%s) check evict: \nwant: %v\n got %v ", caseIndex, test.Name, test.Evicted, evicts)
Expand Down
53 changes: 47 additions & 6 deletions pkg/scheduler/util/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,18 +271,44 @@ func BuildPriorityClass(name string, value int32) *schedulingv1.PriorityClass {

// FakeBinder is used as fake binder
type FakeBinder struct {
sync.Mutex
Binds map[string]string
sync.RWMutex
binds map[string]string
Channel chan string
}

// NewFakeBinder returns a instance of FakeBinder
func NewFakeBinder(buffer int) *FakeBinder {
return &FakeBinder{
binds: make(map[string]string, buffer),
Channel: make(chan string, buffer),
}
}

// Binds returns the binding results
func (fb *FakeBinder) Binds() map[string]string {
fb.RLock()
defer fb.RUnlock()
ret := make(map[string]string, len(fb.binds))
for k, v := range fb.binds {
ret[k] = v
}
return ret
}

// Length returns the number of bindings
func (fb *FakeBinder) Length() int {
fb.RLock()
defer fb.RUnlock()
return len(fb.binds)
}

// Bind used by fake binder struct to bind pods
func (fb *FakeBinder) Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInfo) ([]*api.TaskInfo, error) {
fb.Lock()
defer fb.Unlock()
for _, p := range tasks {
key := fmt.Sprintf("%v/%v", p.Namespace, p.Name)
fb.Binds[key] = p.NodeName
fb.binds[key] = p.NodeName
fb.Channel <- key // need to wait binding pod because Bind process is asynchronous
}

Expand All @@ -291,18 +317,33 @@ func (fb *FakeBinder) Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInf

// FakeEvictor is used as fake evictor
type FakeEvictor struct {
sync.Mutex
sync.RWMutex
evicts []string
Channel chan string
}

// NewFakeEvictor returns a new FakeEvictor instance
func NewFakeEvictor(buffer int) *FakeEvictor {
return &FakeEvictor{
evicts: make([]string, 0, buffer),
Channel: make(chan string, buffer),
}
}

// Evicts returns copy of evicted pods.
func (fe *FakeEvictor) Evicts() []string {
fe.Lock()
defer fe.Unlock()
fe.RLock()
defer fe.RUnlock()
return append([]string{}, fe.evicts...)
}

// Length returns the number of evicts
func (fe *FakeEvictor) Length() int {
fe.RLock()
defer fe.RUnlock()
return len(fe.evicts)
}

// Evict is used by fake evictor to evict pods
func (fe *FakeEvictor) Evict(p *v1.Pod, reason string) error {
fe.Lock()
Expand Down
66 changes: 66 additions & 0 deletions pkg/scheduler/util/test_utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
Copyright 2024 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package util

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestNewFakeBinder(t *testing.T) {
tests := []struct {
fkBinder *FakeBinder
cap, lenth int
}{
{
fkBinder: NewFakeBinder(0),
cap: 0, lenth: 0,
},
{
fkBinder: NewFakeBinder(10),
cap: 10, lenth: 0,
},
}
for _, test := range tests {
assert.Equal(t, test.cap, cap(test.fkBinder.Channel))
assert.Equal(t, test.lenth, test.fkBinder.Length())
assert.Equal(t, test.lenth, len(test.fkBinder.Binds()))
}
}

func TestNewFakeEvictor(t *testing.T) {
tests := []struct {
fkEvictor *FakeEvictor
cap, lenth int
}{
{
fkEvictor: NewFakeEvictor(0),
cap: 0, lenth: 0,
},
{
fkEvictor: NewFakeEvictor(10),
cap: 10, lenth: 0,
},
}
for _, test := range tests {
assert.Equal(t, test.cap, cap(test.fkEvictor.Channel))
assert.Equal(t, test.cap, cap(test.fkEvictor.evicts))
assert.Equal(t, test.lenth, test.fkEvictor.Length())
assert.Equal(t, test.lenth, len(test.fkEvictor.Evicts()))
}
}

0 comments on commit 5fa8a3a

Please sign in to comment.