From 6f29134f1e3de90029e1e020dfab01e5df6d49cf Mon Sep 17 00:00:00 2001 From: chaunceyjiang Date: Mon, 8 Apr 2024 14:38:32 +0800 Subject: [PATCH] Add QPS related parameters to control the request rate of metrics-adapter to member clusters. Signed-off-by: chaunceyjiang --- cmd/agent/app/options/options.go | 8 +++---- .../app/options/options.go | 4 ++-- cmd/controller-manager/app/options/options.go | 8 +++---- cmd/descheduler/app/options/options.go | 4 ++-- cmd/karmada-search/app/options/options.go | 4 ++-- cmd/metrics-adapter/app/options/options.go | 20 ++++++++++++---- .../app/options/options.go | 4 ++-- cmd/scheduler/app/options/options.go | 4 ++-- cmd/webhook/app/options/options.go | 4 ++-- pkg/metricsadapter/controller.go | 24 +++++++++---------- pkg/metricsadapter/multiclient/client.go | 18 ++++++++------ 11 files changed, 59 insertions(+), 43 deletions(-) diff --git a/cmd/agent/app/options/options.go b/cmd/agent/app/options/options.go index 35c6a81b171b..783900db58ab 100644 --- a/cmd/agent/app/options/options.go +++ b/cmd/agent/app/options/options.go @@ -194,10 +194,10 @@ func (o *Options) AddFlags(fs *pflag.FlagSet, allControllers []string) { "Specifies the cluster lease renew interval fraction.") fs.DurationVar(&o.ClusterSuccessThreshold.Duration, "cluster-success-threshold", 30*time.Second, "The duration of successes for the cluster to be considered healthy after recovery.") fs.DurationVar(&o.ClusterFailureThreshold.Duration, "cluster-failure-threshold", 30*time.Second, "The duration of failure for the cluster to be considered unhealthy.") - fs.Float32Var(&o.ClusterAPIQPS, "cluster-api-qps", 40.0, "QPS to use while talking with cluster kube-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") - fs.IntVar(&o.ClusterAPIBurst, "cluster-api-burst", 60, "Burst to use while talking with cluster kube-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") - fs.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") - fs.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") + fs.Float32Var(&o.ClusterAPIQPS, "cluster-api-qps", 40.0, "QPS to use while talking with cluster kube-apiserver.") + fs.IntVar(&o.ClusterAPIBurst, "cluster-api-burst", 60, "Burst to use while talking with cluster kube-apiserver.") + fs.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver.") + fs.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver.") fs.DurationVar(&o.ClusterCacheSyncTimeout.Duration, "cluster-cache-sync-timeout", util.CacheSyncTimeout, "Timeout period waiting for cluster cache to sync.") fs.StringVar(&o.ClusterAPIEndpoint, "cluster-api-endpoint", o.ClusterAPIEndpoint, "APIEndpoint of the cluster.") fs.StringVar(&o.ProxyServerAddress, "proxy-server-address", o.ProxyServerAddress, "Address of the proxy server that is used to proxy to the cluster.") diff --git a/cmd/aggregated-apiserver/app/options/options.go b/cmd/aggregated-apiserver/app/options/options.go index 5b316e22dfdb..f1c462d48fdb 100644 --- a/cmd/aggregated-apiserver/app/options/options.go +++ b/cmd/aggregated-apiserver/app/options/options.go @@ -81,8 +81,8 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) { o.RecommendedOptions.AddFlags(flags) flags.Lookup("kubeconfig").Usage = "Path to karmada control plane kubeconfig file." - flags.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") - flags.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") + flags.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver.") + flags.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver.") _ = utilfeature.DefaultMutableFeatureGate.Add(pkgfeatures.DefaultFeatureGates) utilfeature.DefaultMutableFeatureGate.AddFlag(flags) o.ProfileOpts.AddFlags(flags) diff --git a/cmd/controller-manager/app/options/options.go b/cmd/controller-manager/app/options/options.go index 2b90dd1e887e..dbe53e9185b3 100644 --- a/cmd/controller-manager/app/options/options.go +++ b/cmd/controller-manager/app/options/options.go @@ -212,10 +212,10 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers, disabledByDefau "Note: 'karmada-system', 'karmada-cluster' and 'karmada-es-.*' are Karmada reserved namespaces that will always be skipped.") flags.StringVar(&o.ClusterAPIContext, "cluster-api-context", "", "Name of the cluster context in cluster-api management cluster kubeconfig file.") flags.StringVar(&o.ClusterAPIKubeconfig, "cluster-api-kubeconfig", "", "Path to the cluster-api management cluster kubeconfig file.") - flags.Float32Var(&o.ClusterAPIQPS, "cluster-api-qps", 40.0, "QPS to use while talking with cluster kube-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") - flags.IntVar(&o.ClusterAPIBurst, "cluster-api-burst", 60, "Burst to use while talking with cluster kube-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") - flags.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") - flags.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") + flags.Float32Var(&o.ClusterAPIQPS, "cluster-api-qps", 40.0, "QPS to use while talking with cluster kube-apiserver.") + flags.IntVar(&o.ClusterAPIBurst, "cluster-api-burst", 60, "Burst to use while talking with cluster kube-apiserver.") + flags.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver.") + flags.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver.") flags.DurationVar(&o.ClusterCacheSyncTimeout.Duration, "cluster-cache-sync-timeout", util.CacheSyncTimeout, "Timeout period waiting for cluster cache to sync.") flags.DurationVar(&o.ResyncPeriod.Duration, "resync-period", 0, "Base frequency the informers are resynced.") flags.StringVar(&o.MetricsBindAddress, "metrics-bind-address", ":8080", "The TCP address that the controller should bind to for serving prometheus metrics(e.g. 127.0.0.1:8080, :8080). It can be set to \"0\" to disable the metrics serving.") diff --git a/cmd/descheduler/app/options/options.go b/cmd/descheduler/app/options/options.go index b0f1697411eb..0dd0d0b816cb 100644 --- a/cmd/descheduler/app/options/options.go +++ b/cmd/descheduler/app/options/options.go @@ -95,8 +95,8 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server. Overrides any value in KubeConfig. Only required if out-of-cluster.") fs.StringVar(&o.BindAddress, "bind-address", defaultBindAddress, "The IP address on which to listen for the --secure-port port.") fs.IntVar(&o.SecurePort, "secure-port", defaultPort, "The secure port on which to serve HTTPS.") - fs.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") - fs.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") + fs.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver.") + fs.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver.") fs.DurationVar(&o.SchedulerEstimatorTimeout.Duration, "scheduler-estimator-timeout", 3*time.Second, "Specifies the timeout period of calling the scheduler estimator service.") fs.IntVar(&o.SchedulerEstimatorPort, "scheduler-estimator-port", defaultEstimatorPort, "The secure port on which to connect the accurate scheduler estimator.") fs.StringVar(&o.SchedulerEstimatorServicePrefix, "scheduler-estimator-service-prefix", "karmada-scheduler-estimator", "The prefix of scheduler estimator service name") diff --git a/cmd/karmada-search/app/options/options.go b/cmd/karmada-search/app/options/options.go index 07b277005a06..0c03fc26e5c4 100644 --- a/cmd/karmada-search/app/options/options.go +++ b/cmd/karmada-search/app/options/options.go @@ -64,8 +64,8 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) { o.RecommendedOptions.AddFlags(flags) flags.Lookup("kubeconfig").Usage = "Path to karmada control plane kubeconfig file." - flags.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") - flags.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") + flags.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver.") + flags.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver.") flags.BoolVar(&o.DisableSearch, "disable-search", false, "Disable search feature that would save memory usage significantly.") flags.BoolVar(&o.DisableProxy, "disable-proxy", false, "Disable proxy feature that would save memory usage significantly.") diff --git a/cmd/metrics-adapter/app/options/options.go b/cmd/metrics-adapter/app/options/options.go index d5504fd17833..1f070b1de826 100755 --- a/cmd/metrics-adapter/app/options/options.go +++ b/cmd/metrics-adapter/app/options/options.go @@ -34,6 +34,7 @@ import ( generatedopenapi "github.com/karmada-io/karmada/pkg/generated/openapi" "github.com/karmada-io/karmada/pkg/metricsadapter" "github.com/karmada-io/karmada/pkg/sharedcli/profileflag" + "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/version" ) @@ -42,8 +43,15 @@ type Options struct { CustomMetricsAdapterServerOptions *options.CustomMetricsAdapterServerOptions KubeConfig string - - ProfileOpts profileflag.Options + // ClusterAPIQPS is the QPS to use while talking with cluster kube-apiserver. + ClusterAPIQPS float32 + // ClusterAPIBurst is the burst to allow while talking with cluster kube-apiserver. + ClusterAPIBurst int + // KubeAPIQPS is the QPS to use while talking with karmada-apiserver. + KubeAPIQPS float32 + // KubeAPIBurst is the burst to allow while talking with karmada-apiserver. + KubeAPIBurst int + ProfileOpts profileflag.Options } // NewOptions builds a default metrics-adapter options. @@ -64,7 +72,10 @@ func (o *Options) Complete() error { func (o *Options) AddFlags(fs *pflag.FlagSet) { o.CustomMetricsAdapterServerOptions.AddFlags(fs) o.ProfileOpts.AddFlags(fs) - + fs.Float32Var(&o.ClusterAPIQPS, "cluster-api-qps", 40.0, "QPS to use while talking with cluster kube-apiserver.") + fs.IntVar(&o.ClusterAPIBurst, "cluster-api-burst", 60, "Burst to use while talking with cluster kube-apiserver.") + fs.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver.") + fs.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver.") fs.StringVar(&o.KubeConfig, "kubeconfig", o.KubeConfig, "Path to karmada control plane kubeconfig file.") } @@ -75,12 +86,13 @@ func (o *Options) Config() (*metricsadapter.MetricsServer, error) { klog.Errorf("Unable to build restConfig: %v", err) return nil, err } + restConfig.QPS, restConfig.Burst = o.KubeAPIQPS, o.KubeAPIBurst karmadaClient := karmadaclientset.NewForConfigOrDie(restConfig) factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0) kubeClient := kubernetes.NewForConfigOrDie(restConfig) kubeFactory := informers.NewSharedInformerFactory(kubeClient, 0) - metricsController := metricsadapter.NewMetricsController(restConfig, factory, kubeFactory) + metricsController := metricsadapter.NewMetricsController(restConfig, factory, kubeFactory, &util.ClientOption{QPS: o.ClusterAPIQPS, Burst: o.ClusterAPIBurst}) metricsAdapter := metricsadapter.NewMetricsAdapter(metricsController, o.CustomMetricsAdapterServerOptions) metricsAdapter.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme)) metricsAdapter.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme)) diff --git a/cmd/scheduler-estimator/app/options/options.go b/cmd/scheduler-estimator/app/options/options.go index 3835f0a8391a..f10bf3ca9cb2 100644 --- a/cmd/scheduler-estimator/app/options/options.go +++ b/cmd/scheduler-estimator/app/options/options.go @@ -65,8 +65,8 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.BindAddress, "bind-address", defaultBindAddress, "The IP address on which to listen for the --secure-port port.") fs.IntVar(&o.ServerPort, "server-port", defaultServerPort, "The secure port on which to serve gRPC.") fs.IntVar(&o.SecurePort, "secure-port", defaultHealthzPort, "The secure port on which to serve HTTPS.") - fs.Float32Var(&o.ClusterAPIQPS, "kube-api-qps", 20.0, "QPS to use while talking with apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") - fs.IntVar(&o.ClusterAPIBurst, "kube-api-burst", 30, "Burst to use while talking with apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") + fs.Float32Var(&o.ClusterAPIQPS, "kube-api-qps", 20.0, "QPS to use while talking with apiserver.") + fs.IntVar(&o.ClusterAPIBurst, "kube-api-burst", 30, "Burst to use while talking with apiserver.") fs.IntVar(&o.Parallelism, "parallelism", o.Parallelism, "Parallelism defines the amount of parallelism in algorithms for estimating. Must be greater than 0. Defaults to 16.") features.FeatureGate.AddFlag(fs) diff --git a/cmd/scheduler/app/options/options.go b/cmd/scheduler/app/options/options.go index c7ac81b49297..bcb6b2e59634 100644 --- a/cmd/scheduler/app/options/options.go +++ b/cmd/scheduler/app/options/options.go @@ -131,8 +131,8 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server. Overrides any value in KubeConfig. Only required if out-of-cluster.") fs.StringVar(&o.BindAddress, "bind-address", defaultBindAddress, "The IP address on which to listen for the --secure-port port.") fs.IntVar(&o.SecurePort, "secure-port", defaultPort, "The secure port on which to serve HTTPS.") - fs.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") - fs.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") + fs.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver.") + fs.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver.") fs.BoolVar(&o.EnableSchedulerEstimator, "enable-scheduler-estimator", false, "Enable calling cluster scheduler estimator for adjusting replicas.") fs.BoolVar(&o.DisableSchedulerEstimatorInPullMode, "disable-scheduler-estimator-in-pull-mode", false, "Disable the scheduler estimator for clusters in pull mode, which takes effect only when enable-scheduler-estimator is true.") fs.DurationVar(&o.SchedulerEstimatorTimeout.Duration, "scheduler-estimator-timeout", 3*time.Second, "Specifies the timeout period of calling the scheduler estimator service.") diff --git a/cmd/webhook/app/options/options.go b/cmd/webhook/app/options/options.go index 8cfde8cb3c15..6a4b0b8fc7e9 100644 --- a/cmd/webhook/app/options/options.go +++ b/cmd/webhook/app/options/options.go @@ -87,8 +87,8 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) { flags.StringVar(&o.CertName, "tls-cert-file-name", "tls.crt", "The name of server certificate.") flags.StringVar(&o.KeyName, "tls-private-key-file-name", "tls.key", "The name of server key.") flags.StringVar(&o.TLSMinVersion, "tls-min-version", defaultTLSMinVersion, "Minimum TLS version supported. Possible values: 1.0, 1.1, 1.2, 1.3.") - flags.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") - flags.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") + flags.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver.") + flags.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver.") flags.StringVar(&o.MetricsBindAddress, "metrics-bind-address", ":8080", "The TCP address that the controller should bind to for serving prometheus metrics(e.g. 127.0.0.1:8080, :8080). It can be set to \"0\" to disable the metrics serving.") flags.StringVar(&o.HealthProbeBindAddress, "health-probe-bind-address", ":8000", "The TCP address that the controller should bind to for serving health probes(e.g. 127.0.0.1:8000, :8000)") diff --git a/pkg/metricsadapter/controller.go b/pkg/metricsadapter/controller.go index b4f520d92467..e1e392faba4a 100755 --- a/pkg/metricsadapter/controller.go +++ b/pkg/metricsadapter/controller.go @@ -61,12 +61,12 @@ type MetricsController struct { } // NewMetricsController creates a new metrics controller -func NewMetricsController(restConfig *rest.Config, factory informerfactory.SharedInformerFactory, kubeFactory informers.SharedInformerFactory) *MetricsController { +func NewMetricsController(restConfig *rest.Config, factory informerfactory.SharedInformerFactory, kubeFactory informers.SharedInformerFactory, clusterClientOption *util.ClientOption) *MetricsController { clusterLister := factory.Cluster().V1alpha1().Clusters().Lister() controller := &MetricsController{ InformerFactory: factory, ClusterLister: clusterLister, - MultiClusterDiscovery: multiclient.NewMultiClusterDiscoveryClient(clusterLister, kubeFactory), + MultiClusterDiscovery: multiclient.NewMultiClusterDiscoveryClient(clusterLister, kubeFactory, clusterClientOption), InformerManager: genericmanager.GetInstance(), TypedInformerManager: newInstance(), restConfig: restConfig, @@ -175,7 +175,7 @@ func (m *MetricsController) updateCluster(oldObj, curObj interface{}) { if util.ClusterAccessCredentialChanged(curCluster.Spec, oldCluster.Spec) || util.IsClusterReady(&curCluster.Status) != util.IsClusterReady(&oldCluster.Status) { // Cluster.Spec or Cluster health state is changed, rebuild informer. - m.InformerManager.Stop(curCluster.GetName()) + m.stopInformerManager(curCluster.GetName()) m.queue.Add(curCluster.GetName()) } } @@ -213,9 +213,7 @@ func (m *MetricsController) handleClusters() bool { if err != nil { if apierrors.IsNotFound(err) { klog.Infof("try to stop cluster informer %s", clusterName) - m.TypedInformerManager.Stop(clusterName) - m.InformerManager.Stop(clusterName) - m.MultiClusterDiscovery.Remove(clusterName) + m.stopInformerManager(clusterName) return true } return false @@ -223,17 +221,13 @@ func (m *MetricsController) handleClusters() bool { if !cls.DeletionTimestamp.IsZero() { klog.Infof("try to stop cluster informer %s", clusterName) - m.TypedInformerManager.Stop(clusterName) - m.InformerManager.Stop(clusterName) - m.MultiClusterDiscovery.Remove(clusterName) + m.stopInformerManager(clusterName) return true } if !util.IsClusterReady(&cls.Status) { klog.Warningf("cluster %s is notReady try to stop this cluster informer", clusterName) - m.TypedInformerManager.Stop(clusterName) - m.InformerManager.Stop(clusterName) - m.MultiClusterDiscovery.Remove(clusterName) + m.stopInformerManager(clusterName) return false } @@ -276,3 +270,9 @@ func (m *MetricsController) handleClusters() bool { return true } + +func (m *MetricsController) stopInformerManager(clusterName string) { + m.TypedInformerManager.Stop(clusterName) + m.InformerManager.Stop(clusterName) + m.MultiClusterDiscovery.Remove(clusterName) +} diff --git a/pkg/metricsadapter/multiclient/client.go b/pkg/metricsadapter/multiclient/client.go index 5a78ca52bdaf..abf30fb39e18 100644 --- a/pkg/metricsadapter/multiclient/client.go +++ b/pkg/metricsadapter/multiclient/client.go @@ -39,17 +39,19 @@ type MultiClusterDiscoveryInterface interface { // MultiClusterDiscovery provides DiscoveryClient for multiple clusters. type MultiClusterDiscovery struct { sync.RWMutex - clients map[string]*discovery.DiscoveryClient - secretLister listcorev1.SecretLister - clusterLister clusterlister.ClusterLister + clients map[string]*discovery.DiscoveryClient + clusterClientOption *util.ClientOption + secretLister listcorev1.SecretLister + clusterLister clusterlister.ClusterLister } // NewMultiClusterDiscoveryClient returns a new MultiClusterDiscovery -func NewMultiClusterDiscoveryClient(clusterLister clusterlister.ClusterLister, KubeFactory informers.SharedInformerFactory) MultiClusterDiscoveryInterface { +func NewMultiClusterDiscoveryClient(clusterLister clusterlister.ClusterLister, KubeFactory informers.SharedInformerFactory, clusterClientOption *util.ClientOption) MultiClusterDiscoveryInterface { return &MultiClusterDiscovery{ - clusterLister: clusterLister, - secretLister: KubeFactory.Core().V1().Secrets().Lister(), - clients: map[string]*discovery.DiscoveryClient{}, + clusterLister: clusterLister, + secretLister: KubeFactory.Core().V1().Secrets().Lister(), + clients: map[string]*discovery.DiscoveryClient{}, + clusterClientOption: clusterClientOption, } } @@ -72,6 +74,8 @@ func (m *MultiClusterDiscovery) Set(clusterName string) error { if err != nil { return err } + clusterConfig.QPS = m.clusterClientOption.QPS + clusterConfig.Burst = m.clusterClientOption.Burst m.Lock() defer m.Unlock() m.clients[clusterName] = discovery.NewDiscoveryClientForConfigOrDie(clusterConfig)