Skip to content

Commit

Permalink
add admitPod and PGController
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyuqing (C) authored and wangyuqing4 committed Jul 15, 2019
1 parent afb0007 commit a524104
Show file tree
Hide file tree
Showing 36 changed files with 1,155 additions and 648 deletions.
32 changes: 20 additions & 12 deletions cmd/admission/app/configure/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,20 @@ import (

// Config admission-controller server config.
type Config struct {
Master string
Kubeconfig string
CertFile string
KeyFile string
CaCertFile string
Port int
MutateWebhookConfigName string
MutateWebhookName string
ValidateWebhookConfigName string
ValidateWebhookName string
PrintVersion bool
Master string
Kubeconfig string
CertFile string
KeyFile string
CaCertFile string
Port int
MutateWebhookConfigName string
MutateWebhookName string
ValidateWebhookConfigName string
ValidateWebhookName string
ValidateWebhookPodConfigName string
ValidateWebhookPodName string
PrintVersion bool
SchedulerName string
}

// NewConfig create new config
Expand All @@ -65,10 +68,15 @@ func (c *Config) AddFlags() {
flag.StringVar(&c.MutateWebhookName, "mutate-webhook-name", "mutatejob.volcano.sh",
"Name of the webhook entry in the webhook config.")
flag.StringVar(&c.ValidateWebhookConfigName, "validate-webhook-config-name", "volcano-validate-job",
"Name of the mutatingwebhookconfiguration resource in Kubernetes.")
"Name of the validatingwebhookconfiguration resource in Kubernetes.")
flag.StringVar(&c.ValidateWebhookName, "validate-webhook-name", "validatejob.volcano.sh",
"Name of the webhook entry in the webhook config.")
flag.StringVar(&c.ValidateWebhookPodConfigName, "validate-webhook-pod-config-name", "volcano-validate-pod",
"Name of the pod validatingwebhookconfiguration resource in Kubernetes.")
flag.StringVar(&c.ValidateWebhookPodName, "validate-webhook-pod-name", "validatepod.volcano.sh",
"Name of the pod webhook entry in the webhook config.")
flag.BoolVar(&c.PrintVersion, "version", false, "Show version and quit")
flag.StringVar(&c.SchedulerName, "scheduler-name", "volcano", "Volcano will handle pods whose .spec.SchedulerName is same as scheduler-name")
}

// CheckPortOrDie check valid port range
Expand Down
68 changes: 2 additions & 66 deletions cmd/admission/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,13 @@ package app

import (
"crypto/tls"
"encoding/json"
"io/ioutil"
"net/http"

"github.com/golang/glog"
"volcano.sh/volcano/pkg/client/clientset/versioned"

"k8s.io/api/admission/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"

appConf "volcano.sh/volcano/cmd/admission/app/configure"
admissioncontroller "volcano.sh/volcano/pkg/admission"
)

const (
//CONTENTTYPE http content-type
CONTENTTYPE = "Content-Type"

//APPLICATIONJSON json content
APPLICATIONJSON = "application/json"
"volcano.sh/volcano/pkg/client/clientset/versioned"
)

// GetClient Get a clientset with restConfig.
Expand All @@ -51,7 +36,7 @@ func GetClient(restConfig *restclient.Config) *kubernetes.Clientset {
return clientset
}

