From a524104d092eba556a0c46611434c3632b8803e2 Mon Sep 17 00:00:00 2001 From: "wangyuqing (C)" Date: Thu, 9 May 2019 15:30:58 +0800 Subject: [PATCH] add admitPod and PGController --- cmd/admission/app/configure/configure.go | 32 +- cmd/admission/app/server.go | 68 +--- cmd/admission/main.go | 24 +- cmd/controllers/app/options/options.go | 13 +- cmd/controllers/app/options/options_test.go | 1 + cmd/controllers/app/server.go | 8 +- hack/e2e-admission-config.yaml | 22 ++ hack/run-e2e-kind.sh | 4 +- .../volcano/templates/admission-config.yaml | 26 ++ .../chart/volcano/templates/admission.yaml | 4 + .../chart/volcano/templates/controllers.yaml | 1 + installer/helm/chart/volcano/values.yaml | 1 + pkg/admission/admission_controller.go | 13 + pkg/admission/admit_pod.go | 144 +++++++++ pkg/admission/admit_pod_test.go | 185 +++++++++++ pkg/admission/server.go | 77 +++++ pkg/apis/batch/v1alpha1/job.go | 2 - pkg/apis/batch/v1alpha1/labels.go | 2 + pkg/controllers/job/helpers/helpers.go | 4 +- pkg/controllers/job/job_controller.go | 1 - pkg/controllers/job/job_controller_actions.go | 61 ++-- .../job/job_controller_actions_test.go | 115 +------ pkg/controllers/job/job_controller_handler.go | 4 +- pkg/controllers/job/job_state_test.go | 304 ------------------ pkg/controllers/job/state/factory.go | 4 - pkg/controllers/job/state/inqueue.go | 61 ---- pkg/controllers/job/state/pending.go | 11 +- pkg/controllers/podgroup/pg_controller.go | 147 +++++++++ .../podgroup/pg_controller_handler.go | 95 ++++++ .../podgroup/pg_controller_test.go | 78 +++++ test/e2e/admission.go | 73 +++++ test/e2e/command.go | 2 +- test/e2e/job_error_handling.go | 24 +- test/e2e/job_scheduling.go | 2 +- test/e2e/pg_controller.go | 136 ++++++++ test/e2e/util.go | 54 +++- 36 files changed, 1155 insertions(+), 648 deletions(-) create mode 100644 pkg/admission/admit_pod.go create mode 100644 pkg/admission/admit_pod_test.go create mode 100644 pkg/admission/server.go delete mode 100644 pkg/controllers/job/state/inqueue.go create mode 100644 pkg/controllers/podgroup/pg_controller.go create mode 100644 pkg/controllers/podgroup/pg_controller_handler.go create mode 100644 pkg/controllers/podgroup/pg_controller_test.go create mode 100644 test/e2e/pg_controller.go diff --git a/cmd/admission/app/configure/configure.go b/cmd/admission/app/configure/configure.go index 981af32231a..fc73fb81f91 100644 --- a/cmd/admission/app/configure/configure.go +++ b/cmd/admission/app/configure/configure.go @@ -31,17 +31,20 @@ import ( // Config admission-controller server config. type Config struct { - Master string - Kubeconfig string - CertFile string - KeyFile string - CaCertFile string - Port int - MutateWebhookConfigName string - MutateWebhookName string - ValidateWebhookConfigName string - ValidateWebhookName string - PrintVersion bool + Master string + Kubeconfig string + CertFile string + KeyFile string + CaCertFile string + Port int + MutateWebhookConfigName string + MutateWebhookName string + ValidateWebhookConfigName string + ValidateWebhookName string + ValidateWebhookPodConfigName string + ValidateWebhookPodName string + PrintVersion bool + SchedulerName string } // NewConfig create new config @@ -65,10 +68,15 @@ func (c *Config) AddFlags() { flag.StringVar(&c.MutateWebhookName, "mutate-webhook-name", "mutatejob.volcano.sh", "Name of the webhook entry in the webhook config.") flag.StringVar(&c.ValidateWebhookConfigName, "validate-webhook-config-name", "volcano-validate-job", - "Name of the mutatingwebhookconfiguration resource in Kubernetes.") + "Name of the validatingwebhookconfiguration resource in Kubernetes.") flag.StringVar(&c.ValidateWebhookName, "validate-webhook-name", "validatejob.volcano.sh", "Name of the webhook entry in the webhook config.") + flag.StringVar(&c.ValidateWebhookPodConfigName, "validate-webhook-pod-config-name", "volcano-validate-pod", + "Name of the pod validatingwebhookconfiguration resource in Kubernetes.") + flag.StringVar(&c.ValidateWebhookPodName, "validate-webhook-pod-name", "validatepod.volcano.sh", + "Name of the pod webhook entry in the webhook config.") flag.BoolVar(&c.PrintVersion, "version", false, "Show version and quit") + flag.StringVar(&c.SchedulerName, "scheduler-name", "volcano", "Volcano will handle pods whose .spec.SchedulerName is same as scheduler-name") } // CheckPortOrDie check valid port range diff --git a/cmd/admission/app/server.go b/cmd/admission/app/server.go index dcd2ae9a987..3f01d64080b 100644 --- a/cmd/admission/app/server.go +++ b/cmd/admission/app/server.go @@ -18,28 +18,13 @@ package app import ( "crypto/tls" - "encoding/json" - "io/ioutil" - "net/http" "github.com/golang/glog" - "volcano.sh/volcano/pkg/client/clientset/versioned" - "k8s.io/api/admission/v1beta1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" - appConf "volcano.sh/volcano/cmd/admission/app/configure" - admissioncontroller "volcano.sh/volcano/pkg/admission" -) - -const ( - //CONTENTTYPE http content-type - CONTENTTYPE = "Content-Type" - - //APPLICATIONJSON json content - APPLICATIONJSON = "application/json" + "volcano.sh/volcano/pkg/client/clientset/versioned" ) // GetClient Get a clientset with restConfig. @@ -51,7 +36,7 @@ func GetClient(restConfig *restclient.Config) *kubernetes.Clientset { return clientset } -//GetKubeBatchClient get a clientset for kubebatch +// GetKubeBatchClient get a clientset for kubebatch func GetKubeBatchClient(restConfig *restclient.Config) *versioned.Clientset { clientset, err := versioned.NewForConfig(restConfig) if err != nil { @@ -89,52 +74,3 @@ func ConfigTLS(config *appConf.Config, restConfig *restclient.Config) *tls.Confi glog.Fatal("tls: failed to find any tls config data") return &tls.Config{} } - -//Serve the http request -func Serve(w http.ResponseWriter, r *http.Request, admit admissioncontroller.AdmitFunc) { - var body []byte - if r.Body != nil { - if data, err := ioutil.ReadAll(r.Body); err == nil { - body = data - } - } - - // verify the content type is accurate - contentType := r.Header.Get(CONTENTTYPE) - if contentType != APPLICATIONJSON { - glog.Errorf("contentType=%s, expect application/json", contentType) - return - } - - var reviewResponse *v1beta1.AdmissionResponse - ar := v1beta1.AdmissionReview{} - deserializer := admissioncontroller.Codecs.UniversalDeserializer() - if _, _, err := deserializer.Decode(body, nil, &ar); err != nil { - reviewResponse = admissioncontroller.ToAdmissionResponse(err) - } else { - reviewResponse = admit(ar) - } - glog.V(3).Infof("sending response: %v", reviewResponse) - - response := createResponse(reviewResponse, &ar) - resp, err := json.Marshal(response) - if err != nil { - glog.Error(err) - } - if _, err := w.Write(resp); err != nil { - glog.Error(err) - } -} - -func createResponse(reviewResponse *v1beta1.AdmissionResponse, ar *v1beta1.AdmissionReview) v1beta1.AdmissionReview { - response := v1beta1.AdmissionReview{} - if reviewResponse != nil { - response.Response = reviewResponse - response.Response.UID = ar.Request.UID - } - // reset the Object and OldObject, they are not needed in a response. - ar.Request.Object = runtime.RawExtension{} - ar.Request.OldObject = runtime.RawExtension{} - - return response -} diff --git a/cmd/admission/main.go b/cmd/admission/main.go index 82ab5dcd685..366c20dcc46 100644 --- a/cmd/admission/main.go +++ b/cmd/admission/main.go @@ -23,20 +23,20 @@ import ( "os" "strconv" + "k8s.io/client-go/tools/clientcmd" + "volcano.sh/volcano/cmd/admission/app" appConf "volcano.sh/volcano/cmd/admission/app/configure" admissioncontroller "volcano.sh/volcano/pkg/admission" "volcano.sh/volcano/pkg/version" - - "k8s.io/client-go/tools/clientcmd" ) func serveJobs(w http.ResponseWriter, r *http.Request) { - app.Serve(w, r, admissioncontroller.AdmitJobs) + admissioncontroller.Serve(w, r, admissioncontroller.AdmitJobs) } func serveMutateJobs(w http.ResponseWriter, r *http.Request) { - app.Serve(w, r, admissioncontroller.MutateJobs) + admissioncontroller.Serve(w, r, admissioncontroller.MutateJobs) } func main() { @@ -67,6 +67,8 @@ func main() { admissioncontroller.KubeBatchClientSet = app.GetKubeBatchClient(restConfig) + servePods(config) + caCertPem, err := ioutil.ReadFile(config.CaCertFile) if err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) @@ -80,6 +82,10 @@ func main() { config.ValidateWebhookConfigName, config.ValidateWebhookName, caCertPem); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) } + if err = appConf.PatchValidateWebhookConfig(clientset.AdmissionregistrationV1beta1().ValidatingWebhookConfigurations(), + config.ValidateWebhookPodConfigName, config.ValidateWebhookPodName, caCertPem); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + } } server := &http.Server{ @@ -88,3 +94,13 @@ func main() { } server.ListenAndServeTLS("", "") } + +func servePods(config *appConf.Config) { + admController := &admissioncontroller.Controller{ + KbClients: admissioncontroller.KubeBatchClientSet, + SchedulerName: config.SchedulerName, + } + http.HandleFunc(admissioncontroller.AdmitPodPath, admController.ServerPods) + + return +} diff --git a/cmd/controllers/app/options/options.go b/cmd/controllers/app/options/options.go index b402cefcb59..a161d44ae91 100644 --- a/cmd/controllers/app/options/options.go +++ b/cmd/controllers/app/options/options.go @@ -23,9 +23,10 @@ import ( ) const ( - defaultQPS = 50.0 - defaultBurst = 100 - defaultWorkers = 3 + defaultQPS = 50.0 + defaultBurst = 100 + defaultWorkers = 3 + defaultSchedulerName = "volcano" ) // ServerOption is the main context object for the controller manager. @@ -39,7 +40,9 @@ type ServerOption struct { PrintVersion bool // WorkerThreads is the number of threads syncing job operations // concurrently. Larger number = faster job updating,but more CPU load. - WorkerThreads uint32 + WorkerThreads uint32 + EnablePodgroupController bool + SchedulerName string } // NewServerOption creates a new CMServer with a default config. @@ -60,6 +63,8 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit") fs.Uint32Var(&s.WorkerThreads, "worker-threads", defaultWorkers, "The number of threads syncing job operations concurrently. "+ "Larger number = faster job updating, but more CPU load") + fs.BoolVar(&s.EnablePodgroupController, "enable-podgroup-controller", false, "Normal job use volcano scheduler will enable pg controller") + fs.StringVar(&s.SchedulerName, "scheduler-name", defaultSchedulerName, "Volcano will handle pods whose .spec.SchedulerName is same as scheduler-name") } // CheckOptionOrDie checks the LockObjectNamespace diff --git a/cmd/controllers/app/options/options_test.go b/cmd/controllers/app/options/options_test.go index 8d46d10933e..7ce731f04a9 100644 --- a/cmd/controllers/app/options/options_test.go +++ b/cmd/controllers/app/options/options_test.go @@ -40,6 +40,7 @@ func TestAddFlags(t *testing.T) { KubeAPIBurst: 200, PrintVersion: false, WorkerThreads: defaultWorkers, + SchedulerName: defaultSchedulerName, } if !reflect.DeepEqual(expected, s) { diff --git a/cmd/controllers/app/server.go b/cmd/controllers/app/server.go index 9ac28618332..08732bd7da6 100644 --- a/cmd/controllers/app/server.go +++ b/cmd/controllers/app/server.go @@ -38,12 +38,12 @@ import ( "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" - kbver "volcano.sh/volcano/pkg/client/clientset/versioned" - "volcano.sh/volcano/cmd/controllers/app/options" + kbver "volcano.sh/volcano/pkg/client/clientset/versioned" vkclient "volcano.sh/volcano/pkg/client/clientset/versioned" "volcano.sh/volcano/pkg/controllers/garbagecollector" "volcano.sh/volcano/pkg/controllers/job" + "volcano.sh/volcano/pkg/controllers/podgroup" "volcano.sh/volcano/pkg/controllers/queue" ) @@ -88,11 +88,15 @@ func Run(opt *options.ServerOption) error { jobController := job.NewJobController(kubeClient, kbClient, vkClient, opt.WorkerThreads) queueController := queue.NewQueueController(kubeClient, kbClient) garbageCollector := garbagecollector.New(vkClient) + pgController := podgroup.NewPodgroupController(kubeClient, kbClient, opt.SchedulerName) run := func(ctx context.Context) { go jobController.Run(ctx.Done()) go queueController.Run(ctx.Done()) go garbageCollector.Run(ctx.Done()) + if opt.EnablePodgroupController { + go pgController.Run(ctx.Done()) + } <-ctx.Done() } diff --git a/hack/e2e-admission-config.yaml b/hack/e2e-admission-config.yaml index 663bf1acc1d..fde3701568e 100644 --- a/hack/e2e-admission-config.yaml +++ b/hack/e2e-admission-config.yaml @@ -42,3 +42,25 @@ webhooks: - CREATE resources: - jobs +--- +apiVersion: admissionregistration.k8s.io/v1beta1 +kind: ValidatingWebhookConfiguration +metadata: + name: validate-volcano-pod +webhooks: + - clientConfig: + caBundle: {{CA_BUNDLE}} + + # the url should agree with webhook service + url: https://{{host}}:{{hostPort}}/pods + failurePolicy: Ignore + name: validatepod.volcano.sh + rules: + - apiGroups: + - "" + apiVersions: + - "v1" + operations: + - CREATE + resources: + - pods diff --git a/hack/run-e2e-kind.sh b/hack/run-e2e-kind.sh index 8073e69d4a8..44fdc9d53ca 100755 --- a/hack/run-e2e-kind.sh +++ b/hack/run-e2e-kind.sh @@ -64,7 +64,9 @@ function install-volcano { kind load docker-image ${MPI_EXAMPLE_IMAGE} ${CLUSTER_CONTEXT} echo "Install volcano chart" - helm install installer/helm/chart/volcano --namespace kube-system --name ${CLUSTER_NAME} --kubeconfig ${KUBECONFIG} --set basic.image_tag_version=${TAG} --set basic.scheduler_config_file=kube-batch-ci.conf --wait + helm install installer/helm/chart/volcano --namespace kube-system --name ${CLUSTER_NAME} --kubeconfig ${KUBECONFIG} \ + --set basic.image_tag_version=${TAG} --set basic.scheduler_config_file=kube-batch-ci.conf --set basic.enable_podgroup_controller=true \ + --wait } function uninstall-volcano { diff --git a/installer/helm/chart/volcano/templates/admission-config.yaml b/installer/helm/chart/volcano/templates/admission-config.yaml index 3ff79d115e2..6cb34ee19ff 100644 --- a/installer/helm/chart/volcano/templates/admission-config.yaml +++ b/installer/helm/chart/volcano/templates/admission-config.yaml @@ -50,3 +50,29 @@ webhooks: - CREATE resources: - jobs +--- +apiVersion: admissionregistration.k8s.io/v1beta1 +kind: ValidatingWebhookConfiguration +metadata: + name: {{ .Release.Name }}-validate-pod + annotations: + "helm.sh/hook": pre-install + "helm.sh/hook-delete-policy": before-hook-creation +webhooks: + - clientConfig: + service: + name: {{ .Release.Name }}-admission-service + namespace: {{ .Release.Namespace }} + path: /pods + failurePolicy: Ignore + name: validatepod.volcano.sh + namespaceSelector: {} + rules: + - apiGroups: + - "" + apiVersions: + - "v1" + operations: + - CREATE + resources: + - pods diff --git a/installer/helm/chart/volcano/templates/admission.yaml b/installer/helm/chart/volcano/templates/admission.yaml index 436b6b34d18..06dbe6d3c89 100644 --- a/installer/helm/chart/volcano/templates/admission.yaml +++ b/installer/helm/chart/volcano/templates/admission.yaml @@ -32,6 +32,9 @@ rules: - apiGroups: ["scheduling.incubator.k8s.io"] resources: ["queues"] verbs: ["get", "list"] + - apiGroups: ["scheduling.incubator.k8s.io"] + resources: ["podgroups"] + verbs: ["get", "list", "watch"] --- kind: ClusterRoleBinding @@ -78,6 +81,7 @@ spec: - --ca-cert-file=/admission.local.config/certificates/ca.crt - --mutate-webhook-config-name={{ .Release.Name }}-mutate-job - --validate-webhook-config-name={{ .Release.Name }}-validate-job + - --validate-webhook-pod-config-name={{ .Release.Name }}-validate-pod - --alsologtostderr - --port=443 - -v=4 diff --git a/installer/helm/chart/volcano/templates/controllers.yaml b/installer/helm/chart/volcano/templates/controllers.yaml index 6ce0743edcf..480d9bea19f 100644 --- a/installer/helm/chart/volcano/templates/controllers.yaml +++ b/installer/helm/chart/volcano/templates/controllers.yaml @@ -91,6 +91,7 @@ spec: image: {{.Values.basic.controller_image_name}}:{{.Values.basic.image_tag_version}} args: - --alsologtostderr + - --enable-podgroup-controller={{.Values.basic.enable_podgroup_controller}} - -v=4 - 2>&1 imagePullPolicy: "IfNotPresent" diff --git a/installer/helm/chart/volcano/values.yaml b/installer/helm/chart/volcano/values.yaml index 481158d5691..175d48089ac 100644 --- a/installer/helm/chart/volcano/values.yaml +++ b/installer/helm/chart/volcano/values.yaml @@ -6,3 +6,4 @@ basic: admission_secret_name: "volcano-admission-secret" scheduler_config_file: "kube-batch.conf" image_pull_secret: "" + enable_podgroup_controller: "false" diff --git a/pkg/admission/admission_controller.go b/pkg/admission/admission_controller.go index bafca922774..f9e9cccf913 100644 --- a/pkg/admission/admission_controller.go +++ b/pkg/admission/admission_controller.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/util/validation/field" "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + kbver "volcano.sh/volcano/pkg/client/clientset/versioned" ) const ( @@ -38,11 +39,23 @@ const ( AdmitJobPath = "/jobs" //MutateJobPath is the pattern for the mutating jobs MutateJobPath = "/mutating-jobs" + //AdmitPodPath is the pattern for the pods admission + AdmitPodPath = "/pods" + //CONTENTTYPE http content-type + CONTENTTYPE = "Content-Type" + //APPLICATIONJSON json content + APPLICATIONJSON = "application/json" ) //The AdmitFunc returns response type AdmitFunc func(v1beta1.AdmissionReview) *v1beta1.AdmissionResponse +// Controller the Admission Controller type +type Controller struct { + KbClients kbver.Interface + SchedulerName string +} + var scheme = runtime.NewScheme() //Codecs is for retrieving serializers for the supported wire formats diff --git a/pkg/admission/admit_pod.go b/pkg/admission/admit_pod.go new file mode 100644 index 00000000000..5fa24c5faea --- /dev/null +++ b/pkg/admission/admit_pod.go @@ -0,0 +1,144 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package admission + +import ( + "fmt" + "net/http" + "strings" + + "github.com/golang/glog" + + "k8s.io/api/admission/v1beta1" + "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + vkbatchv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + kbtype "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1" +) + +// ServerPods is to server pods +func (c *Controller) ServerPods(w http.ResponseWriter, r *http.Request) { + Serve(w, r, c.AdmitPods) +} + +// AdmitPods is to admit pods and return response +func (c *Controller) AdmitPods(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { + + glog.V(3).Infof("admitting pods -- %s", ar.Request.Operation) + + pod, err := DecodePod(ar.Request.Object, ar.Request.Resource) + if err != nil { + return ToAdmissionResponse(err) + } + + var msg string + reviewResponse := v1beta1.AdmissionResponse{} + reviewResponse.Allowed = true + + switch ar.Request.Operation { + case v1beta1.Create: + msg = c.validatePod(pod, &reviewResponse) + break + default: + err := fmt.Errorf("expect operation to be 'CREATE'") + return ToAdmissionResponse(err) + } + + if !reviewResponse.Allowed { + reviewResponse.Result = &metav1.Status{Message: strings.TrimSpace(msg)} + } + return &reviewResponse +} + +// DecodePod decodes the pod using deserializer from the raw object +func DecodePod(object runtime.RawExtension, resource metav1.GroupVersionResource) (v1.Pod, error) { + podResource := metav1.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} + raw := object.Raw + pod := v1.Pod{} + + if resource != podResource { + err := fmt.Errorf("expect resource to be %s", podResource) + return pod, err + } + + deserializer := Codecs.UniversalDeserializer() + if _, _, err := deserializer.Decode(raw, nil, &pod); err != nil { + return pod, err + } + glog.V(3).Infof("the pod struct is %+v", pod) + + return pod, nil +} + +// allow pods to create when +// 1. pod.spec.schedulerName != volcano +// 2. Podgroup phase isn't Pending +// 3. normal pod, no podgroup +func (c *Controller) validatePod(pod v1.Pod, reviewResponse *v1beta1.AdmissionResponse) string { + if pod.Spec.SchedulerName != c.SchedulerName { + return "" + } + + pgName := "" + msg := "" + + // vk-job, SN == volcano + if pod.Annotations != nil { + pgName = pod.Annotations[kbtype.GroupNameAnnotationKey] + } + if pgName != "" { + if msg = c.checkPGPhase(pod, pgName, true); msg != "" { + reviewResponse.Allowed = false + } + return msg + } + + // normal pod, SN == volcano + pgName = getNormalPodPGName(pod) + if pgName != "" { + if msg = c.checkPGPhase(pod, pgName, false); msg != "" { + reviewResponse.Allowed = false + } + return msg + } + + return msg +} + +func (c *Controller) checkPGPhase(pod v1.Pod, pgName string, isVkJob bool) string { + if pg, err := c.KbClients.SchedulingV1alpha1().PodGroups(pod.Namespace).Get(pgName, metav1.GetOptions{}); err != nil { + if isVkJob || (!isVkJob && !apierrors.IsNotFound(err)) { + return fmt.Sprintf("Failed to get PodGroup for pod <%s/%s>: %v", pod.Namespace, pod.Name, err) + } + return "" + } else if pg.Status.Phase != kbtype.PodGroupPending { + return "" + } + return fmt.Sprintf("Failed to create pod for pod <%s/%s>, because the podgroup phase is Pending", + pod.Namespace, pod.Name) +} + +func getNormalPodPGName(pod v1.Pod) string { + if len(pod.OwnerReferences) == 0 { + return "" + } + + return vkbatchv1.PodgroupNamePrefix + string(pod.OwnerReferences[0].UID) +} diff --git a/pkg/admission/admit_pod_test.go b/pkg/admission/admit_pod_test.go new file mode 100644 index 00000000000..fb9c4b9ca11 --- /dev/null +++ b/pkg/admission/admit_pod_test.go @@ -0,0 +1,185 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package admission + +import ( + "strings" + "testing" + + "k8s.io/api/admission/v1beta1" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + kbv1aplha1 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1" + kubebatchclient "volcano.sh/volcano/pkg/client/clientset/versioned/fake" +) + +func TestValidatePod(t *testing.T) { + + namespace := "test" + pgName := "podgroup-p1" + + testCases := []struct { + Name string + Pod v1.Pod + ExpectErr bool + reviewResponse v1beta1.AdmissionResponse + ret string + disabledPG bool + }{ + // validate normal pod with default-scheduler + { + Name: "validate default normal pod", + Pod: v1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "normal-pod-1", + }, + Spec: v1.PodSpec{ + SchedulerName: "default-scheduler", + }, + }, + + reviewResponse: v1beta1.AdmissionResponse{Allowed: true}, + ret: "", + ExpectErr: false, + }, + // validate normal pod with volcano scheduler + { + Name: "validate volcano-scheduler normal pod", + Pod: v1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "normal-pod-2", + OwnerReferences: []metav1.OwnerReference{ + {UID: "p1"}, + }, + }, + Spec: v1.PodSpec{ + SchedulerName: "volcano", + }, + }, + + reviewResponse: v1beta1.AdmissionResponse{Allowed: false}, + ret: "Failed to create pod for pod , because the podgroup phase is Pending", + ExpectErr: true, + }, + // validate volcano pod with volcano scheduler + { + Name: "validate volcano-scheduler volcano pod", + Pod: v1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "volcano-pod-1", + Annotations: map[string]string{kbv1aplha1.GroupNameAnnotationKey: pgName}, + }, + Spec: v1.PodSpec{ + SchedulerName: "volcano", + }, + }, + + reviewResponse: v1beta1.AdmissionResponse{Allowed: false}, + ret: "Failed to create pod for pod , because the podgroup phase is Pending", + ExpectErr: true, + }, + // validate volcano pod with volcano scheduler when get pg failed + { + Name: "validate volcano volcano pod when get pg failed", + Pod: v1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "volcano-pod-2", + Annotations: map[string]string{kbv1aplha1.GroupNameAnnotationKey: pgName}, + }, + Spec: v1.PodSpec{ + SchedulerName: "volcano", + }, + }, + + reviewResponse: v1beta1.AdmissionResponse{Allowed: false}, + ret: `Failed to get PodGroup for pod : podgroups.scheduling "podgroup-p1" not found`, + ExpectErr: true, + disabledPG: true, + }, + } + + for _, testCase := range testCases { + + pg := &kbv1aplha1.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "podgroup-p1", + }, + Spec: kbv1aplha1.PodGroupSpec{ + MinMember: 1, + }, + Status: kbv1aplha1.PodGroupStatus{ + Phase: kbv1aplha1.PodGroupPending, + }, + } + + // create fake kube-batch clientset + KubeBatchClientSet = kubebatchclient.NewSimpleClientset() + + if !testCase.disabledPG { + _, err := KubeBatchClientSet.SchedulingV1alpha1().PodGroups(namespace).Create(pg) + if err != nil { + t.Error("PG Creation Failed") + } + } + + c := Controller{ + KbClients: KubeBatchClientSet, + SchedulerName: "volcano", + } + + ret := c.validatePod(testCase.Pod, &testCase.reviewResponse) + + if testCase.ExpectErr == true && ret == "" { + t.Errorf("%s: test case Expect error msg :%s, but got nil.", testCase.Name, testCase.ret) + } + if testCase.ExpectErr == true && testCase.reviewResponse.Allowed != false { + t.Errorf("%s: test case Expect Allowed as false but got true.", testCase.Name) + } + if testCase.ExpectErr == true && !strings.Contains(ret, testCase.ret) { + t.Errorf("%s: test case Expect error msg :%s, but got diff error %v", testCase.Name, testCase.ret, ret) + } + + if testCase.ExpectErr == false && ret != "" { + t.Errorf("%s: test case Expect no error, but got error %v", testCase.Name, ret) + } + if testCase.ExpectErr == false && testCase.reviewResponse.Allowed != true { + t.Errorf("%s: test case Expect Allowed as true but got false. %v", testCase.Name, testCase.reviewResponse) + } + } +} diff --git a/pkg/admission/server.go b/pkg/admission/server.go new file mode 100644 index 00000000000..4c3fa15e88c --- /dev/null +++ b/pkg/admission/server.go @@ -0,0 +1,77 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package admission + +import ( + "encoding/json" + "io/ioutil" + "net/http" + + "github.com/golang/glog" + + "k8s.io/api/admission/v1beta1" + "k8s.io/apimachinery/pkg/runtime" +) + +// Serve the http request +func Serve(w http.ResponseWriter, r *http.Request, admit AdmitFunc) { + var body []byte + if r.Body != nil { + if data, err := ioutil.ReadAll(r.Body); err == nil { + body = data + } + } + + // verify the content type is accurate + contentType := r.Header.Get(CONTENTTYPE) + if contentType != APPLICATIONJSON { + glog.Errorf("contentType=%s, expect application/json", contentType) + return + } + + var reviewResponse *v1beta1.AdmissionResponse + ar := v1beta1.AdmissionReview{} + deserializer := Codecs.UniversalDeserializer() + if _, _, err := deserializer.Decode(body, nil, &ar); err != nil { + reviewResponse = ToAdmissionResponse(err) + } else { + reviewResponse = admit(ar) + } + glog.V(3).Infof("sending response: %v", reviewResponse) + + response := createResponse(reviewResponse, &ar) + resp, err := json.Marshal(response) + if err != nil { + glog.Error(err) + } + if _, err := w.Write(resp); err != nil { + glog.Error(err) + } +} + +func createResponse(reviewResponse *v1beta1.AdmissionResponse, ar *v1beta1.AdmissionReview) v1beta1.AdmissionReview { + response := v1beta1.AdmissionReview{} + if reviewResponse != nil { + response.Response = reviewResponse + response.Response.UID = ar.Request.UID + } + // reset the Object and OldObject, they are not needed in a response. + ar.Request.Object = runtime.RawExtension{} + ar.Request.OldObject = runtime.RawExtension{} + + return response +} diff --git a/pkg/apis/batch/v1alpha1/job.go b/pkg/apis/batch/v1alpha1/job.go index 5895b1830c5..6e06459ce02 100644 --- a/pkg/apis/batch/v1alpha1/job.go +++ b/pkg/apis/batch/v1alpha1/job.go @@ -238,8 +238,6 @@ const ( Terminated JobPhase = "Terminated" // Failed is the phase that the job is restarted failed reached the maximum number of retries. Failed JobPhase = "Failed" - // Inqueue is the phase that cluster have idle resource to schedule the job - Inqueue JobPhase = "Inqueue" ) // JobState contains details for the current state of the job. diff --git a/pkg/apis/batch/v1alpha1/labels.go b/pkg/apis/batch/v1alpha1/labels.go index 78e01d8a7a6..788e7fb4bd1 100644 --- a/pkg/apis/batch/v1alpha1/labels.go +++ b/pkg/apis/batch/v1alpha1/labels.go @@ -29,4 +29,6 @@ const ( JobVersion = "volcano.sh/job-version" // JobTypeKey job type key used in labels JobTypeKey = "volcano.sh/job-type" + // PodgroupNamePrefix podgroup name prefix + PodgroupNamePrefix = "podgroup-" ) diff --git a/pkg/controllers/job/helpers/helpers.go b/pkg/controllers/job/helpers/helpers.go index 905376e31c8..369c66153cc 100644 --- a/pkg/controllers/job/helpers/helpers.go +++ b/pkg/controllers/job/helpers/helpers.go @@ -18,10 +18,12 @@ package helpers import ( "fmt" - "k8s.io/api/core/v1" "math/rand" "strings" "time" + + "k8s.io/api/core/v1" + "volcano.sh/volcano/pkg/controllers/apis" ) diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index 794e7896b0b..c4432a4573d 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -195,7 +195,6 @@ func NewJobController( // Register actions state.SyncJob = cc.syncJob state.KillJob = cc.killJob - state.CreateJob = cc.createJob return cc } diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index 8d4b1569582..0729a4d0a77 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -142,58 +142,35 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseM return nil } -func (cc *Controller) createJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error { - glog.V(3).Infof("Starting to create Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name) - defer glog.V(3).Infof("Finished Job <%s/%s> create", jobInfo.Job.Namespace, jobInfo.Job.Name) - - job := jobInfo.Job.DeepCopy() - glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name) - - job, err := cc.initJobStatus(job) - if err != nil { +func (cc *Controller) createJob(job *vkv1.Job) (bool, error) { + if update, err := cc.initJobStatus(job); err != nil { cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.JobStatusError), fmt.Sprintf("Failed to initialize job status, err: %v", err)) - return err + return false, err + } else if update { + return true, nil } if err := cc.pluginOnJobAdd(job); err != nil { cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PluginError), fmt.Sprintf("Execute plugin when job add failed, err: %v", err)) - return err + return false, err } if err := cc.createPodGroupIfNotExist(job); err != nil { cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PodGroupError), fmt.Sprintf("Failed to create PodGroup, err: %v", err)) - return err + return false, err } - newJob, err := cc.createJobIOIfNotExist(job) + job, err := cc.createJobIOIfNotExist(job) if err != nil { cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PVCError), fmt.Sprintf("Failed to create PVC, err: %v", err)) - return err - } - - if updateStatus != nil { - if updateStatus(&newJob.Status) { - newJob.Status.State.LastTransitionTime = metav1.Now() - } - } - - newJob2, err := cc.vkClients.BatchV1alpha1().Jobs(newJob.Namespace).UpdateStatus(newJob) - if err != nil { - glog.Errorf("Failed to update status of Job %v/%v: %v", - job.Namespace, job.Name, err) - return err - } - if err = cc.cache.Update(newJob2); err != nil { - glog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v", - newJob2.Namespace, newJob2.Name, err) - return err + return false, err } - return nil + return false, nil } func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error { @@ -211,6 +188,12 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt var running, pending, terminating, succeeded, failed, unknown int32 + if update, err := cc.createJob(job); err != nil { + return err + } else if update { + return nil + } + var podToCreate []*v1.Pod var podToDelete []*v1.Pod var creationErrs []error @@ -521,9 +504,9 @@ func (cc *Controller) calcPGMinResources(job *vkv1.Job) *v1.ResourceList { return &minAvailableTasksRes } -func (cc *Controller) initJobStatus(job *vkv1.Job) (*vkv1.Job, error) { +func (cc *Controller) initJobStatus(job *vkv1.Job) (bool, error) { if job.Status.State.Phase != "" { - return job, nil + return false, nil } job.Status.State.Phase = vkv1.Pending @@ -532,15 +515,15 @@ func (cc *Controller) initJobStatus(job *vkv1.Job) (*vkv1.Job, error) { if err != nil { glog.Errorf("Failed to update status of Job %v/%v: %v", job.Namespace, job.Name, err) - return nil, err + return false, err } if err := cc.cache.Update(newJob); err != nil { glog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v", - newJob.Namespace, newJob.Name, err) - return nil, err + job.Namespace, job.Name, err) + return false, err } - return newJob, nil + return true, nil } func classifyAndAddUpPodBaseOnPhase(pod *v1.Pod, pending, running, succeeded, failed, unknown *int32) { diff --git a/pkg/controllers/job/job_controller_actions_test.go b/pkg/controllers/job/job_controller_actions_test.go index 3372296becc..2353c73840d 100644 --- a/pkg/controllers/job/job_controller_actions_test.go +++ b/pkg/controllers/job/job_controller_actions_test.go @@ -160,116 +160,6 @@ func TestKillJobFunc(t *testing.T) { } } -func TestCreateJobFunc(t *testing.T) { - namespace := "test" - - testcases := []struct { - Name string - Job *v1alpha1.Job - PodGroup *kbv1aplha1.PodGroup - UpdateStatus state.UpdateStatusFn - JobInfo *apis.JobInfo - Plugins []string - ExpextVal error - }{ - { - Name: "CreateJob success Case", - Job: &v1alpha1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: "job1", - Namespace: namespace, - }, - Status: v1alpha1.JobStatus{ - State: v1alpha1.JobState{ - Phase: v1alpha1.Pending, - }, - }, - }, - PodGroup: &kbv1aplha1.PodGroup{ - ObjectMeta: metav1.ObjectMeta{ - Name: "job1", - Namespace: namespace, - }, - }, - UpdateStatus: nil, - JobInfo: &apis.JobInfo{ - Namespace: namespace, - Name: "jobinfo1", - }, - Plugins: []string{"svc", "ssh", "env"}, - ExpextVal: nil, - }, - } - - for i, testcase := range testcases { - - fakeController := newFakeController() - jobPlugins := make(map[string][]string) - - for _, plugin := range testcase.Plugins { - jobPlugins[plugin] = make([]string, 0) - } - testcase.JobInfo.Job = testcase.Job - testcase.JobInfo.Job.Spec.Plugins = jobPlugins - - _, err := fakeController.vkClients.BatchV1alpha1().Jobs(namespace).Create(testcase.Job) - if err != nil { - t.Errorf("Case %d (%s): expected: No Error, but got error %s", i, testcase.Name, err.Error()) - } - - err = fakeController.cache.Add(testcase.Job) - if err != nil { - t.Error("Error While Adding Job in cache") - } - - err = fakeController.createJob(testcase.JobInfo, testcase.UpdateStatus) - if err != nil { - t.Errorf("Case %d (%s): expected: No Error, but got error %s", i, testcase.Name, err.Error()) - } - - job, err := fakeController.vkClients.BatchV1alpha1().Jobs(namespace).Get(testcase.Job.Name, metav1.GetOptions{}) - if err != nil { - t.Errorf("Case %d (%s): expected: No Error, but got error %s", i, testcase.Name, err.Error()) - } - for _, plugin := range testcase.Plugins { - - if plugin == "svc" { - _, err = fakeController.kubeClients.CoreV1().Services(namespace).Get(testcase.Job.Name, metav1.GetOptions{}) - if err != nil { - t.Errorf("Case %d (%s): expected: Service to be created, but not created because of error %s", i, testcase.Name, err.Error()) - } - - _, err = fakeController.kubeClients.CoreV1().ConfigMaps(namespace).Get(fmt.Sprint(testcase.Job.Name, "-svc"), metav1.GetOptions{}) - if err != nil { - t.Errorf("Case %d (%s): expected: Service to be created, but not created because of error %s", i, testcase.Name, err.Error()) - } - - exist := job.Status.ControlledResources["plugin-svc"] - if exist == "" { - t.Errorf("Case %d (%s): expected: ControlledResources should be added, but not got added", i, testcase.Name) - } - } - - if plugin == "ssh" { - _, err := fakeController.kubeClients.CoreV1().ConfigMaps(namespace).Get(fmt.Sprint(testcase.Job.Name, "-ssh"), metav1.GetOptions{}) - if err != nil { - t.Errorf("Case %d (%s): expected: ConfigMap to be created, but not created because of error %s", i, testcase.Name, err.Error()) - } - exist := job.Status.ControlledResources["plugin-ssh"] - if exist == "" { - t.Errorf("Case %d (%s): expected: ControlledResources should be added, but not got added", i, testcase.Name) - } - } - if plugin == "env" { - exist := job.Status.ControlledResources["plugin-env"] - if exist == "" { - t.Errorf("Case %d (%s): expected: ControlledResources should be added, but not got added", i, testcase.Name) - } - } - } - } -} - func TestSyncJobFunc(t *testing.T) { namespace := "test" @@ -313,6 +203,11 @@ func TestSyncJobFunc(t *testing.T) { }, }, }, + Status: v1alpha1.JobStatus{ + State: v1alpha1.JobState{ + Phase: v1alpha1.Pending, + }, + }, }, PodGroup: &kbv1aplha1.PodGroup{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index f3f2d548d15..506fb326bda 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -383,7 +383,7 @@ func (cc *Controller) updatePodGroup(oldObj, newObj interface{}) { } _, err := cc.cache.Get(vkcache.JobKeyByName(newPG.Namespace, newPG.Name)) - if err != nil { + if err != nil && newPG.Annotations != nil { glog.Warningf( "Failed to find job in cache by PodGroup, this may not be a PodGroup for volcano job.") } @@ -396,8 +396,6 @@ func (cc *Controller) updatePodGroup(oldObj, newObj interface{}) { switch newPG.Status.Phase { case kbtype.PodGroupUnknown: req.Event = vkbatchv1.JobUnknownEvent - case kbtype.PodGroupInqueue: - req.Action = vkbatchv1.EnqueueAction } key := vkjobhelpers.GetJobKeyByReq(&req) queue := cc.getWorkerQueue(key) diff --git a/pkg/controllers/job/job_state_test.go b/pkg/controllers/job/job_state_test.go index bb47f2fb28a..f4dc2b6a4f4 100644 --- a/pkg/controllers/job/job_state_test.go +++ b/pkg/controllers/job/job_state_test.go @@ -391,306 +391,6 @@ func TestFinishedState_Execute(t *testing.T) { } } -func TestInqueueState_Execute(t *testing.T) { - namespace := "test" - - testcases := []struct { - Name string - JobInfo *apis.JobInfo - Action v1alpha1.Action - ExpectedVal error - }{ - { - Name: "InqueueState- RestartJobAction case With terminating pod count equal to zero", - JobInfo: &apis.JobInfo{ - Namespace: namespace, - Name: "jobinfo1", - Job: &v1alpha1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: "Job1", - Namespace: namespace, - }, - Status: v1alpha1.JobStatus{ - State: v1alpha1.JobState{ - Phase: v1alpha1.Inqueue, - }, - }, - }, - }, - Action: v1alpha1.RestartJobAction, - ExpectedVal: nil, - }, - { - Name: "InqueueState- RestartJobAction case With terminating pod count not equal to zero", - JobInfo: &apis.JobInfo{ - Namespace: namespace, - Name: "jobinfo1", - Job: &v1alpha1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: "Job1", - Namespace: namespace, - }, - Status: v1alpha1.JobStatus{ - Terminating: 2, - State: v1alpha1.JobState{ - Phase: v1alpha1.Inqueue, - }, - }, - }, - Pods: map[string]map[string]*v1.Pod{ - "task1": { - "pod1": buildPod(namespace, "pod1", v1.PodRunning, nil), - "pod2": buildPod(namespace, "pod2", v1.PodRunning, nil), - }, - }, - }, - Action: v1alpha1.RestartJobAction, - ExpectedVal: nil, - }, - { - Name: "InqueueState- AbortJobAction case With terminating pod count equal to zero", - JobInfo: &apis.JobInfo{ - Namespace: namespace, - Name: "jobinfo1", - Job: &v1alpha1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: "Job1", - Namespace: namespace, - }, - Status: v1alpha1.JobStatus{ - State: v1alpha1.JobState{ - Phase: v1alpha1.Inqueue, - }, - }, - }, - }, - Action: v1alpha1.AbortJobAction, - ExpectedVal: nil, - }, - { - Name: "InqueueState- AbortJobAction case With terminating pod count not equal to zero", - JobInfo: &apis.JobInfo{ - Namespace: namespace, - Name: "jobinfo1", - Job: &v1alpha1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: "Job1", - Namespace: namespace, - }, - Status: v1alpha1.JobStatus{ - Terminating: 2, - State: v1alpha1.JobState{ - Phase: v1alpha1.Inqueue, - }, - }, - }, - Pods: map[string]map[string]*v1.Pod{ - "task1": { - "pod1": buildPod(namespace, "pod1", v1.PodRunning, nil), - "pod2": buildPod(namespace, "pod2", v1.PodRunning, nil), - }, - }, - }, - Action: v1alpha1.AbortJobAction, - ExpectedVal: nil, - }, - { - Name: "InqueueState- TerminateJobAction case With terminating pod count not equal to zero", - JobInfo: &apis.JobInfo{ - Namespace: namespace, - Name: "jobinfo1", - Job: &v1alpha1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: "Job1", - Namespace: namespace, - }, - Status: v1alpha1.JobStatus{ - Terminating: 2, - State: v1alpha1.JobState{ - Phase: v1alpha1.Inqueue, - }, - }, - }, - Pods: map[string]map[string]*v1.Pod{ - "task1": { - "pod1": buildPod(namespace, "pod1", v1.PodRunning, nil), - "pod2": buildPod(namespace, "pod2", v1.PodRunning, nil), - }, - }, - }, - Action: v1alpha1.TerminateJobAction, - ExpectedVal: nil, - }, - { - Name: "InqueueState- CompleteJobAction case With terminating pod count equal to zero", - JobInfo: &apis.JobInfo{ - Namespace: namespace, - Name: "jobinfo1", - Job: &v1alpha1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: "Job1", - Namespace: namespace, - }, - Status: v1alpha1.JobStatus{ - State: v1alpha1.JobState{ - Phase: v1alpha1.Inqueue, - }, - }, - }, - }, - Action: v1alpha1.CompleteJobAction, - ExpectedVal: nil, - }, - { - Name: "InqueueState- CompleteJobAction case With terminating pod count not equal to zero", - JobInfo: &apis.JobInfo{ - Namespace: namespace, - Name: "jobinfo1", - Job: &v1alpha1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: "Job1", - Namespace: namespace, - }, - Status: v1alpha1.JobStatus{ - Terminating: 2, - State: v1alpha1.JobState{ - Phase: v1alpha1.Inqueue, - }, - }, - }, - Pods: map[string]map[string]*v1.Pod{ - "task1": { - "pod1": buildPod(namespace, "pod1", v1.PodRunning, nil), - "pod2": buildPod(namespace, "pod2", v1.PodRunning, nil), - }, - }, - }, - Action: v1alpha1.CompleteJobAction, - ExpectedVal: nil, - }, - { - Name: "InqueueState- Default case With Min Available equal to running pods", - JobInfo: &apis.JobInfo{ - Namespace: namespace, - Name: "jobinfo1", - Job: &v1alpha1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: "Job1", - Namespace: namespace, - }, - Spec: v1alpha1.JobSpec{ - MinAvailable: 3, - }, - Status: v1alpha1.JobStatus{ - Running: 3, - State: v1alpha1.JobState{ - Phase: v1alpha1.Inqueue, - }, - }, - }, - Pods: map[string]map[string]*v1.Pod{ - "task1": { - "pod1": buildPod(namespace, "pod1", v1.PodRunning, nil), - "pod2": buildPod(namespace, "pod2", v1.PodRunning, nil), - "pod3": buildPod(namespace, "pod3", v1.PodRunning, nil), - }, - }, - }, - Action: v1alpha1.ResumeJobAction, - ExpectedVal: nil, - }, - { - Name: "InqueueState- Default case With Min Available not equal to running pods", - JobInfo: &apis.JobInfo{ - Namespace: namespace, - Name: "jobinfo1", - Job: &v1alpha1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: "Job1", - Namespace: namespace, - }, - Spec: v1alpha1.JobSpec{ - MinAvailable: 3, - }, - Status: v1alpha1.JobStatus{ - Running: 2, - State: v1alpha1.JobState{ - Phase: v1alpha1.Inqueue, - }, - }, - }, - Pods: map[string]map[string]*v1.Pod{ - "task1": { - "pod1": buildPod(namespace, "pod1", v1.PodRunning, nil), - "pod2": buildPod(namespace, "pod2", v1.PodRunning, nil), - }, - }, - }, - Action: v1alpha1.ResumeJobAction, - ExpectedVal: nil, - }, - } - - for i, testcase := range testcases { - testState := state.NewState(testcase.JobInfo) - - fakecontroller := newFakeController() - state.KillJob = fakecontroller.killJob - - _, err := fakecontroller.vkClients.BatchV1alpha1().Jobs(namespace).Create(testcase.JobInfo.Job) - if err != nil { - t.Error("Error while creating Job") - } - - err = fakecontroller.cache.Add(testcase.JobInfo.Job) - if err != nil { - t.Error("Error while adding Job in cache") - } - - err = testState.Execute(testcase.Action) - if err != nil { - t.Errorf("Expected Error not to occur but got: %s", err) - } - - jobInfo, err := fakecontroller.cache.Get(fmt.Sprintf("%s/%s", testcase.JobInfo.Job.Namespace, testcase.JobInfo.Job.Name)) - if err != nil { - t.Error("Error while retrieving value from Cache") - } - - if testcase.Action == v1alpha1.RestartJobAction { - // always jump to restarting firstly - if jobInfo.Job.Status.State.Phase != v1alpha1.Restarting { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Restarting, jobInfo.Job.Status.State.Phase, i) - } - } else if testcase.Action == v1alpha1.AbortJobAction { - // always jump to aborting firstly - if jobInfo.Job.Status.State.Phase != v1alpha1.Aborting { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Aborting, jobInfo.Job.Status.State.Phase, i) - } - } else if testcase.Action == v1alpha1.TerminateJobAction { - // always jump to terminating firstly - if jobInfo.Job.Status.State.Phase != v1alpha1.Terminating { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Terminating, jobInfo.Job.Status.State.Phase, i) - } - } else if testcase.Action == v1alpha1.CompleteJobAction { - // always jump to completing firstly - if jobInfo.Job.Status.State.Phase != v1alpha1.Completing { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Completing, jobInfo.Job.Status.State.Phase, i) - } - } else { - if jobInfo.Job.Spec.MinAvailable <= jobInfo.Job.Status.Running+jobInfo.Job.Status.Succeeded+jobInfo.Job.Status.Failed { - if jobInfo.Job.Status.State.Phase != v1alpha1.Running { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Running, jobInfo.Job.Status.State.Phase, i) - } - } else { - if jobInfo.Job.Status.State.Phase != testcase.JobInfo.Job.Status.State.Phase { - t.Errorf("Expected Job phase to %s, but got %s in case %d", testcase.JobInfo.Job.Status.State.Phase, jobInfo.Job.Status.State.Phase, i) - } - } - } - } -} - func TestPendingState_Execute(t *testing.T) { namespace := "test" @@ -1012,10 +712,6 @@ func TestPendingState_Execute(t *testing.T) { if jobInfo.Job.Status.State.Phase != v1alpha1.Running { t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Running, jobInfo.Job.Status.State.Phase, i) } - } else { - if jobInfo.Job.Status.State.Phase != v1alpha1.Inqueue { - t.Errorf("Expected Job phase to %s, but got %s in case %d", v1alpha1.Inqueue, jobInfo.Job.Status.State.Phase, i) - } } } else { if jobInfo.Job.Status.State.Phase != v1alpha1.Pending { diff --git a/pkg/controllers/job/state/factory.go b/pkg/controllers/job/state/factory.go index aaffbaff812..f249f125253 100644 --- a/pkg/controllers/job/state/factory.go +++ b/pkg/controllers/job/state/factory.go @@ -49,8 +49,6 @@ var ( SyncJob ActionFn // KillJob kill all Pods of Job with phase not in podRetainPhase. KillJob KillActionFn - // CreateJob will prepare to create Job. - CreateJob ActionFn ) //State interface @@ -79,8 +77,6 @@ func NewState(jobInfo *apis.JobInfo) State { return &abortedState{job: jobInfo} case vkv1.Completing: return &completingState{job: jobInfo} - case vkv1.Inqueue: - return &inqueueState{job: jobInfo} } // It's pending by default. diff --git a/pkg/controllers/job/state/inqueue.go b/pkg/controllers/job/state/inqueue.go deleted file mode 100644 index b8d6d5805cc..00000000000 --- a/pkg/controllers/job/state/inqueue.go +++ /dev/null @@ -1,61 +0,0 @@ -/* -Copyright 2019 The Volcano Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package state - -import ( - vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" - "volcano.sh/volcano/pkg/controllers/apis" -) - -type inqueueState struct { - job *apis.JobInfo -} - -func (ps *inqueueState) Execute(action vkv1.Action) error { - switch action { - case vkv1.RestartJobAction: - return KillJob(ps.job, PodRetainPhaseNone, func(status *vkv1.JobStatus) bool { - status.State.Phase = vkv1.Restarting - status.RetryCount++ - return true - }) - - case vkv1.AbortJobAction: - return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { - status.State.Phase = vkv1.Aborting - return true - }) - case vkv1.CompleteJobAction: - return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { - status.State.Phase = vkv1.Completing - return true - }) - case vkv1.TerminateJobAction: - return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { - status.State.Phase = vkv1.Terminating - return true - }) - default: - return SyncJob(ps.job, func(status *vkv1.JobStatus) bool { - if ps.job.Job.Spec.MinAvailable <= status.Running+status.Succeeded+status.Failed { - status.State.Phase = vkv1.Running - return true - } - return false - }) - } -} diff --git a/pkg/controllers/job/state/pending.go b/pkg/controllers/job/state/pending.go index b3c63396fd9..4ccd4965427 100644 --- a/pkg/controllers/job/state/pending.go +++ b/pkg/controllers/job/state/pending.go @@ -49,21 +49,14 @@ func (ps *pendingState) Execute(action vkv1.Action) error { status.State.Phase = vkv1.Terminating return true }) - case vkv1.EnqueueAction: + default: return SyncJob(ps.job, func(status *vkv1.JobStatus) bool { - phase := vkv1.Inqueue - + phase := vkv1.Pending if ps.job.Job.Spec.MinAvailable <= status.Running+status.Succeeded+status.Failed { phase = vkv1.Running } - status.State.Phase = phase return true }) - default: - return CreateJob(ps.job, func(status *vkv1.JobStatus) bool { - status.State.Phase = vkv1.Pending - return true - }) } } diff --git a/pkg/controllers/podgroup/pg_controller.go b/pkg/controllers/podgroup/pg_controller.go new file mode 100644 index 00000000000..35d4bf10fb9 --- /dev/null +++ b/pkg/controllers/podgroup/pg_controller.go @@ -0,0 +1,147 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podgroup + +import ( + "github.com/golang/glog" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + + kbv1 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1" + kbver "volcano.sh/volcano/pkg/client/clientset/versioned" + kbinfoext "volcano.sh/volcano/pkg/client/informers/externalversions" + kbinfo "volcano.sh/volcano/pkg/client/informers/externalversions/scheduling/v1alpha1" + kblister "volcano.sh/volcano/pkg/client/listers/scheduling/v1alpha1" +) + +// Controller the Podgroup Controller type +type Controller struct { + kubeClients kubernetes.Interface + kbClients kbver.Interface + + podInformer coreinformers.PodInformer + pgInformer kbinfo.PodGroupInformer + sharedInformers informers.SharedInformerFactory + + // A store of pods + podLister corelisters.PodLister + podSynced func() bool + + // A store of podgroups + pgLister kblister.PodGroupLister + pgSynced func() bool + + queue workqueue.RateLimitingInterface +} + +// NewPodgroupController create new Podgroup Controller +func NewPodgroupController( + kubeClient kubernetes.Interface, + kbClient kbver.Interface, + schedulerName string, +) *Controller { + cc := &Controller{ + kubeClients: kubeClient, + kbClients: kbClient, + + queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + } + + cc.sharedInformers = informers.NewSharedInformerFactory(cc.kubeClients, 0) + cc.podInformer = cc.sharedInformers.Core().V1().Pods() + cc.podLister = cc.podInformer.Lister() + cc.podSynced = cc.podInformer.Informer().HasSynced + cc.podInformer.Informer().AddEventHandler( + cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + switch obj.(type) { + case *v1.Pod: + pod := obj.(*v1.Pod) + if pod.Spec.SchedulerName == schedulerName && len(pod.OwnerReferences) != 0 && + (pod.Annotations == nil || pod.Annotations[kbv1.GroupNameAnnotationKey] == "") { + return true + } + return false + default: + return false + } + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: cc.addPod, + }, + }) + + cc.pgInformer = kbinfoext.NewSharedInformerFactory(cc.kbClients, 0).Scheduling().V1alpha1().PodGroups() + cc.pgLister = cc.pgInformer.Lister() + cc.pgSynced = cc.pgInformer.Informer().HasSynced + + return cc +} + +// Run start NewPodgroupController +func (cc *Controller) Run(stopCh <-chan struct{}) { + go cc.sharedInformers.Start(stopCh) + go cc.podInformer.Informer().Run(stopCh) + go cc.pgInformer.Informer().Run(stopCh) + + cache.WaitForCacheSync(stopCh, cc.podSynced, cc.pgSynced) + + go wait.Until(cc.worker, 0, stopCh) + + glog.Infof("PodgroupController is running ...... ") +} + +func (cc *Controller) worker() { + for cc.processNextReq() { + } +} + +func (cc *Controller) processNextReq() bool { + obj, shutdown := cc.queue.Get() + if shutdown { + glog.Errorf("Fail to pop item from queue") + return false + } + + req := obj.(podRequest) + defer cc.queue.Done(req) + + pod, err := cc.podLister.Pods(req.pod.Namespace).Get(req.pod.Name) + if err != nil { + glog.Errorf("Failed to get pod by <%v> from cache: %v", req, err) + return true + } + + // normal pod use kube-batch + if err := cc.createNormalPodPGIfNotExist(pod); err != nil { + glog.Errorf("Failed to handle Pod <%s/%s>: %v", pod.Namespace, pod.Name, err) + cc.queue.AddRateLimited(req) + return true + } + + // If no error, forget it. + cc.queue.Forget(req) + + return true +} diff --git a/pkg/controllers/podgroup/pg_controller_handler.go b/pkg/controllers/podgroup/pg_controller_handler.go new file mode 100644 index 00000000000..2028ba52c17 --- /dev/null +++ b/pkg/controllers/podgroup/pg_controller_handler.go @@ -0,0 +1,95 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podgroup + +import ( + "github.com/golang/glog" + + "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + vkbatchv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + kbv1 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1" +) + +type podRequest struct { + pod *v1.Pod +} + +func (cc *Controller) addPod(obj interface{}) { + pod, ok := obj.(*v1.Pod) + if !ok { + glog.Errorf("Failed to convert %v to v1.Pod", obj) + return + } + + req := podRequest{ + pod: pod, + } + + cc.queue.Add(req) +} + +func (cc *Controller) updatePodAnnotations(pod *v1.Pod, pgName string) error { + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + if pod.Annotations[kbv1.GroupNameAnnotationKey] == "" { + pod.Annotations[kbv1.GroupNameAnnotationKey] = pgName + } else { + return nil + } + + if _, err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Update(pod); err != nil { + glog.Errorf("Failed to update pod <%s/%s>: %v", pod.Namespace, pod.Name, err) + return err + } + + return nil +} + +func (cc *Controller) createNormalPodPGIfNotExist(pod *v1.Pod) error { + pgName := vkbatchv1.PodgroupNamePrefix + string(pod.OwnerReferences[0].UID) + + if _, err := cc.pgLister.PodGroups(pod.Namespace).Get(pgName); err != nil { + if !apierrors.IsNotFound(err) { + glog.Errorf("Failed to get normal PodGroup for Pod <%s/%s>: %v", + pod.Namespace, pod.Name, err) + return err + } + + pg := &kbv1.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: pod.Namespace, + Name: pgName, + OwnerReferences: pod.OwnerReferences, + }, + Spec: kbv1.PodGroupSpec{ + MinMember: 1, + }, + } + + if _, err := cc.kbClients.SchedulingV1alpha1().PodGroups(pod.Namespace).Create(pg); err != nil { + glog.Errorf("Failed to create normal PodGroup for Pod <%s/%s>: %v", + pod.Namespace, pod.Name, err) + return err + } + } + + return cc.updatePodAnnotations(pod, pgName) +} diff --git a/pkg/controllers/podgroup/pg_controller_test.go b/pkg/controllers/podgroup/pg_controller_test.go new file mode 100644 index 00000000000..b7da1d8a05a --- /dev/null +++ b/pkg/controllers/podgroup/pg_controller_test.go @@ -0,0 +1,78 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podgroup + +import ( + "testing" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeclient "k8s.io/client-go/kubernetes/fake" + + kbv1alpha1 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1" + kubebatchclient "volcano.sh/volcano/pkg/client/clientset/versioned/fake" +) + +func newFakeController() *Controller { + KubeClientSet := kubeclient.NewSimpleClientset() + KubeBatchClientSet := kubebatchclient.NewSimpleClientset() + + controller := NewPodgroupController(KubeClientSet, KubeBatchClientSet, "volcano") + return controller +} + +func TestAddPodgroup(t *testing.T) { + namespace := "test" + + testCases := []struct { + Name string + pod *v1.Pod + ExpectValue string + }{ + { + Name: "AddPodgroup", + pod: &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: namespace, + OwnerReferences: []metav1.OwnerReference{ + {UID: "p1"}, + }, + }, + }, + ExpectValue: "podgroup-p1", + }, + } + + for i, testcase := range testCases { + c := newFakeController() + + pod, _ := c.kubeClients.CoreV1().Pods(testcase.pod.Namespace).Create(testcase.pod) + + c.addPod(pod) + c.createNormalPodPGIfNotExist(pod) + + podAnno := pod.Annotations[kbv1alpha1.GroupNameAnnotationKey] + if testcase.ExpectValue != podAnno { + t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectValue, podAnno) + } + } +} diff --git a/test/e2e/admission.go b/test/e2e/admission.go index 968240ac82e..8f60a695a1b 100644 --- a/test/e2e/admission.go +++ b/test/e2e/admission.go @@ -19,7 +19,10 @@ package e2e import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1" + + kbv1 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1" ) var _ = Describe("Job E2E Test: Test Admission service", func() { @@ -51,4 +54,74 @@ var _ = Describe("Job E2E Test: Test Admission service", func() { "Job queue attribute would default to 'default' ") }) + It("Create default-scheduler pod", func() { + podName := "pod-default-scheduler" + namespace := "test" + context := initTestContext() + defer cleanupTestContext(context) + + pod := &corev1.Pod{ + TypeMeta: v1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: v1.ObjectMeta{ + Namespace: namespace, + Name: podName, + }, + Spec: corev1.PodSpec{ + Containers: createContainers(defaultNginxImage, "", "", oneCPU, oneCPU, 0), + }, + } + + _, err := context.kubeclient.CoreV1().Pods(namespace).Create(pod) + Expect(err).NotTo(HaveOccurred()) + + err = waitPodPhase(context, pod, []corev1.PodPhase{corev1.PodRunning}) + Expect(err).NotTo(HaveOccurred()) + }) + + It("Can't create volcano pod when podgroup is Pending", func() { + podName := "pod-volcano" + pgName := "pending-pg" + namespace := "test" + context := initTestContext() + defer cleanupTestContext(context) + + pg := &kbv1.PodGroup{ + ObjectMeta: v1.ObjectMeta{ + Namespace: namespace, + Name: pgName, + }, + Spec: kbv1.PodGroupSpec{ + MinMember: 1, + MinResources: &thirtyCPU, + }, + Status: kbv1.PodGroupStatus{ + Phase: kbv1.PodGroupPending, + }, + } + + pod := &corev1.Pod{ + TypeMeta: v1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: v1.ObjectMeta{ + Namespace: namespace, + Name: podName, + Annotations: map[string]string{kbv1.GroupNameAnnotationKey: pgName}, + }, + Spec: corev1.PodSpec{ + SchedulerName: "volcano", + Containers: createContainers(defaultNginxImage, "", "", oneCPU, oneCPU, 0), + }, + } + + _, err := context.kbclient.SchedulingV1alpha1().PodGroups(namespace).Create(pg) + Expect(err).NotTo(HaveOccurred()) + + _, err = context.kubeclient.CoreV1().Pods(namespace).Create(pod) + Expect(err.Error()).Should(ContainSubstring(`Failed to create pod for pod , because the podgroup phase is Pending`)) + }) }) diff --git a/test/e2e/command.go b/test/e2e/command.go index cd23d4cd2a2..52ab7fca802 100644 --- a/test/e2e/command.go +++ b/test/e2e/command.go @@ -138,7 +138,7 @@ var _ = Describe("Job E2E Test: Test Job Command", func() { //Job is pending err := waitJobPending(context, job) Expect(err).NotTo(HaveOccurred()) - err = waitJobStateInqueue(context, job) + err = waitJobStatePending(context, job) Expect(err).NotTo(HaveOccurred()) //Suspend job and wait status change diff --git a/test/e2e/job_error_handling.go b/test/e2e/job_error_handling.go index f52b6d86484..f926c58c4fc 100644 --- a/test/e2e/job_error_handling.go +++ b/test/e2e/job_error_handling.go @@ -61,7 +61,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running -> restarting - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running, vkv1.Restarting}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running, vkv1.Restarting}) Expect(err).NotTo(HaveOccurred()) }) @@ -98,7 +98,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running -> Terminating -> Terminated - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running, vkv1.Terminating, vkv1.Terminated}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running, vkv1.Terminating, vkv1.Terminated}) Expect(err).NotTo(HaveOccurred()) }) @@ -135,7 +135,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running -> Aborting -> Aborted - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running, vkv1.Aborting, vkv1.Aborted}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running, vkv1.Aborting, vkv1.Aborted}) Expect(err).NotTo(HaveOccurred()) }) @@ -170,7 +170,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running}) Expect(err).NotTo(HaveOccurred()) By("delete one pod of job") @@ -179,7 +179,7 @@ var _ = Describe("Job Error Handling", func() { Expect(err).NotTo(HaveOccurred()) // job phase: Restarting -> Running - err = waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Restarting, vkv1.Pending, vkv1.Inqueue, vkv1.Running}) + err = waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Restarting, vkv1.Pending, vkv1.Running}) Expect(err).NotTo(HaveOccurred()) }) @@ -214,7 +214,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running}) Expect(err).NotTo(HaveOccurred()) By("delete one pod of job") @@ -258,7 +258,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running}) Expect(err).NotTo(HaveOccurred()) By("delete one pod of job") @@ -302,7 +302,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running}) Expect(err).NotTo(HaveOccurred()) By("delete one pod of job") @@ -311,7 +311,7 @@ var _ = Describe("Job Error Handling", func() { Expect(err).NotTo(HaveOccurred()) // job phase: Restarting -> Running - err = waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Restarting, vkv1.Pending, vkv1.Inqueue, vkv1.Running}) + err = waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Restarting, vkv1.Pending, vkv1.Running}) Expect(err).NotTo(HaveOccurred()) }) @@ -464,7 +464,7 @@ var _ = Describe("Job Error Handling", func() { By("job scheduled, then task 'completed_task' finished and job finally complete") // job phase: pending -> running -> completing -> completed err := waitJobPhases(context, job, []vkv1.JobPhase{ - vkv1.Pending, vkv1.Inqueue, vkv1.Running, vkv1.Completing, vkv1.Completed}) + vkv1.Pending, vkv1.Running, vkv1.Completing, vkv1.Completed}) Expect(err).NotTo(HaveOccurred()) }) @@ -503,7 +503,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running -> restarting - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running, vkv1.Restarting}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running, vkv1.Restarting}) Expect(err).NotTo(HaveOccurred()) }) @@ -541,7 +541,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running}) Expect(err).NotTo(HaveOccurred()) By("delete one pod of job") diff --git a/test/e2e/job_scheduling.go b/test/e2e/job_scheduling.go index 81fabdcbbaa..5baa9d59651 100644 --- a/test/e2e/job_scheduling.go +++ b/test/e2e/job_scheduling.go @@ -103,7 +103,7 @@ var _ = Describe("Job E2E Test", func() { } job := createJob(context, jobSpec) - err = waitJobStateInqueue(context, job) + err = waitJobPending(context, job) Expect(err).NotTo(HaveOccurred()) err = waitJobUnschedulable(context, job) diff --git a/test/e2e/pg_controller.go b/test/e2e/pg_controller.go new file mode 100644 index 00000000000..8f3490230ae --- /dev/null +++ b/test/e2e/pg_controller.go @@ -0,0 +1,136 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var _ = Describe("PG E2E Test: Test PG controller", func() { + It("Create volcano rc, pg controller process", func() { + rcName := "rc-volcano" + podName := "pod-volcano" + namespace := "test" + label := map[string]string{"schedulerName": "volcano"} + context := initTestContext() + defer cleanupTestContext(context) + + rc := &corev1.ReplicationController{ + TypeMeta: v1.TypeMeta{ + APIVersion: "v1", + Kind: "ReplicationController", + }, + ObjectMeta: v1.ObjectMeta{ + Name: rcName, + Namespace: namespace, + }, + Spec: corev1.ReplicationControllerSpec{ + Selector: label, + Template: &corev1.PodTemplateSpec{ + ObjectMeta: v1.ObjectMeta{ + Name: podName, + Labels: label, + }, + Spec: corev1.PodSpec{ + SchedulerName: "volcano", + Containers: []corev1.Container{ + { + Name: podName, + Image: defaultNginxImage, + ImagePullPolicy: corev1.PullIfNotPresent, + }, + }, + }, + }, + }, + } + + pod := &corev1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Namespace: namespace, + }, + } + + _, err := context.kubeclient.CoreV1().ReplicationControllers(namespace).Create(rc) + Expect(err).NotTo(HaveOccurred()) + + err = waitPodPhase(context, pod, []corev1.PodPhase{corev1.PodRunning}) + Expect(err).NotTo(HaveOccurred()) + + ready, err := pgIsReady(context, namespace) + Expect(ready).Should(Equal(true)) + Expect(err).NotTo(HaveOccurred()) + }) + + It("Create default-scheduler rc, pg controller don't process", func() { + rcName := "rc-default-scheduler" + podName := "pod-default-scheduler" + namespace := "test" + label := map[string]string{"a": "b"} + context := initTestContext() + defer cleanupTestContext(context) + + rc := &corev1.ReplicationController{ + TypeMeta: v1.TypeMeta{ + APIVersion: "v1", + Kind: "ReplicationController", + }, + ObjectMeta: v1.ObjectMeta{ + Name: rcName, + Namespace: namespace, + }, + Spec: corev1.ReplicationControllerSpec{ + Selector: label, + Template: &corev1.PodTemplateSpec{ + ObjectMeta: v1.ObjectMeta{ + Name: podName, + Labels: label, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: podName, + Image: defaultNginxImage, + ImagePullPolicy: corev1.PullIfNotPresent, + }, + }, + }, + }, + }, + } + + pod := &corev1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Namespace: namespace, + }, + } + + _, err := context.kubeclient.CoreV1().ReplicationControllers(namespace).Create(rc) + Expect(err).NotTo(HaveOccurred()) + + err = waitPodPhase(context, pod, []corev1.PodPhase{corev1.PodRunning}) + Expect(err).NotTo(HaveOccurred()) + + ready, err := pgIsReady(context, namespace) + Expect(ready).Should(Equal(false)) + Expect(err.Error()).Should(Equal("podgroup is not found")) + }) +}) diff --git a/test/e2e/util.go b/test/e2e/util.go index 73fc0320e39..26b7b91dfea 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -52,6 +52,7 @@ var ( oneMinute = 1 * time.Minute twoMinute = 2 * time.Minute oneCPU = v1.ResourceList{"cpu": resource.MustParse("1000m")} + thirtyCPU = v1.ResourceList{"cpu": resource.MustParse("30000m")} ) const ( @@ -512,8 +513,6 @@ func waitJobPhases(ctx *context, job *vkv1.Job, phases []vkv1.JobPhase) error { newJob.Status.Terminating == 0 case vkv1.Running: flag = newJob.Status.Running >= newJob.Spec.MinAvailable - case vkv1.Inqueue: - flag = newJob.Status.Pending > 0 default: return fmt.Errorf("unknown phase %s", phase) } @@ -565,9 +564,7 @@ func waitJobPhase(ctx *context, job *vkv1.Job, phase vkv1.JobPhase) error { var flag = false switch phase { case vkv1.Pending: - flag = (newJob.Status.Pending+newJob.Status.Succeeded+ - newJob.Status.Failed+newJob.Status.Running) == 0 || - (total-newJob.Status.Terminating >= newJob.Status.MinAvailable) + flag = newJob.Status.Pending > 0 case vkv1.Terminating, vkv1.Aborting, vkv1.Restarting: flag = newJob.Status.Terminating > 0 case vkv1.Terminated, vkv1.Aborted: @@ -578,8 +575,6 @@ func waitJobPhase(ctx *context, job *vkv1.Job, phase vkv1.JobPhase) error { flag = newJob.Status.Succeeded == state.TotalTasks(newJob) case vkv1.Running: flag = newJob.Status.Running >= newJob.Spec.MinAvailable - case vkv1.Inqueue: - flag = newJob.Status.Pending > 0 default: return false, fmt.Errorf("unknown phase %s", phase) } @@ -629,10 +624,6 @@ func waitJobStatePending(ctx *context, job *vkv1.Job) error { return waitJobPhaseExpect(ctx, job, vkv1.Pending) } -func waitJobStateInqueue(ctx *context, job *vkv1.Job) error { - return waitJobPhaseExpect(ctx, job, vkv1.Inqueue) -} - func waitJobStateAborted(ctx *context, job *vkv1.Job) error { return waitJobPhaseExpect(ctx, job, vkv1.Aborted) } @@ -1087,3 +1078,44 @@ func waitPodGone(ctx *context, podName, namespace string) error { } return err } + +func waitPodPhase(ctx *context, pod *v1.Pod, phase []v1.PodPhase) error { + var additionalError error + err := wait.Poll(100*time.Millisecond, oneMinute, func() (bool, error) { + pods, err := ctx.kubeclient.CoreV1().Pods(pod.Namespace).List(metav1.ListOptions{}) + Expect(err).NotTo(HaveOccurred()) + + for _, p := range phase { + for _, pod := range pods.Items { + if pod.Status.Phase == p { + return true, nil + } + } + } + + additionalError = fmt.Errorf("expected pod '%s' to %v, actual got %s", pod.Name, phase, pod.Status.Phase) + return false, nil + }) + if err != nil && strings.Contains(err.Error(), timeOutMessage) { + return fmt.Errorf("[Wait time out]: %s", additionalError) + } + return err +} + +func pgIsReady(ctx *context, namespace string) (bool, error) { + pgs, err := ctx.kbclient.SchedulingV1alpha1().PodGroups(namespace).List(metav1.ListOptions{}) + if err != nil { + return false, err + } + if pgs != nil && len(pgs.Items) == 0 { + return false, fmt.Errorf("podgroup is not found") + } + + for _, pg := range pgs.Items { + if pg.Status.Phase != kbv1.PodGroupPending { + return true, nil + } + } + + return false, fmt.Errorf("podgroup phase is Pending") +}