Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lint fix for apis, cache, job and state package of controller #248

Merged
merged 1 commit into from
Jun 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions hack/.golint_failures
Original file line number Diff line number Diff line change
@@ -1,6 +1,2 @@
volcano.sh/volcano/pkg/admission
volcano.sh/volcano/pkg/controllers/apis
volcano.sh/volcano/pkg/controllers/cache
volcano.sh/volcano/pkg/controllers/job
volcano.sh/volcano/pkg/controllers/job/state
volcano.sh/volcano/test/e2e
9 changes: 9 additions & 0 deletions pkg/controllers/apis/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"volcano.sh/volcano/pkg/apis/batch/v1alpha1"
)

//JobInfo struct
type JobInfo struct {
Namespace string
Name string
Expand All @@ -32,6 +33,7 @@ type JobInfo struct {
Pods map[string]map[string]*v1.Pod
}

//Clone function clones the k8s pod values to the JobInfo struct
func (ji *JobInfo) Clone() *JobInfo {
job := &JobInfo{
Namespace: ji.Namespace,
Expand All @@ -51,12 +53,15 @@ func (ji *JobInfo) Clone() *JobInfo {
return job
}

//SetJob sets the volcano jobs values to the JobInfo struct
func (ji *JobInfo) SetJob(job *v1alpha1.Job) {
ji.Name = job.Name
ji.Namespace = job.Namespace
ji.Job = job
}

//AddPod adds the k8s pod object values to the Pods field
//of JobStruct if it doesn't exist. Otherwise it throws error
func (ji *JobInfo) AddPod(pod *v1.Pod) error {
taskName, found := pod.Annotations[v1alpha1.TaskSpecKey]
if !found {
Expand All @@ -81,6 +86,7 @@ func (ji *JobInfo) AddPod(pod *v1.Pod) error {
return nil
}

//UpdatePod updates the k8s pod object values to the existing pod
func (ji *JobInfo) UpdatePod(pod *v1.Pod) error {
taskName, found := pod.Annotations[v1alpha1.TaskSpecKey]
if !found {
Expand All @@ -105,6 +111,7 @@ func (ji *JobInfo) UpdatePod(pod *v1.Pod) error {
return nil
}

//DeletePod deletes the given k8s pod from the JobInfo struct
func (ji *JobInfo) DeletePod(pod *v1.Pod) error {
taskName, found := pod.Annotations[v1alpha1.TaskSpecKey]
if !found {
Expand All @@ -127,6 +134,7 @@ func (ji *JobInfo) DeletePod(pod *v1.Pod) error {
return nil
}

//Request struct
type Request struct {
Namespace string
JobName string
Expand All @@ -138,6 +146,7 @@ type Request struct {
JobVersion int32
}

//String function returns the request in string format
func (r Request) String() string {
return fmt.Sprintf(
"Job: %s/%s, Task:%s, Event:%s, ExitCode:%d, Action:%s, JobVersion: %d",
Expand Down
20 changes: 12 additions & 8 deletions pkg/controllers/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,17 @@ func keyFn(ns, name string) string {
return fmt.Sprintf("%s/%s", ns, name)
}

//JobKeyByName gets the key for the job name
func JobKeyByName(namespace string, name string) string {
return keyFn(namespace, name)
}

//JobKeyByReq gets the key for the job request
func JobKeyByReq(req *apis.Request) string {
return keyFn(req.Namespace, req.JobName)
}

//JobKey gets the "ns"/"name" format of the given job
func JobKey(job *v1alpha1.Job) string {
return keyFn(job.Namespace, job.Name)
}
Expand All @@ -66,6 +69,7 @@ func jobKeyOfPod(pod *v1.Pod) (string, error) {
return keyFn(pod.Namespace, jobName), nil
}

//New gets the job Cache
func New() Cache {
return &jobCache{
jobs: map[string]*apis.JobInfo{},
Expand Down Expand Up @@ -133,11 +137,11 @@ func (jc *jobCache) Update(obj *v1alpha1.Job) error {
defer jc.Unlock()

key := JobKey(obj)
if job, found := jc.jobs[key]; !found {
job, found := jc.jobs[key]
if !found {
return fmt.Errorf("failed to find job <%v>", key)
} else {
job.Job = obj
}
job.Job = obj

return nil
}
Expand All @@ -147,12 +151,12 @@ func (jc *jobCache) Delete(obj *v1alpha1.Job) error {
defer jc.Unlock()

key := JobKey(obj)
if jobInfo, found := jc.jobs[key]; !found {
jobInfo, found := jc.jobs[key]
if !found {
return fmt.Errorf("failed to find job <%v>", key)
} else {
jobInfo.Job = nil
jc.deleteJob(jobInfo)
}
jobInfo.Job = nil
jc.deleteJob(jobInfo)

return nil
}
Expand Down Expand Up @@ -261,7 +265,7 @@ func (jc *jobCache) TaskCompleted(jobKey, taskName string) bool {

for _, pod := range taskPods {
if pod.Status.Phase == v1.PodSucceeded {
completed += 1
completed++
}
}
return completed >= taskReplicas
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"volcano.sh/volcano/pkg/controllers/apis"
)

//Cache Interface
type Cache interface {
Run(stopCh <-chan struct{})

Expand Down
60 changes: 29 additions & 31 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,16 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseM
}

// Update Job status
if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil {
job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job)
if err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
} else {
if e := cc.cache.Update(job); e != nil {
glog.Errorf("KillJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, e)
return e
}
}
if e := cc.cache.Update(job); e != nil {
glog.Errorf("KillJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, e)
return e
}

// Delete PodGroup
Expand Down Expand Up @@ -165,7 +165,7 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, updateStatus state.Update
return err
}

err, job := cc.createJobIOIfNotExist(job)
job, err := cc.createJobIOIfNotExist(job)
if err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PVCError),
fmt.Sprintf("Failed to create PVC, err: %v", err))
Expand All @@ -178,16 +178,16 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, updateStatus state.Update
}
}

if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil {
job, err = cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job)
if err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
} else {
if err := cc.cache.Update(job); err != nil {
glog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, err)
return err
}
}
if err = cc.cache.Update(job); err != nil {
glog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, err)
return err
}

return nil
Expand Down Expand Up @@ -330,23 +330,22 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
job.Status.State.LastTransitionTime = metav1.Now()
}
}

if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil {
job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job)
if err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
} else {
if e := cc.cache.Update(job); e != nil {
glog.Errorf("SyncJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, e)
return e
}
}
if e := cc.cache.Update(job); e != nil {
glog.Errorf("SyncJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, e)
return e
}

return nil
}

func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) (error, *vkv1.Job) {
func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) (*vkv1.Job, error) {
// If PVC does not exist, create them for Job.
var needUpdate, nameExist bool
volumes := job.Spec.Volumes
Expand All @@ -359,7 +358,7 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) (error, *vkv1.Job) {
vcName = vkjobhelpers.MakeVolumeClaimName(job.Name)
exist, err := cc.checkPVCExist(job, vcName)
if err != nil {
return err, nil
return nil, err
}
if exist {
continue
Expand All @@ -371,7 +370,7 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) (error, *vkv1.Job) {
} else {
exist, err := cc.checkPVCExist(job, vcName)
if err != nil {
return err, nil
return nil, err
}
nameExist = exist
}
Expand All @@ -382,7 +381,7 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) (error, *vkv1.Job) {
}
if volume.VolumeClaim != nil {
if err := cc.createPVC(job, vcName, volume.VolumeClaim); err != nil {
return err, nil
return nil, err
}
job.Status.ControlledResources["volume-pvc-"+vcName] = vcName
} else {
Expand All @@ -395,12 +394,11 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) (error, *vkv1.Job) {
if err != nil {
glog.Errorf("Failed to update Job %v/%v for volume claim name: %v ",
job.Namespace, job.Name, err)
return err, nil
} else {
return nil, newJob
return nil, err
}
return newJob, err
}
return nil, job
return job, nil
}

func (cc *Controller) checkPVCExist(job *vkv1.Job, vcName string) (bool, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/job_controller_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func TestCreateJobIOIfNotExistFunc(t *testing.T) {
for i, testcase := range testcases {
fakeController := newFakeController()

err, job := fakeController.createJobIOIfNotExist(testcase.Job)
job, err := fakeController.createJobIOIfNotExist(testcase.Job)
if err != testcase.ExpextVal {
t.Errorf("Expected Return value to be : %s, but got: %s in testcase %d", testcase.ExpextVal, err, i)
}
Expand Down
45 changes: 24 additions & 21 deletions pkg/controllers/job/job_controller_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,18 @@ import (
func (cc *Controller) pluginOnPodCreate(job *vkv1.Job, pod *v1.Pod) error {
client := vkinterface.PluginClientset{KubeClients: cc.kubeClients}
for name, args := range job.Spec.Plugins {
if pb, found := vkplugin.GetPluginBuilder(name); !found {
pb, found := vkplugin.GetPluginBuilder(name)
if !found {
err := fmt.Errorf("failed to get plugin %s", name)
glog.Error(err)
return err
} else {
glog.Infof("Starting to execute plugin at <pluginOnPodCreate>: %s on job: <%s/%s>", name, job.Namespace, job.Name)
if err := pb(client, args).OnPodCreate(pod, job); err != nil {
glog.Errorf("Failed to process on pod create plugin %s, err %v.", name, err)
return err
}
}
glog.Infof("Starting to execute plugin at <pluginOnPodCreate>: %s on job: <%s/%s>", name, job.Namespace, job.Name)
if err := pb(client, args).OnPodCreate(pod, job); err != nil {
glog.Errorf("Failed to process on pod create plugin %s, err %v.", name, err)
return err
}

}
return nil
}
Expand All @@ -52,17 +53,18 @@ func (cc *Controller) pluginOnJobAdd(job *vkv1.Job) error {
job.Status.ControlledResources = make(map[string]string)
}
for name, args := range job.Spec.Plugins {
if pb, found := vkplugin.GetPluginBuilder(name); !found {
pb, found := vkplugin.GetPluginBuilder(name)
if !found {
err := fmt.Errorf("failed to get plugin %s", name)
glog.Error(err)
return err
} else {
glog.Infof("Starting to execute plugin at <pluginOnJobAdd>: %s on job: <%s/%s>", name, job.Namespace, job.Name)
if err := pb(client, args).OnJobAdd(job); err != nil {
glog.Errorf("Failed to process on job add plugin %s, err %v.", name, err)
return err
}
}
glog.Infof("Starting to execute plugin at <pluginOnJobAdd>: %s on job: <%s/%s>", name, job.Namespace, job.Name)
if err := pb(client, args).OnJobAdd(job); err != nil {
glog.Errorf("Failed to process on job add plugin %s, err %v.", name, err)
return err
}

}

return nil
Expand All @@ -71,17 +73,18 @@ func (cc *Controller) pluginOnJobAdd(job *vkv1.Job) error {
func (cc *Controller) pluginOnJobDelete(job *vkv1.Job) error {
client := vkinterface.PluginClientset{KubeClients: cc.kubeClients}
for name, args := range job.Spec.Plugins {
if pb, found := vkplugin.GetPluginBuilder(name); !found {
pb, found := vkplugin.GetPluginBuilder(name)
if !found {
err := fmt.Errorf("failed to get plugin %s", name)
glog.Error(err)
return err
} else {
glog.Infof("Starting to execute plugin at <pluginOnJobDelete>: %s on job: <%s/%s>", name, job.Namespace, job.Name)
if err := pb(client, args).OnJobDelete(job); err != nil {
glog.Errorf("failed to process on job delete plugin %s, err %v.", name, err)
return err
}
}
glog.Infof("Starting to execute plugin at <pluginOnJobDelete>: %s on job: <%s/%s>", name, job.Namespace, job.Name)
if err := pb(client, args).OnJobDelete(job); err != nil {
glog.Errorf("failed to process on job delete plugin %s, err %v.", name, err)
return err
}

}

return nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
vkjobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers"
)

//MakePodName append podname,jobname,taskName and index and returns the string
func MakePodName(jobName string, taskName string, index int) string {
return fmt.Sprintf(vkjobhelpers.PodNameFmt, jobName, taskName, index)
}
Expand Down Expand Up @@ -186,12 +187,14 @@ func addResourceList(list, new v1.ResourceList) {
}
}

//TaskPriority structure
type TaskPriority struct {
priority int32

vkv1.TaskSpec
}

//TasksPriority is a slice of TaskPriority
type TasksPriority []TaskPriority

func (p TasksPriority) Len() int { return len(p) }
Expand Down
Loading