diff --git a/controllers/remote/cluster_cache_tracker.go b/controllers/remote/cluster_cache_tracker.go index be21cff54665..3c1c67248d60 100644 --- a/controllers/remote/cluster_cache_tracker.go +++ b/controllers/remote/cluster_cache_tracker.go @@ -29,6 +29,7 @@ import ( "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" @@ -298,8 +299,14 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl return nil, errors.Wrapf(err, "error fetching REST client config for remote cluster %q", cluster.String()) } - // Create a client and a cache for the cluster. - c, uncachedClient, cache, err := t.createClient(ctx, config, cluster, indexes) + // Create a http client and a mapper for the cluster. + httpClient, mapper, err := t.createHTTPClientAndMapper(config, cluster) + if err != nil { + return nil, errors.Wrapf(err, "error creating http client and mapper for remote cluster %q", cluster.String()) + } + + // Create an uncached client for the cluster. + uncachedClient, err := t.createUncachedClient(config, cluster, httpClient, mapper) if err != nil { return nil, err } @@ -324,16 +331,23 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl config.CAFile = inClusterConfig.CAFile config.Host = inClusterConfig.Host - // Create a new client and overwrite the previously created client. - c, _, cache, err = t.createClient(ctx, config, cluster, indexes) + // Update the http client and the mapper to use in-cluster config. + httpClient, mapper, err = t.createHTTPClientAndMapper(config, cluster) if err != nil { - return nil, errors.Wrap(err, "error creating client for self-hosted cluster") + return nil, errors.Wrapf(err, "error creating http client and mapper (using in-cluster config) for remote cluster %q", cluster.String()) } + log.Info(fmt.Sprintf("Creating cluster accessor for cluster %q with in-cluster service %q", cluster.String(), config.Host)) } else { log.Info(fmt.Sprintf("Creating cluster accessor for cluster %q with the regular apiserver endpoint %q", cluster.String(), config.Host)) } + // Create a client and a cache for the cluster. + cachedClient, err := t.createCachedClient(ctx, config, cluster, httpClient, mapper, indexes) + if err != nil { + return nil, err + } + // Generating a new private key to be used for generating temporary certificates to connect to // etcd on the target cluster. // NOTE: Generating a private key is an expensive operation, so we store it in the cluster accessor. @@ -343,9 +357,9 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl } return &clusterAccessor{ - cache: cache, + cache: cachedClient.Cache, config: config, - client: c, + client: cachedClient.Client, watches: sets.Set[string]{}, etcdClientCertificateKey: etcdKey, }, nil @@ -377,18 +391,18 @@ func (t *ClusterCacheTracker) runningOnWorkloadCluster(ctx context.Context, c cl return t.controllerPodMetadata.UID == pod.UID, nil } -// createClient creates a cached client, and uncached client and a mapper based on a rest.Config. -func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, indexes []Index) (client.Client, client.Client, *stoppableCache, error) { +// createHTTPClientAndMapper creates a http client and a dynamic rest mapper for the given cluster, based on the rest.Config. +func (t *ClusterCacheTracker) createHTTPClientAndMapper(config *rest.Config, cluster client.ObjectKey) (*http.Client, meta.RESTMapper, error) { // Create a http client for the cluster. httpClient, err := rest.HTTPClientFor(config) if err != nil { - return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String()) + return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String()) } // Create a mapper for it mapper, err := apiutil.NewDynamicRESTMapper(config, httpClient) if err != nil { - return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String()) + return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String()) } // Verify if we can get a rest mapping from the workload cluster apiserver. @@ -396,9 +410,34 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con // to avoid further effort creating a cache and a client and to produce a clearer error message. _, err = mapper.RESTMapping(corev1.SchemeGroupVersion.WithKind("Node").GroupKind(), corev1.SchemeGroupVersion.Version) if err != nil { - return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String()) + return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String()) + } + + return httpClient, mapper, nil +} + +// createUncachedClient creates an uncached client for the given cluster, based on the rest.Config. +func (t *ClusterCacheTracker) createUncachedClient(config *rest.Config, cluster client.ObjectKey, httpClient *http.Client, mapper meta.RESTMapper) (client.Client, error) { + // Create the uncached client for the remote cluster + uncachedClient, err := client.New(config, client.Options{ + Scheme: t.scheme, + Mapper: mapper, + HTTPClient: httpClient, + }) + if err != nil { + return nil, errors.Wrapf(err, "error creating uncached client for remote cluster %q", cluster.String()) } + return uncachedClient, nil +} + +type cachedClientOutput struct { + Client client.Client + Cache *stoppableCache +} + +// createCachedClient creates a cached client for the given cluster, based on a rest.Config. +func (t *ClusterCacheTracker) createCachedClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, httpClient *http.Client, mapper meta.RESTMapper, indexes []Index) (*cachedClientOutput, error) { // Create the cache for the remote cluster cacheOptions := cache.Options{ HTTPClient: httpClient, @@ -407,7 +446,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con } remoteCache, err := cache.New(config, cacheOptions) if err != nil { - return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating cache", cluster.String()) + return nil, errors.Wrapf(err, "error creating cached client for remote cluster %q: error creating cache", cluster.String()) } cacheCtx, cacheCtxCancel := context.WithCancel(ctx) @@ -420,7 +459,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con for _, index := range indexes { if err := cache.IndexField(ctx, index.Object, index.Field, index.ExtractValue); err != nil { - return nil, nil, nil, errors.Wrapf(err, "error adding index for field %q to cache for remote cluster %q", index.Field, cluster.String()) + return nil, errors.Wrapf(err, "error creating cached client for remote cluster %q: error adding index for field %q to cache", cluster.String(), index.Field) } } @@ -436,19 +475,9 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con }, }) if err != nil { - return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String()) + return nil, errors.Wrapf(err, "error creating cached client for remote cluster %q", cluster.String()) } - // Create an uncached client. This is used in `runningOnWorkloadCluster` to ensure we don't continuously cache - // pods in the client. - uncachedClient, err := client.New(config, client.Options{ - Scheme: t.scheme, - Mapper: mapper, - HTTPClient: httpClient, - }) - if err != nil { - return nil, nil, nil, errors.Wrapf(err, "error creating uncached client for remote cluster %q", cluster.String()) - } // Start the cache!!! go cache.Start(cacheCtx) //nolint:errcheck @@ -457,7 +486,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con defer cacheSyncCtxCancel() if !cache.WaitForCacheSync(cacheSyncCtx) { cache.Stop() - return nil, nil, nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err()) + return nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err()) } // Wrap the cached client with a client that sets timeouts on all Get and List calls @@ -474,7 +503,10 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con httpClient: httpClient, }) - return cachedClient, uncachedClient, cache, nil + return &cachedClientOutput{ + Client: cachedClient, + Cache: cache, + }, nil } // deleteAccessor stops a clusterAccessor's cache and removes the clusterAccessor from the tracker.