From 753d42335b1420468b20305632799adad9b92ba5 Mon Sep 17 00:00:00 2001 From: Zhongcheng Lao Date: Tue, 1 Aug 2023 15:11:37 +0800 Subject: [PATCH] Using ClusterCacheTracker instead of remote.NewClusterClient --- controllers/controllers_suite_test.go | 36 +++++++++++-- controllers/serviceaccount_controller.go | 14 +++-- controllers/servicediscovery_controller.go | 14 +++-- controllers/vspherevm_controller.go | 31 +++++++---- controllers/vspherevm_controller_test.go | 37 ++++++++++++- main.go | 62 +++++++++++++++++++--- 6 files changed, 162 insertions(+), 32 deletions(-) diff --git a/controllers/controllers_suite_test.go b/controllers/controllers_suite_test.go index 3e0e66f48e..5b54b3d2b9 100644 --- a/controllers/controllers_suite_test.go +++ b/controllers/controllers_suite_test.go @@ -28,7 +28,9 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes/scheme" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + "sigs.k8s.io/cluster-api/controllers/remote" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" infrav1 "sigs.k8s.io/cluster-api-provider-vsphere/apis/v1beta1" @@ -63,15 +65,43 @@ func setup() { testEnv = helpers.NewTestEnvironment() + secretCachingClient, err := client.New(testEnv.Manager.GetConfig(), client.Options{ + HTTPClient: testEnv.Manager.GetHTTPClient(), + Cache: &client.CacheOptions{ + Reader: testEnv.Manager.GetCache(), + }, + }) + if err != nil { + panic("unable to create secret caching client") + } + + tracker, err := remote.NewClusterCacheTracker( + testEnv.Manager, + remote.ClusterCacheTrackerOptions{ + SecretCachingClient: secretCachingClient, + ControllerName: "testenv-manager", + }, + ) + if err != nil { + panic(fmt.Sprintf("unable to setup ClusterCacheTracker: %v", err)) + } + controllerOpts := controller.Options{MaxConcurrentReconciles: 10} + if err := (&remote.ClusterCacheReconciler{ + Client: testEnv.Manager.GetClient(), + Tracker: tracker, + }).SetupWithManager(ctx, testEnv.Manager, controllerOpts); err != nil { + panic(fmt.Sprintf("unable to create ClusterCacheReconciler controller: %v", err)) + } + if err := AddClusterControllerToManager(testEnv.GetContext(), testEnv.Manager, &infrav1.VSphereCluster{}, controllerOpts); err != nil { panic(fmt.Sprintf("unable to setup VsphereCluster controller: %v", err)) } if err := AddMachineControllerToManager(testEnv.GetContext(), testEnv.Manager, &infrav1.VSphereMachine{}, controllerOpts); err != nil { panic(fmt.Sprintf("unable to setup VsphereMachine controller: %v", err)) } - if err := AddVMControllerToManager(testEnv.GetContext(), testEnv.Manager, controllerOpts); err != nil { + if err := AddVMControllerToManager(testEnv.GetContext(), testEnv.Manager, tracker, controllerOpts); err != nil { panic(fmt.Sprintf("unable to setup VsphereVM controller: %v", err)) } if err := AddVsphereClusterIdentityControllerToManager(testEnv.GetContext(), testEnv.Manager, controllerOpts); err != nil { @@ -80,10 +110,10 @@ func setup() { if err := AddVSphereDeploymentZoneControllerToManager(testEnv.GetContext(), testEnv.Manager, controllerOpts); err != nil { panic(fmt.Sprintf("unable to setup VSphereDeploymentZone controller: %v", err)) } - if err := AddServiceAccountProviderControllerToManager(testEnv.GetContext(), testEnv.Manager, controllerOpts); err != nil { + if err := AddServiceAccountProviderControllerToManager(testEnv.GetContext(), testEnv.Manager, tracker, controllerOpts); err != nil { panic(fmt.Sprintf("unable to setup ServiceAccount controller: %v", err)) } - if err := AddServiceDiscoveryControllerToManager(testEnv.GetContext(), testEnv.Manager, controllerOpts); err != nil { + if err := AddServiceDiscoveryControllerToManager(testEnv.GetContext(), testEnv.Manager, tracker, controllerOpts); err != nil { panic(fmt.Sprintf("unable to setup SvcDiscovery controller: %v", err)) } diff --git a/controllers/serviceaccount_controller.go b/controllers/serviceaccount_controller.go index ef1a8e862e..5284058b69 100644 --- a/controllers/serviceaccount_controller.go +++ b/controllers/serviceaccount_controller.go @@ -66,7 +66,7 @@ const ( ) // AddServiceAccountProviderControllerToManager adds this controller to the provided manager. -func AddServiceAccountProviderControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, options controller.Options) error { +func AddServiceAccountProviderControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, tracker *remote.ClusterCacheTracker, options controller.Options) error { var ( controlledType = &vmwarev1.ProviderServiceAccount{} controlledTypeName = reflect.TypeOf(controlledType).Elem().Name() @@ -82,8 +82,8 @@ func AddServiceAccountProviderControllerToManager(ctx *context.ControllerManager Logger: ctx.Logger.WithName(controllerNameShort), } r := ServiceAccountReconciler{ - ControllerContext: controllerContext, - remoteClientGetter: remote.NewClusterClient, + ControllerContext: controllerContext, + remoteClusterCacheTracker: tracker, } clusterToInfraFn := clusterToSupervisorInfrastructureMapFunc(ctx) @@ -134,7 +134,7 @@ func clusterToSupervisorInfrastructureMapFunc(managerContext *context.Controller type ServiceAccountReconciler struct { *context.ControllerContext - remoteClientGetter remote.ClusterClientGetter + remoteClusterCacheTracker *remote.ClusterCacheTracker } func (r ServiceAccountReconciler) Reconcile(_ goctx.Context, req reconcile.Request) (_ reconcile.Result, reterr error) { @@ -202,8 +202,12 @@ func (r ServiceAccountReconciler) Reconcile(_ goctx.Context, req reconcile.Reque // then just return a no-op and wait for the next sync. This will occur when // the Cluster's status is updated with a reference to the secret that has // the Kubeconfig data used to access the target cluster. - guestClient, err := r.remoteClientGetter(clusterContext, ProviderServiceAccountControllerName, clusterContext.Client, client.ObjectKeyFromObject(cluster)) + guestClient, err := r.remoteClusterCacheTracker.GetClient(clusterContext, client.ObjectKeyFromObject(cluster)) if err != nil { + if errors.Is(err, remote.ErrClusterLocked) { + r.Logger.V(5).Info("Requeuing because another worker has the lock on the ClusterCacheTracker") + return ctrl.Result{Requeue: true}, nil + } clusterContext.Logger.Info("The control plane is not ready yet", "err", err) return reconcile.Result{RequeueAfter: clusterNotReadyRequeueTime}, nil } diff --git a/controllers/servicediscovery_controller.go b/controllers/servicediscovery_controller.go index 972e0e9ee2..6dd88e0e56 100644 --- a/controllers/servicediscovery_controller.go +++ b/controllers/servicediscovery_controller.go @@ -72,7 +72,7 @@ const ( // +kubebuilder:rbac:groups="",resources=configmaps/status,verbs=get // AddServiceDiscoveryControllerToManager adds the ServiceDiscovery controller to the provided manager. -func AddServiceDiscoveryControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, options controller.Options) error { +func AddServiceDiscoveryControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, tracker *remote.ClusterCacheTracker, options controller.Options) error { var ( controllerNameShort = ServiceDiscoveryControllerName controllerNameLong = fmt.Sprintf("%s/%s/%s", ctx.Namespace, ctx.Name, ServiceDiscoveryControllerName) @@ -84,8 +84,8 @@ func AddServiceDiscoveryControllerToManager(ctx *context.ControllerManagerContex Logger: ctx.Logger.WithName(controllerNameShort), } r := serviceDiscoveryReconciler{ - ControllerContext: controllerContext, - remoteClientGetter: remote.NewClusterClient, + ControllerContext: controllerContext, + remoteClusterCacheTracker: tracker, } configMapCache, err := cache.New(mgr.GetConfig(), cache.Options{ @@ -128,7 +128,7 @@ func AddServiceDiscoveryControllerToManager(ctx *context.ControllerManagerContex type serviceDiscoveryReconciler struct { *context.ControllerContext - remoteClientGetter remote.ClusterClientGetter + remoteClusterCacheTracker *remote.ClusterCacheTracker } func (r serviceDiscoveryReconciler) Reconcile(_ goctx.Context, req reconcile.Request) (_ reconcile.Result, reterr error) { @@ -189,8 +189,12 @@ func (r serviceDiscoveryReconciler) Reconcile(_ goctx.Context, req reconcile.Req // We cannot proceed until we are able to access the target cluster. Until // then just return a no-op and wait for the next sync. - guestClient, err := r.remoteClientGetter(clusterContext, ServiceDiscoveryControllerName, clusterContext.Client, client.ObjectKeyFromObject(cluster)) + guestClient, err := r.remoteClusterCacheTracker.GetClient(clusterContext, client.ObjectKeyFromObject(cluster)) if err != nil { + if errors.Is(err, remote.ErrClusterLocked) { + r.Logger.V(5).Info("Requeuing because another worker has the lock on the ClusterCacheTracker") + return ctrl.Result{Requeue: true}, nil + } logger.Info("The control plane is not ready yet", "err", err) return reconcile.Result{RequeueAfter: clusterNotReadyRequeueTime}, nil } diff --git a/controllers/vspherevm_controller.go b/controllers/vspherevm_controller.go index 30f0b474a9..2f187c41c0 100644 --- a/controllers/vspherevm_controller.go +++ b/controllers/vspherevm_controller.go @@ -70,7 +70,7 @@ import ( // AddVMControllerToManager adds the VM controller to the provided manager. // //nolint:forcetypeassert -func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, options controller.Options) error { +func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, tracker *remote.ClusterCacheTracker, options controller.Options) error { var ( controlledType = &infrav1.VSphereVM{} controlledTypeName = reflect.TypeOf(controlledType).Elem().Name() @@ -88,8 +88,9 @@ func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager Logger: ctx.Logger.WithName(controllerNameShort), } r := vmReconciler{ - ControllerContext: controllerContext, - VMService: &govmomi.VMService{}, + ControllerContext: controllerContext, + VMService: &govmomi.VMService{}, + remoteClusterCacheTracker: tracker, } controller, err := ctrl.NewControllerManagedBy(mgr). // Watch the controlled, infrastructure resource. @@ -158,7 +159,8 @@ func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager type vmReconciler struct { *context.ControllerContext - VMService services.VirtualMachineService + VMService services.VirtualMachineService + remoteClusterCacheTracker *remote.ClusterCacheTracker } // Reconcile ensures the back-end state reflects the Kubernetes resource state intent. @@ -344,9 +346,14 @@ func (r vmReconciler) reconcileDelete(ctx *context.VMContext) (reconcile.Result, } // Attempt to delete the node corresponding to the vsphere VM - if err := r.deleteNode(ctx, vm.Name); err != nil { + result, err = r.deleteNode(ctx, vm.Name) + if err != nil { r.Logger.V(6).Info("unable to delete node", "err", err) } + if !result.IsZero() { + // a non-zero value means we need to requeue the request before proceed. + return result, nil + } if err := r.deleteIPAddressClaims(ctx); err != nil { return reconcile.Result{}, err @@ -362,15 +369,19 @@ func (r vmReconciler) reconcileDelete(ctx *context.VMContext) (reconcile.Result, // This is necessary since CAPI does not the nodeRef field on the owner Machine object // until the node moves to Ready state. Hence, on Machine deletion it is unable to delete // the kubernetes node corresponding to the VM. -func (r vmReconciler) deleteNode(ctx *context.VMContext, name string) error { +func (r vmReconciler) deleteNode(ctx *context.VMContext, name string) (reconcile.Result, error) { // Fetching the cluster object from the VSphereVM object to create a remote client to the cluster cluster, err := clusterutilv1.GetClusterFromMetadata(r.ControllerContext, r.Client, ctx.VSphereVM.ObjectMeta) if err != nil { - return err + return ctrl.Result{}, err } - clusterClient, err := remote.NewClusterClient(ctx, r.ControllerContext.Name, r.Client, ctrlclient.ObjectKeyFromObject(cluster)) + clusterClient, err := r.remoteClusterCacheTracker.GetClient(ctx, ctrlclient.ObjectKeyFromObject(cluster)) if err != nil { - return err + if errors.Is(err, remote.ErrClusterLocked) { + r.Logger.V(5).Info("Requeuing because another worker has the lock on the ClusterCacheTracker") + return ctrl.Result{Requeue: true}, nil + } + return ctrl.Result{}, err } // Attempt to delete the corresponding node @@ -379,7 +390,7 @@ func (r vmReconciler) deleteNode(ctx *context.VMContext, name string) error { Name: name, }, } - return clusterClient.Delete(ctx, node) + return ctrl.Result{}, clusterClient.Delete(ctx, node) } func (r vmReconciler) reconcileNormal(ctx *context.VMContext) (reconcile.Result, error) { diff --git a/controllers/vspherevm_controller_test.go b/controllers/vspherevm_controller_test.go index 01bebbfa5f..44d6ffe777 100644 --- a/controllers/vspherevm_controller_test.go +++ b/controllers/vspherevm_controller_test.go @@ -30,11 +30,13 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" apirecord "k8s.io/client-go/tools/record" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + "sigs.k8s.io/cluster-api/controllers/remote" ipamv1 "sigs.k8s.io/cluster-api/exp/ipam/api/v1alpha1" "sigs.k8s.io/cluster-api/util" "sigs.k8s.io/cluster-api/util/conditions" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -73,6 +75,36 @@ func TestReconcileNormal_WaitingForIPAddrAllocation(t *testing.T) { } defer simr.Destroy() + secretCachingClient, err := client.New(testEnv.Manager.GetConfig(), client.Options{ + HTTPClient: testEnv.Manager.GetHTTPClient(), + Cache: &client.CacheOptions{ + Reader: testEnv.Manager.GetCache(), + }, + }) + if err != nil { + panic("unable to create secret caching client") + } + + tracker, err := remote.NewClusterCacheTracker( + testEnv.Manager, + remote.ClusterCacheTrackerOptions{ + SecretCachingClient: secretCachingClient, + ControllerName: "testvspherevm-manager", + }, + ) + if err != nil { + t.Fatalf("unable to setup ClusterCacheTracker: %v", err) + } + + controllerOpts := controller.Options{MaxConcurrentReconciles: 10} + + if err := (&remote.ClusterCacheReconciler{ + Client: testEnv.Manager.GetClient(), + Tracker: tracker, + }).SetupWithManager(ctx, testEnv.Manager, controllerOpts); err != nil { + panic(fmt.Sprintf("unable to create ClusterCacheReconciler controller: %v", err)) + } + create := func(netSpec infrav1.NetworkSpec) func() { return func() { vsphereCluster = &infrav1.VSphereCluster{ @@ -177,8 +209,9 @@ func TestReconcileNormal_WaitingForIPAddrAllocation(t *testing.T) { Logger: log.Log, } return vmReconciler{ - ControllerContext: controllerContext, - VMService: vmService, + ControllerContext: controllerContext, + VMService: vmService, + remoteClusterCacheTracker: tracker, } } diff --git a/main.go b/main.go index fcbab4c091..5f342d2429 100644 --- a/main.go +++ b/main.go @@ -25,6 +25,7 @@ import ( goruntime "runtime" "time" + perrors "github.com/pkg/errors" "github.com/spf13/pflag" "gopkg.in/fsnotify.v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -39,6 +40,7 @@ import ( "sigs.k8s.io/cluster-api/controllers/remote" "sigs.k8s.io/cluster-api/util/flags" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" ctrlmgr "sigs.k8s.io/controller-runtime/pkg/manager" ctrlsig "sigs.k8s.io/controller-runtime/pkg/manager/signals" @@ -79,6 +81,7 @@ var ( vSphereVMConcurrency int vSphereClusterIdentityConcurrency int vSphereDeploymentZoneConcurrency int + clusterCacheTrackerConcurrency int tlsOptions = flags.TLSOptions{} @@ -122,6 +125,9 @@ func InitFlags(fs *pflag.FlagSet) { fs.IntVar(&vSphereDeploymentZoneConcurrency, "vspheredeploymentzone-concurrency", 10, "Number of vSphere deployment zones to process simultaneously") + fs.IntVar(&clusterCacheTrackerConcurrency, "clustercachetracker-concurrency", 10, + "Number of clusters to process simultaneously") + fs.StringVar( &managerOpts.PodName, "pod-name", @@ -250,6 +256,11 @@ func main() { // Create a function that adds all the controllers and webhooks to the manager. addToManager := func(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager) error { + tracker, err := setupRemoteClusterCacheTracker(ctx, mgr) + if err != nil { + return perrors.Wrapf(err, "unable to create remote cluster tracker tracker") + } + // Check for non-supervisor VSphereCluster and start controller if found gvr := v1beta1.GroupVersion.WithResource(reflect.TypeOf(&v1beta1.VSphereCluster{}).Elem().Name()) isLoaded, err := isCRDDeployed(mgr, gvr) @@ -257,7 +268,7 @@ func main() { return err } if isLoaded { - if err := setupVAPIControllers(ctx, mgr); err != nil { + if err := setupVAPIControllers(ctx, mgr, tracker); err != nil { return fmt.Errorf("setupVAPIControllers: %w", err) } } else { @@ -271,7 +282,7 @@ func main() { return err } if isLoaded { - if err := setupSupervisorControllers(ctx, mgr); err != nil { + if err := setupSupervisorControllers(ctx, mgr, tracker); err != nil { return fmt.Errorf("setupSupervisorControllers: %w", err) } } else { @@ -318,7 +329,7 @@ func main() { defer session.Clear() } -func setupVAPIControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager) error { +func setupVAPIControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager, tracker *remote.ClusterCacheTracker) error { if err := (&v1beta1.VSphereClusterTemplate{}).SetupWebhookWithManager(mgr); err != nil { return err } @@ -349,7 +360,7 @@ func setupVAPIControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Man if err := controllers.AddMachineControllerToManager(ctx, mgr, &v1beta1.VSphereMachine{}, concurrency(vSphereMachineConcurrency)); err != nil { return err } - if err := controllers.AddVMControllerToManager(ctx, mgr, concurrency(vSphereVMConcurrency)); err != nil { + if err := controllers.AddVMControllerToManager(ctx, mgr, tracker, concurrency(vSphereVMConcurrency)); err != nil { return err } if err := controllers.AddVsphereClusterIdentityControllerToManager(ctx, mgr, concurrency(vSphereClusterIdentityConcurrency)); err != nil { @@ -359,7 +370,7 @@ func setupVAPIControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Man return controllers.AddVSphereDeploymentZoneControllerToManager(ctx, mgr, concurrency(vSphereDeploymentZoneConcurrency)) } -func setupSupervisorControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager) error { +func setupSupervisorControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager, tracker *remote.ClusterCacheTracker) error { if err := controllers.AddClusterControllerToManager(ctx, mgr, &vmwarev1b1.VSphereCluster{}, concurrency(vSphereClusterConcurrency)); err != nil { return err } @@ -368,11 +379,11 @@ func setupSupervisorControllers(ctx *context.ControllerManagerContext, mgr ctrlm return err } - if err := controllers.AddServiceAccountProviderControllerToManager(ctx, mgr, concurrency(providerServiceAccountConcurrency)); err != nil { + if err := controllers.AddServiceAccountProviderControllerToManager(ctx, mgr, tracker, concurrency(providerServiceAccountConcurrency)); err != nil { return err } - return controllers.AddServiceDiscoveryControllerToManager(ctx, mgr, concurrency(serviceDiscoveryConcurrency)) + return controllers.AddServiceDiscoveryControllerToManager(ctx, mgr, tracker, concurrency(serviceDiscoveryConcurrency)) } func setupChecks(mgr ctrlmgr.Manager) { @@ -409,3 +420,40 @@ func isCRDDeployed(mgr ctrlmgr.Manager, gvr schema.GroupVersionResource) (bool, func concurrency(c int) controller.Options { return controller.Options{MaxConcurrentReconciles: c} } + +func setupRemoteClusterCacheTracker(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager) (*remote.ClusterCacheTracker, error) { + secretCachingClient, err := client.New(mgr.GetConfig(), client.Options{ + HTTPClient: mgr.GetHTTPClient(), + Cache: &client.CacheOptions{ + Reader: mgr.GetCache(), + }, + }) + if err != nil { + return nil, perrors.Wrapf(err, "unable to create secret caching client") + } + + // Set up a ClusterCacheTracker and ClusterCacheReconciler to provide to controllers + // requiring a connection to a remote cluster + log := ctrl.Log.WithName("remote").WithName("ClusterCacheTracker") + tracker, err := remote.NewClusterCacheTracker( + mgr, + remote.ClusterCacheTrackerOptions{ + SecretCachingClient: secretCachingClient, + ControllerName: controllerName, + Log: &log, + }, + ) + if err != nil { + return nil, perrors.Wrapf(err, "unable to create cluster cache tracker") + } + + if err := (&remote.ClusterCacheReconciler{ + Client: mgr.GetClient(), + Tracker: tracker, + WatchFilterValue: managerOpts.WatchFilterValue, + }).SetupWithManager(ctx, mgr, concurrency(clusterCacheTrackerConcurrency)); err != nil { + return nil, perrors.Wrapf(err, "unable to create ClusterCacheReconciler controller") + } + + return tracker, nil +}