diff --git a/internal/discovery/discovery.go b/internal/discovery/discovery.go index bd764ab701..7e1f162909 100644 --- a/internal/discovery/discovery.go +++ b/internal/discovery/discovery.go @@ -203,10 +203,6 @@ func (r *CRDiscoverer) PollForCacheUpdates( ) { // The interval at which we will check the cache for updates. t := time.NewTicker(Interval) - // Track previous context to allow refreshing cache. - olderContext, olderCancel := context.WithCancel(ctx) - // Prevent context leak (kill the last metric handler instance). - defer olderCancel() generateMetrics := func() { // Get families for discovered factories. customFactories, err := factoryGenerator() @@ -239,21 +235,8 @@ func (r *CRDiscoverer) PollForCacheUpdates( r.SafeWrite(func() { r.WasUpdated = false }) - // Run the metrics handler with updated configs. - olderContext, olderCancel = context.WithCancel(ctx) - go func() { - // Blocks indefinitely until the unbuffered context is cancelled to serve metrics for that duration. - err = m.Run(olderContext) - if err != nil { - // Check if context was cancelled. - select { - case <-olderContext.Done(): - // Context cancelled, don't really need to log this though. - default: - klog.ErrorS(err, "failed to run metrics handler") - } - } - }() + // Update metric handler with the new configs. + m.BuildWriters(ctx) } go func() { for range t.C { @@ -269,7 +252,6 @@ func (r *CRDiscoverer) PollForCacheUpdates( shouldGenerateMetrics = r.WasUpdated }) if shouldGenerateMetrics { - olderCancel() generateMetrics() klog.InfoS("discovery finished, cache updated") } diff --git a/pkg/app/server.go b/pkg/app/server.go index 56afc6090a..8719797cb6 100644 --- a/pkg/app/server.go +++ b/pkg/app/server.go @@ -286,14 +286,12 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error { opts.EnableGZIPEncoding, ) // Run MetricsHandler - if config == nil { - ctxMetricsHandler, cancel := context.WithCancel(ctx) - g.Add(func() error { - return m.Run(ctxMetricsHandler) - }, func(error) { - cancel() - }) - } + ctxMetricsHandler, cancel := context.WithCancel(ctx) + g.Add(func() error { + return m.Run(ctxMetricsHandler) + }, func(error) { + cancel() + }) tlsConfig := opts.TLSConfig diff --git a/pkg/metricshandler/metrics_handler.go b/pkg/metricshandler/metrics_handler.go index 9d02dd0b95..0a68914736 100644 --- a/pkg/metricshandler/metrics_handler.go +++ b/pkg/metricshandler/metrics_handler.go @@ -69,24 +69,35 @@ func New(opts *options.Options, kubeClient kubernetes.Interface, storeBuilder ks } } -// ConfigureSharding (re-)configures sharding. Re-configuration can be done -// concurrently. -func (m *MetricsHandler) ConfigureSharding(ctx context.Context, shard int32, totalShards int) { +// BuildWriters builds the metrics writers, cancelling any previous context and passing a new one on every build. +// Build can be used multiple times and concurrently. +func (m *MetricsHandler) BuildWriters(ctx context.Context) { m.mtx.Lock() defer m.mtx.Unlock() if m.cancel != nil { m.cancel() } - if totalShards != 1 { - klog.InfoS("Configuring sharding of this instance to be shard index (zero-indexed) out of total shards", "shard", shard, "totalShards", totalShards) - } ctx, m.cancel = context.WithCancel(ctx) - m.storeBuilder.WithSharding(shard, totalShards) m.storeBuilder.WithContext(ctx) m.metricsWriters = m.storeBuilder.Build() +} + +// ConfigureSharding configures sharding. Configuration can be used multiple times and +// concurrently. +func (m *MetricsHandler) ConfigureSharding(ctx context.Context, shard int32, totalShards int) { + m.mtx.Lock() + + if totalShards != 1 { + klog.InfoS("Configuring sharding of this instance to be shard index (zero-indexed) out of total shards", "shard", shard, "totalShards", totalShards) + } m.curShard = shard m.curTotalShards = totalShards + m.storeBuilder.WithSharding(shard, totalShards) + + // unlock because BuildWriters will hold a lock again + m.mtx.Unlock() + m.BuildWriters(ctx) } // Run configures the MetricsHandler's sharding and if autosharding is enabled diff --git a/tests/e2e/discovery_test.go b/tests/e2e/discovery_test.go index d34c932daf..91a1e2a8f7 100644 --- a/tests/e2e/discovery_test.go +++ b/tests/e2e/discovery_test.go @@ -36,48 +36,122 @@ import ( // PopulateTimeout is the timeout on populating the cache for the first time. const PopulateTimeout = 10 * time.Second -func TestVariableVKsDiscoveryAndResolution(t *testing.T) { - - // Initialise options. - opts := options.NewOptions() - cmd := options.InitCommand - opts.AddFlags(cmd) - klog.InfoS("options", "options", opts) +type resourceManager struct { + crConfigFile *os.File + initCrdFile *os.File + initCrFile *os.File + newCrdFile *os.File + newCrFile *os.File +} - // Create testdata. +func (rm *resourceManager) createConfigAndResourceFiles(t *testing.T) { crConfigFile, err := os.CreateTemp("", "cr-config.yaml") if err != nil { t.Fatal(err) } - crdFile, err := os.CreateTemp("", "crd.yaml") + rm.crConfigFile = crConfigFile + + initCrdFile, err := os.CreateTemp("", "crd.yaml") + if err != nil { + t.Fatal(err) + } + rm.initCrdFile = initCrdFile + + initCrFile, err := os.CreateTemp("", "cr.yaml") if err != nil { t.Fatal(err) } - crFile, err := os.CreateTemp("", "cr.yaml") + rm.initCrFile = initCrFile + + newCrdFile, err := os.CreateTemp("", "new-crd.yaml") if err != nil { t.Fatal(err) } - klog.InfoS("testdata", "crConfigFile", crConfigFile.Name(), "crdFile", crdFile.Name(), "crFile", crFile.Name()) + rm.newCrdFile = newCrdFile + + newCrFile, err := os.CreateTemp("", "new-cr.yaml") + if err != nil { + t.Fatal(err) + } + rm.newCrFile = newCrFile + klog.InfoS("testdata", "crConfigFile", crConfigFile.Name(), "initCrdFile", initCrdFile.Name(), "initCrFile", initCrFile.Name(), "newCrdFile", newCrdFile.Name(), "newCrFile", newCrFile.Name()) +} + +func (rm *resourceManager) removeResourceFiles(t *testing.T) { + err := os.Remove(rm.crConfigFile.Name()) + if err != nil { + t.Fatalf("failed to remove CR config: %v", err) + } + err = os.Remove(rm.initCrdFile.Name()) + if err != nil { + t.Fatalf("failed to remove initial CRD manifest: %v", err) + } + err = os.Remove(rm.initCrFile.Name()) + if err != nil { + t.Fatalf("failed to remove initial CR manifest: %v", err) + } + err = os.Remove(rm.newCrdFile.Name()) + if err != nil { + t.Fatalf("failed to remove new CRD manifest: %v", err) + } + err = os.Remove(rm.newCrFile.Name()) + if err != nil { + t.Fatalf("failed to remove new CR manifest: %v", err) + } + klog.InfoS("deleted artefacts", "crConfigFile", rm.crConfigFile.Name(), "initCrdFile", rm.initCrdFile.Name(), "initCrFile", rm.initCrFile.Name(), "newCrdFile", rm.newCrdFile.Name(), "newCrFile", rm.newCrFile.Name()) +} + +func (rm *resourceManager) writeConfigFile(t *testing.T) { + crConfig := getCRConfig() + configFile := rm.crConfigFile.Name() + err := os.WriteFile(configFile, []byte(crConfig), 0600 /* rw------- */) + if err != nil { + t.Fatalf("cannot write to config file: %v", err) + } + klog.InfoS("populated cr config file", "crConfigFile", configFile) +} + +func (rm *resourceManager) writeResourceFiles(t *testing.T) { + initCr := getCR() + initCrd := getCRD() + + newCr := getNewCR() + newCrd := getNewCRD() + err := os.WriteFile(rm.initCrdFile.Name(), []byte(initCrd), 0600 /* rw------- */) + if err != nil { + t.Fatalf("cannot write to initial crd file: %v", err) + } + err = os.WriteFile(rm.initCrFile.Name(), []byte(initCr), 0600 /* rw------- */) + if err != nil { + t.Fatalf("cannot write to initial cr file: %v", err) + } + err = os.WriteFile(rm.newCrdFile.Name(), []byte(newCrd), 0600 /* rw------- */) + if err != nil { + t.Fatalf("cannot write to new crd file: %v", err) + } + err = os.WriteFile(rm.newCrFile.Name(), []byte(newCr), 0600 /* rw------- */) + if err != nil { + t.Fatalf("cannot write to new cr file: %v", err) + } + klog.InfoS("created initial and new CR and CRD manifests") +} + +func TestVariableVKsDiscoveryAndResolution(t *testing.T) { + rm := &resourceManager{} + // Create testdata. + rm.createConfigAndResourceFiles(t) + + // Initialise options. + opts := options.NewOptions() + cmd := options.InitCommand + opts.AddFlags(cmd) + klog.InfoS("options", "options", opts) // Delete artefacts. - defer func() { - err := os.Remove(crConfigFile.Name()) - if err != nil { - t.Fatalf("failed to remove CR config: %v", err) - } - err = os.Remove(crdFile.Name()) - if err != nil { - t.Fatalf("failed to remove CRD manifest: %v", err) - } - err = os.Remove(crFile.Name()) - if err != nil { - t.Fatalf("failed to remove CR manifest: %v", err) - } - klog.InfoS("deleted artefacts", "crConfigFile", crConfigFile.Name(), "crdFile", crdFile.Name(), "crFile", crFile.Name()) - }() + defer rm.removeResourceFiles(t) // Populate options, and parse them. - opts.CustomResourceConfigFile = crConfigFile.Name() + opts.CustomResourceConfigFile = rm.crConfigFile.Name() opts.Kubeconfig = os.Getenv("HOME") + "/.kube/config" if err := opts.Parse(); err != nil { t.Fatalf("failed to parse options: %v", err) @@ -85,19 +159,14 @@ func TestVariableVKsDiscoveryAndResolution(t *testing.T) { klog.InfoS("parsed options", "options", opts) // Write to the config file. - crConfig := getCRConfig() - err = os.WriteFile(opts.CustomResourceConfigFile, []byte(crConfig), 0600 /* rw------- */) - if err != nil { - t.Fatalf("cannot write to config file: %v", err) - } - klog.InfoS("populated cr config file", "crConfigFile", opts.CustomResourceConfigFile) + rm.writeConfigFile(t) // Make the process asynchronous. go internal.RunKubeStateMetricsWrapper(opts) klog.InfoS("started KSM") // Wait for port 8080 to come up. - err = wait.PollUntilContextTimeout(context.TODO(), 1*time.Second, 20*time.Second, true, func(_ context.Context) (bool, error) { + err := wait.PollUntilContextTimeout(context.TODO(), 1*time.Second, 20*time.Second, true, func(_ context.Context) (bool, error) { conn, err := net.Dial("tcp", "localhost:8080") if err != nil { return false, nil @@ -114,32 +183,71 @@ func TestVariableVKsDiscoveryAndResolution(t *testing.T) { klog.InfoS("port 8080 up") // Create CRD and CR files. - crd := getCRD() - cr := getCR() - err = os.WriteFile(crdFile.Name(), []byte(crd), 0600 /* rw------- */) + rm.writeResourceFiles(t) + + // Apply initial CRD and CR to the cluster. + err = exec.Command("kubectl", "apply", "-f", rm.initCrdFile.Name()).Run() //nolint:gosec if err != nil { - t.Fatalf("cannot write to crd file: %v", err) + t.Fatalf("failed to apply initial crd: %v", err) } - err = os.WriteFile(crFile.Name(), []byte(cr), 0600 /* rw------- */) + err = exec.Command("kubectl", "apply", "-f", rm.initCrFile.Name()).Run() //nolint:gosec if err != nil { - t.Fatalf("cannot write to cr file: %v", err) + t.Fatalf("failed to apply initial cr: %v", err) } - klog.InfoS("created CR and CRD manifests") + klog.InfoS("applied initial CR and CRD manifests") - // Apply CRD and CR to the cluster. - err = exec.Command("kubectl", "apply", "-f", crdFile.Name()).Run() //nolint:gosec + // Wait for the metric to be available. + ch := make(chan bool, 1) + klog.InfoS("waiting for first metrics to become available") + testMetric := `kube_customresource_test_metric{customresource_group="contoso.com",customresource_kind="MyPlatform",customresource_version="v1alpha1",name="test-dotnet-app"}` + err = wait.PollUntilContextTimeout(context.TODO(), discovery.Interval, PopulateTimeout, true, func(_ context.Context) (bool, error) { + out, err := exec.Command("curl", "localhost:8080/metrics").Output() + if err != nil { + return false, err + } + if string(out) == "" { + return false, nil + } + // Note: we use count to make sure that only one metrics handler is running + if strings.Count(string(out), testMetric) == 1 { + // klog.InfoS("metrics available", "metric", string(out)) + // Signal the process to exit, since we know the metrics are being generated as expected. + ch <- true + return true, nil + } + return false, nil + }) if err != nil { - t.Fatalf("failed to apply crd: %v", err) + t.Fatalf("failed while waiting for initial metrics to be available: %v", err) } - err = exec.Command("kubectl", "apply", "-f", crFile.Name()).Run() //nolint:gosec + + // Wait for process to exit. + select { + case <-ch: + t.Log("initial metrics are available") + case <-time.After(PopulateTimeout * 2): + t.Fatal("timed out waiting for test to pass, check the logs for more info") + } + + // Apply new CRD and CR to the cluster. + err = exec.Command("kubectl", "apply", "-f", rm.newCrdFile.Name()).Run() //nolint:gosec if err != nil { - t.Fatalf("failed to apply cr: %v", err) + t.Fatalf("failed to apply new crd: %v", err) } - klog.InfoS("applied CR and CRD manifests") + err = exec.Command("kubectl", "apply", "-f", rm.newCrFile.Name()).Run() //nolint:gosec + if err != nil { + t.Fatalf("failed to apply new cr: %v", err) + } + err = exec.Command("kubectl", "delete", "myplatform", "test-dotnet-app").Run() //nolint:gosec + if err != nil { + t.Fatalf("failed to delete myplatform resource: %v", err) + } + klog.InfoS("applied new CR and CRD manifests") - // Wait for the metric to be available. - ch := make(chan bool, 1) - klog.InfoS("waiting for metrics to become available") + // Wait for the the new metric to be available + ch = make(chan bool, 1) + klog.InfoS("waiting for new metrics to become available") + testUpdateCRDMetric := `kube_customresource_test_update_crd_metric{customresource_group="contoso.com",customresource_kind="Update",customresource_version="v1",name="test-dotnet-app-update"}` err = wait.PollUntilContextTimeout(context.TODO(), discovery.Interval, PopulateTimeout, true, func(_ context.Context) (bool, error) { out, err := exec.Command("curl", "localhost:8080/metrics").Output() if err != nil { @@ -148,17 +256,20 @@ func TestVariableVKsDiscoveryAndResolution(t *testing.T) { if string(out) == "" { return false, nil } - // Note the "{" below. This is to ensure that the metric is not in a comment. - if strings.Contains(string(out), "kube_customresource_test_metric{") { + // Note: we use count to make sure that only one metrics handler is running, and we also want to validate that the + // new metric is available and the old one was removed, otherwise, the response could come from the + // previous handler before its context was cancelled, or maybe because it failed to be cancelled. + if strings.Contains(string(out), testUpdateCRDMetric) && !strings.Contains(string(out), testMetric) { klog.InfoS("metrics available", "metric", string(out)) // Signal the process to exit, since we know the metrics are being generated as expected. ch <- true return true, nil } + klog.InfoS("metrics available", "metric", string(out)) return false, nil }) if err != nil { - t.Fatalf("failed while waiting for metrics to be available: %v", err) + t.Fatalf("failed while waiting for new metrics to be available: %v", err) } // Wait for process to exit. @@ -252,8 +363,8 @@ spec: resources: - groupVersionKind: group: "contoso.com" - version: "*" - kind: "*" + version: "v1alpha1" + kind: "MyPlatform" metrics: - name: "test_metric" help: "foo baz" @@ -263,5 +374,61 @@ spec: path: [metadata] labelsFromPath: name: [name] + - groupVersionKind: + group: "contoso.com" + version: "v1" + kind: "Update" + metrics: + - name: "test_update_crd_metric" + help: "foo baz" + each: + type: Info + info: + path: [metadata] + labelsFromPath: + name: [name] +` +} + +func getNewCR() string { + return ` +apiVersion: contoso.com/v1 +kind: Update +metadata: + name: test-dotnet-app-update +spec: + new: just-added +` +} + +func getNewCRD() string { + return ` +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: updates.contoso.com +spec: + group: contoso.com + names: + plural: updates + singular: update + kind: Update + shortNames: + - updt + scope: Namespaced + versions: + - name: v1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + properties: + new: + type: string + required: ["spec"] ` }