From f4c4c0dd5b19bc804ef8beba7ff7fc777bc255aa Mon Sep 17 00:00:00 2001 From: xilinxing Date: Thu, 25 Aug 2022 16:02:48 +0800 Subject: [PATCH 1/5] add a flag to control whether inherit owner annotations when podgroup created Signed-off-by: xilinxing --- cmd/controller-manager/app/options/options.go | 3 ++ cmd/controller-manager/app/server.go | 1 + pkg/controllers/framework/interface.go | 2 ++ pkg/controllers/podgroup/pg_controller.go | 4 +++ .../podgroup/pg_controller_handler.go | 29 +++++++++++-------- 5 files changed, 27 insertions(+), 12 deletions(-) diff --git a/cmd/controller-manager/app/options/options.go b/cmd/controller-manager/app/options/options.go index f8586288c5..0266644d71 100644 --- a/cmd/controller-manager/app/options/options.go +++ b/cmd/controller-manager/app/options/options.go @@ -57,6 +57,8 @@ type ServerOption struct { // For dependent tasks, there is a detection cycle inside volcano // It indicates how often to detect the status of dependent tasks DetectionPeriodOfDependsOntask time.Duration + // To determine whether inherit owner's annotations for pods when create podgroup + InheritOwnerAnnotations bool } // NewServerOption creates a new CMServer with a default config. @@ -82,6 +84,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&s.EnableHealthz, "enable-healthz", false, "Enable the health check; it is false by default") fs.DurationVar(&s.DetectionPeriodOfDependsOntask, "detection-period-of-dependson-task", defaultDetectionPeriodOfDependsOntask, "It indicates how often to detect the status of dependent tasks."+ "e.g. --detection-period-of-dependson-task=1s") + fs.BoolVar(&s.InheritOwnerAnnotations, "inherit-owner-annotations", true, "Enable inherit owner annotations for pods when create podgroup; it is enabled by default") } // CheckOptionOrDie checks the LockObjectNamespace. diff --git a/cmd/controller-manager/app/server.go b/cmd/controller-manager/app/server.go index 34747dc9a6..5ee8b2cc4f 100644 --- a/cmd/controller-manager/app/server.go +++ b/cmd/controller-manager/app/server.go @@ -126,6 +126,7 @@ func startControllers(config *rest.Config, opt *options.ServerOption) func(ctx c controllerOpt.KubeClient = kubeclientset.NewForConfigOrDie(config) controllerOpt.VolcanoClient = vcclientset.NewForConfigOrDie(config) controllerOpt.SharedInformerFactory = informers.NewSharedInformerFactory(controllerOpt.KubeClient, 0) + controllerOpt.InheritOwnerAnnotations = opt.InheritOwnerAnnotations return func(ctx context.Context) { framework.ForeachController(func(c framework.Controller) { diff --git a/pkg/controllers/framework/interface.go b/pkg/controllers/framework/interface.go index 65ea13f0a6..cc594ef644 100644 --- a/pkg/controllers/framework/interface.go +++ b/pkg/controllers/framework/interface.go @@ -31,6 +31,8 @@ type ControllerOption struct { SchedulerNames []string WorkerNum uint32 MaxRequeueNum int + + InheritOwnerAnnotations bool } // Controller is the interface of all controllers. diff --git a/pkg/controllers/podgroup/pg_controller.go b/pkg/controllers/podgroup/pg_controller.go index 8651ab24eb..fa3d43a6b4 100644 --- a/pkg/controllers/podgroup/pg_controller.go +++ b/pkg/controllers/podgroup/pg_controller.go @@ -62,6 +62,9 @@ type pgcontroller struct { queue workqueue.RateLimitingInterface schedulerNames []string + + // To determine whether inherit owner's annotations for pods when create podgroup + inheritOwnerAnnotations bool } func (pg *pgcontroller) Name() string { @@ -77,6 +80,7 @@ func (pg *pgcontroller) Initialize(opt *framework.ControllerOption) error { pg.schedulerNames = make([]string, len(opt.SchedulerNames)) copy(pg.schedulerNames, opt.SchedulerNames) + pg.inheritOwnerAnnotations = opt.InheritOwnerAnnotations pg.informerFactory = opt.SharedInformerFactory pg.podInformer = opt.SharedInformerFactory.Core().V1().Pods() diff --git a/pkg/controllers/podgroup/pg_controller_handler.go b/pkg/controllers/podgroup/pg_controller_handler.go index 931fba7bb7..0af19957ca 100644 --- a/pkg/controllers/podgroup/pg_controller_handler.go +++ b/pkg/controllers/podgroup/pg_controller_handler.go @@ -129,6 +129,22 @@ func (pg *pgcontroller) getAnnotationsFromUpperRes(kind string, name string, nam } } +// Inherit annotations from upper resources. +func (pg *pgcontroller) inheritUpperAnnotations(pod *v1.Pod, obj *scheduling.PodGroup) { + if pg.inheritOwnerAnnotations { + for _, reference := range pod.OwnerReferences { + if reference.Kind != "" && reference.Name != "" { + var upperAnnotations = pg.getAnnotationsFromUpperRes(reference.Kind, reference.Name, pod.Namespace) + for k, v := range upperAnnotations { + if strings.HasPrefix(k, scheduling.AnnotationPrefix) { + obj.Annotations[k] = v + } + } + } + } + } +} + func (pg *pgcontroller) createNormalPodPGIfNotExist(pod *v1.Pod) error { pgName := helpers.GeneratePodgroupName(pod) @@ -157,18 +173,7 @@ func (pg *pgcontroller) createNormalPodPGIfNotExist(pod *v1.Pod) error { }, } - // Inherit annotations from upper resources. - for _, reference := range pod.OwnerReferences { - if reference.Kind != "" && reference.Name != "" { - var upperAnnotations = pg.getAnnotationsFromUpperRes(reference.Kind, reference.Name, pod.Namespace) - for k, v := range upperAnnotations { - if strings.HasPrefix(k, scheduling.AnnotationPrefix) { - obj.Annotations[k] = v - } - } - } - } - + pg.inheritUpperAnnotations(pod, obj) // Individual annotations on pods would overwrite annotations inherited from upper resources. if queueName, ok := pod.Annotations[scheduling.QueueNameAnnotationKey]; ok { obj.Spec.Queue = queueName From 2d8d6458c825471338eacc6865e64b2233432be0 Mon Sep 17 00:00:00 2001 From: Zhe Jin Date: Thu, 1 Sep 2022 17:53:47 +0800 Subject: [PATCH 2/5] fix scheduler panic issue Signed-off-by: Zhe Jin --- pkg/scheduler/cache/cache.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 1e3b62c744..3b17254ae5 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -990,22 +990,26 @@ func (sc *SchedulerCache) processBindTask() { func (sc *SchedulerCache) BindTask() { klog.V(5).Infof("batch bind task count %d", len(sc.bindCache)) + successfulTasks := make([]*schedulingapi.TaskInfo, 0) for _, task := range sc.bindCache { if err := sc.VolumeBinder.BindVolumes(task, task.PodVolumes); err != nil { klog.Errorf("task %s/%s bind Volumes failed: %#v", task.Namespace, task.Name, err) sc.VolumeBinder.RevertVolumes(task, task.PodVolumes) sc.resyncTask(task) - return + } else { + successfulTasks = append(successfulTasks, task) + klog.V(5).Infof("task %s/%s bind Volumes done", task.Namespace, task.Name) } } - bindTasks := make([]*schedulingapi.TaskInfo, len(sc.bindCache)) - copy(bindTasks, sc.bindCache) + bindTasks := make([]*schedulingapi.TaskInfo, len(successfulTasks)) + copy(bindTasks, successfulTasks) if err := sc.Bind(bindTasks); err != nil { + klog.Errorf("failed to bind task count %d: %#v", len(bindTasks), err) return } - for _, task := range sc.bindCache { + for _, task := range successfulTasks { metrics.UpdateTaskScheduleDuration(metrics.Duration(task.Pod.CreationTimestamp.Time)) } From 8adec9427d02e07731681dd7e0bc786f8d82eb0c Mon Sep 17 00:00:00 2001 From: xilinxing Date: Wed, 21 Sep 2022 11:22:38 +0800 Subject: [PATCH 3/5] merge #2500(add selector spread plugin) and #2487(add podTopologySpread plugin) conflict; Signed-off-by: shaoqiu <516595344@qq.com> --- .../chart/volcano/templates/scheduler.yaml | 9 +- installer/volcano-development.yaml | 9 +- pkg/scheduler/cache/cache.go | 9 + pkg/scheduler/plugins/nodeorder/nodeorder.go | 78 +++++- .../node/topology/helpers.go | 57 +++++ .../plugins/selectorspread/selector_spread.go | 234 ++++++++++++++++++ vendor/modules.txt | 2 + 7 files changed, 391 insertions(+), 7 deletions(-) create mode 100644 vendor/k8s.io/component-helpers/node/topology/helpers.go create mode 100644 vendor/k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread/selector_spread.go diff --git a/installer/helm/chart/volcano/templates/scheduler.yaml b/installer/helm/chart/volcano/templates/scheduler.yaml index 650390bff5..6d0a234f70 100644 --- a/installer/helm/chart/volcano/templates/scheduler.yaml +++ b/installer/helm/chart/volcano/templates/scheduler.yaml @@ -43,8 +43,8 @@ rules: resources: ["persistentvolumes"] verbs: ["list", "watch"] - apiGroups: [""] - resources: ["namespaces"] - verbs: ["list", "watch"] + resources: ["namespaces", "services", "replicationcontrollers"] + verbs: ["list", "watch", "get"] - apiGroups: [""] resources: ["resourcequotas"] verbs: ["list", "watch"] @@ -72,6 +72,9 @@ rules: - apiGroups: [""] resources: ["configmaps"] verbs: ["get", "create", "delete", "update"] + - apiGroups: ["apps"] + resources: ["daemonsets", "replicasets", "statefulsets"] + verbs: ["list", "watch", "get"] --- kind: ClusterRoleBinding @@ -138,7 +141,7 @@ metadata: prometheus.io/port: "8080" prometheus.io/scrape: "true" name: {{ .Release.Name }}-scheduler-service - namespace: {{ .Release.Namespace }} + namespace: {{ .Release.Namespace }} spec: ports: - port: 8080 diff --git a/installer/volcano-development.yaml b/installer/volcano-development.yaml index 1dda9064a0..9507decf3e 100644 --- a/installer/volcano-development.yaml +++ b/installer/volcano-development.yaml @@ -8570,8 +8570,8 @@ rules: resources: ["persistentvolumes"] verbs: ["list", "watch"] - apiGroups: [""] - resources: ["namespaces"] - verbs: ["list", "watch"] + resources: ["namespaces", "services", "replicationcontrollers"] + verbs: ["list", "watch", "get"] - apiGroups: [""] resources: ["resourcequotas"] verbs: ["list", "watch"] @@ -8599,6 +8599,9 @@ rules: - apiGroups: [""] resources: ["configmaps"] verbs: ["get", "create", "delete", "update"] + - apiGroups: ["apps"] + resources: ["daemonsets", "replicasets", "statefulsets"] + verbs: ["list", "watch", "get"] --- # Source: volcano/templates/scheduler.yaml kind: ClusterRoleBinding @@ -8623,7 +8626,7 @@ metadata: prometheus.io/port: "8080" prometheus.io/scrape: "true" name: volcano-scheduler-service - namespace: volcano-system + namespace: volcano-system spec: ports: - port: 8080 diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 3b17254ae5..8cb2fd05cd 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -489,6 +489,15 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu sc.informerFactory = informerFactory mySchedulerPodName, c := getMultiSchedulerInfo() + // explictly register informers to the factory, otherwise resources listers cannot get anything + // even with no erorr returned. `Namespace` informer is used by `InterPodAffinity` plugin, + // `SelectorSpread` and `PodTopologySpread` plugins uses the following four so far. + informerFactory.Core().V1().Namespaces().Informer() + informerFactory.Core().V1().Services().Informer() + informerFactory.Core().V1().ReplicationControllers().Informer() + informerFactory.Apps().V1().ReplicaSets().Informer() + informerFactory.Apps().V1().StatefulSets().Informer() + // create informer for node information sc.nodeInformer = informerFactory.Core().V1().Nodes() sc.nodeInformer.Informer().AddEventHandlerWithResyncPeriod( diff --git a/pkg/scheduler/plugins/nodeorder/nodeorder.go b/pkg/scheduler/plugins/nodeorder/nodeorder.go index 5454eab2cd..2db8179036 100644 --- a/pkg/scheduler/plugins/nodeorder/nodeorder.go +++ b/pkg/scheduler/plugins/nodeorder/nodeorder.go @@ -33,6 +33,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration" "volcano.sh/volcano/pkg/scheduler/api" @@ -61,6 +62,8 @@ const ( ImageLocalityWeight = "imagelocality.weight" // PodTopologySpreadWeight is the key for providing Pod Topology Spread Priority Weight in YAML PodTopologySpreadWeight = "podtopologyspread.weight" + // SelectorSpreadWeight is the key for providing Selector Spread Priority Weight in YAML + selectorSpreadWeight = "selectorspread.weight" ) type nodeOrderPlugin struct { @@ -86,6 +89,7 @@ type priorityWeight struct { taintTolerationWeight int imageLocalityWeight int podTopologySpreadWeight int + selectorSpreadWeight int } // calculateWeight from the provided arguments. @@ -128,6 +132,7 @@ func calculateWeight(args framework.Arguments) priorityWeight { taintTolerationWeight: 1, imageLocalityWeight: 1, podTopologySpreadWeight: 2, // be consistent with kubernetes default setting. + selectorSpreadWeight: 0, } // Checks whether nodeaffinity.weight is provided or not, if given, modifies the value in weight struct. @@ -153,6 +158,10 @@ func calculateWeight(args framework.Arguments) priorityWeight { // Checks whether podtopologyspread.weight is provided or not, if given, modifies the value in weight struct. args.GetInt(&weight.podTopologySpreadWeight, PodTopologySpreadWeight) + + // Checks whether selectorspread.weight is provided or not, if given, modifies the value in weight struct. + args.GetInt(&weight.selectorSpreadWeight, selectorSpreadWeight) + return weight } @@ -260,6 +269,7 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { // If imageLocalityWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score. nodeScore += float64(score) * float64(weight.imageLocalityWeight) + klog.V(4).Infof("Image Locality score: %f", nodeScore) } // NodeResourcesLeastAllocated @@ -272,6 +282,7 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { // If leastReqWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score. nodeScore += float64(score) * float64(weight.leastReqWeight) + klog.V(4).Infof("Least Request score: %f", nodeScore) } // NodeResourcesMostAllocated @@ -284,6 +295,7 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { // If mostRequestedWeight is provided, host.Score is multiplied with weight, it's 0 by default nodeScore += float64(score) * float64(weight.mostReqWeight) + klog.V(4).Infof("Most Request score: %f", nodeScore) } // NodeResourcesBalancedAllocation @@ -296,6 +308,7 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { // If balancedResourceWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score. nodeScore += float64(score) * float64(weight.balancedResourceWeight) + klog.V(4).Infof("Balanced Request score: %f", nodeScore) } // NodeAffinity @@ -309,6 +322,7 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { // TODO: should we normalize the score // If nodeAffinityWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score. nodeScore += float64(score) * float64(weight.nodeAffinityWeight) + klog.V(4).Infof("Node Affinity score: %f", nodeScore) } klog.V(4).Infof("Total Score for task %s/%s on node %s is: %f", task.Namespace, task.Name, node.Name, nodeScore) @@ -329,6 +343,9 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { p, _ = podtopologyspread.New(ptsArgs, handle) podTopologySpread := p.(*podtopologyspread.PodTopologySpread) + p, _ = selectorspread.New(nil, handle) + selectorSpread := p.(*selectorspread.SelectorSpread) + batchNodeOrderFn := func(task *api.TaskInfo, nodeInfo []*api.NodeInfo) (map[string]float64, error) { // InterPodAffinity state := k8sframework.NewCycleState() @@ -353,8 +370,13 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { return nil, err } + selectorSpreadScores, err := selectorSpreadScore(selectorSpread, state, task.Pod, nodes, weight.selectorSpreadWeight) + if err != nil { + return nil, err + } + for _, node := range nodes { - nodeScores[node.Name] = podAffinityScores[node.Name] + nodeTolerationScores[node.Name] + podTopologySpreadScores[node.Name] + nodeScores[node.Name] = podAffinityScores[node.Name] + nodeTolerationScores[node.Name] + podTopologySpreadScores[node.Name] + selectorSpreadScores[node.Name] } klog.V(4).Infof("Batch Total Score for task %s/%s is: %v", task.Namespace, task.Name, nodeScores) @@ -542,5 +564,59 @@ func podTopologySpreadScore( return nodeScores, nil } +func selectorSpreadScore( + selectorSpread *selectorspread.SelectorSpread, + cycleState *k8sframework.CycleState, + pod *v1.Pod, + nodes []*v1.Node, + selectorSpreadWeight int, +) (map[string]float64, error) { + preScoreStatus := selectorSpread.PreScore(context.TODO(), cycleState, pod, nodes) + if !preScoreStatus.IsSuccess() { + return nil, preScoreStatus.AsError() + } + nodeScoreList := make(k8sframework.NodeScoreList, len(nodes)) + // size of errCh should be no less than parallelization number, see interPodAffinityScore. + workerNum := 16 + errCh := make(chan error, workerNum) + parallelizeContext, parallelizeCancel := context.WithCancel(context.TODO()) + workqueue.ParallelizeUntil(parallelizeContext, workerNum, len(nodes), func(index int) { + nodeName := nodes[index].Name + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s, status := selectorSpread.Score(ctx, cycleState, pod, nodeName) + if !status.IsSuccess() { + parallelizeCancel() + errCh <- fmt.Errorf("calculate selector spread priority failed %v", status.Message()) + return + } + nodeScoreList[index] = k8sframework.NodeScore{ + Name: nodeName, + Score: s, + } + }) + + select { + case err := <-errCh: + return nil, err + default: + } + selectorSpread.NormalizeScore(context.TODO(), cycleState, pod, nodeScoreList) + + nodeScores := make(map[string]float64, len(nodes)) + for i, nodeScore := range nodeScoreList { + // return error if score plugin returns invalid score. + if nodeScore.Score > k8sframework.MaxNodeScore || nodeScore.Score < k8sframework.MinNodeScore { + return nil, fmt.Errorf("selector spread returns an invalid score %v for node %s", nodeScore.Score, nodeScore.Name) + } + nodeScore.Score *= int64(selectorSpreadWeight) + nodeScoreList[i] = nodeScore + nodeScores[nodeScore.Name] = float64(nodeScore.Score) + } + + klog.V(4).Infof("selector spread Score for task %s/%s is: %v", pod.Namespace, pod.Name, nodeScores) + return nodeScores, nil +} + func (pp *nodeOrderPlugin) OnSessionClose(ssn *framework.Session) { } diff --git a/vendor/k8s.io/component-helpers/node/topology/helpers.go b/vendor/k8s.io/component-helpers/node/topology/helpers.go new file mode 100644 index 0000000000..18c838cca5 --- /dev/null +++ b/vendor/k8s.io/component-helpers/node/topology/helpers.go @@ -0,0 +1,57 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topology + +import ( + "k8s.io/api/core/v1" +) + +// GetZoneKey is a helper function that builds a string identifier that is unique per failure-zone; +// it returns empty-string for no zone. +// Since there are currently two separate zone keys: +// * "failure-domain.beta.kubernetes.io/zone" +// * "topology.kubernetes.io/zone" +// GetZoneKey will first check failure-domain.beta.kubernetes.io/zone and if not exists, will then check +// topology.kubernetes.io/zone +func GetZoneKey(node *v1.Node) string { + labels := node.Labels + if labels == nil { + return "" + } + + // TODO: "failure-domain.beta..." names are deprecated, but will + // stick around a long time due to existing on old extant objects like PVs. + // Maybe one day we can stop considering them (see #88493). + zone, ok := labels[v1.LabelFailureDomainBetaZone] + if !ok { + zone, _ = labels[v1.LabelTopologyZone] + } + + region, ok := labels[v1.LabelFailureDomainBetaRegion] + if !ok { + region, _ = labels[v1.LabelTopologyRegion] + } + + if region == "" && zone == "" { + return "" + } + + // We include the null character just in case region or failureDomain has a colon + // (We do assume there's no null characters in a region or failureDomain) + // As a nice side-benefit, the null character is not printed by fmt.Print or glog + return region + ":\x00:" + zone +} diff --git a/vendor/k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread/selector_spread.go b/vendor/k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread/selector_spread.go new file mode 100644 index 0000000000..c50e0bfaf8 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread/selector_spread.go @@ -0,0 +1,234 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package selectorspread + +import ( + "context" + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + appslisters "k8s.io/client-go/listers/apps/v1" + corelisters "k8s.io/client-go/listers/core/v1" + utilnode "k8s.io/component-helpers/node/topology" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" +) + +// SelectorSpread is a plugin that calculates selector spread priority. +type SelectorSpread struct { + sharedLister framework.SharedLister + services corelisters.ServiceLister + replicationControllers corelisters.ReplicationControllerLister + replicaSets appslisters.ReplicaSetLister + statefulSets appslisters.StatefulSetLister +} + +var _ framework.PreScorePlugin = &SelectorSpread{} +var _ framework.ScorePlugin = &SelectorSpread{} + +const ( + // Name is the name of the plugin used in the plugin registry and configurations. + Name = names.SelectorSpread + // preScoreStateKey is the key in CycleState to SelectorSpread pre-computed data for Scoring. + preScoreStateKey = "PreScore" + Name + + // When zone information is present, give 2/3 of the weighting to zone spreading, 1/3 to node spreading + // TODO: Any way to justify this weighting? + zoneWeighting float64 = 2.0 / 3.0 +) + +// Name returns name of the plugin. It is used in logs, etc. +func (pl *SelectorSpread) Name() string { + return Name +} + +// preScoreState computed at PreScore and used at Score. +type preScoreState struct { + selector labels.Selector +} + +// Clone implements the mandatory Clone interface. We don't really copy the data since +// there is no need for that. +func (s *preScoreState) Clone() framework.StateData { + return s +} + +// skipSelectorSpread returns true if the pod's TopologySpreadConstraints are specified. +// Note that this doesn't take into account default constraints defined for +// the PodTopologySpread plugin. +func skipSelectorSpread(pod *v1.Pod) bool { + return len(pod.Spec.TopologySpreadConstraints) != 0 +} + +// Score invoked at the Score extension point. +// The "score" returned in this function is the matching number of pods on the `nodeName`, +// it is normalized later. +func (pl *SelectorSpread) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { + if skipSelectorSpread(pod) { + return 0, nil + } + + c, err := state.Read(preScoreStateKey) + if err != nil { + return 0, framework.AsStatus(fmt.Errorf("reading %q from cycleState: %w", preScoreStateKey, err)) + } + + s, ok := c.(*preScoreState) + if !ok { + return 0, framework.AsStatus(fmt.Errorf("cannot convert saved state to selectorspread.preScoreState")) + } + + nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName) + if err != nil { + return 0, framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", nodeName, err)) + } + + count := countMatchingPods(pod.Namespace, s.selector, nodeInfo) + return int64(count), nil +} + +// NormalizeScore invoked after scoring all nodes. +// For this plugin, it calculates the score of each node +// based on the number of existing matching pods on the node +// where zone information is included on the nodes, it favors nodes +// in zones with fewer existing matching pods. +func (pl *SelectorSpread) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status { + if skipSelectorSpread(pod) { + return nil + } + + countsByZone := make(map[string]int64, 10) + maxCountByZone := int64(0) + maxCountByNodeName := int64(0) + + for i := range scores { + if scores[i].Score > maxCountByNodeName { + maxCountByNodeName = scores[i].Score + } + nodeInfo, err := pl.sharedLister.NodeInfos().Get(scores[i].Name) + if err != nil { + return framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", scores[i].Name, err)) + } + zoneID := utilnode.GetZoneKey(nodeInfo.Node()) + if zoneID == "" { + continue + } + countsByZone[zoneID] += scores[i].Score + } + + for zoneID := range countsByZone { + if countsByZone[zoneID] > maxCountByZone { + maxCountByZone = countsByZone[zoneID] + } + } + + haveZones := len(countsByZone) != 0 + + maxCountByNodeNameFloat64 := float64(maxCountByNodeName) + maxCountByZoneFloat64 := float64(maxCountByZone) + MaxNodeScoreFloat64 := float64(framework.MaxNodeScore) + + for i := range scores { + // initializing to the default/max node score of maxPriority + fScore := MaxNodeScoreFloat64 + if maxCountByNodeName > 0 { + fScore = MaxNodeScoreFloat64 * (float64(maxCountByNodeName-scores[i].Score) / maxCountByNodeNameFloat64) + } + // If there is zone information present, incorporate it + if haveZones { + nodeInfo, err := pl.sharedLister.NodeInfos().Get(scores[i].Name) + if err != nil { + return framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", scores[i].Name, err)) + } + + zoneID := utilnode.GetZoneKey(nodeInfo.Node()) + if zoneID != "" { + zoneScore := MaxNodeScoreFloat64 + if maxCountByZone > 0 { + zoneScore = MaxNodeScoreFloat64 * (float64(maxCountByZone-countsByZone[zoneID]) / maxCountByZoneFloat64) + } + fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore) + } + } + scores[i].Score = int64(fScore) + } + return nil +} + +// ScoreExtensions of the Score plugin. +func (pl *SelectorSpread) ScoreExtensions() framework.ScoreExtensions { + return pl +} + +// PreScore builds and writes cycle state used by Score and NormalizeScore. +func (pl *SelectorSpread) PreScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status { + if skipSelectorSpread(pod) { + return nil + } + selector := helper.DefaultSelector( + pod, + pl.services, + pl.replicationControllers, + pl.replicaSets, + pl.statefulSets, + ) + state := &preScoreState{ + selector: selector, + } + cycleState.Write(preScoreStateKey, state) + return nil +} + +// New initializes a new plugin and returns it. +func New(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { + sharedLister := handle.SnapshotSharedLister() + if sharedLister == nil { + return nil, fmt.Errorf("SnapshotSharedLister is nil") + } + sharedInformerFactory := handle.SharedInformerFactory() + if sharedInformerFactory == nil { + return nil, fmt.Errorf("SharedInformerFactory is nil") + } + return &SelectorSpread{ + sharedLister: sharedLister, + services: sharedInformerFactory.Core().V1().Services().Lister(), + replicationControllers: sharedInformerFactory.Core().V1().ReplicationControllers().Lister(), + replicaSets: sharedInformerFactory.Apps().V1().ReplicaSets().Lister(), + statefulSets: sharedInformerFactory.Apps().V1().StatefulSets().Lister(), + }, nil +} + +// countMatchingPods counts pods based on namespace and matching all selectors +func countMatchingPods(namespace string, selector labels.Selector, nodeInfo *framework.NodeInfo) int { + if len(nodeInfo.Pods) == 0 || selector.Empty() { + return 0 + } + count := 0 + for _, p := range nodeInfo.Pods { + // Ignore pods being deleted for spreading purposes + // Similar to how it is done for SelectorSpreadPriority + if namespace == p.Pod.Namespace && p.Pod.DeletionTimestamp == nil { + if selector.Matches(labels.Set(p.Pod.Labels)) { + count++ + } + } + } + return count +} diff --git a/vendor/modules.txt b/vendor/modules.txt index a46fb9f71a..8e05b763d4 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -911,6 +911,7 @@ k8s.io/component-base/metrics/testutil k8s.io/component-base/version # k8s.io/component-helpers v0.23.0 => k8s.io/component-helpers v0.23.0 ## explicit; go 1.16 +k8s.io/component-helpers/node/topology k8s.io/component-helpers/node/util/sysctl k8s.io/component-helpers/scheduling/corev1 k8s.io/component-helpers/scheduling/corev1/nodeaffinity @@ -996,6 +997,7 @@ k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread +k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding/metrics From 6e0beacfe390901c29f94c58560aa2052ff3724c Mon Sep 17 00:00:00 2001 From: xilinxing Date: Mon, 26 Sep 2022 19:43:50 +0800 Subject: [PATCH 4/5] merge #2506(add volume binding plugin) into scheduler/plugins/nodeorder; Signed-off-by: shaoqiu <516595344@qq.com> --- .../chart/volcano/templates/scheduler.yaml | 2 +- installer/volcano-development.yaml | 2 +- pkg/scheduler/plugins/nodeorder/nodeorder.go | 31 +++++++++++++++++++ .../plugins/predicates/predicates.go | 23 +++++++++++++- 4 files changed, 55 insertions(+), 3 deletions(-) diff --git a/installer/helm/chart/volcano/templates/scheduler.yaml b/installer/helm/chart/volcano/templates/scheduler.yaml index 6d0a234f70..95768f55c3 100644 --- a/installer/helm/chart/volcano/templates/scheduler.yaml +++ b/installer/helm/chart/volcano/templates/scheduler.yaml @@ -41,7 +41,7 @@ rules: verbs: ["list", "watch", "update"] - apiGroups: [""] resources: ["persistentvolumes"] - verbs: ["list", "watch"] + verbs: ["list", "watch", "update"] - apiGroups: [""] resources: ["namespaces", "services", "replicationcontrollers"] verbs: ["list", "watch", "get"] diff --git a/installer/volcano-development.yaml b/installer/volcano-development.yaml index 9507decf3e..3477705655 100644 --- a/installer/volcano-development.yaml +++ b/installer/volcano-development.yaml @@ -8568,7 +8568,7 @@ rules: verbs: ["list", "watch", "update"] - apiGroups: [""] resources: ["persistentvolumes"] - verbs: ["list", "watch"] + verbs: ["list", "watch", "update"] - apiGroups: [""] resources: ["namespaces", "services", "replicationcontrollers"] verbs: ["list", "watch", "get"] diff --git a/pkg/scheduler/plugins/nodeorder/nodeorder.go b/pkg/scheduler/plugins/nodeorder/nodeorder.go index 2db8179036..7beb25c580 100644 --- a/pkg/scheduler/plugins/nodeorder/nodeorder.go +++ b/pkg/scheduler/plugins/nodeorder/nodeorder.go @@ -21,6 +21,7 @@ import ( "fmt" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilFeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/util/workqueue" "k8s.io/klog" @@ -35,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/framework" @@ -64,6 +66,8 @@ const ( PodTopologySpreadWeight = "podtopologyspread.weight" // SelectorSpreadWeight is the key for providing Selector Spread Priority Weight in YAML selectorSpreadWeight = "selectorspread.weight" + // VolumeBinding is the key for providing Volume Binding Priority Weight in YAML + volumeBindingWeight = "volumebinding.weight" ) type nodeOrderPlugin struct { @@ -90,6 +94,7 @@ type priorityWeight struct { imageLocalityWeight int podTopologySpreadWeight int selectorSpreadWeight int + volumeBindingWeight int } // calculateWeight from the provided arguments. @@ -133,6 +138,7 @@ func calculateWeight(args framework.Arguments) priorityWeight { imageLocalityWeight: 1, podTopologySpreadWeight: 2, // be consistent with kubernetes default setting. selectorSpreadWeight: 0, + volumeBindingWeight: 1, } // Checks whether nodeaffinity.weight is provided or not, if given, modifies the value in weight struct. @@ -162,6 +168,9 @@ func calculateWeight(args framework.Arguments) priorityWeight { // Checks whether selectorspread.weight is provided or not, if given, modifies the value in weight struct. args.GetInt(&weight.selectorSpreadWeight, selectorSpreadWeight) + // Checks whether volumebinding.weight is provided or not, if given, modifies the value in weight struct. + args.GetInt(&weight.volumeBindingWeight, volumeBindingWeight) + return weight } @@ -256,6 +265,15 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { p, _ = imagelocality.New(nil, handle) imageLocality := p.(*imagelocality.ImageLocality) + // 6. VolumeBinding + volumeBindingArgs := &config.VolumeBindingArgs{ + TypeMeta: metav1.TypeMeta{}, + BindTimeoutSeconds: volumebinding.DefaultBindTimeoutSeconds, + Shape: nil, + } + p, _ = volumebinding.New(volumeBindingArgs, handle, fts) + volumeBinding := p.(*volumebinding.VolumeBinding) + nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) { var nodeScore = 0.0 @@ -325,6 +343,19 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { klog.V(4).Infof("Node Affinity score: %f", nodeScore) } + // VolumeBinding + if weight.volumeBindingWeight != 0 { + score, status := volumeBinding.Score(context.TODO(), state, task.Pod, node.Name) + if !status.IsSuccess() { + klog.Warningf("Volume Binding Priority Failed because of Error: %v", status.AsError()) + return 0, status.AsError() + } + + // If volumeBindingWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score. + nodeScore += float64(score) * float64(weight.volumeBindingWeight) + klog.V(4).Infof("Volume Binding score: %f", nodeScore) + } + klog.V(4).Infof("Total Score for task %s/%s on node %s is: %f", task.Namespace, task.Name, node.Name, nodeScore) return nodeScore, nil } diff --git a/pkg/scheduler/plugins/predicates/predicates.go b/pkg/scheduler/plugins/predicates/predicates.go index 4747e6a4e6..0528173e83 100644 --- a/pkg/scheduler/plugins/predicates/predicates.go +++ b/pkg/scheduler/plugins/predicates/predicates.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone" "volcano.sh/volcano/pkg/scheduler/api" @@ -316,7 +317,15 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { // 6. NodeVolumeLimits plugin, _ = nodevolumelimits.NewCSI(nil, handle, features) nodeVolumeLimitsCSIFilter := plugin.(*nodevolumelimits.CSILimits) - // 7. VolumeZone + // 7. VolumeBinding + volumeBindingArgs := &config.VolumeBindingArgs{ + TypeMeta: metav1.TypeMeta{}, + BindTimeoutSeconds: volumebinding.DefaultBindTimeoutSeconds, + Shape: nil, + } + plugin, _ = volumebinding.New(volumeBindingArgs, handle, features) + volumebindingFilter := plugin.(*volumebinding.VolumeBinding) + // 8. VolumeZone plugin, _ = volumezone.New(nil, handle) volumeZoneFilter := plugin.(*volumezone.VolumeZone) // 8. PodTopologySpread @@ -405,6 +414,18 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { return fmt.Errorf("plugin %s predicates failed %s", nodeVolumeLimitsCSIFilter.Name(), status.Message()) } + // Check VolumeBinding: handle immediate claims unbounded case + status = volumebindingFilter.PreFilter(context.TODO(), state, task.Pod) + if !status.IsSuccess() { + return fmt.Errorf("plugin %s pre-predicates failed %s", volumebindingFilter.Name(), status.Message()) + } + + // handle both bound(check node affinity) and unbound(find available PVs and check node affinity) PVCs + status = volumebindingFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) + if !status.IsSuccess() { + return fmt.Errorf("plugin %s predicates failed %s", volumebindingFilter.Name(), status.Message()) + } + // Check VolumeZone status = volumeZoneFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) if !status.IsSuccess() { From 9773a7b168cf6f44e1129e3d32b7abf2536e7960 Mon Sep 17 00:00:00 2001 From: xilinxing Date: Tue, 27 Sep 2022 20:12:38 +0800 Subject: [PATCH 5/5] bugfix: the csinode driver status failed updating The existing implementation still keeps the deleted driver status as true, this pr fix it. Signed-off-by: xilinxing --- pkg/scheduler/cache/event_handlers.go | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/pkg/scheduler/cache/event_handlers.go b/pkg/scheduler/cache/event_handlers.go index 8b724ce8f2..13f67f2f70 100644 --- a/pkg/scheduler/cache/event_handlers.go +++ b/pkg/scheduler/cache/event_handlers.go @@ -411,24 +411,18 @@ func (sc *SchedulerCache) AddOrUpdateCSINode(obj interface{}) { return } - var csiNodeStatus *schedulingapi.CSINodeStatusInfo - var found bool + csiNodeStatus := &schedulingapi.CSINodeStatusInfo{ + CSINodeName: csiNode.Name, + DriverStatus: make(map[string]bool), + } sc.Mutex.Lock() defer sc.Mutex.Unlock() - // update nodeVolumeCount - - if csiNodeStatus, found = sc.CSINodesStatus[csiNode.Name]; !found { - csiNodeStatus = &schedulingapi.CSINodeStatusInfo{ - CSINodeName: csiNode.Name, - DriverStatus: make(map[string]bool), - } - sc.CSINodesStatus[csiNode.Name] = csiNodeStatus - } for i := range csiNode.Spec.Drivers { d := csiNode.Spec.Drivers[i] csiNodeStatus.DriverStatus[d.Name] = d.Allocatable != nil && d.Allocatable.Count != nil } + sc.CSINodesStatus[csiNode.Name] = csiNodeStatus } func (sc *SchedulerCache) UpdateCSINode(oldObj, newObj interface{}) {