Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge pr from release-1.6 #2542

Merged
merged 5 commits into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type ServerOption struct {
// For dependent tasks, there is a detection cycle inside volcano
// It indicates how often to detect the status of dependent tasks
DetectionPeriodOfDependsOntask time.Duration
// To determine whether inherit owner's annotations for pods when create podgroup
InheritOwnerAnnotations bool
}

// NewServerOption creates a new CMServer with a default config.
Expand All @@ -82,6 +84,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&s.EnableHealthz, "enable-healthz", false, "Enable the health check; it is false by default")
fs.DurationVar(&s.DetectionPeriodOfDependsOntask, "detection-period-of-dependson-task", defaultDetectionPeriodOfDependsOntask, "It indicates how often to detect the status of dependent tasks."+
"e.g. --detection-period-of-dependson-task=1s")
fs.BoolVar(&s.InheritOwnerAnnotations, "inherit-owner-annotations", true, "Enable inherit owner annotations for pods when create podgroup; it is enabled by default")
}

// CheckOptionOrDie checks the LockObjectNamespace.
Expand Down
1 change: 1 addition & 0 deletions cmd/controller-manager/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func startControllers(config *rest.Config, opt *options.ServerOption) func(ctx c
controllerOpt.KubeClient = kubeclientset.NewForConfigOrDie(config)
controllerOpt.VolcanoClient = vcclientset.NewForConfigOrDie(config)
controllerOpt.SharedInformerFactory = informers.NewSharedInformerFactory(controllerOpt.KubeClient, 0)
controllerOpt.InheritOwnerAnnotations = opt.InheritOwnerAnnotations

return func(ctx context.Context) {
framework.ForeachController(func(c framework.Controller) {
Expand Down
11 changes: 7 additions & 4 deletions installer/helm/chart/volcano/templates/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ rules:
verbs: ["list", "watch", "update"]
- apiGroups: [""]
resources: ["persistentvolumes"]
verbs: ["list", "watch"]
verbs: ["list", "watch", "update"]
- apiGroups: [""]
resources: ["namespaces"]
verbs: ["list", "watch"]
resources: ["namespaces", "services", "replicationcontrollers"]
verbs: ["list", "watch", "get"]
- apiGroups: [""]
resources: ["resourcequotas"]
verbs: ["list", "watch"]
Expand Down Expand Up @@ -72,6 +72,9 @@ rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "create", "delete", "update"]
- apiGroups: ["apps"]
resources: ["daemonsets", "replicasets", "statefulsets"]
verbs: ["list", "watch", "get"]

---
kind: ClusterRoleBinding
Expand Down Expand Up @@ -138,7 +141,7 @@ metadata:
prometheus.io/port: "8080"
prometheus.io/scrape: "true"
name: {{ .Release.Name }}-scheduler-service
namespace: {{ .Release.Namespace }}
namespace: {{ .Release.Namespace }}
spec:
ports:
- port: 8080
Expand Down
11 changes: 7 additions & 4 deletions installer/volcano-development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8568,10 +8568,10 @@ rules:
verbs: ["list", "watch", "update"]
- apiGroups: [""]
resources: ["persistentvolumes"]
verbs: ["list", "watch"]
verbs: ["list", "watch", "update"]
- apiGroups: [""]
resources: ["namespaces"]
verbs: ["list", "watch"]
resources: ["namespaces", "services", "replicationcontrollers"]
verbs: ["list", "watch", "get"]
- apiGroups: [""]
resources: ["resourcequotas"]
verbs: ["list", "watch"]
Expand Down Expand Up @@ -8599,6 +8599,9 @@ rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "create", "delete", "update"]
- apiGroups: ["apps"]
resources: ["daemonsets", "replicasets", "statefulsets"]
verbs: ["list", "watch", "get"]
---
# Source: volcano/templates/scheduler.yaml
kind: ClusterRoleBinding
Expand All @@ -8623,7 +8626,7 @@ metadata:
prometheus.io/port: "8080"
prometheus.io/scrape: "true"
name: volcano-scheduler-service
namespace: volcano-system
namespace: volcano-system
spec:
ports:
- port: 8080
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/framework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type ControllerOption struct {
SchedulerNames []string
WorkerNum uint32
MaxRequeueNum int

InheritOwnerAnnotations bool
}

// Controller is the interface of all controllers.
Expand Down
4 changes: 4 additions & 0 deletions pkg/controllers/podgroup/pg_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type pgcontroller struct {
queue workqueue.RateLimitingInterface

schedulerNames []string

// To determine whether inherit owner's annotations for pods when create podgroup
inheritOwnerAnnotations bool
}

func (pg *pgcontroller) Name() string {
Expand All @@ -77,6 +80,7 @@ func (pg *pgcontroller) Initialize(opt *framework.ControllerOption) error {

pg.schedulerNames = make([]string, len(opt.SchedulerNames))
copy(pg.schedulerNames, opt.SchedulerNames)
pg.inheritOwnerAnnotations = opt.InheritOwnerAnnotations

pg.informerFactory = opt.SharedInformerFactory
pg.podInformer = opt.SharedInformerFactory.Core().V1().Pods()
Expand Down
29 changes: 17 additions & 12 deletions pkg/controllers/podgroup/pg_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,22 @@ func (pg *pgcontroller) getAnnotationsFromUpperRes(kind string, name string, nam
}
}

// Inherit annotations from upper resources.
func (pg *pgcontroller) inheritUpperAnnotations(pod *v1.Pod, obj *scheduling.PodGroup) {
if pg.inheritOwnerAnnotations {
for _, reference := range pod.OwnerReferences {
if reference.Kind != "" && reference.Name != "" {
var upperAnnotations = pg.getAnnotationsFromUpperRes(reference.Kind, reference.Name, pod.Namespace)
for k, v := range upperAnnotations {
if strings.HasPrefix(k, scheduling.AnnotationPrefix) {
obj.Annotations[k] = v
}
}
}
}
}
}

func (pg *pgcontroller) createNormalPodPGIfNotExist(pod *v1.Pod) error {
pgName := helpers.GeneratePodgroupName(pod)

Expand Down Expand Up @@ -157,18 +173,7 @@ func (pg *pgcontroller) createNormalPodPGIfNotExist(pod *v1.Pod) error {
},
}

// Inherit annotations from upper resources.
for _, reference := range pod.OwnerReferences {
if reference.Kind != "" && reference.Name != "" {
var upperAnnotations = pg.getAnnotationsFromUpperRes(reference.Kind, reference.Name, pod.Namespace)
for k, v := range upperAnnotations {
if strings.HasPrefix(k, scheduling.AnnotationPrefix) {
obj.Annotations[k] = v
}
}
}
}

pg.inheritUpperAnnotations(pod, obj)
// Individual annotations on pods would overwrite annotations inherited from upper resources.
if queueName, ok := pod.Annotations[scheduling.QueueNameAnnotationKey]; ok {
obj.Spec.Queue = queueName
Expand Down
21 changes: 17 additions & 4 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,15 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
sc.informerFactory = informerFactory
mySchedulerPodName, c := getMultiSchedulerInfo()

// explictly register informers to the factory, otherwise resources listers cannot get anything
// even with no erorr returned. `Namespace` informer is used by `InterPodAffinity` plugin,
// `SelectorSpread` and `PodTopologySpread` plugins uses the following four so far.
informerFactory.Core().V1().Namespaces().Informer()
informerFactory.Core().V1().Services().Informer()
informerFactory.Core().V1().ReplicationControllers().Informer()
informerFactory.Apps().V1().ReplicaSets().Informer()
informerFactory.Apps().V1().StatefulSets().Informer()

// create informer for node information
sc.nodeInformer = informerFactory.Core().V1().Nodes()
sc.nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
Expand Down Expand Up @@ -990,22 +999,26 @@ func (sc *SchedulerCache) processBindTask() {

func (sc *SchedulerCache) BindTask() {
klog.V(5).Infof("batch bind task count %d", len(sc.bindCache))
successfulTasks := make([]*schedulingapi.TaskInfo, 0)
for _, task := range sc.bindCache {
if err := sc.VolumeBinder.BindVolumes(task, task.PodVolumes); err != nil {
klog.Errorf("task %s/%s bind Volumes failed: %#v", task.Namespace, task.Name, err)
sc.VolumeBinder.RevertVolumes(task, task.PodVolumes)
sc.resyncTask(task)
return
} else {
successfulTasks = append(successfulTasks, task)
klog.V(5).Infof("task %s/%s bind Volumes done", task.Namespace, task.Name)
}
}

bindTasks := make([]*schedulingapi.TaskInfo, len(sc.bindCache))
copy(bindTasks, sc.bindCache)
bindTasks := make([]*schedulingapi.TaskInfo, len(successfulTasks))
copy(bindTasks, successfulTasks)
if err := sc.Bind(bindTasks); err != nil {
klog.Errorf("failed to bind task count %d: %#v", len(bindTasks), err)
return
}

for _, task := range sc.bindCache {
for _, task := range successfulTasks {
metrics.UpdateTaskScheduleDuration(metrics.Duration(task.Pod.CreationTimestamp.Time))
}

Expand Down
16 changes: 5 additions & 11 deletions pkg/scheduler/cache/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,24 +411,18 @@ func (sc *SchedulerCache) AddOrUpdateCSINode(obj interface{}) {
return
}

var csiNodeStatus *schedulingapi.CSINodeStatusInfo
var found bool
csiNodeStatus := &schedulingapi.CSINodeStatusInfo{
CSINodeName: csiNode.Name,
DriverStatus: make(map[string]bool),
}
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
// update nodeVolumeCount

if csiNodeStatus, found = sc.CSINodesStatus[csiNode.Name]; !found {
csiNodeStatus = &schedulingapi.CSINodeStatusInfo{
CSINodeName: csiNode.Name,
DriverStatus: make(map[string]bool),
}
sc.CSINodesStatus[csiNode.Name] = csiNodeStatus
}

for i := range csiNode.Spec.Drivers {
d := csiNode.Spec.Drivers[i]
csiNodeStatus.DriverStatus[d.Name] = d.Allocatable != nil && d.Allocatable.Count != nil
}
sc.CSINodesStatus[csiNode.Name] = csiNodeStatus
}

func (sc *SchedulerCache) UpdateCSINode(oldObj, newObj interface{}) {
Expand Down
Loading