From 24e89e8f15d2574ddb8d6d2d0836888d69712e78 Mon Sep 17 00:00:00 2001 From: Stefan Bueringer Date: Tue, 16 Jul 2024 17:30:10 +0200 Subject: [PATCH] Add QPS & burst options & flags for ClusterCacheTracker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stefan Büringer buringerst@vmware.com --- bootstrap/kubeadm/main.go | 52 +++++++++++-------- controllers/remote/cluster_cache_tracker.go | 29 ++++++++++- controlplane/kubeadm/main.go | 52 +++++++++++-------- .../providers/migrations/v1.7-to-v1.8.md | 2 + main.go | 52 +++++++++++-------- test/extension/main.go | 4 +- test/infrastructure/docker/main.go | 52 +++++++++++-------- test/infrastructure/inmemory/main.go | 4 +- 8 files changed, 157 insertions(+), 90 deletions(-) diff --git a/bootstrap/kubeadm/main.go b/bootstrap/kubeadm/main.go index 2306dee40eb2..2caf4823761a 100644 --- a/bootstrap/kubeadm/main.go +++ b/bootstrap/kubeadm/main.go @@ -62,25 +62,27 @@ var ( controllerName = "cluster-api-kubeadm-bootstrap-manager" // flags. - enableLeaderElection bool - leaderElectionLeaseDuration time.Duration - leaderElectionRenewDeadline time.Duration - leaderElectionRetryPeriod time.Duration - watchFilterValue string - watchNamespace string - profilerAddress string - enableContentionProfiling bool - syncPeriod time.Duration - restConfigQPS float32 - restConfigBurst int - webhookPort int - webhookCertDir string - webhookCertName string - webhookKeyName string - healthAddr string - tlsOptions = flags.TLSOptions{} - diagnosticsOptions = flags.DiagnosticsOptions{} - logOptions = logs.NewOptions() + enableLeaderElection bool + leaderElectionLeaseDuration time.Duration + leaderElectionRenewDeadline time.Duration + leaderElectionRetryPeriod time.Duration + watchFilterValue string + watchNamespace string + profilerAddress string + enableContentionProfiling bool + syncPeriod time.Duration + restConfigQPS float32 + restConfigBurst int + clusterCacheTrackerClientQPS float32 + clusterCacheTrackerClientBurst int + webhookPort int + webhookCertDir string + webhookCertName string + webhookKeyName string + healthAddr string + tlsOptions = flags.TLSOptions{} + diagnosticsOptions = flags.DiagnosticsOptions{} + logOptions = logs.NewOptions() // CABPK specific flags. clusterConcurrency int clusterCacheTrackerConcurrency int @@ -139,10 +141,16 @@ func InitFlags(fs *pflag.FlagSet) { "The minimum interval at which watched resources are reconciled (e.g. 15m)") fs.Float32Var(&restConfigQPS, "kube-api-qps", 20, - "Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20") + "Maximum queries per second from the controller client to the Kubernetes API server.") fs.IntVar(&restConfigBurst, "kube-api-burst", 30, - "Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30") + "Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.") + + fs.Float32Var(&clusterCacheTrackerClientQPS, "clustercachetracker-client-qps", 20, + "Maximum queries per second from the controller client to the Kubernetes API server of workload clusters.") + + fs.IntVar(&clusterCacheTrackerClientBurst, "clustercachetracker-client-burst", 30, + "Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server of workload clusters.") fs.DurationVar(&tokenTTL, "bootstrap-token-ttl", kubeadmbootstrapcontrollers.DefaultTokenTTL, "The amount of time the bootstrap token will be valid") @@ -314,6 +322,8 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) { SecretCachingClient: secretCachingClient, ControllerName: controllerName, Log: &ctrl.Log, + ClientQPS: clusterCacheTrackerClientQPS, + ClientBurst: clusterCacheTrackerClientBurst, }, ) if err != nil { diff --git a/controllers/remote/cluster_cache_tracker.go b/controllers/remote/cluster_cache_tracker.go index 22a46b2b6260..b0e85ebf9a53 100644 --- a/controllers/remote/cluster_cache_tracker.go +++ b/controllers/remote/cluster_cache_tracker.go @@ -67,8 +67,11 @@ var ErrClusterLocked = errors.New("cluster is locked already") // ClusterCacheTracker manages client caches for workload clusters. type ClusterCacheTracker struct { - log logr.Logger + log logr.Logger + clientUncachedObjects []client.Object + clientQPS float32 + clientBurst int client client.Client @@ -116,7 +119,18 @@ type ClusterCacheTrackerOptions struct { // it'll instead query the API server directly. // Defaults to never caching ConfigMap and Secret if not set. ClientUncachedObjects []client.Object - Indexes []Index + + // ClientQPS is the maximum queries per second from the controller client + // to the Kubernetes API server of workload clusters. + // Defaults to 20. + ClientQPS float32 + + // ClientBurst is the maximum number of queries that should be allowed in + // one burst from the controller client to the Kubernetes API server of workload clusters. + // Default 30. + ClientBurst int + + Indexes []Index // ControllerName is the name of the controller. // This is used to calculate the user agent string. @@ -139,6 +153,13 @@ func setDefaultOptions(opts *ClusterCacheTrackerOptions) { &corev1.Secret{}, } } + + if opts.ClientQPS == 0 { + opts.ClientQPS = 20 + } + if opts.ClientBurst == 0 { + opts.ClientBurst = 30 + } } // NewClusterCacheTracker creates a new ClusterCacheTracker. @@ -170,6 +191,8 @@ func NewClusterCacheTracker(manager ctrl.Manager, options ClusterCacheTrackerOpt controllerPodMetadata: controllerPodMetadata, log: *options.Log, clientUncachedObjects: options.ClientUncachedObjects, + clientQPS: options.ClientQPS, + clientBurst: options.ClientBurst, client: manager.GetClient(), secretCachingClient: options.SecretCachingClient, scheme: manager.GetScheme(), @@ -303,6 +326,8 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl if err != nil { return nil, errors.Wrapf(err, "error fetching REST client config for remote cluster %q", cluster.String()) } + config.QPS = t.clientQPS + config.Burst = t.clientBurst // Create a http client and a mapper for the cluster. httpClient, mapper, restClient, err := t.createHTTPClientAndMapper(ctx, config, cluster) diff --git a/controlplane/kubeadm/main.go b/controlplane/kubeadm/main.go index d24eadeb2ddd..0d2e44c4a9c5 100644 --- a/controlplane/kubeadm/main.go +++ b/controlplane/kubeadm/main.go @@ -66,25 +66,27 @@ var ( controllerName = "cluster-api-kubeadm-control-plane-manager" // flags. - enableLeaderElection bool - leaderElectionLeaseDuration time.Duration - leaderElectionRenewDeadline time.Duration - leaderElectionRetryPeriod time.Duration - watchFilterValue string - watchNamespace string - profilerAddress string - enableContentionProfiling bool - syncPeriod time.Duration - restConfigQPS float32 - restConfigBurst int - webhookPort int - webhookCertDir string - webhookCertName string - webhookKeyName string - healthAddr string - tlsOptions = flags.TLSOptions{} - diagnosticsOptions = flags.DiagnosticsOptions{} - logOptions = logs.NewOptions() + enableLeaderElection bool + leaderElectionLeaseDuration time.Duration + leaderElectionRenewDeadline time.Duration + leaderElectionRetryPeriod time.Duration + watchFilterValue string + watchNamespace string + profilerAddress string + enableContentionProfiling bool + syncPeriod time.Duration + restConfigQPS float32 + restConfigBurst int + clusterCacheTrackerClientQPS float32 + clusterCacheTrackerClientBurst int + webhookPort int + webhookCertDir string + webhookCertName string + webhookKeyName string + healthAddr string + tlsOptions = flags.TLSOptions{} + diagnosticsOptions = flags.DiagnosticsOptions{} + logOptions = logs.NewOptions() // KCP specific flags. kubeadmControlPlaneConcurrency int clusterCacheTrackerConcurrency int @@ -142,10 +144,16 @@ func InitFlags(fs *pflag.FlagSet) { "The minimum interval at which watched resources are reconciled (e.g. 15m)") fs.Float32Var(&restConfigQPS, "kube-api-qps", 20, - "Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20") + "Maximum queries per second from the controller client to the Kubernetes API server.") fs.IntVar(&restConfigBurst, "kube-api-burst", 30, - "Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30") + "Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.") + + fs.Float32Var(&clusterCacheTrackerClientQPS, "clustercachetracker-client-qps", 20, + "Maximum queries per second from the controller client to the Kubernetes API server of workload clusters.") + + fs.IntVar(&clusterCacheTrackerClientBurst, "clustercachetracker-client-burst", 30, + "Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server of workload clusters.") fs.IntVar(&webhookPort, "webhook-port", 9443, "Webhook Server port") @@ -332,6 +340,8 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) { &appsv1.Deployment{}, &appsv1.DaemonSet{}, }, + ClientQPS: clusterCacheTrackerClientQPS, + ClientBurst: clusterCacheTrackerClientBurst, }) if err != nil { setupLog.Error(err, "unable to create cluster cache tracker") diff --git a/docs/book/src/developer/providers/migrations/v1.7-to-v1.8.md b/docs/book/src/developer/providers/migrations/v1.7-to-v1.8.md index ff4039c0af54..f5ccea55795c 100644 --- a/docs/book/src/developer/providers/migrations/v1.7-to-v1.8.md +++ b/docs/book/src/developer/providers/migrations/v1.7-to-v1.8.md @@ -27,3 +27,5 @@ maintainers of providers and consumers of our Go API. - It's highly recommended to move to a new setup-envtest version that uses envtest binaries from controller-tools releases instead of the deprecated GCS bucket. More details can be found in [#10569](https://github.com/kubernetes-sigs/cluster-api/pull/10569) and [kubernetes-sigs/controller-runtime#2811](https://github.com/kubernetes-sigs/controller-runtime/pull/2811). +- `remote.NewClusterCacheTracker` now has options to configure QPS & Burst. It's highly recommended to implement corresponding flags + the same way as core Cluster API (see PR: https://github.com/kubernetes-sigs/cluster-api/pull/10880). diff --git a/main.go b/main.go index ba03d35be1f9..c9ad7f3636bd 100644 --- a/main.go +++ b/main.go @@ -84,25 +84,27 @@ var ( controllerName = "cluster-api-controller-manager" // flags. - enableLeaderElection bool - leaderElectionLeaseDuration time.Duration - leaderElectionRenewDeadline time.Duration - leaderElectionRetryPeriod time.Duration - watchFilterValue string - watchNamespace string - profilerAddress string - enableContentionProfiling bool - syncPeriod time.Duration - restConfigQPS float32 - restConfigBurst int - webhookPort int - webhookCertDir string - webhookCertName string - webhookKeyName string - healthAddr string - tlsOptions = flags.TLSOptions{} - diagnosticsOptions = flags.DiagnosticsOptions{} - logOptions = logs.NewOptions() + enableLeaderElection bool + leaderElectionLeaseDuration time.Duration + leaderElectionRenewDeadline time.Duration + leaderElectionRetryPeriod time.Duration + watchFilterValue string + watchNamespace string + profilerAddress string + enableContentionProfiling bool + syncPeriod time.Duration + restConfigQPS float32 + restConfigBurst int + clusterCacheTrackerClientQPS float32 + clusterCacheTrackerClientBurst int + webhookPort int + webhookCertDir string + webhookCertName string + webhookKeyName string + healthAddr string + tlsOptions = flags.TLSOptions{} + diagnosticsOptions = flags.DiagnosticsOptions{} + logOptions = logs.NewOptions() // core Cluster API specific flags. clusterTopologyConcurrency int clusterCacheTrackerConcurrency int @@ -208,10 +210,16 @@ func InitFlags(fs *pflag.FlagSet) { "The minimum interval at which watched resources are reconciled (e.g. 15m)") fs.Float32Var(&restConfigQPS, "kube-api-qps", 20, - "Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20") + "Maximum queries per second from the controller client to the Kubernetes API server.") fs.IntVar(&restConfigBurst, "kube-api-burst", 30, - "Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30") + "Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.") + + fs.Float32Var(&clusterCacheTrackerClientQPS, "clustercachetracker-client-qps", 20, + "Maximum queries per second from the controller client to the Kubernetes API server of workload clusters.") + + fs.IntVar(&clusterCacheTrackerClientBurst, "clustercachetracker-client-burst", 30, + "Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server of workload clusters.") fs.DurationVar(&nodeDrainClientTimeout, "node-drain-client-timeout-duration", time.Second*10, "The timeout of the client used for draining nodes. Defaults to 10s") @@ -411,6 +419,8 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager, watchNamespaces map ControllerName: controllerName, Log: &ctrl.Log, Indexes: []remote.Index{remote.NodeProviderIDIndex}, + ClientQPS: clusterCacheTrackerClientQPS, + ClientBurst: clusterCacheTrackerClientBurst, }, ) if err != nil { diff --git a/test/extension/main.go b/test/extension/main.go index 17ee5f2d6e28..0cd1f263a607 100644 --- a/test/extension/main.go +++ b/test/extension/main.go @@ -133,10 +133,10 @@ func InitFlags(fs *pflag.FlagSet) { "The minimum interval at which watched resources are reconciled (e.g. 15m)") fs.Float32Var(&restConfigQPS, "kube-api-qps", 20, - "Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20") + "Maximum queries per second from the controller client to the Kubernetes API server.") fs.IntVar(&restConfigBurst, "kube-api-burst", 30, - "Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30") + "Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.") fs.IntVar(&webhookPort, "webhook-port", 9443, "Webhook Server port") diff --git a/test/infrastructure/docker/main.go b/test/infrastructure/docker/main.go index c7f385997164..7b5a86ced84e 100644 --- a/test/infrastructure/docker/main.go +++ b/test/infrastructure/docker/main.go @@ -69,25 +69,27 @@ var ( controllerName = "cluster-api-docker-controller-manager" // flags. - enableLeaderElection bool - leaderElectionLeaseDuration time.Duration - leaderElectionRenewDeadline time.Duration - leaderElectionRetryPeriod time.Duration - watchFilterValue string - watchNamespace string - profilerAddress string - enableContentionProfiling bool - syncPeriod time.Duration - restConfigQPS float32 - restConfigBurst int - webhookPort int - webhookCertDir string - webhookCertName string - webhookKeyName string - healthAddr string - tlsOptions = flags.TLSOptions{} - diagnosticsOptions = flags.DiagnosticsOptions{} - logOptions = logs.NewOptions() + enableLeaderElection bool + leaderElectionLeaseDuration time.Duration + leaderElectionRenewDeadline time.Duration + leaderElectionRetryPeriod time.Duration + watchFilterValue string + watchNamespace string + profilerAddress string + enableContentionProfiling bool + syncPeriod time.Duration + restConfigQPS float32 + restConfigBurst int + clusterCacheTrackerClientQPS float32 + clusterCacheTrackerClientBurst int + webhookPort int + webhookCertDir string + webhookCertName string + webhookKeyName string + healthAddr string + tlsOptions = flags.TLSOptions{} + diagnosticsOptions = flags.DiagnosticsOptions{} + logOptions = logs.NewOptions() // CAPD specific flags. concurrency int clusterCacheTrackerConcurrency int @@ -143,10 +145,16 @@ func InitFlags(fs *pflag.FlagSet) { "The minimum interval at which watched resources are reconciled (e.g. 15m)") fs.Float32Var(&restConfigQPS, "kube-api-qps", 20, - "Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20") + "Maximum queries per second from the controller client to the Kubernetes API server.") fs.IntVar(&restConfigBurst, "kube-api-burst", 30, - "Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30") + "Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.") + + fs.Float32Var(&clusterCacheTrackerClientQPS, "clustercachetracker-client-qps", 20, + "Maximum queries per second from the controller client to the Kubernetes API server of workload clusters.") + + fs.IntVar(&clusterCacheTrackerClientBurst, "clustercachetracker-client-burst", 30, + "Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server of workload clusters.") fs.IntVar(&webhookPort, "webhook-port", 9443, "Webhook Server port") @@ -325,6 +333,8 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) { SecretCachingClient: secretCachingClient, ControllerName: controllerName, Log: &ctrl.Log, + ClientQPS: clusterCacheTrackerClientQPS, + ClientBurst: clusterCacheTrackerClientBurst, }, ) if err != nil { diff --git a/test/infrastructure/inmemory/main.go b/test/infrastructure/inmemory/main.go index 9edfa4b07979..a1d08368677c 100644 --- a/test/infrastructure/inmemory/main.go +++ b/test/infrastructure/inmemory/main.go @@ -138,10 +138,10 @@ func InitFlags(fs *pflag.FlagSet) { "The minimum interval at which watched resources are reconciled (e.g. 15m)") fs.Float32Var(&restConfigQPS, "kube-api-qps", 20, - "Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20") + "Maximum queries per second from the controller client to the Kubernetes API server.") fs.IntVar(&restConfigBurst, "kube-api-burst", 30, - "Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30") + "Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.") fs.IntVar(&webhookPort, "webhook-port", 9443, "Webhook Server port")