Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add podgroup controller #401

Merged
merged 1 commit into from
Aug 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions cmd/controllers/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import (
)

const (
defaultQPS = 50.0
defaultBurst = 100
defaultWorkers = 3
defaultQPS = 50.0
defaultBurst = 100
defaultWorkers = 3
defaultSchedulerName = "volcano"
)

// ServerOption is the main context object for the controller manager.
Expand All @@ -38,8 +39,9 @@ type ServerOption struct {
KubeAPIQPS float32
PrintVersion bool
// WorkerThreads is the number of threads syncing job operations
// concurrently. Larger number = faster job updating,but more CPU load.
// concurrently. Larger number = faster job updating, but more CPU load.
WorkerThreads uint32
SchedulerName string
}

// NewServerOption creates a new CMServer with a default config.
Expand All @@ -60,6 +62,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")
fs.Uint32Var(&s.WorkerThreads, "worker-threads", defaultWorkers, "The number of threads syncing job operations concurrently. "+
"Larger number = faster job updating, but more CPU load")
fs.StringVar(&s.SchedulerName, "scheduler-name", defaultSchedulerName, "Volcano will handle pods whose .spec.SchedulerName is same as scheduler-name")
}

// CheckOptionOrDie checks the LockObjectNamespace
Expand Down
4 changes: 3 additions & 1 deletion cmd/controllers/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ limitations under the License.
package options

import (
"github.com/spf13/pflag"
"reflect"
"testing"

"github.com/spf13/pflag"
)

func TestAddFlags(t *testing.T) {
Expand All @@ -40,6 +41,7 @@ func TestAddFlags(t *testing.T) {
KubeAPIBurst: 200,
PrintVersion: false,
WorkerThreads: defaultWorkers,
SchedulerName: defaultSchedulerName,
}

if !reflect.DeepEqual(expected, s) {
Expand Down
3 changes: 3 additions & 0 deletions cmd/controllers/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
vkclient "volcano.sh/volcano/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/controllers/garbagecollector"
"volcano.sh/volcano/pkg/controllers/job"
"volcano.sh/volcano/pkg/controllers/podgroup"
"volcano.sh/volcano/pkg/controllers/queue"
)

Expand Down Expand Up @@ -88,11 +89,13 @@ func Run(opt *options.ServerOption) error {
jobController := job.NewJobController(kubeClient, kbClient, vkClient, opt.WorkerThreads)
queueController := queue.NewQueueController(kubeClient, kbClient)
garbageCollector := garbagecollector.New(vkClient)
pgController := podgroup.NewPodgroupController(kubeClient, kbClient, opt.SchedulerName)

run := func(ctx context.Context) {
go jobController.Run(ctx.Done())
go queueController.Run(ctx.Done())
go garbageCollector.Run(ctx.Done())
go pgController.Run(ctx.Done())
<-ctx.Done()
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/batch/v1alpha1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ const (
JobVersion = "volcano.sh/job-version"
// JobTypeKey job type key used in labels
JobTypeKey = "volcano.sh/job-type"
// PodgroupNamePrefix podgroup name prefix
PodgroupNamePrefix = "podgroup-"
)
145 changes: 145 additions & 0 deletions pkg/controllers/podgroup/pg_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
Copyright 2019 The Volcano Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package podgroup

import (
"github.com/golang/glog"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

scheduling "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2"
kbver "volcano.sh/volcano/pkg/client/clientset/versioned"
kbinfoext "volcano.sh/volcano/pkg/client/informers/externalversions"
kbinfo "volcano.sh/volcano/pkg/client/informers/externalversions/scheduling/v1alpha2"
kblister "volcano.sh/volcano/pkg/client/listers/scheduling/v1alpha2"
)

// Controller the Podgroup Controller type
type Controller struct {
kubeClients kubernetes.Interface
kbClients kbver.Interface

podInformer coreinformers.PodInformer
pgInformer kbinfo.PodGroupInformer

// A store of pods
podLister corelisters.PodLister
podSynced func() bool

// A store of podgroups
pgLister kblister.PodGroupLister
pgSynced func() bool

queue workqueue.RateLimitingInterface
}

// NewPodgroupController create new Podgroup Controller
func NewPodgroupController(
kubeClient kubernetes.Interface,
kbClient kbver.Interface,
schedulerName string,
) *Controller {
cc := &Controller{
kubeClients: kubeClient,
kbClients: kbClient,

queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
}

factory := informers.NewSharedInformerFactory(cc.kubeClients, 0)
cc.podInformer = factory.Core().V1().Pods()
cc.podLister = cc.podInformer.Lister()
cc.podSynced = cc.podInformer.Informer().HasSynced
cc.podInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch obj.(type) {
case *v1.Pod:
pod := obj.(*v1.Pod)
if pod.Spec.SchedulerName == schedulerName &&
(pod.Annotations == nil || pod.Annotations[scheduling.GroupNameAnnotationKey] == "") {
return true
}
return false
default:
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: cc.addPod,
},
})

cc.pgInformer = kbinfoext.NewSharedInformerFactory(cc.kbClients, 0).Scheduling().V1alpha2().PodGroups()
cc.pgLister = cc.pgInformer.Lister()
cc.pgSynced = cc.pgInformer.Informer().HasSynced

return cc
}

// Run start NewPodgroupController
func (cc *Controller) Run(stopCh <-chan struct{}) {
go cc.podInformer.Informer().Run(stopCh)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if start sharedInformer, it's not necessary to start pod informer again.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thing happens to other controllers

go cc.pgInformer.Informer().Run(stopCh)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We donot need Run informers separately as sharedInformers Start will Run the informers created from informer factory.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same happens to other controllers

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc.sharedInformers.Star will run all its informers.


cache.WaitForCacheSync(stopCh, cc.podSynced, cc.pgSynced)

go wait.Until(cc.worker, 0, stopCh)

glog.Infof("PodgroupController is running ...... ")
}

func (cc *Controller) worker() {
for cc.processNextReq() {
}
}

func (cc *Controller) processNextReq() bool {
obj, shutdown := cc.queue.Get()
if shutdown {
glog.Errorf("Fail to pop item from queue")
return false
}

req := obj.(podRequest)
defer cc.queue.Done(req)

pod, err := cc.podLister.Pods(req.podNamespace).Get(req.podName)
if err != nil {
glog.Errorf("Failed to get pod by <%v> from cache: %v", req, err)
return true
}

// normal pod use volcano
if err := cc.createNormalPodPGIfNotExist(pod); err != nil {
glog.Errorf("Failed to handle Pod <%s/%s>: %v", pod.Namespace, pod.Name, err)
cc.queue.AddRateLimited(req)
return true
}

// If no error, forget it.
cc.queue.Forget(req)

return true
}
136 changes: 136 additions & 0 deletions pkg/controllers/podgroup/pg_controller_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
Copyright 2019 The Volcano Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package podgroup

import (
"github.com/golang/glog"

"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

vkbatchv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
"volcano.sh/volcano/pkg/apis/helpers"
scheduling "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2"
)

type podRequest struct {
podName string
podNamespace string
}

func (cc *Controller) addPod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
glog.Errorf("Failed to convert %v to v1.Pod", obj)
return
}

