From f8b2f0737f5ea96582036ae7579ae6ff54e04209 Mon Sep 17 00:00:00 2001 From: MegaByte875 Date: Wed, 13 Dec 2023 18:12:34 +0800 Subject: [PATCH] add selector for nebula objects (#410) --- cmd/controller-manager/app/controller-manager.go | 11 +++++++++++ cmd/controller-manager/app/options/options.go | 4 ++++ .../nebularestore/nebula_restore_controller.go | 2 -- pkg/kube/rbac.go | 15 +++++++++++++++ 4 files changed, 30 insertions(+), 2 deletions(-) diff --git a/cmd/controller-manager/app/controller-manager.go b/cmd/controller-manager/app/controller-manager.go index 9ecf108e..7117ba72 100644 --- a/cmd/controller-manager/app/controller-manager.go +++ b/cmd/controller-manager/app/controller-manager.go @@ -23,6 +23,7 @@ import ( kruisev1beta1 "github.com/openkruise/kruise-api/apps/v1beta1" "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -142,6 +143,16 @@ func Run(ctx context.Context, opts *options.Options) error { TLSMinVersion: opts.WebhookOpts.TLSMinVersion, }) } + + if opts.NebulaSelector != "" { + parsedSelector, err := labels.Parse(opts.NebulaSelector) + if err != nil { + klog.Errorf("couldn't convert selector into a corresponding internal selector object: %v", err) + return err + } + ctrlOptions.Cache.DefaultLabelSelector = parsedSelector + } + mgr, err := ctrlruntime.NewManager(cfg, ctrlOptions) if err != nil { klog.Errorf("Failed to build controller manager: %v", err) diff --git a/cmd/controller-manager/app/options/options.go b/cmd/controller-manager/app/options/options.go index b7aa2680..4d680aff 100644 --- a/cmd/controller-manager/app/options/options.go +++ b/cmd/controller-manager/app/options/options.go @@ -51,6 +51,9 @@ type Options struct { // Default watches all namespaces Namespaces []string + // NebulaSelector to filter on, supports '=', '==', and '!='.(e.g. -l key1=value1,key2=value2) + NebulaSelector string + // MetricsBindAddress is the TCP address that the controller should bind to // for serving prometheus metrics. // It can be set to "0" to disable the metrics serving. @@ -111,6 +114,7 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) { flags.DurationVar(&o.SyncPeriod.Duration, "sync-period", 0, "Period at which the controller forces the repopulation of its local object stores.") flags.StringSliceVar(&o.Namespaces, "watch-namespaces", nil, "Namespaces restricts the controller watches for updates to Kubernetes objects. If empty, all namespaces are watched. Multiple namespaces seperated by comma.(e.g. ns1,ns2,ns3).") + flags.StringVar(&o.NebulaSelector, "nebula-object-selector", "", "nebula object selector (label query) to filter on, supports '=', '==', and '!='.(e.g. -l key1=value1,key2=value2).") flags.StringVar(&o.MetricsBindAddress, "metrics-bind-address", ":8080", "The TCP address that the controller should bind to for serving prometheus metrics(e.g. 127.0.0.1:8080, :8080). It can be set to \"0\" to disable the metrics serving.") flags.StringVar(&o.HealthProbeBindAddress, "health-probe-bind-address", ":8081", "The TCP address that the controller should bind to for serving health probes.(e.g. 127.0.0.1:8081, :8081). It can be set to \"0\" to disable the health probe serving.") flags.IntVar(&o.ConcurrentNebulaClusterSyncs, "concurrent-nebulacluster-syncs", 5, "The number of NebulaCluster objects that are allowed to sync concurrently.") diff --git a/pkg/controller/nebularestore/nebula_restore_controller.go b/pkg/controller/nebularestore/nebula_restore_controller.go index 99fadf4c..035a39fd 100644 --- a/pkg/controller/nebularestore/nebula_restore_controller.go +++ b/pkg/controller/nebularestore/nebula_restore_controller.go @@ -24,7 +24,6 @@ import ( "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" @@ -116,6 +115,5 @@ func (r *Reconciler) syncNebulaRestore(restore *v1alpha1.NebulaRestore) error { func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.NebulaRestore{}). - WithOptions(controller.Options{MaxConcurrentReconciles: 5}). Complete(r) } diff --git a/pkg/kube/rbac.go b/pkg/kube/rbac.go index cc1be97c..bdb4a5bf 100644 --- a/pkg/kube/rbac.go +++ b/pkg/kube/rbac.go @@ -69,9 +69,14 @@ func createClusterRole(ctx context.Context, k8sClient client.Client) error { if err := k8sClient.Get(ctx, client.ObjectKey{Name: v1alpha1.NebulaRoleName}, &rbacv1.ClusterRole{}); err != nil { if apierrors.IsNotFound(err) { if err := k8sClient.Create(ctx, &role); err != nil { + if apierrors.IsAlreadyExists(err) { + return nil + } return fmt.Errorf("failed to create ClusterRole role: %v", err) } + return nil } + return err } return nil } @@ -90,9 +95,14 @@ func createClusterRoleBinding(ctx context.Context, k8sClient client.Client, name if err := k8sClient.Get(ctx, client.ObjectKey{Name: v1alpha1.NebulaRoleBindingName}, binding); err != nil { if apierrors.IsNotFound(err) { if err := k8sClient.Create(ctx, binding); err != nil { + if apierrors.IsAlreadyExists(err) { + return nil + } return fmt.Errorf("failed to create ClusterRoleBinding: %v", err) } + return nil } + return err } if !isApplied(binding.Subjects, namespace) { binding.Subjects = append(binding.Subjects, rbacv1.Subject{ @@ -119,9 +129,14 @@ func createServiceAccount(ctx context.Context, k8sClient client.Client, namespac if err := k8sClient.Get(ctx, client.ObjectKey{Name: v1alpha1.NebulaServiceAccountName, Namespace: namespace}, &corev1.ServiceAccount{}); err != nil { if apierrors.IsNotFound(err) { if err := k8sClient.Create(ctx, &serviceAccount); err != nil { + if apierrors.IsAlreadyExists(err) { + return nil + } return fmt.Errorf("failed to create ServiceAccount: %v", err) } + return nil } + return err } return nil }