Skip to content

Commit

Permalink
Merge pull request #3212 from Lily922/volumelimit-1.8
Browse files Browse the repository at this point in the history
[cherry-pick for release-1.8]support preemption when the number of attachment volumes of a node reaches the upper limit
  • Loading branch information
volcano-sh-bot authored Nov 22, 2023
2 parents 5683dbe + d15a234 commit 43e368b
Show file tree
Hide file tree
Showing 19 changed files with 339 additions and 132 deletions.
3 changes: 3 additions & 0 deletions cmd/scheduler/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
defaultMinNodesToFind = 100
defaultPercentageOfNodesToFind = 100
defaultLockObjectNamespace = "volcano-system"
defaultNodeWorkers = 20
)

// ServerOption is the main context object for the controller manager.
Expand Down Expand Up @@ -77,6 +78,7 @@ type ServerOption struct {

NodeSelector []string
EnableCacheDumper bool
NodeWorkerThreads uint32
}

type DecryptFunc func(c *ServerOption) error
Expand Down Expand Up @@ -131,6 +133,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&s.EnableMetrics, "enable-metrics", false, "Enable the metrics function; it is false by default")
fs.StringSliceVar(&s.NodeSelector, "node-selector", nil, "volcano only work with the labeled node, like: --node-selector=volcano.sh/role:train --node-selector=volcano.sh/role:serving")
fs.BoolVar(&s.EnableCacheDumper, "cache-dumper", true, "Enable the cache dumper, it's true by default")
fs.Uint32Var(&s.NodeWorkerThreads, "node-worker-threads", defaultNodeWorkers, "The number of threads syncing node operations.")
}

// CheckOptionOrDie check lock-object-namespace when LeaderElection is enabled.
Expand Down
1 change: 1 addition & 0 deletions cmd/scheduler/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func TestAddFlags(t *testing.T) {
PercentageOfNodesToFind: defaultPercentageOfNodesToFind,
EnableLeaderElection: true,
LockObjectNamespace: defaultLockObjectNamespace,
NodeWorkerThreads: defaultNodeWorkers,
}

if !reflect.DeepEqual(expected, s) {
Expand Down
7 changes: 1 addition & 6 deletions cmd/scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,7 @@ func Run(opt *options.ServerOption) error {
}
}

