-
Notifications
You must be signed in to change notification settings - Fork 2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add support for variable VKs in CRS config
Add support for variable VKs in CRS config, while maintaining a cache of discovered GVKs in the cluster, and updating it every 30s. Signed-off-by: Pranshu Srivastava <rexagod@gmail.com>
- Loading branch information
Showing
20 changed files
with
1,305 additions
and
116 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,279 @@ | ||
/* | ||
Copyright 2023 The Kubernetes Authors All rights reserved. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
// Package discovery provides a discovery and resolution logic for GVKs. | ||
package discovery | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | ||
"k8s.io/apimachinery/pkg/runtime/schema" | ||
"k8s.io/client-go/dynamic" | ||
"k8s.io/client-go/dynamic/dynamicinformer" | ||
"k8s.io/client-go/rest" | ||
"k8s.io/client-go/tools/cache" | ||
"k8s.io/klog/v2" | ||
|
||
"k8s.io/kube-state-metrics/v2/internal/store" | ||
"k8s.io/kube-state-metrics/v2/pkg/customresource" | ||
"k8s.io/kube-state-metrics/v2/pkg/metricshandler" | ||
"k8s.io/kube-state-metrics/v2/pkg/options" | ||
"k8s.io/kube-state-metrics/v2/pkg/util" | ||
) | ||
|
||
// Interval is the time interval between two cache sync checks. | ||
const Interval = 3 * time.Second | ||
|
||
// StartDiscovery starts the discovery process, fetching all the objects that can be listed from the apiserver, every `Interval` seconds. | ||
// resolveGVK needs to be called after StartDiscovery to generate factories. | ||
func (r *CRDiscoverer) StartDiscovery(ctx context.Context, config *rest.Config) error { | ||
client := dynamic.NewForConfigOrDie(config) | ||
factory := dynamicinformer.NewFilteredDynamicInformer(client, schema.GroupVersionResource{ | ||
Group: "apiextensions.k8s.io", | ||
Version: "v1", | ||
Resource: "customresourcedefinitions", | ||
}, "", 0, nil, nil) | ||
informer := factory.Informer() | ||
stopper := make(chan struct{}) | ||
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ | ||
AddFunc: func(obj interface{}) { | ||
objSpec := obj.(*unstructured.Unstructured).Object["spec"].(map[string]interface{}) | ||
for _, version := range objSpec["versions"].([]interface{}) { | ||
g := objSpec["group"].(string) | ||
v := version.(map[string]interface{})["name"].(string) | ||
k := objSpec["names"].(map[string]interface{})["kind"].(string) | ||
p := objSpec["names"].(map[string]interface{})["plural"].(string) | ||
gotGVKP := groupVersionKindPlural{ | ||
GroupVersionKind: schema.GroupVersionKind{ | ||
Group: g, | ||
Version: v, | ||
Kind: k, | ||
}, | ||
Plural: p, | ||
} | ||
r.AppendToMap(gotGVKP) | ||
r.SafeWrite(func() { | ||
r.WasUpdated = true | ||
}) | ||
} | ||
r.SafeWrite(func() { | ||
r.CRDsAddEventsCounter.Inc() | ||
r.CRDsCacheCountGauge.Inc() | ||
}) | ||
}, | ||
DeleteFunc: func(obj interface{}) { | ||
objSpec := obj.(*unstructured.Unstructured).Object["spec"].(map[string]interface{}) | ||
for _, version := range objSpec["versions"].([]interface{}) { | ||
g := objSpec["group"].(string) | ||
v := version.(map[string]interface{})["name"].(string) | ||
k := objSpec["names"].(map[string]interface{})["kind"].(string) | ||
p := objSpec["names"].(map[string]interface{})["plural"].(string) | ||
gotGVKP := groupVersionKindPlural{ | ||
GroupVersionKind: schema.GroupVersionKind{ | ||
Group: g, | ||
Version: v, | ||
Kind: k, | ||
}, | ||
Plural: p, | ||
} | ||
r.RemoveFromMap(gotGVKP) | ||
r.SafeWrite(func() { | ||
r.WasUpdated = true | ||
}) | ||
} | ||
r.SafeWrite(func() { | ||
r.CRDsDeleteEventsCounter.Inc() | ||
r.CRDsCacheCountGauge.Dec() | ||
}) | ||
}, | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
// Respect context cancellation. | ||
go func() { | ||
for range ctx.Done() { | ||
klog.InfoS("context cancelled, stopping discovery") | ||
close(stopper) | ||
return | ||
} | ||
}() | ||
go informer.Run(stopper) | ||
return nil | ||
} | ||
|
||
// ResolveGVKToGVKPs resolves the variable VKs to a GVK list, based on the current cache. | ||
func (r *CRDiscoverer) ResolveGVKToGVKPs(gvk schema.GroupVersionKind) (resolvedGVKPs []groupVersionKindPlural, err error) { // nolint:revive | ||
g := gvk.Group | ||
v := gvk.Version | ||
k := gvk.Kind | ||
if g == "" || g == "*" { | ||
return nil, fmt.Errorf("group is required in the defined GVK %v", gvk) | ||
} | ||
hasVersion := v != "" && v != "*" | ||
hasKind := k != "" && k != "*" | ||
// No need to resolve, return. | ||
if hasVersion && hasKind { | ||
var p string | ||
for _, el := range r.Map[g][v] { | ||
if el.Kind == k { | ||
p = el.Plural | ||
break | ||
} | ||
} | ||
return []groupVersionKindPlural{ | ||
{ | ||
GroupVersionKind: schema.GroupVersionKind{ | ||
Group: g, | ||
Version: v, | ||
Kind: k, | ||
}, | ||
Plural: p, | ||
}, | ||
}, nil | ||
} | ||
if hasVersion && !hasKind { | ||
kinds := r.Map[g][v] | ||
for _, el := range kinds { | ||
resolvedGVKPs = append(resolvedGVKPs, groupVersionKindPlural{ | ||
GroupVersionKind: schema.GroupVersionKind{ | ||
Group: g, | ||
Version: v, | ||
Kind: el.Kind, | ||
}, | ||
Plural: el.Plural, | ||
}) | ||
} | ||
} | ||
if !hasVersion && hasKind { | ||
versions := r.Map[g] | ||
for version, kinds := range versions { | ||
for _, el := range kinds { | ||
if el.Kind == k { | ||
resolvedGVKPs = append(resolvedGVKPs, groupVersionKindPlural{ | ||
GroupVersionKind: schema.GroupVersionKind{ | ||
Group: g, | ||
Version: version, | ||
Kind: k, | ||
}, | ||
Plural: el.Plural, | ||
}) | ||
} | ||
} | ||
} | ||
} | ||
if !hasVersion && !hasKind { | ||
versions := r.Map[g] | ||
for version, kinds := range versions { | ||
for _, el := range kinds { | ||
resolvedGVKPs = append(resolvedGVKPs, groupVersionKindPlural{ | ||
GroupVersionKind: schema.GroupVersionKind{ | ||
Group: g, | ||
Version: version, | ||
Kind: el.Kind, | ||
}, | ||
Plural: el.Plural, | ||
}) | ||
} | ||
} | ||
} | ||
return | ||
} | ||
|
||
// PollForCacheUpdates polls the cache for updates and updates the stores accordingly. | ||
func (r *CRDiscoverer) PollForCacheUpdates( | ||
ctx context.Context, | ||
opts *options.Options, | ||
storeBuilder *store.Builder, | ||
m *metricshandler.MetricsHandler, | ||
factoryGenerator func() ([]customresource.RegistryFactory, error), | ||
) { | ||
// The interval at which we will check the cache for updates. | ||
t := time.NewTicker(Interval) | ||
// Track previous context to allow refreshing cache. | ||
olderContext, olderCancel := context.WithCancel(ctx) | ||
// Prevent context leak (kill the last metric handler instance). | ||
defer olderCancel() | ||
generateMetrics := func() { | ||
// Get families for discovered factories. | ||
customFactories, err := factoryGenerator() | ||
if err != nil { | ||
klog.ErrorS(err, "failed to update custom resource stores") | ||
} | ||
// Update the list of enabled custom resources. | ||
var enabledCustomResources []string | ||
for _, factory := range customFactories { | ||
gvrString := util.GVRFromType(factory.Name(), factory.ExpectedType()).String() | ||
enabledCustomResources = append(enabledCustomResources, gvrString) | ||
} | ||
// Create clients for discovered factories. | ||
discoveredCustomResourceClients, err := util.CreateCustomResourceClients(opts.Apiserver, opts.Kubeconfig, customFactories...) | ||
if err != nil { | ||
klog.ErrorS(err, "failed to update custom resource stores") | ||
} | ||
// Update the store builder with the new clients. | ||
storeBuilder.WithCustomResourceClients(discoveredCustomResourceClients) | ||
// Inject families' constructors to the existing set of stores. | ||
storeBuilder.WithCustomResourceStoreFactories(customFactories...) | ||
// Update the store builder with the new custom resources. | ||
if err := storeBuilder.WithEnabledResources(enabledCustomResources); err != nil { | ||
klog.ErrorS(err, "failed to update custom resource stores") | ||
} | ||
// Configure the generation function for the custom resource stores. | ||
storeBuilder.WithGenerateCustomResourceStoresFunc(storeBuilder.DefaultGenerateCustomResourceStoresFunc()) | ||
// Reset the flag, if there were no errors. Else, we'll try again on the next tick. | ||
// Keep retrying if there were errors. | ||
r.SafeWrite(func() { | ||
r.WasUpdated = false | ||
}) | ||
// Run the metrics handler with updated configs. | ||
olderContext, olderCancel = context.WithCancel(ctx) | ||
go func() { | ||
// Blocks indefinitely until the unbuffered context is cancelled to serve metrics for that duration. | ||
err = m.Run(olderContext) | ||
if err != nil { | ||
// Check if context was cancelled. | ||
select { | ||
case <-olderContext.Done(): | ||
// Context cancelled, don't really need to log this though. | ||
default: | ||
klog.ErrorS(err, "failed to run metrics handler") | ||
} | ||
} | ||
}() | ||
} | ||
go func() { | ||
for range t.C { | ||
select { | ||
case <-ctx.Done(): | ||
klog.InfoS("context cancelled") | ||
t.Stop() | ||
return | ||
default: | ||
// Check if cache has been updated. | ||
shouldGenerateMetrics := false | ||
r.SafeRead(func() { | ||
shouldGenerateMetrics = r.WasUpdated | ||
}) | ||
if shouldGenerateMetrics { | ||
olderCancel() | ||
generateMetrics() | ||
klog.InfoS("discovery finished, cache updated") | ||
} | ||
} | ||
} | ||
}() | ||
} |
Oops, something went wrong.