Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-1.6] 🐛 Fix ClusterCacheTracker memory leak #10064

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 59 additions & 27 deletions controllers/remote/cluster_cache_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
Expand Down Expand Up @@ -298,8 +299,14 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
return nil, errors.Wrapf(err, "error fetching REST client config for remote cluster %q", cluster.String())
}

// Create a client and a cache for the cluster.
c, uncachedClient, cache, err := t.createClient(ctx, config, cluster, indexes)
// Create a http client and a mapper for the cluster.
httpClient, mapper, err := t.createHTTPClientAndMapper(config, cluster)
if err != nil {
return nil, errors.Wrapf(err, "error creating http client and mapper for remote cluster %q", cluster.String())
}

// Create an uncached client for the cluster.
uncachedClient, err := t.createUncachedClient(config, cluster, httpClient, mapper)
if err != nil {
return nil, err
}
Expand All @@ -324,16 +331,23 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
config.CAFile = inClusterConfig.CAFile
config.Host = inClusterConfig.Host

// Create a new client and overwrite the previously created client.
c, _, cache, err = t.createClient(ctx, config, cluster, indexes)
// Update the http client and the mapper to use in-cluster config.
httpClient, mapper, err = t.createHTTPClientAndMapper(config, cluster)
if err != nil {
return nil, errors.Wrap(err, "error creating client for self-hosted cluster")
return nil, errors.Wrapf(err, "error creating http client and mapper (using in-cluster config) for remote cluster %q", cluster.String())
}

log.Info(fmt.Sprintf("Creating cluster accessor for cluster %q with in-cluster service %q", cluster.String(), config.Host))
} else {
log.Info(fmt.Sprintf("Creating cluster accessor for cluster %q with the regular apiserver endpoint %q", cluster.String(), config.Host))
}

// Create a client and a cache for the cluster.
cachedClient, err := t.createCachedClient(ctx, config, cluster, httpClient, mapper, indexes)
if err != nil {
return nil, err
}

// Generating a new private key to be used for generating temporary certificates to connect to
// etcd on the target cluster.
// NOTE: Generating a private key is an expensive operation, so we store it in the cluster accessor.
Expand All @@ -343,9 +357,9 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
}

return &clusterAccessor{
cache: cache,
cache: cachedClient.Cache,
config: config,
client: c,
client: cachedClient.Client,
watches: sets.Set[string]{},
etcdClientCertificateKey: etcdKey,
}, nil
Expand Down Expand Up @@ -377,28 +391,53 @@ func (t *ClusterCacheTracker) runningOnWorkloadCluster(ctx context.Context, c cl
return t.controllerPodMetadata.UID == pod.UID, nil
}

// createClient creates a cached client, and uncached client and a mapper based on a rest.Config.
func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, indexes []Index) (client.Client, client.Client, *stoppableCache, error) {
// createHTTPClientAndMapper creates a http client and a dynamic rest mapper for the given cluster, based on the rest.Config.
func (t *ClusterCacheTracker) createHTTPClientAndMapper(config *rest.Config, cluster client.ObjectKey) (*http.Client, meta.RESTMapper, error) {
// Create a http client for the cluster.
httpClient, err := rest.HTTPClientFor(config)
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String())
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String())
}

// Create a mapper for it
mapper, err := apiutil.NewDynamicRESTMapper(config, httpClient)
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String())
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String())
}

// Verify if we can get a rest mapping from the workload cluster apiserver.
// Note: This also checks if the apiserver is up in general. We do this already here
// to avoid further effort creating a cache and a client and to produce a clearer error message.
_, err = mapper.RESTMapping(corev1.SchemeGroupVersion.WithKind("Node").GroupKind(), corev1.SchemeGroupVersion.Version)
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String())
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String())
}

return httpClient, mapper, nil
}

// createUncachedClient creates an uncached client for the given cluster, based on the rest.Config.
func (t *ClusterCacheTracker) createUncachedClient(config *rest.Config, cluster client.ObjectKey, httpClient *http.Client, mapper meta.RESTMapper) (client.Client, error) {
// Create the uncached client for the remote cluster
uncachedClient, err := client.New(config, client.Options{
Scheme: t.scheme,
Mapper: mapper,
HTTPClient: httpClient,
})
if err != nil {
return nil, errors.Wrapf(err, "error creating uncached client for remote cluster %q", cluster.String())
}

return uncachedClient, nil
}

type cachedClientOutput struct {
Client client.Client
Cache *stoppableCache
}

// createCachedClient creates a cached client for the given cluster, based on a rest.Config.
func (t *ClusterCacheTracker) createCachedClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, httpClient *http.Client, mapper meta.RESTMapper, indexes []Index) (*cachedClientOutput, error) {
// Create the cache for the remote cluster
cacheOptions := cache.Options{
HTTPClient: httpClient,
Expand All @@ -407,7 +446,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
}
remoteCache, err := cache.New(config, cacheOptions)
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating cache", cluster.String())
return nil, errors.Wrapf(err, "error creating cached client for remote cluster %q: error creating cache", cluster.String())
}

cacheCtx, cacheCtxCancel := context.WithCancel(ctx)
Expand All @@ -420,7 +459,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con

for _, index := range indexes {
if err := cache.IndexField(ctx, index.Object, index.Field, index.ExtractValue); err != nil {
return nil, nil, nil, errors.Wrapf(err, "error adding index for field %q to cache for remote cluster %q", index.Field, cluster.String())
return nil, errors.Wrapf(err, "error creating cached client for remote cluster %q: error adding index for field %q to cache", cluster.String(), index.Field)
}
}

Expand All @@ -436,19 +475,9 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
},
})
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String())
return nil, errors.Wrapf(err, "error creating cached client for remote cluster %q", cluster.String())
}

// Create an uncached client. This is used in `runningOnWorkloadCluster` to ensure we don't continuously cache
// pods in the client.
uncachedClient, err := client.New(config, client.Options{
Scheme: t.scheme,
Mapper: mapper,
HTTPClient: httpClient,
})
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating uncached client for remote cluster %q", cluster.String())
}
// Start the cache!!!
go cache.Start(cacheCtx) //nolint:errcheck

Expand All @@ -457,7 +486,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
defer cacheSyncCtxCancel()
if !cache.WaitForCacheSync(cacheSyncCtx) {
cache.Stop()
return nil, nil, nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err())
return nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err())
}

// Wrap the cached client with a client that sets timeouts on all Get and List calls
Expand All @@ -474,7 +503,10 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
httpClient: httpClient,
})

return cachedClient, uncachedClient, cache, nil
return &cachedClientOutput{
Client: cachedClient,
Cache: cache,
}, nil
}

// deleteAccessor stops a clusterAccessor's cache and removes the clusterAccessor from the tracker.
Expand Down
Loading