req := podRequest{
podName: pod.Name,
podNamespace: pod.Namespace,
}

cc.queue.Add(req)
}

func (cc *Controller) updatePodAnnotations(pod *v1.Pod, pgName string) error {
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
if pod.Annotations[scheduling.GroupNameAnnotationKey] == "" {
pod.Annotations[scheduling.GroupNameAnnotationKey] = pgName
} else {
if pod.Annotations[scheduling.GroupNameAnnotationKey] != pgName {
glog.Errorf("normal pod %s/%s annotations %s value is not %s, but %s", pod.Namespace, pod.Name,
scheduling.GroupNameAnnotationKey, pgName, pod.Annotations[scheduling.GroupNameAnnotationKey])
}
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log an error message if pod.Annotations[scheduling.GroupNameAnnotationKey] != pgName.

}

if _, err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Update(pod); err != nil {
glog.Errorf("Failed to update pod <%s/%s>: %v", pod.Namespace, pod.Name, err)
return err
}

return nil
}

func (cc *Controller) createNormalPodPGIfNotExist(pod *v1.Pod) error {
pgName := generatePodgroupName(pod)

if _, err := cc.pgLister.PodGroups(pod.Namespace).Get(pgName); err != nil {
if !apierrors.IsNotFound(err) {
glog.Errorf("Failed to get normal PodGroup for Pod <%s/%s>: %v",
pod.Namespace, pod.Name, err)
return err
}

pg := &scheduling.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: pod.Namespace,
Name: pgName,
OwnerReferences: newPGOwnerReferences(pod),
},
Spec: scheduling.PodGroupSpec{
MinMember: 1,
},
}

if _, err := cc.kbClients.SchedulingV1alpha2().PodGroups(pod.Namespace).Create(pg); err != nil {
glog.Errorf("Failed to create normal PodGroup for Pod <%s/%s>: %v",
pod.Namespace, pod.Name, err)
return err
}
}

return cc.updatePodAnnotations(pod, pgName)
}

func generatePodgroupName(pod *v1.Pod) string {
pgName := vkbatchv1.PodgroupNamePrefix

if len(pod.OwnerReferences) != 0 {
for _, ownerReference := range pod.OwnerReferences {
if ownerReference.Controller != nil && *ownerReference.Controller == true {
pgName += string(ownerReference.UID)
return pgName
}
}
}

pgName += string(pod.UID)

return pgName
}

func newPGOwnerReferences(pod *v1.Pod) []metav1.OwnerReference {
if len(pod.OwnerReferences) != 0 {
for _, ownerReference := range pod.OwnerReferences {
if ownerReference.Controller != nil && *ownerReference.Controller == true {
return pod.OwnerReferences
}
}
}

isController := true
return []metav1.OwnerReference{{
APIVersion: helpers.JobKind.GroupVersion().String(),
Controller: &isController,
UID: pod.UID,
}}
}
Loading