Skip to content

Commit

Permalink
Add QPS & burst options & flags for ClusterCacheTracker
Browse files Browse the repository at this point in the history
Signed-off-by: Stefan Büringer buringerst@vmware.com
  • Loading branch information
sbueringer committed Jul 16, 2024
1 parent 8aa545a commit 24e89e8
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 90 deletions.
52 changes: 31 additions & 21 deletions bootstrap/kubeadm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,27 @@ var (
controllerName = "cluster-api-kubeadm-bootstrap-manager"

// flags.
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
webhookPort int
webhookCertDir string
webhookCertName string
webhookKeyName string
healthAddr string
tlsOptions = flags.TLSOptions{}
diagnosticsOptions = flags.DiagnosticsOptions{}
logOptions = logs.NewOptions()
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
clusterCacheTrackerClientQPS float32
clusterCacheTrackerClientBurst int
webhookPort int
webhookCertDir string
webhookCertName string
webhookKeyName string
healthAddr string
tlsOptions = flags.TLSOptions{}
diagnosticsOptions = flags.DiagnosticsOptions{}
logOptions = logs.NewOptions()
// CABPK specific flags.
clusterConcurrency int
clusterCacheTrackerConcurrency int
Expand Down Expand Up @@ -139,10 +141,16 @@ func InitFlags(fs *pflag.FlagSet) {
"The minimum interval at which watched resources are reconciled (e.g. 15m)")

fs.Float32Var(&restConfigQPS, "kube-api-qps", 20,
"Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20")
"Maximum queries per second from the controller client to the Kubernetes API server.")

