Skip to content

Commit

Permalink
Add queue controller
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxuzhonghu committed May 5, 2019
1 parent a69c217 commit d3f2fac
Show file tree
Hide file tree
Showing 10 changed files with 317 additions and 41 deletions.
15 changes: 13 additions & 2 deletions cmd/controllers/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@ import (
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"

kbver "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"

"volcano.sh/volcano/cmd/controllers/app/options"
vkclient "volcano.sh/volcano/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/controllers/job"
"volcano.sh/volcano/pkg/controllers/queue"
)

const (
Expand Down Expand Up @@ -72,10 +76,17 @@ func Run(opt *options.ServerOption) error {
return err
}

jobController := job.NewJobController(config)
// TODO: add user agent for different controllers
kubeClient := clientset.NewForConfigOrDie(config)
kbClient := kbver.NewForConfigOrDie(config)
vkClient := vkclient.NewForConfigOrDie(config)

jobController := job.NewJobController(kubeClient, kbClient, vkClient)
queueController := queue.NewQueueController(kubeClient, kbClient)

run := func(ctx context.Context) {
jobController.Run(ctx.Done())
go jobController.Run(ctx.Done())
go queueController.Run(ctx.Done())
<-ctx.Done()
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func ControlledBy(obj interface{}, gvk schema.GroupVersionKind) bool {
return false
}

func CreateConfigMapIfNotExist(job *vkv1.Job, kubeClients *kubernetes.Clientset, data map[string]string, cmName string) error {
func CreateConfigMapIfNotExist(job *vkv1.Job, kubeClients kubernetes.Interface, data map[string]string, cmName string) error {
// If ConfigMap does not exist, create one for Job.
cmOld, err := kubeClients.CoreV1().ConfigMaps(job.Namespace).Get(cmName, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -102,7 +102,7 @@ func CreateConfigMapIfNotExist(job *vkv1.Job, kubeClients *kubernetes.Clientset,
return nil
}

func DeleteConfigmap(job *vkv1.Job, kubeClients *kubernetes.Clientset, cmName string) error {
func DeleteConfigmap(job *vkv1.Job, kubeClients kubernetes.Interface, cmName string) error {
if _, err := kubeClients.CoreV1().ConfigMaps(job.Namespace).Get(cmName, metav1.GetOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get Configmap for Job <%s/%s>: %v",
Expand Down
33 changes: 16 additions & 17 deletions pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
pclister "k8s.io/client-go/listers/scheduling/v1beta1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
Expand All @@ -53,10 +52,9 @@ import (

// Controller the Job Controller type
type Controller struct {
config *rest.Config
kubeClients *kubernetes.Clientset
vkClients *vkver.Clientset
kbClients *kbver.Clientset
kubeClient kubernetes.Interface
vkClient vkver.Interface
kbClient kbver.Interface

jobInformer vkbatchinfo.JobInformer
podInformer coreinformers.PodInformer
Expand Down Expand Up @@ -102,29 +100,30 @@ type Controller struct {
}

// NewJobController create new Job Controller
func NewJobController(config *rest.Config) *Controller {

kubeClients := kubernetes.NewForConfigOrDie(config)
func NewJobController(
kubeClient kubernetes.Interface,
kbClient kbver.Interface,
vkClient vkver.Interface,
) *Controller {

//Initialize event client
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: kubeClients.CoreV1().Events("")})
eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(vkscheme.Scheme, v1.EventSource{Component: "vk-controller"})

cc := &Controller{
config: config,
kubeClients: kubeClients,
vkClients: vkver.NewForConfigOrDie(config),
kbClients: kbver.NewForConfigOrDie(config),
kubeClient: kubeClient,
vkClient: vkClient,
kbClient: kbClient,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
commandQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
cache: jobcache.New(),
recorder: recorder,
priorityClasses: make(map[string]*v1beta1.PriorityClass),
}

cc.jobInformer = vkinfoext.NewSharedInformerFactory(cc.vkClients, 0).Batch().V1alpha1().Jobs()
cc.jobInformer = vkinfoext.NewSharedInformerFactory(cc.vkClient, 0).Batch().V1alpha1().Jobs()
cc.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cc.addJob,
// TODO: enable this until we find an appropriate way.
Expand All @@ -134,14 +133,14 @@ func NewJobController(config *rest.Config) *Controller {
cc.jobLister = cc.jobInformer.Lister()
cc.jobSynced = cc.jobInformer.Informer().HasSynced

cc.cmdInformer = vkinfoext.NewSharedInformerFactory(cc.vkClients, 0).Bus().V1alpha1().Commands()
cc.cmdInformer = vkinfoext.NewSharedInformerFactory(cc.vkClient, 0).Bus().V1alpha1().Commands()
cc.cmdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cc.addCommand,
})
cc.cmdLister = cc.cmdInformer.Lister()
cc.cmdSynced = cc.cmdInformer.Informer().HasSynced

cc.sharedInformers = informers.NewSharedInformerFactory(cc.kubeClients, 0)
cc.sharedInformers = informers.NewSharedInformerFactory(cc.kubeClient, 0)
cc.podInformer = cc.sharedInformers.Core().V1().Pods()
cc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cc.addPod,
Expand All @@ -160,7 +159,7 @@ func NewJobController(config *rest.Config) *Controller {
cc.svcLister = cc.svcInformer.Lister()
cc.svcSynced = cc.svcInformer.Informer().HasSynced

cc.pgInformer = kbinfoext.NewSharedInformerFactory(cc.kbClients, 0).Scheduling().V1alpha1().PodGroups()
cc.pgInformer = kbinfoext.NewSharedInformerFactory(cc.kbClient, 0).Scheduling().V1alpha1().PodGroups()
cc.pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: cc.updatePodGroup,
})
Expand Down
16 changes: 8 additions & 8 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
}

// Update Job status
if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil {
if job, err := cc.vkClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
Expand All @@ -117,7 +117,7 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
}

// Delete PodGroup
if err := cc.kbClients.SchedulingV1alpha1().PodGroups(job.Namespace).Delete(job.Name, nil); err != nil {
if err := cc.kbClient.SchedulingV1alpha1().PodGroups(job.Namespace).Delete(job.Name, nil); err != nil {
if !apierrors.IsNotFound(err) {
glog.Errorf("Failed to delete PodGroup of Job %v/%v: %v",
job.Namespace, job.Name, err)
Expand Down Expand Up @@ -146,7 +146,7 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta
return err
}
if newJob != nil {
if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(newJob); err != nil {
if job, err := cc.vkClient.BatchV1alpha1().Jobs(job.Namespace).Update(newJob); err != nil {
glog.Errorf("Failed to update Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
Expand Down Expand Up @@ -240,7 +240,7 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
for _, pod := range podToCreate {
go func(pod *v1.Pod) {
defer waitCreationGroup.Done()
_, err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Create(pod)
_, err := cc.kubeClient.CoreV1().Pods(pod.Namespace).Create(pod)
if err != nil && !apierrors.IsAlreadyExists(err) {
// Failed to create Pod, waitCreationGroup a moment and then create it again
// This is to ensure all podsMap under the same Job created
Expand Down Expand Up @@ -307,7 +307,7 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
updateStatus(&job.Status)
}

if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil {
if job, err := cc.vkClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
Expand Down Expand Up @@ -410,7 +410,7 @@ func (cc *Controller) createPVC(job *vkv1.Job, vcName string, volumeClaim *v1.Pe

glog.V(3).Infof("Try to create PVC: %v", pvc)

if _, e := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); e != nil {
if _, e := cc.kubeClient.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); e != nil {
glog.V(3).Infof("Failed to create PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, e)
return e
Expand Down Expand Up @@ -441,7 +441,7 @@ func (cc *Controller) createPodGroupIfNotExist(job *vkv1.Job) error {
},
}

if _, e := cc.kbClients.SchedulingV1alpha1().PodGroups(job.Namespace).Create(pg); e != nil {
if _, e := cc.kbClient.SchedulingV1alpha1().PodGroups(job.Namespace).Create(pg); e != nil {
glog.V(3).Infof("Failed to create PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)

Expand All @@ -453,7 +453,7 @@ func (cc *Controller) createPodGroupIfNotExist(job *vkv1.Job) error {
}

func (cc *Controller) deleteJobPod(jobName string, pod *v1.Pod) error {
err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil)
err := cc.kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil)
if err != nil && !apierrors.IsNotFound(err) {
glog.Errorf("Failed to delete pod %s/%s for Job %s, err %#v",
pod.Namespace, pod.Name, jobName, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (cc *Controller) processNextCommand() bool {
cmd := obj.(*vkbusv1.Command)
defer cc.commandQueue.Done(cmd)

if err := cc.vkClients.BusV1alpha1().Commands(cmd.Namespace).Delete(cmd.Name, nil); err != nil {
if err := cc.vkClient.BusV1alpha1().Commands(cmd.Namespace).Delete(cmd.Name, nil); err != nil {
if !apierrors.IsNotFound(err) {
glog.Errorf("Failed to delete Command <%s/%s>.", cmd.Namespace, cmd.Name)
cc.commandQueue.AddRateLimited(cmd)
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/job/job_controller_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

func (cc *Controller) pluginOnPodCreate(job *vkv1.Job, pod *v1.Pod) error {
client := vkinterface.PluginClientset{KubeClients: cc.kubeClients}
client := vkinterface.PluginClientset{KubeClient: cc.kubeClient}
for name, args := range job.Spec.Plugins {
if pb, found := vkplugin.GetPluginBuilder(name); !found {
err := fmt.Errorf("failed to get plugin %s", name)
Expand All @@ -47,7 +47,7 @@ func (cc *Controller) pluginOnPodCreate(job *vkv1.Job, pod *v1.Pod) error {
}

func (cc *Controller) pluginOnJobAdd(job *vkv1.Job) error {
client := vkinterface.PluginClientset{KubeClients: cc.kubeClients}
client := vkinterface.PluginClientset{KubeClient: cc.kubeClient}
if job.Status.ControlledResources == nil {
job.Status.ControlledResources = make(map[string]string)
}
Expand All @@ -69,7 +69,7 @@ func (cc *Controller) pluginOnJobAdd(job *vkv1.Job) error {
}

func (cc *Controller) pluginOnJobDelete(job *vkv1.Job) error {
client := vkinterface.PluginClientset{KubeClients: cc.kubeClients}
client := vkinterface.PluginClientset{KubeClient: cc.kubeClient}
for name, args := range job.Spec.Plugins {
if pb, found := vkplugin.GetPluginBuilder(name); !found {
err := fmt.Errorf("failed to get plugin %s", name)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/plugins/interface/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

type PluginClientset struct {
KubeClients *kubernetes.Clientset
KubeClient kubernetes.Interface
}

type PluginInterface interface {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/job/plugins/ssh/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (sp *sshPlugin) OnJobAdd(job *vkv1.Job) error {
return err
}

if err := helpers.CreateConfigMapIfNotExist(job, sp.Clientset.KubeClients, data, sp.cmName(job)); err != nil {
if err := helpers.CreateConfigMapIfNotExist(job, sp.Clientset.KubeClient, data, sp.cmName(job)); err != nil {
return err
}

Expand All @@ -85,7 +85,7 @@ func (sp *sshPlugin) OnJobAdd(job *vkv1.Job) error {
}

func (sp *sshPlugin) OnJobDelete(job *vkv1.Job) error {
if err := helpers.DeleteConfigmap(job, sp.Clientset.KubeClients, sp.cmName(job)); err != nil {
if err := helpers.DeleteConfigmap(job, sp.Clientset.KubeClient, sp.cmName(job)); err != nil {
return err
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/controllers/job/plugins/svc/svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (sp *servicePlugin) OnJobAdd(job *vkv1.Job) error {

data := generateHost(job)

if err := helpers.CreateConfigMapIfNotExist(job, sp.Clientset.KubeClients, data, sp.cmName(job)); err != nil {
if err := helpers.CreateConfigMapIfNotExist(job, sp.Clientset.KubeClient, data, sp.cmName(job)); err != nil {
return err
}

Expand All @@ -83,11 +83,11 @@ func (sp *servicePlugin) OnJobAdd(job *vkv1.Job) error {
}

func (sp *servicePlugin) OnJobDelete(job *vkv1.Job) error {
if err := helpers.DeleteConfigmap(job, sp.Clientset.KubeClients, sp.cmName(job)); err != nil {
if err := helpers.DeleteConfigmap(job, sp.Clientset.KubeClient, sp.cmName(job)); err != nil {
return err
}

if err := sp.Clientset.KubeClients.CoreV1().Services(job.Namespace).Delete(job.Name, nil); err != nil {
if err := sp.Clientset.KubeClient.CoreV1().Services(job.Namespace).Delete(job.Name, nil); err != nil {
if !apierrors.IsNotFound(err) {
glog.Errorf("Failed to delete Service of Job %v/%v: %v", job.Namespace, job.Name, err)
return err
Expand Down Expand Up @@ -121,7 +121,7 @@ func (sp *servicePlugin) mountConfigmap(pod *v1.Pod, job *vkv1.Job) {

func (sp *servicePlugin) createServiceIfNotExist(job *vkv1.Job) error {
// If Service does not exist, create one for Job.
if _, err := sp.Clientset.KubeClients.CoreV1().Services(job.Namespace).Get(job.Name, metav1.GetOptions{}); err != nil {
if _, err := sp.Clientset.KubeClient.CoreV1().Services(job.Namespace).Get(job.Name, metav1.GetOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get Service for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
Expand Down Expand Up @@ -153,7 +153,7 @@ func (sp *servicePlugin) createServiceIfNotExist(job *vkv1.Job) error {
},
}

if _, e := sp.Clientset.KubeClients.CoreV1().Services(job.Namespace).Create(svc); e != nil {
if _, e := sp.Clientset.KubeClient.CoreV1().Services(job.Namespace).Create(svc); e != nil {
glog.V(3).Infof("Failed to create Service for Job <%s/%s>: %v", job.Namespace, job.Name, err)
return e
} else {
Expand Down
Loading

0 comments on commit d3f2fac

Please sign in to comment.