//GetKubeBatchClient get a clientset for kubebatch
// GetKubeBatchClient get a clientset for kubebatch
func GetKubeBatchClient(restConfig *restclient.Config) *versioned.Clientset {
clientset, err := versioned.NewForConfig(restConfig)
if err != nil {
Expand Down Expand Up @@ -89,52 +74,3 @@ func ConfigTLS(config *appConf.Config, restConfig *restclient.Config) *tls.Confi
glog.Fatal("tls: failed to find any tls config data")
return &tls.Config{}
}

//Serve the http request
func Serve(w http.ResponseWriter, r *http.Request, admit admissioncontroller.AdmitFunc) {
var body []byte
if r.Body != nil {
if data, err := ioutil.ReadAll(r.Body); err == nil {
body = data
}
}

// verify the content type is accurate
contentType := r.Header.Get(CONTENTTYPE)
if contentType != APPLICATIONJSON {
glog.Errorf("contentType=%s, expect application/json", contentType)
return
}

var reviewResponse *v1beta1.AdmissionResponse
ar := v1beta1.AdmissionReview{}
deserializer := admissioncontroller.Codecs.UniversalDeserializer()
if _, _, err := deserializer.Decode(body, nil, &ar); err != nil {
reviewResponse = admissioncontroller.ToAdmissionResponse(err)
} else {
reviewResponse = admit(ar)
}
glog.V(3).Infof("sending response: %v", reviewResponse)

response := createResponse(reviewResponse, &ar)
resp, err := json.Marshal(response)
if err != nil {
glog.Error(err)
}
if _, err := w.Write(resp); err != nil {
glog.Error(err)
}
}

func createResponse(reviewResponse *v1beta1.AdmissionResponse, ar *v1beta1.AdmissionReview) v1beta1.AdmissionReview {
response := v1beta1.AdmissionReview{}
if reviewResponse != nil {
response.Response = reviewResponse
response.Response.UID = ar.Request.UID
}
// reset the Object and OldObject, they are not needed in a response.
ar.Request.Object = runtime.RawExtension{}
ar.Request.OldObject = runtime.RawExtension{}

return response
}
24 changes: 20 additions & 4 deletions cmd/admission/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@ import (
"os"
"strconv"

"k8s.io/client-go/tools/clientcmd"

"volcano.sh/volcano/cmd/admission/app"
appConf "volcano.sh/volcano/cmd/admission/app/configure"
admissioncontroller "volcano.sh/volcano/pkg/admission"
"volcano.sh/volcano/pkg/version"

"k8s.io/client-go/tools/clientcmd"
)

func serveJobs(w http.ResponseWriter, r *http.Request) {
app.Serve(w, r, admissioncontroller.AdmitJobs)
admissioncontroller.Serve(w, r, admissioncontroller.AdmitJobs)
}

func serveMutateJobs(w http.ResponseWriter, r *http.Request) {
app.Serve(w, r, admissioncontroller.MutateJobs)
admissioncontroller.Serve(w, r, admissioncontroller.MutateJobs)
}

func main() {
Expand Down Expand Up @@ -67,6 +67,8 @@ func main() {

admissioncontroller.KubeBatchClientSet = app.GetKubeBatchClient(restConfig)

servePods(config)

caCertPem, err := ioutil.ReadFile(config.CaCertFile)
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
Expand All @@ -80,6 +82,10 @@ func main() {
config.ValidateWebhookConfigName, config.ValidateWebhookName, caCertPem); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
}
if err = appConf.PatchValidateWebhookConfig(clientset.AdmissionregistrationV1beta1().ValidatingWebhookConfigurations(),
config.ValidateWebhookPodConfigName, config.ValidateWebhookPodName, caCertPem); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
}
}

server := &http.Server{
Expand All @@ -88,3 +94,13 @@ func main() {
}
server.ListenAndServeTLS("", "")
}

func servePods(config *appConf.Config) {
admController := &admissioncontroller.Controller{
KbClients: admissioncontroller.KubeBatchClientSet,
SchedulerName: config.SchedulerName,
}
http.HandleFunc(admissioncontroller.AdmitPodPath, admController.ServerPods)

return
}
13 changes: 9 additions & 4 deletions cmd/controllers/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import (
)

const (
defaultQPS = 50.0
defaultBurst = 100
defaultWorkers = 3
defaultQPS = 50.0
defaultBurst = 100
defaultWorkers = 3
defaultSchedulerName = "volcano"
)

// ServerOption is the main context object for the controller manager.
Expand All @@ -39,7 +40,9 @@ type ServerOption struct {
PrintVersion bool
// WorkerThreads is the number of threads syncing job operations
// concurrently. Larger number = faster job updating,but more CPU load.
WorkerThreads uint32
WorkerThreads uint32
EnablePodgroupController bool
SchedulerName string
}

// NewServerOption creates a new CMServer with a default config.
Expand All @@ -60,6 +63,8 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")
fs.Uint32Var(&s.WorkerThreads, "worker-threads", defaultWorkers, "The number of threads syncing job operations concurrently. "+
"Larger number = faster job updating, but more CPU load")
fs.BoolVar(&s.EnablePodgroupController, "enable-podgroup-controller", false, "Normal job use volcano scheduler will enable pg controller")
fs.StringVar(&s.SchedulerName, "scheduler-name", defaultSchedulerName, "Volcano will handle pods whose .spec.SchedulerName is same as scheduler-name")
}

// CheckOptionOrDie checks the LockObjectNamespace
Expand Down
1 change: 1 addition & 0 deletions cmd/controllers/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestAddFlags(t *testing.T) {
KubeAPIBurst: 200,
PrintVersion: false,
WorkerThreads: defaultWorkers,
SchedulerName: defaultSchedulerName,
}

