From 6d5882517bf71826d1f031c0ba2b45aed9ee6794 Mon Sep 17 00:00:00 2001 From: "Jose A. Rivera" Date: Thu, 20 Dec 2018 14:03:31 -0600 Subject: [PATCH] Make registration path mandatory, remove kubeconfig parameter Signed-off-by: Jose A. Rivera --- cmd/node-driver-registrar/main.go | 30 +-- cmd/node-driver-registrar/node_register.go | 299 ++------------------- deploy/kubernetes/rbac.yaml | 51 ---- 3 files changed, 35 insertions(+), 345 deletions(-) delete mode 100644 deploy/kubernetes/rbac.yaml diff --git a/cmd/node-driver-registrar/main.go b/cmd/node-driver-registrar/main.go index 7325c1792..d11eba8dd 100644 --- a/cmd/node-driver-registrar/main.go +++ b/cmd/node-driver-registrar/main.go @@ -24,8 +24,6 @@ import ( "time" "github.com/golang/glog" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1alpha1" "github.com/kubernetes-csi/node-driver-registrar/pkg/connection" @@ -45,7 +43,6 @@ const ( // Command line flags var ( - kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.") connectionTimeout = flag.Duration("connection-timeout", 1*time.Minute, "Timeout for waiting for CSI driver socket.") csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.") kubeletRegistrationPath = flag.String("kubelet-registration-path", "", @@ -104,6 +101,11 @@ func main() { flag.Set("logtostderr", "true") flag.Parse() + if *kubeletRegistrationPath == "" { + glog.Error("kubelet-registration-path is a required parameter") + os.Exit(1) + } + if *showVersion { fmt.Println(os.Args[0], version) return @@ -133,26 +135,6 @@ func main() { } glog.V(2).Infof("CSI driver name: %q", csiDriverName) - // Create the client config. Use kubeconfig if given, otherwise assume - // in-cluster. - glog.V(1).Infof("Loading kubeconfig.") - config, err := buildConfig(*kubeconfig) - if err != nil { - glog.Error(err.Error()) - os.Exit(1) - } - // Run forever - nodeRegister(config, csiConn, csiDriverName) -} - -func buildConfig(kubeconfig string) (*rest.Config, error) { - if kubeconfig != "" { - return clientcmd.BuildConfigFromFlags("", kubeconfig) - } - - // Return config object which uses the service account kubernetes gives to - // pods. It's intended for clients that are running inside a pod running on - // kubernetes. - return rest.InClusterConfig() + nodeRegister(csiConn, csiDriverName) } diff --git a/cmd/node-driver-registrar/node_register.go b/cmd/node-driver-registrar/node_register.go index 2dbe3d6c9..2d772ebb2 100644 --- a/cmd/node-driver-registrar/node_register.go +++ b/cmd/node-driver-registrar/node_register.go @@ -18,29 +18,20 @@ package main import ( "context" - "encoding/json" "fmt" "net" "os" - "os/signal" - "time" "google.golang.org/grpc" "github.com/golang/glog" "golang.org/x/sys/unix" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - corev1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/rest" - "k8s.io/client-go/util/retry" registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1alpha1" "github.com/kubernetes-csi/node-driver-registrar/pkg/connection" ) func nodeRegister( - config *rest.Config, csiConn connection.CSIConnection, csiDriverName string, ) { @@ -65,272 +56,40 @@ func nodeRegister( // When kubeletRegistrationPath is specified then driver-registrar ONLY acts // as gRPC server which replies to registration requests initiated by kubelet's // pluginswatcher infrastructure. Node labeling is done by kubelet's csi code. - if *kubeletRegistrationPath != "" { - registrar := newRegistrationServer(csiDriverName, *kubeletRegistrationPath, supportedVersions) - socketPath := fmt.Sprintf("/registration/%s-reg.sock", csiDriverName) - fi, err := os.Stat(socketPath) - if err == nil && (fi.Mode()&os.ModeSocket) != 0 { - // Remove any socket, stale or not, but fall through for other files - if err := os.Remove(socketPath); err != nil { - glog.Errorf("failed to remove stale socket %s with error: %+v", socketPath, err) - os.Exit(1) - } - } - if err != nil && !os.IsNotExist(err) { - glog.Errorf("failed to stat the socket %s with error: %+v", socketPath, err) - os.Exit(1) - } - // Default to only user accessible socket, caller can open up later if desired - oldmask := unix.Umask(0077) - - glog.Infof("Starting Registration Server at: %s\n", socketPath) - lis, err := net.Listen("unix", socketPath) - if err != nil { - glog.Errorf("failed to listen on socket: %s with error: %+v", socketPath, err) - os.Exit(1) - } - unix.Umask(oldmask) - glog.Infof("Registration Server started at: %s\n", socketPath) - grpcServer := grpc.NewServer() - // Registers kubelet plugin watcher api. - registerapi.RegisterRegistrationServer(grpcServer, registrar) - - // Starts service - if err := grpcServer.Serve(lis); err != nil { - glog.Errorf("Registration Server stopped serving: %v", err) + registrar := newRegistrationServer(csiDriverName, *kubeletRegistrationPath, supportedVersions) + socketPath := fmt.Sprintf("/registration/%s-reg.sock", csiDriverName) + fi, err := os.Stat(socketPath) + if err == nil && (fi.Mode()&os.ModeSocket) != 0 { + // Remove any socket, stale or not, but fall through for other files + if err := os.Remove(socketPath); err != nil { + glog.Errorf("failed to remove stale socket %s with error: %+v", socketPath, err) os.Exit(1) } - // If gRPC server is gracefully shutdown, exit - os.Exit(0) - } else { // only apply Node label update when kubelet plugin not used - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - glog.Error(err.Error()) - os.Exit(1) - } - - glog.V(1).Infof("Attempt to update node annotation if needed") - k8sNodesClient := clientset.CoreV1().Nodes() - - // Set up goroutine to cleanup (aka deregister) on termination. - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) - go func() { - <-c - err := getVerifyAndDeleteNodeId( - k8sNodeName, - k8sNodesClient, - csiDriverName) - if err != nil { - glog.Warning(err) - } - os.Exit(1) - }() - - // This program is intended to run as a side-car container inside a - // Kubernetes DaemonSet. Kubernetes DaemonSet only have one RestartPolicy, - // always, meaning as soon as this container terminates, it will be started - // again. Therefore, this program will loop indefientley and periodically - // update the node annotation. - // The CSI driver name and node ID are assumed to be immutable, and are not - // refetched on subsequent loop iterations. - for { - err := getVerifyAndAddNodeId( - k8sNodeName, - k8sNodesClient, - csiDriverName, - csiDriverNodeId) - if err != nil { - glog.Warning(err) - } - time.Sleep(sleepDuration) - } } -} - -// Fetches Kubernetes node API object corresponding to k8sNodeName. -// If the csiDriverName and csiDriverNodeId are not present in the node -// annotation, this method adds it. -func getVerifyAndAddNodeId( - k8sNodeName string, - k8sNodesClient corev1.NodeInterface, - csiDriverName string, - csiDriverNodeId string, -) error { - // Add or update annotation on Node object - retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - // Retrieve the latest version of Node before attempting update, so that - // existing changes are not overwritten. RetryOnConflict uses - // exponential backoff to avoid exhausting the apiserver. - result, getErr := k8sNodesClient.Get(k8sNodeName, metav1.GetOptions{}) - if getErr != nil { - glog.Errorf("Failed to get latest version of Node: %v", getErr) - return getErr // do not wrap error - } - - var previousAnnotationValue string - if result.ObjectMeta.Annotations != nil { - previousAnnotationValue = - result.ObjectMeta.Annotations[annotationKey] - glog.V(3).Infof( - "previousAnnotationValue=%q", previousAnnotationValue) - } - - existingDriverMap := map[string]string{} - if previousAnnotationValue != "" { - // Parse previousAnnotationValue as JSON - if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil { - return fmt.Errorf( - "Failed to parse node's %q annotation value (%q) err=%v", - annotationKey, - previousAnnotationValue, - err) - } - } - - if val, ok := existingDriverMap[csiDriverName]; ok { - if val == csiDriverNodeId { - // Value already exists in node annotation, nothing more to do - glog.V(1).Infof( - "The key value {%q: %q} alredy eixst in node %q annotation, no need to update: %v", - csiDriverName, - csiDriverNodeId, - annotationKey, - previousAnnotationValue) - return nil - } - } - - // Add/update annotation value - existingDriverMap[csiDriverName] = csiDriverNodeId - jsonObj, err := json.Marshal(existingDriverMap) - if err != nil { - return fmt.Errorf( - "Failed while trying to add key value {%q: %q} to node %q annotation. Existing value: %v", - csiDriverName, - csiDriverNodeId, - annotationKey, - previousAnnotationValue) - } - - result.ObjectMeta.Annotations = cloneAndAddAnnotation( - result.ObjectMeta.Annotations, - annotationKey, - string(jsonObj)) - _, updateErr := k8sNodesClient.Update(result) - if updateErr == nil { - fmt.Printf( - "Updated node %q successfully for CSI driver %q and CSI node name %q", - k8sNodeName, - csiDriverName, - csiDriverNodeId) - } - return updateErr // do not wrap error - }) - if retryErr != nil { - return fmt.Errorf("Node update failed: %v", retryErr) - } - return nil -} - -// Fetches Kubernetes node API object corresponding to k8sNodeName. -// If the csiDriverName is present in the node annotation, it is removed. -func getVerifyAndDeleteNodeId( - k8sNodeName string, - k8sNodesClient corev1.NodeInterface, - csiDriverName string) error { - retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - // Retrieve the latest version of Node before attempting update, so that - // existing changes are not overwritten. RetryOnConflict uses - // exponential backoff to avoid exhausting the apiserver. - result, getErr := k8sNodesClient.Get(k8sNodeName, metav1.GetOptions{}) - if getErr != nil { - glog.Errorf("Failed to get latest version of Node: %v", getErr) - return getErr // do not wrap error - } - - var previousAnnotationValue string - if result.ObjectMeta.Annotations != nil { - previousAnnotationValue = - result.ObjectMeta.Annotations[annotationKey] - glog.V(3).Infof( - "previousAnnotationValue=%q", previousAnnotationValue) - } - - existingDriverMap := map[string]string{} - if previousAnnotationValue == "" { - // Value already exists in node annotation, nothing more to do - glog.V(1).Infof( - "The key %q does not exist in node %q annotation, no need to cleanup.", - csiDriverName, - annotationKey) - return nil - } - - // Parse previousAnnotationValue as JSON - if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil { - return fmt.Errorf( - "Failed to parse node's %q annotation value (%q) err=%v", - annotationKey, - previousAnnotationValue, - err) - } - - if _, ok := existingDriverMap[csiDriverName]; !ok { - // Value already exists in node annotation, nothing more to do - glog.V(1).Infof( - "The key %q does not eixst in node %q annotation, no need to cleanup: %v", - csiDriverName, - annotationKey, - previousAnnotationValue) - return nil - } - - // Add/update annotation value - delete(existingDriverMap, csiDriverName) - jsonObj, err := json.Marshal(existingDriverMap) - if err != nil { - return fmt.Errorf( - "Failed while trying to remove key %q from node %q annotation. Existing data: %v", - csiDriverName, - annotationKey, - previousAnnotationValue) - } - - result.ObjectMeta.Annotations = cloneAndAddAnnotation( - result.ObjectMeta.Annotations, - annotationKey, - string(jsonObj)) - _, updateErr := k8sNodesClient.Update(result) - if updateErr == nil { - fmt.Printf( - "Updated node %q annotation to remove CSI driver %q.", - k8sNodeName, - csiDriverName) - } - return updateErr // do not wrap error - }) - if retryErr != nil { - return fmt.Errorf("Node update failed: %v", retryErr) + if err != nil && !os.IsNotExist(err) { + glog.Errorf("failed to stat the socket %s with error: %+v", socketPath, err) + os.Exit(1) } - return nil -} + // Default to only user accessible socket, caller can open up later if desired + oldmask := unix.Umask(0077) -// Clones the given map and returns a new map with the given key and value added. -// Returns the given map, if annotationKey is empty. -func cloneAndAddAnnotation( - annotations map[string]string, - annotationKey, - annotationValue string) map[string]string { - if annotationKey == "" { - // Don't need to add an annotation. - return annotations + glog.Infof("Starting Registration Server at: %s\n", socketPath) + lis, err := net.Listen("unix", socketPath) + if err != nil { + glog.Errorf("failed to listen on socket: %s with error: %+v", socketPath, err) + os.Exit(1) } - // Clone. - newAnnotations := map[string]string{} - for key, value := range annotations { - newAnnotations[key] = value + unix.Umask(oldmask) + glog.Infof("Registration Server started at: %s\n", socketPath) + grpcServer := grpc.NewServer() + // Registers kubelet plugin watcher api. + registerapi.RegisterRegistrationServer(grpcServer, registrar) + + // Starts service + if err := grpcServer.Serve(lis); err != nil { + glog.Errorf("Registration Server stopped serving: %v", err) + os.Exit(1) } - newAnnotations[annotationKey] = annotationValue - return newAnnotations + // If gRPC server is gracefully shutdown, exit + os.Exit(0) } diff --git a/deploy/kubernetes/rbac.yaml b/deploy/kubernetes/rbac.yaml deleted file mode 100644 index a4f4f1aac..000000000 --- a/deploy/kubernetes/rbac.yaml +++ /dev/null @@ -1,51 +0,0 @@ -# This YAML file contains all RBAC objects that are necessary to run external -# CSI provisioner. -# -# In production, each CSI driver deployment has to be customized: -# - to avoid conflicts, use non-default namespace and different names -# for non-namespaced entities like the ClusterRole -# - decide whether the deployment replicates the external CSI -# provisioner, in which case leadership election must be enabled; -# this influences the RBAC setup, see below - -apiVersion: v1 -kind: ServiceAccount -metadata: - name: csi-driver-registrar - # replace with non-default namespace name - namespace: default - ---- -kind: ClusterRole -apiVersion: rbac.authorization.k8s.io/v1 -metadata: - name: driver-registrar-runner -rules: - - apiGroups: [""] - resources: ["events"] - verbs: ["get", "list", "watch", "create", "update", "patch"] - # The following permissions are only needed when running - # driver-registrar without the --kubelet-registration-path - # parameter, i.e. when using driver-registrar instead of - # kubelet to update the csi.volume.kubernetes.io/nodeid - # annotation. That mode of operation is going to be deprecated - # and should not be used anymore, but is needed on older - # Kubernetes versions. - # - apiGroups: [""] - # resources: ["nodes"] - # verbs: ["get", "update", "patch"] - ---- -kind: ClusterRoleBinding -apiVersion: rbac.authorization.k8s.io/v1 -metadata: - name: csi-driver-registrar-role -subjects: - - kind: ServiceAccount - name: csi-driver-registrar - # replace with non-default namespace name - namespace: default -roleRef: - kind: ClusterRole - name: driver-registrar-runner - apiGroup: rbac.authorization.k8s.io