sched, err := scheduler.NewScheduler(config,
opt.SchedulerNames,
opt.SchedulerConf,
opt.SchedulePeriod,
opt.DefaultQueue,
opt.NodeSelector)
sched, err := scheduler.NewScheduler(config, opt)
if err != nil {
panic(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/actions/allocate/allocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func TestAllocate(t *testing.T) {
}

for _, node := range test.nodes {
schedulerCache.AddNode(node)
schedulerCache.AddOrUpdateNode(node)
}
for _, pod := range test.pods {
schedulerCache.AddPod(pod)
Expand Down Expand Up @@ -455,7 +455,7 @@ func TestAllocateWithDynamicPVC(t *testing.T) {
schedulerCache.AddPod(pod)
}
for _, node := range test.nodes {
schedulerCache.AddNode(node)
schedulerCache.AddOrUpdateNode(node)
}

trueValue := true
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/preempt/preempt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func TestPreempt(t *testing.T) {
Value: 10,
}
for _, node := range test.nodes {
schedulerCache.AddNode(node)
schedulerCache.AddOrUpdateNode(node)
}
for _, pod := range test.pods {
schedulerCache.AddPod(pod)
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/reclaim/reclaim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestReclaim(t *testing.T) {
Value: 10,
}
for _, node := range test.nodes {
schedulerCache.AddNode(node)
schedulerCache.AddOrUpdateNode(node)
}
for _, pod := range test.pods {
schedulerCache.AddPod(pod)
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/shuffle/shuffle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestShuffle(t *testing.T) {
}

for _, node := range test.nodes {
schedulerCache.AddNode(node)
schedulerCache.AddOrUpdateNode(node)
}
for _, q := range test.queues {
schedulerCache.AddQueueV1beta1(q)
Expand Down
48 changes: 44 additions & 4 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ func init() {
}

// New returns a Cache implementation.
func New(config *rest.Config, schedulerNames []string, defaultQueue string, nodeSelectors []string) Cache {
return newSchedulerCache(config, schedulerNames, defaultQueue, nodeSelectors)
func New(config *rest.Config, schedulerNames []string, defaultQueue string, nodeSelectors []string, nodeWorkers uint32) Cache {
return newSchedulerCache(config, schedulerNames, defaultQueue, nodeSelectors, nodeWorkers)
}

// SchedulerCache cache for the kube batch
Expand Down Expand Up @@ -134,6 +134,7 @@ type SchedulerCache struct {
NamespaceCollection map[string]*schedulingapi.NamespaceCollection

errTasks workqueue.RateLimitingInterface
nodeQueue workqueue.RateLimitingInterface
DeletedJobs workqueue.RateLimitingInterface

informerFactory informers.SharedInformerFactory
Expand All @@ -145,6 +146,8 @@ type SchedulerCache struct {

// A map from image name to its imageState.
imageStates map[string]*imageState

nodeWorkers uint32
}

type imageState struct {
Expand Down Expand Up @@ -387,7 +390,7 @@ func (pgb *podgroupBinder) Bind(job *schedulingapi.JobInfo, cluster string) (*sc
return job, nil
}

func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueue string, nodeSelectors []string) *SchedulerCache {
func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueue string, nodeSelectors []string, nodeWorkers uint32) *SchedulerCache {
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
panic(fmt.Sprintf("failed init kubeClient, with err: %v", err))
Expand Down Expand Up @@ -435,6 +438,7 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
Queues: make(map[schedulingapi.QueueID]*schedulingapi.QueueInfo),
PriorityClasses: make(map[string]*schedulingv1.PriorityClass),
errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
nodeQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
DeletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
kubeClient: kubeClient,
vcClient: vcClient,
Expand All @@ -446,7 +450,8 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
CSINodesStatus: make(map[string]*schedulingapi.CSINodeStatusInfo),
imageStates: make(map[string]*imageState),

NodeList: []string{},
NodeList: []string{},
nodeWorkers: nodeWorkers,
}
if len(nodeSelectors) > 0 {
for _, nodeSelectorLabel := range nodeSelectors {
Expand Down Expand Up @@ -698,6 +703,11 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
sc.informerFactory.Start(stopCh)
sc.vcInformerFactory.Start(stopCh)
sc.WaitForCacheSync(stopCh)
for i := 0; i < int(sc.nodeWorkers); i++ {
go wait.Until(sc.runNodeWorker, 0, stopCh)
}

// Re-sync error tasks.
go wait.Until(sc.processResyncTask, 0, stopCh)

Expand Down Expand Up @@ -983,6 +993,36 @@ func (sc *SchedulerCache) processResyncTask() {
}
}

func (sc *SchedulerCache) runNodeWorker() {
for sc.processSyncNode() {
}
}

func (sc *SchedulerCache) processSyncNode() bool {
obj, shutdown := sc.nodeQueue.Get()
if shutdown {
return false
}
defer sc.nodeQueue.Done(obj)

nodeName, ok := obj.(string)
if !ok {
klog.Errorf("failed to convert %v to string", obj)
return true
}

klog.V(5).Infof("started sync node %s", nodeName)
err := sc.SyncNode(nodeName)
if err == nil {
sc.nodeQueue.Forget(nodeName)
return true
}

klog.Errorf("Failed to sync node <%s>, retry it.", nodeName)
sc.nodeQueue.AddRateLimited(nodeName)
return true
}

// AddBindTask add task to be bind to a cache which consumes by go runtime
func (sc *SchedulerCache) AddBindTask(taskInfo *schedulingapi.TaskInfo) error {
klog.V(5).Infof("add bind task %v/%v", taskInfo.Namespace, taskInfo.Name)
Expand Down
19 changes: 7 additions & 12 deletions pkg/scheduler/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"

"volcano.sh/volcano/pkg/scheduler/api"
volumescheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding"
"volcano.sh/volcano/pkg/scheduler/util"
Expand Down Expand Up @@ -163,7 +162,7 @@ func TestSchedulerCache_Bind_NodeWithSufficientResources(t *testing.T) {
cache.AddPod(pod)

node := buildNode("n1", buildResourceList("2000m", "10G"))
cache.AddNode(node)
cache.AddOrUpdateNode(node)

task := api.NewTaskInfo(pod)
task.Job = "j1"
Expand Down Expand Up @@ -195,7 +194,7 @@ func TestSchedulerCache_Bind_NodeWithInsufficientResources(t *testing.T) {
cache.AddPod(pod)

node := buildNode("n1", buildResourceList("2000m", "10G"))
cache.AddNode(node)
cache.AddOrUpdateNode(node)

task := api.NewTaskInfo(pod)
task.Job = "j1"
Expand Down Expand Up @@ -307,7 +306,7 @@ func TestNodeOperation(t *testing.T) {
}

for _, n := range test.nodes {
cache.AddNode(n)
cache.AddOrUpdateNode(n)
}

if !reflect.DeepEqual(cache, test.expected) {
Expand All @@ -316,7 +315,7 @@ func TestNodeOperation(t *testing.T) {
}

// delete node
cache.DeleteNode(test.deletedNode)
cache.RemoveNode(test.deletedNode.Name)
if !reflect.DeepEqual(cache, test.delExpect) {
t.Errorf("case %d: \n expected %v, \n got %v \n",
i, test.delExpect, cache)
Expand Down Expand Up @@ -344,6 +343,7 @@ func TestBindTasks(t *testing.T) {
pvInformer: informerFactory.Core().V1().PersistentVolumes(),
scInformer: informerFactory.Storage().V1().StorageClasses(),
errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
nodeQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
}

sc.Binder = &DefaultBinder{}
Expand All @@ -367,12 +367,6 @@ func TestBindTasks(t *testing.T) {
DeleteFunc: sc.DeletePod,
},
)
sc.nodeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddNode,
UpdateFunc: sc.UpdateNode,
},
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go wait.Until(sc.processBindTask, time.Millisecond*5, ctx.Done())
Expand All @@ -383,9 +377,10 @@ func TestBindTasks(t *testing.T) {

// make sure pod exist when calling fake client binding
fakeKube.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
fakeKube.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{})
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
// set node in cache directly
sc.AddOrUpdateNode(node)

task := api.NewTaskInfo(pod)
task.NodeName = "n1"
Expand Down
Loading

0 comments on commit 43e368b

Please sign in to comment.