fs.IntVar(&restConfigBurst, "kube-api-burst", 30,
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30")
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.")

fs.Float32Var(&clusterCacheTrackerClientQPS, "clustercachetracker-client-qps", 20,
"Maximum queries per second from the controller client to the Kubernetes API server of workload clusters.")

fs.IntVar(&clusterCacheTrackerClientBurst, "clustercachetracker-client-burst", 30,
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server of workload clusters.")

fs.DurationVar(&tokenTTL, "bootstrap-token-ttl", kubeadmbootstrapcontrollers.DefaultTokenTTL,
"The amount of time the bootstrap token will be valid")
Expand Down Expand Up @@ -314,6 +322,8 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
SecretCachingClient: secretCachingClient,
ControllerName: controllerName,
Log: &ctrl.Log,
ClientQPS: clusterCacheTrackerClientQPS,
ClientBurst: clusterCacheTrackerClientBurst,
},
)
if err != nil {
Expand Down
29 changes: 27 additions & 2 deletions controllers/remote/cluster_cache_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ var ErrClusterLocked = errors.New("cluster is locked already")

// ClusterCacheTracker manages client caches for workload clusters.
type ClusterCacheTracker struct {
log logr.Logger
log logr.Logger

clientUncachedObjects []client.Object
clientQPS float32
clientBurst int

client client.Client

Expand Down Expand Up @@ -116,7 +119,18 @@ type ClusterCacheTrackerOptions struct {
// it'll instead query the API server directly.
// Defaults to never caching ConfigMap and Secret if not set.
ClientUncachedObjects []client.Object
Indexes []Index

// ClientQPS is the maximum queries per second from the controller client
// to the Kubernetes API server of workload clusters.
// Defaults to 20.
ClientQPS float32

// ClientBurst is the maximum number of queries that should be allowed in
// one burst from the controller client to the Kubernetes API server of workload clusters.
// Default 30.
ClientBurst int

Indexes []Index

// ControllerName is the name of the controller.
// This is used to calculate the user agent string.
Expand All @@ -139,6 +153,13 @@ func setDefaultOptions(opts *ClusterCacheTrackerOptions) {
&corev1.Secret{},
}
}

if opts.ClientQPS == 0 {
opts.ClientQPS = 20
}
if opts.ClientBurst == 0 {
opts.ClientBurst = 30
}
}

// NewClusterCacheTracker creates a new ClusterCacheTracker.
Expand Down Expand Up @@ -170,6 +191,8 @@ func NewClusterCacheTracker(manager ctrl.Manager, options ClusterCacheTrackerOpt
controllerPodMetadata: controllerPodMetadata,
log: *options.Log,
clientUncachedObjects: options.ClientUncachedObjects,
clientQPS: options.ClientQPS,
clientBurst: options.ClientBurst,
client: manager.GetClient(),
secretCachingClient: options.SecretCachingClient,
scheme: manager.GetScheme(),
Expand Down Expand Up @@ -303,6 +326,8 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
if err != nil {
return nil, errors.Wrapf(err, "error fetching REST client config for remote cluster %q", cluster.String())
}
config.QPS = t.clientQPS
config.Burst = t.clientBurst

// Create a http client and a mapper for the cluster.
httpClient, mapper, restClient, err := t.createHTTPClientAndMapper(ctx, config, cluster)
Expand Down
52 changes: 31 additions & 21 deletions controlplane/kubeadm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,25 +66,27 @@ var (
controllerName = "cluster-api-kubeadm-control-plane-manager"

// flags.
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
webhookPort int
webhookCertDir string
webhookCertName string
webhookKeyName string
healthAddr string
tlsOptions = flags.TLSOptions{}
diagnosticsOptions = flags.DiagnosticsOptions{}
logOptions = logs.NewOptions()
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
clusterCacheTrackerClientQPS float32
clusterCacheTrackerClientBurst int
webhookPort int
webhookCertDir string
webhookCertName string
webhookKeyName string
healthAddr string
tlsOptions = flags.TLSOptions{}
diagnosticsOptions = flags.DiagnosticsOptions{}
logOptions = logs.NewOptions()
// KCP specific flags.
kubeadmControlPlaneConcurrency int
clusterCacheTrackerConcurrency int
Expand Down Expand Up @@ -142,10 +144,16 @@ func InitFlags(fs *pflag.FlagSet) {
"The minimum interval at which watched resources are reconciled (e.g. 15m)")

fs.Float32Var(&restConfigQPS, "kube-api-qps", 20,
"Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20")
"Maximum queries per second from the controller client to the Kubernetes API server.")

fs.IntVar(&restConfigBurst, "kube-api-burst", 30,
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30")
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.")

fs.Float32Var(&clusterCacheTrackerClientQPS, "clustercachetracker-client-qps", 20,
"Maximum queries per second from the controller client to the Kubernetes API server of workload clusters.")

fs.IntVar(&clusterCacheTrackerClientBurst, "clustercachetracker-client-burst", 30,
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server of workload clusters.")

fs.IntVar(&webhookPort, "webhook-port", 9443,
"Webhook Server port")
Expand Down Expand Up @@ -332,6 +340,8 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
&appsv1.Deployment{},
&appsv1.DaemonSet{},
},
ClientQPS: clusterCacheTrackerClientQPS,
ClientBurst: clusterCacheTrackerClientBurst,
})
if err != nil {
setupLog.Error(err, "unable to create cluster cache tracker")
Expand Down
2 changes: 2 additions & 0 deletions docs/book/src/developer/providers/migrations/v1.7-to-v1.8.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ maintainers of providers and consumers of our Go API.
- It's highly recommended to move to a new setup-envtest version that uses envtest binaries from controller-tools releases
instead of the deprecated GCS bucket. More details can be found in [#10569](https://github.com/kubernetes-sigs/cluster-api/pull/10569)
and [kubernetes-sigs/controller-runtime#2811](https://github.com/kubernetes-sigs/controller-runtime/pull/2811).
- `remote.NewClusterCacheTracker` now has options to configure QPS & Burst. It's highly recommended to implement corresponding flags
the same way as core Cluster API (see PR: https://github.com/kubernetes-sigs/cluster-api/pull/10880).
52 changes: 31 additions & 21 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,25 +84,27 @@ var (
controllerName = "cluster-api-controller-manager"

// flags.
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
webhookPort int
webhookCertDir string
webhookCertName string
webhookKeyName string
healthAddr string
tlsOptions = flags.TLSOptions{}
diagnosticsOptions = flags.DiagnosticsOptions{}
logOptions = logs.NewOptions()
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
clusterCacheTrackerClientQPS float32
clusterCacheTrackerClientBurst int
webhookPort int
webhookCertDir string
webhookCertName string
webhookKeyName string
healthAddr string
tlsOptions = flags.TLSOptions{}
diagnosticsOptions = flags.DiagnosticsOptions{}
logOptions = logs.NewOptions()
// core Cluster API specific flags.
clusterTopologyConcurrency int
clusterCacheTrackerConcurrency int
Expand Down Expand Up @@ -208,10 +210,16 @@ func InitFlags(fs *pflag.FlagSet) {
"The minimum interval at which watched resources are reconciled (e.g. 15m)")

fs.Float32Var(&restConfigQPS, "kube-api-qps", 20,
"Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20")
"Maximum queries per second from the controller client to the Kubernetes API server.")

fs.IntVar(&restConfigBurst, "kube-api-burst", 30,
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30")
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.")

fs.Float32Var(&clusterCacheTrackerClientQPS, "clustercachetracker-client-qps", 20,
"Maximum queries per second from the controller client to the Kubernetes API server of workload clusters.")

fs.IntVar(&clusterCacheTrackerClientBurst, "clustercachetracker-client-burst", 30,
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server of workload clusters.")

fs.DurationVar(&nodeDrainClientTimeout, "node-drain-client-timeout-duration", time.Second*10,
"The timeout of the client used for draining nodes. Defaults to 10s")
Expand Down Expand Up @@ -411,6 +419,8 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager, watchNamespaces map
ControllerName: controllerName,
Log: &ctrl.Log,
Indexes: []remote.Index{remote.NodeProviderIDIndex},
ClientQPS: clusterCacheTrackerClientQPS,
ClientBurst: clusterCacheTrackerClientBurst,
},
)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions test/extension/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ func InitFlags(fs *pflag.FlagSet) {
"The minimum interval at which watched resources are reconciled (e.g. 15m)")

fs.Float32Var(&restConfigQPS, "kube-api-qps", 20,
"Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20")
"Maximum queries per second from the controller client to the Kubernetes API server.")

fs.IntVar(&restConfigBurst, "kube-api-burst", 30,
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30")
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.")

fs.IntVar(&webhookPort, "webhook-port", 9443,
"Webhook Server port")
Expand Down
52 changes: 31 additions & 21 deletions test/infrastructure/docker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,25 +69,27 @@ var (
controllerName = "cluster-api-docker-controller-manager"

// flags.
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
webhookPort int
webhookCertDir string
webhookCertName string
webhookKeyName string
healthAddr string
tlsOptions = flags.TLSOptions{}
diagnosticsOptions = flags.DiagnosticsOptions{}
logOptions = logs.NewOptions()
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
clusterCacheTrackerClientQPS float32
clusterCacheTrackerClientBurst int
webhookPort int
webhookCertDir string
webhookCertName string
webhookKeyName string
healthAddr string
tlsOptions = flags.TLSOptions{}
diagnosticsOptions = flags.DiagnosticsOptions{}
logOptions = logs.NewOptions()
// CAPD specific flags.
concurrency int
clusterCacheTrackerConcurrency int
Expand Down Expand Up @@ -143,10 +145,16 @@ func InitFlags(fs *pflag.FlagSet) {
"The minimum interval at which watched resources are reconciled (e.g. 15m)")

fs.Float32Var(&restConfigQPS, "kube-api-qps", 20,
"Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20")
"Maximum queries per second from the controller client to the Kubernetes API server.")

fs.IntVar(&restConfigBurst, "kube-api-burst", 30,
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30")
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.")

fs.Float32Var(&clusterCacheTrackerClientQPS, "clustercachetracker-client-qps", 20,
"Maximum queries per second from the controller client to the Kubernetes API server of workload clusters.")

fs.IntVar(&clusterCacheTrackerClientBurst, "clustercachetracker-client-burst", 30,
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server of workload clusters.")

fs.IntVar(&webhookPort, "webhook-port", 9443,
"Webhook Server port")
Expand Down Expand Up @@ -325,6 +333,8 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
SecretCachingClient: secretCachingClient,
ControllerName: controllerName,
Log: &ctrl.Log,
ClientQPS: clusterCacheTrackerClientQPS,
ClientBurst: clusterCacheTrackerClientBurst,
},
)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions test/infrastructure/inmemory/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,10 @@ func InitFlags(fs *pflag.FlagSet) {
"The minimum interval at which watched resources are reconciled (e.g. 15m)")

fs.Float32Var(&restConfigQPS, "kube-api-qps", 20,
"Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20")
"Maximum queries per second from the controller client to the Kubernetes API server.")

fs.IntVar(&restConfigBurst, "kube-api-burst", 30,
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30")
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.")

fs.IntVar(&webhookPort, "webhook-port", 9443,
"Webhook Server port")
Expand Down

0 comments on commit 24e89e8

Please sign in to comment.