From 5a8dcc6239766a0aaa72288bc410b7859c127d47 Mon Sep 17 00:00:00 2001 From: "wangyuqing (C)" Date: Wed, 31 Jul 2019 10:51:04 +0800 Subject: [PATCH] add podgroup controller --- cmd/controllers/app/options/options.go | 11 +- cmd/controllers/app/options/options_test.go | 4 +- cmd/controllers/app/server.go | 3 + pkg/apis/batch/v1alpha1/labels.go | 2 + pkg/controllers/podgroup/pg_controller.go | 147 ++++++++++++++++++ .../podgroup/pg_controller_handler.go | 106 +++++++++++++ .../podgroup/pg_controller_test.go | 78 ++++++++++ test/e2e/pg_controller.go | 136 ++++++++++++++++ test/e2e/util.go | 41 +++++ 9 files changed, 523 insertions(+), 5 deletions(-) create mode 100644 pkg/controllers/podgroup/pg_controller.go create mode 100644 pkg/controllers/podgroup/pg_controller_handler.go create mode 100644 pkg/controllers/podgroup/pg_controller_test.go create mode 100644 test/e2e/pg_controller.go diff --git a/cmd/controllers/app/options/options.go b/cmd/controllers/app/options/options.go index b402cefcb59..b37ef94dd2f 100644 --- a/cmd/controllers/app/options/options.go +++ b/cmd/controllers/app/options/options.go @@ -23,9 +23,10 @@ import ( ) const ( - defaultQPS = 50.0 - defaultBurst = 100 - defaultWorkers = 3 + defaultQPS = 50.0 + defaultBurst = 100 + defaultWorkers = 3 + defaultSchedulerName = "volcano" ) // ServerOption is the main context object for the controller manager. @@ -38,8 +39,9 @@ type ServerOption struct { KubeAPIQPS float32 PrintVersion bool // WorkerThreads is the number of threads syncing job operations - // concurrently. Larger number = faster job updating,but more CPU load. + // concurrently. Larger number = faster job updating, but more CPU load. WorkerThreads uint32 + SchedulerName string } // NewServerOption creates a new CMServer with a default config. @@ -60,6 +62,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit") fs.Uint32Var(&s.WorkerThreads, "worker-threads", defaultWorkers, "The number of threads syncing job operations concurrently. "+ "Larger number = faster job updating, but more CPU load") + fs.StringVar(&s.SchedulerName, "scheduler-name", defaultSchedulerName, "Volcano will handle pods whose .spec.SchedulerName is same as scheduler-name") } // CheckOptionOrDie checks the LockObjectNamespace diff --git a/cmd/controllers/app/options/options_test.go b/cmd/controllers/app/options/options_test.go index 8d46d10933e..1726969523f 100644 --- a/cmd/controllers/app/options/options_test.go +++ b/cmd/controllers/app/options/options_test.go @@ -17,9 +17,10 @@ limitations under the License. package options import ( - "github.com/spf13/pflag" "reflect" "testing" + + "github.com/spf13/pflag" ) func TestAddFlags(t *testing.T) { @@ -40,6 +41,7 @@ func TestAddFlags(t *testing.T) { KubeAPIBurst: 200, PrintVersion: false, WorkerThreads: defaultWorkers, + SchedulerName: defaultSchedulerName, } if !reflect.DeepEqual(expected, s) { diff --git a/cmd/controllers/app/server.go b/cmd/controllers/app/server.go index 9ac28618332..ceec83bdd43 100644 --- a/cmd/controllers/app/server.go +++ b/cmd/controllers/app/server.go @@ -44,6 +44,7 @@ import ( vkclient "volcano.sh/volcano/pkg/client/clientset/versioned" "volcano.sh/volcano/pkg/controllers/garbagecollector" "volcano.sh/volcano/pkg/controllers/job" + "volcano.sh/volcano/pkg/controllers/podgroup" "volcano.sh/volcano/pkg/controllers/queue" ) @@ -88,11 +89,13 @@ func Run(opt *options.ServerOption) error { jobController := job.NewJobController(kubeClient, kbClient, vkClient, opt.WorkerThreads) queueController := queue.NewQueueController(kubeClient, kbClient) garbageCollector := garbagecollector.New(vkClient) + pgController := podgroup.NewPodgroupController(kubeClient, kbClient, opt.SchedulerName) run := func(ctx context.Context) { go jobController.Run(ctx.Done()) go queueController.Run(ctx.Done()) go garbageCollector.Run(ctx.Done()) + go pgController.Run(ctx.Done()) <-ctx.Done() } diff --git a/pkg/apis/batch/v1alpha1/labels.go b/pkg/apis/batch/v1alpha1/labels.go index 78e01d8a7a6..788e7fb4bd1 100644 --- a/pkg/apis/batch/v1alpha1/labels.go +++ b/pkg/apis/batch/v1alpha1/labels.go @@ -29,4 +29,6 @@ const ( JobVersion = "volcano.sh/job-version" // JobTypeKey job type key used in labels JobTypeKey = "volcano.sh/job-type" + // PodgroupNamePrefix podgroup name prefix + PodgroupNamePrefix = "podgroup-" ) diff --git a/pkg/controllers/podgroup/pg_controller.go b/pkg/controllers/podgroup/pg_controller.go new file mode 100644 index 00000000000..f6abe0f79c4 --- /dev/null +++ b/pkg/controllers/podgroup/pg_controller.go @@ -0,0 +1,147 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podgroup + +import ( + "github.com/golang/glog" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + + scheduling "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" + kbver "volcano.sh/volcano/pkg/client/clientset/versioned" + kbinfoext "volcano.sh/volcano/pkg/client/informers/externalversions" + kbinfo "volcano.sh/volcano/pkg/client/informers/externalversions/scheduling/v1alpha2" + kblister "volcano.sh/volcano/pkg/client/listers/scheduling/v1alpha2" +) + +// Controller the Podgroup Controller type +type Controller struct { + kubeClients kubernetes.Interface + kbClients kbver.Interface + + podInformer coreinformers.PodInformer + pgInformer kbinfo.PodGroupInformer + sharedInformers informers.SharedInformerFactory + + // A store of pods + podLister corelisters.PodLister + podSynced func() bool + + // A store of podgroups + pgLister kblister.PodGroupLister + pgSynced func() bool + + queue workqueue.RateLimitingInterface +} + +// NewPodgroupController create new Podgroup Controller +func NewPodgroupController( + kubeClient kubernetes.Interface, + kbClient kbver.Interface, + schedulerName string, +) *Controller { + cc := &Controller{ + kubeClients: kubeClient, + kbClients: kbClient, + + queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + } + + cc.sharedInformers = informers.NewSharedInformerFactory(cc.kubeClients, 0) + cc.podInformer = cc.sharedInformers.Core().V1().Pods() + cc.podLister = cc.podInformer.Lister() + cc.podSynced = cc.podInformer.Informer().HasSynced + cc.podInformer.Informer().AddEventHandler( + cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + switch obj.(type) { + case *v1.Pod: + pod := obj.(*v1.Pod) + if pod.Spec.SchedulerName == schedulerName && + (pod.Annotations == nil || pod.Annotations[scheduling.GroupNameAnnotationKey] == "") { + return true + } + return false + default: + return false + } + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: cc.addPod, + }, + }) + + cc.pgInformer = kbinfoext.NewSharedInformerFactory(cc.kbClients, 0).Scheduling().V1alpha2().PodGroups() + cc.pgLister = cc.pgInformer.Lister() + cc.pgSynced = cc.pgInformer.Informer().HasSynced + + return cc +} + +// Run start NewPodgroupController +func (cc *Controller) Run(stopCh <-chan struct{}) { + go cc.sharedInformers.Start(stopCh) + go cc.podInformer.Informer().Run(stopCh) + go cc.pgInformer.Informer().Run(stopCh) + + cache.WaitForCacheSync(stopCh, cc.podSynced, cc.pgSynced) + + go wait.Until(cc.worker, 0, stopCh) + + glog.Infof("PodgroupController is running ...... ") +} + +func (cc *Controller) worker() { + for cc.processNextReq() { + } +} + +func (cc *Controller) processNextReq() bool { + obj, shutdown := cc.queue.Get() + if shutdown { + glog.Errorf("Fail to pop item from queue") + return false + } + + req := obj.(podRequest) + defer cc.queue.Done(req) + + pod, err := cc.podLister.Pods(req.pod.Namespace).Get(req.pod.Name) + if err != nil { + glog.Errorf("Failed to get pod by <%v> from cache: %v", req, err) + return true + } + + // normal pod use volcano + if err := cc.createNormalPodPGIfNotExist(pod); err != nil { + glog.Errorf("Failed to handle Pod <%s/%s>: %v", pod.Namespace, pod.Name, err) + cc.queue.AddRateLimited(req) + return true + } + + // If no error, forget it. + cc.queue.Forget(req) + + return true +} diff --git a/pkg/controllers/podgroup/pg_controller_handler.go b/pkg/controllers/podgroup/pg_controller_handler.go new file mode 100644 index 00000000000..c3c01243d3e --- /dev/null +++ b/pkg/controllers/podgroup/pg_controller_handler.go @@ -0,0 +1,106 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podgroup + +import ( + "github.com/golang/glog" + + "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + vkbatchv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + scheduling "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" +) + +type podRequest struct { + pod *v1.Pod +} + +func (cc *Controller) addPod(obj interface{}) { + pod, ok := obj.(*v1.Pod) + if !ok { + glog.Errorf("Failed to convert %v to v1.Pod", obj) + return + } + + req := podRequest{ + pod: pod, + } + + cc.queue.Add(req) +} + +func (cc *Controller) updatePodAnnotations(pod *v1.Pod, pgName string) error { + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + if pod.Annotations[scheduling.GroupNameAnnotationKey] == "" { + pod.Annotations[scheduling.GroupNameAnnotationKey] = pgName + } else { + return nil + } + + if _, err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Update(pod); err != nil { + glog.Errorf("Failed to update pod <%s/%s>: %v", pod.Namespace, pod.Name, err) + return err + } + + return nil +} + +func (cc *Controller) createNormalPodPGIfNotExist(pod *v1.Pod) error { + pgName := generatePodgroupName(pod) + + if _, err := cc.pgLister.PodGroups(pod.Namespace).Get(pgName); err != nil { + if !apierrors.IsNotFound(err) { + glog.Errorf("Failed to get normal PodGroup for Pod <%s/%s>: %v", + pod.Namespace, pod.Name, err) + return err + } + + pg := &scheduling.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: pod.Namespace, + Name: pgName, + OwnerReferences: pod.OwnerReferences, + }, + Spec: scheduling.PodGroupSpec{ + MinMember: 1, + }, + } + + if _, err := cc.kbClients.SchedulingV1alpha2().PodGroups(pod.Namespace).Create(pg); err != nil { + glog.Errorf("Failed to create normal PodGroup for Pod <%s/%s>: %v", + pod.Namespace, pod.Name, err) + return err + } + } + + return cc.updatePodAnnotations(pod, pgName) +} + +func generatePodgroupName(pod *v1.Pod) string { + pgName := vkbatchv1.PodgroupNamePrefix + if len(pod.OwnerReferences) != 0 { + pgName += string(pod.OwnerReferences[0].UID) + } else { + pgName += string(pod.UID) + } + + return pgName +} diff --git a/pkg/controllers/podgroup/pg_controller_test.go b/pkg/controllers/podgroup/pg_controller_test.go new file mode 100644 index 00000000000..f3cc0f75824 --- /dev/null +++ b/pkg/controllers/podgroup/pg_controller_test.go @@ -0,0 +1,78 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podgroup + +import ( + "testing" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeclient "k8s.io/client-go/kubernetes/fake" + + scheduling "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" + kubebatchclient "volcano.sh/volcano/pkg/client/clientset/versioned/fake" +) + +func newFakeController() *Controller { + KubeClientSet := kubeclient.NewSimpleClientset() + KubeBatchClientSet := kubebatchclient.NewSimpleClientset() + + controller := NewPodgroupController(KubeClientSet, KubeBatchClientSet, "volcano") + return controller +} + +func TestAddPodgroup(t *testing.T) { + namespace := "test" + + testCases := []struct { + Name string + pod *v1.Pod + ExpectValue string + }{ + { + Name: "AddPodgroup", + pod: &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: namespace, + OwnerReferences: []metav1.OwnerReference{ + {UID: "p1"}, + }, + }, + }, + ExpectValue: "podgroup-p1", + }, + } + + for i, testcase := range testCases { + c := newFakeController() + + pod, _ := c.kubeClients.CoreV1().Pods(testcase.pod.Namespace).Create(testcase.pod) + + c.addPod(pod) + c.createNormalPodPGIfNotExist(pod) + + podAnno := pod.Annotations[scheduling.GroupNameAnnotationKey] + if testcase.ExpectValue != podAnno { + t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectValue, podAnno) + } + } +} diff --git a/test/e2e/pg_controller.go b/test/e2e/pg_controller.go new file mode 100644 index 00000000000..8f3490230ae --- /dev/null +++ b/test/e2e/pg_controller.go @@ -0,0 +1,136 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var _ = Describe("PG E2E Test: Test PG controller", func() { + It("Create volcano rc, pg controller process", func() { + rcName := "rc-volcano" + podName := "pod-volcano" + namespace := "test" + label := map[string]string{"schedulerName": "volcano"} + context := initTestContext() + defer cleanupTestContext(context) + + rc := &corev1.ReplicationController{ + TypeMeta: v1.TypeMeta{ + APIVersion: "v1", + Kind: "ReplicationController", + }, + ObjectMeta: v1.ObjectMeta{ + Name: rcName, + Namespace: namespace, + }, + Spec: corev1.ReplicationControllerSpec{ + Selector: label, + Template: &corev1.PodTemplateSpec{ + ObjectMeta: v1.ObjectMeta{ + Name: podName, + Labels: label, + }, + Spec: corev1.PodSpec{ + SchedulerName: "volcano", + Containers: []corev1.Container{ + { + Name: podName, + Image: defaultNginxImage, + ImagePullPolicy: corev1.PullIfNotPresent, + }, + }, + }, + }, + }, + } + + pod := &corev1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Namespace: namespace, + }, + } + + _, err := context.kubeclient.CoreV1().ReplicationControllers(namespace).Create(rc) + Expect(err).NotTo(HaveOccurred()) + + err = waitPodPhase(context, pod, []corev1.PodPhase{corev1.PodRunning}) + Expect(err).NotTo(HaveOccurred()) + + ready, err := pgIsReady(context, namespace) + Expect(ready).Should(Equal(true)) + Expect(err).NotTo(HaveOccurred()) + }) + + It("Create default-scheduler rc, pg controller don't process", func() { + rcName := "rc-default-scheduler" + podName := "pod-default-scheduler" + namespace := "test" + label := map[string]string{"a": "b"} + context := initTestContext() + defer cleanupTestContext(context) + + rc := &corev1.ReplicationController{ + TypeMeta: v1.TypeMeta{ + APIVersion: "v1", + Kind: "ReplicationController", + }, + ObjectMeta: v1.ObjectMeta{ + Name: rcName, + Namespace: namespace, + }, + Spec: corev1.ReplicationControllerSpec{ + Selector: label, + Template: &corev1.PodTemplateSpec{ + ObjectMeta: v1.ObjectMeta{ + Name: podName, + Labels: label, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: podName, + Image: defaultNginxImage, + ImagePullPolicy: corev1.PullIfNotPresent, + }, + }, + }, + }, + }, + } + + pod := &corev1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Namespace: namespace, + }, + } + + _, err := context.kubeclient.CoreV1().ReplicationControllers(namespace).Create(rc) + Expect(err).NotTo(HaveOccurred()) + + err = waitPodPhase(context, pod, []corev1.PodPhase{corev1.PodRunning}) + Expect(err).NotTo(HaveOccurred()) + + ready, err := pgIsReady(context, namespace) + Expect(ready).Should(Equal(false)) + Expect(err.Error()).Should(Equal("podgroup is not found")) + }) +}) diff --git a/test/e2e/util.go b/test/e2e/util.go index b3a4f3afe96..3b771c53c6a 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -1175,3 +1175,44 @@ func jobTerminateAction(ctx *context, pg *batchv1alpha1.Job, time time.Time) wai return false, nil } } + +func waitPodPhase(ctx *context, pod *v1.Pod, phase []v1.PodPhase) error { + var additionalError error + err := wait.Poll(100*time.Millisecond, oneMinute, func() (bool, error) { + pods, err := ctx.kubeclient.CoreV1().Pods(pod.Namespace).List(metav1.ListOptions{}) + Expect(err).NotTo(HaveOccurred()) + + for _, p := range phase { + for _, pod := range pods.Items { + if pod.Status.Phase == p { + return true, nil + } + } + } + + additionalError = fmt.Errorf("expected pod '%s' to %v, actual got %s", pod.Name, phase, pod.Status.Phase) + return false, nil + }) + if err != nil && strings.Contains(err.Error(), timeOutMessage) { + return fmt.Errorf("[Wait time out]: %s", additionalError) + } + return err +} + +func pgIsReady(ctx *context, namespace string) (bool, error) { + pgs, err := ctx.vcclient.SchedulingV1alpha2().PodGroups(namespace).List(metav1.ListOptions{}) + if err != nil { + return false, err + } + if pgs != nil && len(pgs.Items) == 0 { + return false, fmt.Errorf("podgroup is not found") + } + + for _, pg := range pgs.Items { + if pg.Status.Phase != schedulingv1alpha2.PodGroupPending { + return true, nil + } + } + + return false, fmt.Errorf("podgroup phase is Pending") +}