if !reflect.DeepEqual(expected, s) {
Expand Down
8 changes: 6 additions & 2 deletions cmd/controllers/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ import (
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"

kbver "volcano.sh/volcano/pkg/client/clientset/versioned"

"volcano.sh/volcano/cmd/controllers/app/options"
kbver "volcano.sh/volcano/pkg/client/clientset/versioned"
vkclient "volcano.sh/volcano/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/controllers/garbagecollector"
"volcano.sh/volcano/pkg/controllers/job"
"volcano.sh/volcano/pkg/controllers/podgroup"
"volcano.sh/volcano/pkg/controllers/queue"
)

Expand Down Expand Up @@ -88,11 +88,15 @@ func Run(opt *options.ServerOption) error {
jobController := job.NewJobController(kubeClient, kbClient, vkClient, opt.WorkerThreads)
queueController := queue.NewQueueController(kubeClient, kbClient)
garbageCollector := garbagecollector.New(vkClient)
pgController := podgroup.NewPodgroupController(kubeClient, kbClient, opt.SchedulerName)

run := func(ctx context.Context) {
go jobController.Run(ctx.Done())
go queueController.Run(ctx.Done())
go garbageCollector.Run(ctx.Done())
if opt.EnablePodgroupController {
go pgController.Run(ctx.Done())
}
<-ctx.Done()
}

Expand Down
22 changes: 22 additions & 0 deletions hack/e2e-admission-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,25 @@ webhooks:
- CREATE
resources:
- jobs
---
apiVersion: admissionregistration.k8s.io/v1beta1
kind: ValidatingWebhookConfiguration
metadata:
name: validate-volcano-pod
webhooks:
- clientConfig:
caBundle: {{CA_BUNDLE}}

# the url should agree with webhook service
url: https://{{host}}:{{hostPort}}/pods
failurePolicy: Ignore
name: validatepod.volcano.sh
rules:
- apiGroups:
- ""
apiVersions:
- "v1"
operations:
- CREATE
resources:
- pods
4 changes: 3 additions & 1 deletion hack/run-e2e-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ function install-volcano {
kind load docker-image ${MPI_EXAMPLE_IMAGE} ${CLUSTER_CONTEXT}

echo "Install volcano chart"
helm install installer/helm/chart/volcano --namespace kube-system --name ${CLUSTER_NAME} --kubeconfig ${KUBECONFIG} --set basic.image_tag_version=${TAG} --set basic.scheduler_config_file=kube-batch-ci.conf --wait
helm install installer/helm/chart/volcano --namespace kube-system --name ${CLUSTER_NAME} --kubeconfig ${KUBECONFIG} \
--set basic.image_tag_version=${TAG} --set basic.scheduler_config_file=kube-batch-ci.conf --set basic.enable_podgroup_controller=true \
--wait
}

function uninstall-volcano {
Expand Down
26 changes: 26 additions & 0 deletions installer/helm/chart/volcano/templates/admission-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,29 @@ webhooks:
- CREATE
resources:
- jobs
---
apiVersion: admissionregistration.k8s.io/v1beta1
kind: ValidatingWebhookConfiguration
metadata:
name: {{ .Release.Name }}-validate-pod
annotations:
"helm.sh/hook": pre-install
"helm.sh/hook-delete-policy": before-hook-creation
webhooks:
- clientConfig:
service:
name: {{ .Release.Name }}-admission-service
namespace: {{ .Release.Namespace }}
path: /pods
failurePolicy: Ignore
name: validatepod.volcano.sh
namespaceSelector: {}
rules:
- apiGroups:
- ""
apiVersions:
- "v1"
operations:
- CREATE
resources:
- pods
4 changes: 4 additions & 0 deletions installer/helm/chart/volcano/templates/admission.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ rules:
- apiGroups: ["scheduling.incubator.k8s.io"]
resources: ["queues"]
verbs: ["get", "list"]
- apiGroups: ["scheduling.incubator.k8s.io"]
resources: ["podgroups"]
verbs: ["get", "list", "watch"]

---
kind: ClusterRoleBinding
Expand Down Expand Up @@ -78,6 +81,7 @@ spec:
- --ca-cert-file=/admission.local.config/certificates/ca.crt
- --mutate-webhook-config-name={{ .Release.Name }}-mutate-job
- --validate-webhook-config-name={{ .Release.Name }}-validate-job
- --validate-webhook-pod-config-name={{ .Release.Name }}-validate-pod
- --alsologtostderr
- --port=443
- -v=4
Expand Down
1 change: 1 addition & 0 deletions installer/helm/chart/volcano/templates/controllers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ spec:
image: {{.Values.basic.controller_image_name}}:{{.Values.basic.image_tag_version}}
args:
- --alsologtostderr
- --enable-podgroup-controller={{.Values.basic.enable_podgroup_controller}}
- -v=4
- 2>&1
imagePullPolicy: "IfNotPresent"
1 change: 1 addition & 0 deletions installer/helm/chart/volcano/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ basic:
admission_secret_name: "volcano-admission-secret"
scheduler_config_file: "kube-batch.conf"
image_pull_secret: ""
enable_podgroup_controller: "false"
Loading

0 comments on commit a524104

Please sign in to comment.