From 8e1331c97a6117f09cd88ecb8477cb0043e02fe2 Mon Sep 17 00:00:00 2001 From: opudrovs Date: Fri, 17 Nov 2023 02:12:06 +0100 Subject: [PATCH] Add monitoring to object cleaner (#3608) Add setting objects cleaner status and recording it as a metric. Add cleaner metrics for removing old objects and corresponding test. Fix object cleaner not starting. Rename `reterr` to `retErr` for consistency. --- .../templates/clusters-service/configmap.yaml | 1 + cmd/clusters-service/app/options.go | 8 +- cmd/clusters-service/app/server.go | 6 +- pkg/query/cleaner/cleaner.go | 64 +++++++++++++--- pkg/query/cleaner/cleaner_test.go | 76 +++++++++++++++++++ pkg/query/cleaner/metrics/recorder.go | 60 +++++++++++++++ pkg/query/collector/watching.go | 6 +- pkg/query/server/server.go | 4 +- 8 files changed, 202 insertions(+), 23 deletions(-) create mode 100644 pkg/query/cleaner/metrics/recorder.go diff --git a/charts/mccp/templates/clusters-service/configmap.yaml b/charts/mccp/templates/clusters-service/configmap.yaml index 400ce99538..4b7142ab16 100644 --- a/charts/mccp/templates/clusters-service/configmap.yaml +++ b/charts/mccp/templates/clusters-service/configmap.yaml @@ -59,3 +59,4 @@ data: MONITORING_METRICS_ENABLED: {{ .Values.monitoring.metrics.enabled | quote }} MONITORING_PROFILING_ENABLED: {{ .Values.monitoring.profiling.enabled | quote }} EXPLORER_ENABLED_FOR: {{ .Values.explorer.enabledFor | join "," | quote }} + EXPLORER_CLEANER_DISABLED: {{ .Values.explorer.cleaner.disabled | quote }} diff --git a/cmd/clusters-service/app/options.go b/cmd/clusters-service/app/options.go index 5534e2aaa6..4ee567c182 100644 --- a/cmd/clusters-service/app/options.go +++ b/cmd/clusters-service/app/options.go @@ -59,7 +59,7 @@ type Options struct { PipelineControllerAddress string CollectorServiceAccount collector.ImpersonateServiceAccount MonitoringOptions monitoring.Options - EnableObjectCleaner bool + ExplorerCleanerDisabled bool ExplorerEnabledFor []string } @@ -285,10 +285,10 @@ func WithMonitoring(enabled bool, address string, metricsEnabled bool, profiling } } -// WithObjectCleaner enables the object cleaner -func WithObjectCleaner(enabled bool) Option { +// WithExplorerCleanerDisabled configures the object cleaner +func WithExplorerCleanerDisabled(disabled bool) Option { return func(o *Options) { - o.EnableObjectCleaner = enabled + o.ExplorerCleanerDisabled = disabled } } diff --git a/cmd/clusters-service/app/server.go b/cmd/clusters-service/app/server.go index 02d6d5f378..30c0e0e19e 100644 --- a/cmd/clusters-service/app/server.go +++ b/cmd/clusters-service/app/server.go @@ -162,7 +162,7 @@ type Params struct { MonitoringBindAddress string `mapstructure:"monitoring-bind-address"` MetricsEnabled bool `mapstructure:"monitoring-metrics-enabled"` ProfilingEnabled bool `mapstructure:"monitoring-profiling-enabled"` - EnableObjectCleaner bool `mapstructure:"enable-object-cleaner"` + ExplorerCleanerDisabled bool `mapstructure:"explorer-cleaner-disabled"` NoAuthUser string `mapstructure:"insecure-no-authentication-user"` ExplorerEnabledFor []string `mapstructure:"explorer-enabled-for"` } @@ -586,7 +586,7 @@ func StartServer(ctx context.Context, p Params, logOptions flux_logger.Options) WithPipelineControllerAddress(p.PipelineControllerAddress), WithCollectorServiceAccount(p.CollectorServiceAccountName, p.CollectorServiceAccountNamespace), WithMonitoring(p.MonitoringEnabled, p.MonitoringBindAddress, p.MetricsEnabled, p.ProfilingEnabled, log), - WithObjectCleaner(p.EnableObjectCleaner), + WithExplorerCleanerDisabled(p.ExplorerCleanerDisabled), WithExplorerEnabledFor(p.ExplorerEnabledFor), ) } @@ -707,7 +707,7 @@ func RunInProcessGateway(ctx context.Context, addr string, setters ...Option) er SkipCollection: false, ObjectKinds: configuration.SupportedObjectKinds, ServiceAccount: args.CollectorServiceAccount, - EnableObjectCleaner: args.EnableObjectCleaner, + EnableObjectCleaner: !args.ExplorerCleanerDisabled, EnabledFor: args.ExplorerEnabledFor, }) if err != nil { diff --git a/pkg/query/cleaner/cleaner.go b/pkg/query/cleaner/cleaner.go index 4fea42ef91..ef4d0a46b4 100644 --- a/pkg/query/cleaner/cleaner.go +++ b/pkg/query/cleaner/cleaner.go @@ -6,6 +6,7 @@ import ( "time" "github.com/go-logr/logr" + "github.com/weaveworks/weave-gitops-enterprise/pkg/query/cleaner/metrics" "github.com/weaveworks/weave-gitops-enterprise/pkg/query/configuration" "github.com/weaveworks/weave-gitops-enterprise/pkg/query/internal/models" "github.com/weaveworks/weave-gitops-enterprise/pkg/query/store" @@ -17,12 +18,14 @@ type ObjectCleaner interface { } type objectCleaner struct { - log logr.Logger - ticker *time.Ticker - config []configuration.ObjectKind - idx store.IndexWriter - store store.Store - stop chan bool + log logr.Logger + ticker *time.Ticker + config []configuration.ObjectKind + idx store.IndexWriter + store store.Store + stop chan bool + status string + lastStatusChange time.Time } type CleanerOpts struct { @@ -33,6 +36,12 @@ type CleanerOpts struct { Index store.IndexWriter } +const ( + CleanerStarting = "starting" + CleanerStarted = "started" + CleanerStopped = "stopped" +) + func NewObjectCleaner(opts CleanerOpts) (ObjectCleaner, error) { return &objectCleaner{ log: opts.Log, @@ -44,10 +53,13 @@ func NewObjectCleaner(opts CleanerOpts) (ObjectCleaner, error) { } func (oc *objectCleaner) Start() error { + oc.setStatus(CleanerStarting) stop := make(chan bool, 1) oc.stop = stop go func() { + oc.setStatus(CleanerStarted) + for { select { case <-oc.ticker.C: @@ -66,11 +78,42 @@ func (oc *objectCleaner) Start() error { func (oc *objectCleaner) Stop() error { oc.stop <- true + oc.setStatus(CleanerStopped) return nil } -func (oc *objectCleaner) removeOldObjects(ctx context.Context) error { +// setStatus sets cleaner status and records it as a metric. +// It does not record metrics for the non-active state = notStarted +func (oc *objectCleaner) setStatus(s string) { + if oc.status != "" { + metrics.CleanerWatcherDecrease(oc.status) + } + + oc.lastStatusChange = time.Now() + oc.status = s + + if oc.status != CleanerStopped { + metrics.CleanerWatcherIncrease(oc.status) + } +} + +func recordCleanerMetrics(start time.Time, err error) { + metrics.CleanerAddInflightRequests(-1) + + label := metrics.SuccessLabel + if err != nil { + label = metrics.FailedLabel + } + + metrics.CleanerSetLatency(label, time.Since(start)) +} + +func (oc *objectCleaner) removeOldObjects(ctx context.Context) (retErr error) { + // metrics + metrics.CleanerAddInflightRequests(1) + defer recordCleanerMetrics(time.Now(), retErr) + iter, err := oc.store.GetAllObjects(ctx) if err != nil { @@ -80,12 +123,11 @@ func (oc *objectCleaner) removeOldObjects(ctx context.Context) error { all, err := iter.All() if err != nil { - return fmt.Errorf("could not get all objects: %w", err) + return fmt.Errorf("could not iterate over objects: %w", err) } for _, obj := range all { for i, k := range oc.config { - kind := fmt.Sprintf("%s/%s", k.Gvk.GroupVersion().String(), k.Gvk.Kind) gvk := obj.GroupVersionKind() if kind == gvk { @@ -95,11 +137,11 @@ func (oc *objectCleaner) removeOldObjects(ctx context.Context) error { remove := []models.Object{obj} if err := oc.store.DeleteObjects(ctx, remove); err != nil { - oc.log.Error(err, "could not delete object") + oc.log.Error(err, "could not delete object with ID: %s", obj.ID) } if err := oc.idx.Remove(ctx, remove); err != nil { - oc.log.Error(err, "could not delete object from index") + oc.log.Error(err, "could not delete object with ID: %s from index", obj.ID) } } } diff --git a/pkg/query/cleaner/cleaner_test.go b/pkg/query/cleaner/cleaner_test.go index 5c4e181bc2..f437efe8d4 100644 --- a/pkg/query/cleaner/cleaner_test.go +++ b/pkg/query/cleaner/cleaner_test.go @@ -2,11 +2,16 @@ package cleaner import ( "context" + "io" + "net/http" + "net/http/httptest" "testing" "time" "github.com/go-logr/logr" . "github.com/onsi/gomega" + "github.com/weaveworks/weave-gitops-enterprise/pkg/monitoring/metrics" + cleanermetrics "github.com/weaveworks/weave-gitops-enterprise/pkg/query/cleaner/metrics" "github.com/weaveworks/weave-gitops-enterprise/pkg/query/configuration" "github.com/weaveworks/weave-gitops-enterprise/pkg/query/internal/models" "github.com/weaveworks/weave-gitops-enterprise/pkg/query/store/storefakes" @@ -70,5 +75,76 @@ func TestObjectCleaner(t *testing.T) { // The `iter` mock will return only the second object, which is not old enough to be deleted. g.Expect(oc.removeOldObjects(context.Background())).To(Succeed()) g.Expect(s.DeleteObjectsCallCount()).To(Equal(1)) +} + +func TestObjectCleanerMetrics(t *testing.T) { + g := NewWithT(t) + s := storefakes.FakeStore{} + + cfg := configuration.BucketObjectKind + cfg.RetentionPolicy = configuration.RetentionPolicy(60 * time.Second) + cleanermetrics.CleanerLatencyHistogram.Reset() + cleanermetrics.CleanerInflightRequests.Reset() + + _, h := metrics.NewDefaultPrometheusHandler() + ts := httptest.NewServer(h) + defer ts.Close() + + objs := []models.Object{ + { + Cluster: "cluster1", + Kind: cfg.Gvk.Kind, + Name: "name1", + APIGroup: cfg.Gvk.Group, + APIVersion: cfg.Gvk.Version, + // Deleted 1 hour ago, our retention policy is 60s, so this should be deleted. + KubernetesDeletedAt: time.Now().Add(-time.Hour), + }, + { + Cluster: "cluster1", + Kind: cfg.Gvk.Kind, + Name: "name2", + APIGroup: cfg.Gvk.Group, + APIVersion: cfg.Gvk.Version, + // Deleted 10s ago, our retention policy is 60s, so this should not be deleted. + KubernetesDeletedAt: time.Now().Add(10 * -time.Second), + }, + } + + iter := storefakes.FakeIterator{} + iter.AllReturnsOnCall(0, objs, nil) + // Pretend the first object is deleted on the second call + iter.AllReturnsOnCall(1, objs[1:2], nil) + s.GetAllObjectsReturns(&iter, nil) + + index := storefakes.FakeIndexWriter{} + + oc := objectCleaner{ + log: logr.Discard(), + store: &s, + idx: &index, + config: []configuration.ObjectKind{cfg}, + } + + // Skipping starting the cleaner here to avoid dealing with async and time stuff. + g.Expect(oc.removeOldObjects(context.Background())).To(Succeed()) + + wantMetrics := []string{ + `objects_cleaner_inflight_requests{action="RemoveObjects"} 0`, + `objects_cleaner_latency_seconds_count{action="RemoveObjects",status="success"} 1`, + } + assertMetrics(g, ts, wantMetrics) +} + +func assertMetrics(g *WithT, ts *httptest.Server, expMetrics []string) { + resp, err := http.Get(ts.URL) + g.Expect(err).NotTo(HaveOccurred()) + b, err := io.ReadAll(resp.Body) + g.Expect(err).NotTo(HaveOccurred()) + metrics := string(b) + + for _, expMetric := range expMetrics { + g.Expect(metrics).To(ContainSubstring(expMetric)) + } } diff --git a/pkg/query/cleaner/metrics/recorder.go b/pkg/query/cleaner/metrics/recorder.go new file mode 100644 index 0000000000..73c5814f44 --- /dev/null +++ b/pkg/query/cleaner/metrics/recorder.go @@ -0,0 +1,60 @@ +package metrics + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + cleanerSubsystem = "objects_cleaner" + + // cleaner actions + removeObjectsAction = "RemoveObjects" + + FailedLabel = "error" + SuccessLabel = "success" +) + +var cleanerWatcher = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: cleanerSubsystem, + Name: "status", + Help: "cleaner status", +}, []string{"status"}) + +var CleanerLatencyHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Subsystem: cleanerSubsystem, + Name: "latency_seconds", + Help: "cleaner latency", + Buckets: prometheus.LinearBuckets(0.01, 0.01, 10), +}, []string{"action", "status"}) + +var CleanerInflightRequests = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: cleanerSubsystem, + Name: "inflight_requests", + Help: "number of cleaner in-flight requests.", +}, []string{"action"}) + +func init() { + prometheus.MustRegister(cleanerWatcher) + prometheus.MustRegister(CleanerLatencyHistogram) + prometheus.MustRegister(CleanerInflightRequests) +} + +// CleanerWatcherDecrease decreases cleaner_watcher metric for status +func CleanerWatcherDecrease(status string) { + cleanerWatcher.WithLabelValues(status).Dec() +} + +// CleanerWatcherIncrease increases cleaner_watcher metric for status +func CleanerWatcherIncrease(status string) { + cleanerWatcher.WithLabelValues(status).Inc() +} + +func CleanerSetLatency(status string, duration time.Duration) { + CleanerLatencyHistogram.WithLabelValues(removeObjectsAction, status).Observe(duration.Seconds()) +} + +func CleanerAddInflightRequests(number float64) { + CleanerInflightRequests.WithLabelValues(removeObjectsAction).Add(number) +} diff --git a/pkg/query/collector/watching.go b/pkg/query/collector/watching.go index 6d46714792..f74ce89323 100644 --- a/pkg/query/collector/watching.go +++ b/pkg/query/collector/watching.go @@ -98,7 +98,7 @@ type child struct { } // setStatus sets watcher status and records it as a metric. -// It does not record metrics for stopped state non-active states = notStarted and Stopped +// It does not record metrics for the non-active state = notStarted func (c *child) setStatus(s string) { if c.status != "" { metrics.ClusterWatcherDecrease(c.collector, c.status) @@ -145,7 +145,7 @@ func newWatchingCollector(opts CollectorOpts) (*watchingCollector, error) { }, nil } -func (w *watchingCollector) watch(cluster cluster.Cluster) (reterr error) { +func (w *watchingCollector) watch(cluster cluster.Cluster) (retErr error) { clusterName := cluster.GetName() if clusterName == "" { return fmt.Errorf("cluster name is empty") @@ -159,7 +159,7 @@ func (w *watchingCollector) watch(cluster cluster.Cluster) (reterr error) { childctx, cancel := context.WithCancel(context.Background()) c.cancel = cancel defer func() { - if reterr != nil { + if retErr != nil { w.clusterWatchersMu.Lock() c.setStatus(ClusterWatchingFailed) cancel := c.cancel diff --git a/pkg/query/server/server.go b/pkg/query/server/server.go index c4017bdf2c..6028251f1d 100644 --- a/pkg/query/server/server.go +++ b/pkg/query/server/server.go @@ -198,7 +198,7 @@ func (so *ServerOpts) Validate() error { return nil } -func NewServer(opts ServerOpts) (_ pb.QueryServer, _ func() error, reterr error) { +func NewServer(opts ServerOpts) (_ pb.QueryServer, _ func() error, retErr error) { if err := opts.Validate(); err != nil { return nil, nil, fmt.Errorf("invalid query server options: %w", err) } @@ -256,7 +256,7 @@ func NewServer(opts ServerOpts) (_ pb.QueryServer, _ func() error, reterr error) ctx, cancel := context.WithCancel(context.Background()) defer func() { - if reterr != nil { + if retErr != nil { cancel() } }()