Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine input & output volumes #124

Merged
merged 1 commit into from
May 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions pkg/admission/admit_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,9 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st
}
}

//TODO(tommylikehu): Fix me and enable it.
//if validateInfo, ok := ValidateIO(job.Spec.Volumes); ok {
// msg = msg + validateInfo
//}
if validateInfo, ok := ValidateIO(job.Spec.Volumes); ok {
msg = msg + validateInfo
}

if msg != "" {
reviewResponse.Allowed = false
Expand Down
35 changes: 2 additions & 33 deletions pkg/admission/mutate_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@ package admission
import (
"encoding/json"
"fmt"
"math/rand"
"strconv"
"time"

"github.com/golang/glog"
"strconv"

"k8s.io/api/admission/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

v1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
"volcano.sh/volcano/pkg/apis/batch/v1alpha1"
)

type patchOperation struct {
Expand Down Expand Up @@ -74,7 +71,6 @@ func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
func createPatch(job v1alpha1.Job) ([]byte, error) {
var patch []patchOperation
patch = append(patch, mutateSpec(job.Spec.Tasks, "/spec/tasks")...)
patch = append(patch, mutateMetadata(job.ObjectMeta, "/metadata")...)

return json.Marshal(patch)
}
Expand All @@ -95,30 +91,3 @@ func mutateSpec(tasks []v1alpha1.TaskSpec, basePath string) (patch []patchOperat

return patch
}

func mutateMetadata(metadata metav1.ObjectMeta, basePath string) (patch []patchOperation) {
if len(metadata.Annotations) == 0 {
metadata.Annotations = make(map[string]string)
}
randomStr := genRandomStr(5)
metadata.Annotations[PVCInputName] = fmt.Sprintf("%s-input-%s", metadata.Name, randomStr)
metadata.Annotations[PVCOutputName] = fmt.Sprintf("%s-output-%s", metadata.Name, randomStr)
patch = append(patch, patchOperation{
Op: "replace",
Path: basePath,
Value: metadata,
})

return patch
}

func genRandomStr(l int) string {
str := "0123456789abcdefghijklmnopqrstuvwxyz"
bytes := []byte(str)
result := []byte{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < l; i++ {
result = append(result, bytes[r.Intn(len(bytes))])
}
return string(result)
}
22 changes: 11 additions & 11 deletions pkg/apis/batch/v1alpha1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,32 +47,29 @@ type JobSpec struct {
// +optional
MinAvailable int32 `json:"minAvailable,omitempty" protobuf:"bytes,2,opt,name=minAvailable"`

// The volume mount for input of Job
Input *VolumeSpec `json:"input,omitempty" protobuf:"bytes,3,opt,name=input"`

// The volume mount for output of Job
Output *VolumeSpec `json:"output,omitempty" protobuf:"bytes,4,opt,name=output"`
// The volumes mount on Job
Volumes []VolumeSpec `json:"volumes,omitempty" protobuf:"bytes,3,opt,name=volumes"`

// Tasks specifies the task specification of Job
// +optional
Tasks []TaskSpec `json:"tasks,omitempty" protobuf:"bytes,5,opt,name=tasks"`
Tasks []TaskSpec `json:"tasks,omitempty" protobuf:"bytes,4,opt,name=tasks"`

// Specifies the default lifecycle of tasks
// +optional
Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,6,opt,name=policies"`
Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,5,opt,name=policies"`

// Specifies the plugin of job
// Key is plugin name, value is the arguments of the plugin
// +optional
Plugins map[string][]string `json:"plugins,omitempty" protobuf:"bytes,7,opt,name=plugins"`
Plugins map[string][]string `json:"plugins,omitempty" protobuf:"bytes,6,opt,name=plugins"`

//Specifies the queue that will be used in the scheduler, "default" queue is used this leaves empty.
Queue string `json:"queue,omitempty" protobuf:"bytes,8,opt,name=queue"`
Queue string `json:"queue,omitempty" protobuf:"bytes,7,opt,name=queue"`

// Specifies the maximum number of retries before marking this Job failed.
// Defaults to 3.
// +optional
MaxRetry int32 `json:"maxRetry,omitempty" protobuf:"bytes,9,opt,name=maxRetry"`
MaxRetry int32 `json:"maxRetry,omitempty" protobuf:"bytes,8,opt,name=maxRetry"`
}

// VolumeSpec defines the specification of Volume, e.g. PVC
Expand All @@ -81,8 +78,11 @@ type VolumeSpec struct {
// not contain ':'.
MountPath string `json:"mountPath" protobuf:"bytes,1,opt,name=mountPath"`

// defined the PVC name
VolumeClaimName string `json:"volumeClaimName,omitempty" protobuf:"bytes,2,opt,name=volumeClaimName"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would u add this? Would prefer auto gen it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, there would be a few codes which can be optimized, would like to do it when the initial sync patch get merged.


// VolumeClaim defines the PVC used by the VolumeMount.
VolumeClaim *v1.PersistentVolumeClaimSpec `json:"volumeClaim,omitempty" protobuf:"bytes,1,opt,name=volumeClaim"`
VolumeClaim *v1.PersistentVolumeClaimSpec `json:"volumeClaim,omitempty" protobuf:"bytes,3,opt,name=volumeClaim"`
}

type JobEvent string
Expand Down
15 changes: 6 additions & 9 deletions pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 14 additions & 1 deletion pkg/controllers/job/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package helpers

import (
"fmt"
"math/rand"
"strings"
"time"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
)

const (
Expand All @@ -39,3 +41,14 @@ func GetTaskIndex(pod *v1.Pod) string {
func MakePodName(jobName string, taskName string, index int) string {
return fmt.Sprintf(PodNameFmt, jobName, taskName, index)
}

func GenRandomStr(l int) string {
str := "0123456789abcdefghijklmnopqrstuvwxyz"
bytes := []byte(str)
result := []byte{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < l; i++ {
result = append(result, bytes[r.Intn(len(bytes))])
}
return string(result)
}
146 changes: 89 additions & 57 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (

kbv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
kbapi "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"

admissioncontroller "volcano.sh/volcano/pkg/admission"
vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
"volcano.sh/volcano/pkg/apis/helpers"
"volcano.sh/volcano/pkg/controllers/apis"
Expand Down Expand Up @@ -143,12 +141,17 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta
job := jobInfo.Job
glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name)

if err := cc.createPodGroupIfNotExist(job); err != nil {
newJob, err := cc.needUpdateForVolumeClaim(job)
if err != nil {
return err
}

if err := cc.createJobIOIfNotExist(job); err != nil {
return err
if newJob != nil {
if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(newJob); err != nil {
glog.Errorf("Failed to update Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
return nil
}

if err := cc.pluginOnJobAdd(job); err != nil {
Expand All @@ -157,6 +160,14 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta
return err
}

if err := cc.createPodGroupIfNotExist(job); err != nil {
return err
}

if err := cc.createJobIOIfNotExist(job); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -320,68 +331,89 @@ func (cc *Controller) calculateVersion(current int32, bumpVersion bool) int32 {
}

func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error {
// If input/output PVC does not exist, create them for Job.
inputPVC := job.Annotations[admissioncontroller.PVCInputName]
outputPVC := job.Annotations[admissioncontroller.PVCOutputName]
if job.Spec.Input != nil {
if job.Spec.Input.VolumeClaim != nil {
if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(inputPVC); err != nil {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get input PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
// If PVC does not exist, create them for Job.
volumes := job.Spec.Volumes
for _, volume := range volumes {
vcName := volume.VolumeClaimName
exist, err := cc.checkPVCExist(job, vcName)
if err != nil {
return err
}
if !exist {
if job.Status.ControlledResources == nil {
job.Status.ControlledResources = make(map[string]string)
}
if volume.VolumeClaim != nil {
if err := cc.createPVC(job, vcName, volume.VolumeClaim); err != nil {
return err
}
job.Status.ControlledResources["volume-pvc-"+vcName] = vcName
} else {
job.Status.ControlledResources["volume-emptyDir-"+vcName] = vcName
}
}
}
return nil
}

pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: inputPVC,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
},
Spec: *job.Spec.Input.VolumeClaim,
func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error) {
// If VolumeClaimName does not exist, generate them for Job.
var newJob *vkv1.Job
volumes := job.Spec.Volumes
for index, volume := range volumes {
vcName := volume.VolumeClaimName
if len(vcName) == 0 {
for {
randomStr := vkjobhelpers.GenRandomStr(12)
vcName = fmt.Sprintf("%s-volume-%s", job.Name, randomStr)
exist, err := cc.checkPVCExist(job, vcName)
if err != nil {
return nil, err
}

glog.V(3).Infof("Try to create input PVC: %v", pvc)

if _, e := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); e != nil {
glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return e
if exist {
continue
}
if newJob == nil {
newJob = job.DeepCopy()
}
newJob.Spec.Volumes[index].VolumeClaimName = vcName
break
}
}
}
if job.Spec.Output != nil {
if job.Spec.Output.VolumeClaim != nil {
if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(outputPVC); err != nil {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get output PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
//return err
}
return newJob, nil
}

pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: outputPVC,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
},
Spec: *job.Spec.Output.VolumeClaim,
}
func (cc *Controller) checkPVCExist(job *vkv1.Job, vcName string) (bool, error) {
if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(vcName); err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
glog.V(3).Infof("Failed to get PVC for job <%s/%s>: %v",
job.Namespace, job.Name, err)
return false, err
}
return true, nil
}

glog.V(3).Infof("Try to create output PVC: %v", pvc)
func (cc *Controller) createPVC(job *vkv1.Job, vcName string, volumeClaim *v1.PersistentVolumeClaimSpec) error {
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: vcName,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
},
Spec: *volumeClaim,
}

if _, e := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); e != nil {
glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return e
}
}
}
glog.V(3).Infof("Try to create PVC: %v", pvc)

if _, e := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); e != nil {
glog.V(3).Infof("Failed to create PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, e)
return e
}
return nil
}
Expand Down
Loading