Skip to content

Commit

Permalink
Merge pull request volcano-sh#5 from k82cn/add_job_io
Browse files Browse the repository at this point in the history
  • Loading branch information
Klaus Ma authored Jan 17, 2019
2 parents e2300d7 + 4df52f6 commit 59b0a5c
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 3 deletions.
2 changes: 0 additions & 2 deletions config/crds/batch_v1alpha1_job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ spec:
should be mounted. Defaults to "" (volume's root).
type: string
required:
- name
- mountPath
type: object
minAvailable:
Expand Down Expand Up @@ -88,7 +87,6 @@ spec:
should be mounted. Defaults to "" (volume's root).
type: string
required:
- name
- mountPath
type: object
policies:
Expand Down
10 changes: 10 additions & 0 deletions example/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ metadata:
name: test-job
spec:
minAvailable: 3
input:
mountPath: "/myinput"
output:
mountPath: "/myoutput"
claim:
accessModes: [ "ReadWriteOnce" ]
storageClassName: "my-storage-class"
resources:
requests:
storage: 1Gi
taskSpecs:
- replicas: 6
template:
Expand Down
11 changes: 10 additions & 1 deletion pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Controller struct {

jobInformer vkbatchinfo.JobInformer
podInformer coreinformers.PodInformer
pvcInformer coreinformers.PersistentVolumeClaimInformer
pgInformer kbinfo.PodGroupInformer
svcInformer coreinformers.ServiceInformer
cmdInformer vkcoreinfo.CommandInformer
Expand All @@ -70,6 +71,9 @@ type Controller struct {
podLister corelisters.PodLister
podSynced func() bool

pvcLister corelisters.PersistentVolumeClaimLister
pvcSynced func() bool

// A store of podgroups
pgLister kblister.PodGroupLister
pgSynced func() bool
Expand Down Expand Up @@ -156,6 +160,10 @@ func NewJobController(config *rest.Config) *Controller {
cc.podLister = cc.podInformer.Lister()
cc.podSynced = cc.podInformer.Informer().HasSynced

cc.pvcInformer = informers.NewSharedInformerFactory(cc.kubeClients, 0).Core().V1().PersistentVolumeClaims()
cc.pvcLister = cc.pvcInformer.Lister()
cc.pvcSynced = cc.pvcInformer.Informer().HasSynced

cc.svcInformer = informers.NewSharedInformerFactory(cc.kubeClients, 0).Core().V1().Services()
cc.svcLister = cc.svcInformer.Lister()
cc.svcSynced = cc.svcInformer.Informer().HasSynced
Expand All @@ -182,12 +190,13 @@ func NewJobController(config *rest.Config) *Controller {
func (cc *Controller) Run(stopCh <-chan struct{}) {
go cc.jobInformer.Informer().Run(stopCh)
go cc.podInformer.Informer().Run(stopCh)
go cc.pvcInformer.Informer().Run(stopCh)
go cc.pgInformer.Informer().Run(stopCh)
go cc.svcInformer.Informer().Run(stopCh)
go cc.cmdInformer.Informer().Run(stopCh)

cache.WaitForCacheSync(stopCh, cc.jobSynced, cc.podSynced, cc.pgSynced,
cc.svcSynced, cc.cmdSynced)
cc.svcSynced, cc.cmdSynced, cc.pvcSynced)

go wait.Until(cc.worker, 0, stopCh)

Expand Down
66 changes: 66 additions & 0 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,72 @@ func (cc *Controller) syncJob(req *state.Request) error {
}
}

// If input/output PVC does not exist, create them for Job.
inputPVC := fmt.Sprintf("%s-input", job.Name)
outputPVC := fmt.Sprintf("%s-output", job.Name)
if job.Spec.Input != nil {
if job.Spec.Input.VolumeClaim != nil {
if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(inputPVC); err != nil {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get input PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}

pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: inputPVC,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
},
Spec: *job.Spec.Input.VolumeClaim,
}

glog.V(3).Infof("Try to create input PVC: %v", pvc)

if _, e := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); e != nil {
glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return e
}
}
}
}

if job.Spec.Output != nil {
if job.Spec.Output.VolumeClaim != nil {
if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(outputPVC); err != nil {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get output PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}

pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: outputPVC,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
},
Spec: *job.Spec.Output.VolumeClaim,
}

glog.V(3).Infof("Try to create output PVC: %v", pvc)

if _, e := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); e != nil {
glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return e
}
}
}
}

// If Service does not exist, create one for Job.
if _, err := cc.svcLister.Services(job.Namespace).Get(job.Name); err != nil {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get Service for Job <%s/%s>: %v",
Expand Down
52 changes: 52 additions & 0 deletions pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package job

import (
"fmt"

"github.com/golang/glog"

"k8s.io/api/core/v1"
Expand Down Expand Up @@ -81,6 +82,56 @@ func createJobPod(job *vkv1.Job, template *v1.PodTemplateSpec, ix int) *v1.Pod {
Spec: templateCopy.Spec,
}

if job.Spec.Output != nil {
if job.Spec.Output.VolumeClaim == nil {
volume := v1.Volume{
Name: fmt.Sprintf("%s-output", job.Name),
}
volume.EmptyDir = &v1.EmptyDirVolumeSource{}
pod.Spec.Volumes = append(pod.Spec.Volumes, volume)
} else {
volume := v1.Volume{
Name: fmt.Sprintf("%s-output", job.Name),
}
volume.PersistentVolumeClaim = &v1.PersistentVolumeClaimVolumeSource{
ClaimName: fmt.Sprintf("%s-output", job.Name),
}

pod.Spec.Volumes = append(pod.Spec.Volumes, volume)
}

for i, c := range pod.Spec.Containers {
vm := job.Spec.Output.VolumeMount
vm.Name = fmt.Sprintf("%s-output", job.Name)
pod.Spec.Containers[i].VolumeMounts = append(c.VolumeMounts, vm)
}
}

if job.Spec.Input != nil {
if job.Spec.Input.VolumeClaim == nil {
volume := v1.Volume{
Name: fmt.Sprintf("%s-input", job.Name),
}
volume.EmptyDir = &v1.EmptyDirVolumeSource{}
pod.Spec.Volumes = append(pod.Spec.Volumes, volume)
} else {
volume := v1.Volume{
Name: fmt.Sprintf("%s-input", job.Name),
}
volume.PersistentVolumeClaim = &v1.PersistentVolumeClaimVolumeSource{
ClaimName: fmt.Sprintf("%s-input", job.Name),
}

pod.Spec.Volumes = append(pod.Spec.Volumes, volume)
}

for i, c := range pod.Spec.Containers {
vm := job.Spec.Input.VolumeMount
vm.Name = fmt.Sprintf("%s-input", job.Name)
pod.Spec.Containers[i].VolumeMounts = append(c.VolumeMounts, vm)
}
}

if len(pod.Annotations) == 0 {
pod.Annotations = make(map[string]string)
}
Expand Down Expand Up @@ -110,6 +161,7 @@ func createJobPod(job *vkv1.Job, template *v1.PodTemplateSpec, ix int) *v1.Pod {
if job.Spec.SchedulerName != "" && pod.Spec.SchedulerName == "" {
pod.Spec.SchedulerName = job.Spec.SchedulerName
}

return pod
}

Expand Down

0 comments on commit 59b0a5c

Please sign in to comment.