Skip to content

Commit

Permalink
Add monitoring to object cleaner (#3608)
Browse files Browse the repository at this point in the history
Add setting objects cleaner status and recording it as a metric.

Add cleaner metrics for removing old objects and corresponding test.

Fix object cleaner not starting.

Rename `reterr` to `retErr` for consistency.
  • Loading branch information
opudrovs committed Nov 17, 2023
1 parent fd0e650 commit 8e1331c
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 23 deletions.
1 change: 1 addition & 0 deletions charts/mccp/templates/clusters-service/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ data:
MONITORING_METRICS_ENABLED: {{ .Values.monitoring.metrics.enabled | quote }}
MONITORING_PROFILING_ENABLED: {{ .Values.monitoring.profiling.enabled | quote }}
EXPLORER_ENABLED_FOR: {{ .Values.explorer.enabledFor | join "," | quote }}
EXPLORER_CLEANER_DISABLED: {{ .Values.explorer.cleaner.disabled | quote }}
8 changes: 4 additions & 4 deletions cmd/clusters-service/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type Options struct {
PipelineControllerAddress string
CollectorServiceAccount collector.ImpersonateServiceAccount
MonitoringOptions monitoring.Options
EnableObjectCleaner bool
ExplorerCleanerDisabled bool
ExplorerEnabledFor []string
}

Expand Down Expand Up @@ -285,10 +285,10 @@ func WithMonitoring(enabled bool, address string, metricsEnabled bool, profiling
}
}

// WithObjectCleaner enables the object cleaner
func WithObjectCleaner(enabled bool) Option {
// WithExplorerCleanerDisabled configures the object cleaner
func WithExplorerCleanerDisabled(disabled bool) Option {
return func(o *Options) {
o.EnableObjectCleaner = enabled
o.ExplorerCleanerDisabled = disabled
}
}

Expand Down
6 changes: 3 additions & 3 deletions cmd/clusters-service/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ type Params struct {
MonitoringBindAddress string `mapstructure:"monitoring-bind-address"`
MetricsEnabled bool `mapstructure:"monitoring-metrics-enabled"`
ProfilingEnabled bool `mapstructure:"monitoring-profiling-enabled"`
EnableObjectCleaner bool `mapstructure:"enable-object-cleaner"`
ExplorerCleanerDisabled bool `mapstructure:"explorer-cleaner-disabled"`
NoAuthUser string `mapstructure:"insecure-no-authentication-user"`
ExplorerEnabledFor []string `mapstructure:"explorer-enabled-for"`
}
Expand Down Expand Up @@ -586,7 +586,7 @@ func StartServer(ctx context.Context, p Params, logOptions flux_logger.Options)
WithPipelineControllerAddress(p.PipelineControllerAddress),
WithCollectorServiceAccount(p.CollectorServiceAccountName, p.CollectorServiceAccountNamespace),
WithMonitoring(p.MonitoringEnabled, p.MonitoringBindAddress, p.MetricsEnabled, p.ProfilingEnabled, log),
WithObjectCleaner(p.EnableObjectCleaner),
WithExplorerCleanerDisabled(p.ExplorerCleanerDisabled),
WithExplorerEnabledFor(p.ExplorerEnabledFor),
)
}
Expand Down Expand Up @@ -707,7 +707,7 @@ func RunInProcessGateway(ctx context.Context, addr string, setters ...Option) er
SkipCollection: false,
ObjectKinds: configuration.SupportedObjectKinds,
ServiceAccount: args.CollectorServiceAccount,
EnableObjectCleaner: args.EnableObjectCleaner,
EnableObjectCleaner: !args.ExplorerCleanerDisabled,
EnabledFor: args.ExplorerEnabledFor,
})
if err != nil {
Expand Down
64 changes: 53 additions & 11 deletions pkg/query/cleaner/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/weaveworks/weave-gitops-enterprise/pkg/query/cleaner/metrics"
"github.com/weaveworks/weave-gitops-enterprise/pkg/query/configuration"
"github.com/weaveworks/weave-gitops-enterprise/pkg/query/internal/models"
"github.com/weaveworks/weave-gitops-enterprise/pkg/query/store"
Expand All @@ -17,12 +18,14 @@ type ObjectCleaner interface {
}

type objectCleaner struct {
log logr.Logger
ticker *time.Ticker
config []configuration.ObjectKind
idx store.IndexWriter
store store.Store
stop chan bool
log logr.Logger
ticker *time.Ticker
config []configuration.ObjectKind
idx store.IndexWriter
store store.Store
stop chan bool
status string
lastStatusChange time.Time
}

type CleanerOpts struct {
Expand All @@ -33,6 +36,12 @@ type CleanerOpts struct {
Index store.IndexWriter
}

const (
CleanerStarting = "starting"
CleanerStarted = "started"
CleanerStopped = "stopped"
)

func NewObjectCleaner(opts CleanerOpts) (ObjectCleaner, error) {
return &objectCleaner{
log: opts.Log,
Expand All @@ -44,10 +53,13 @@ func NewObjectCleaner(opts CleanerOpts) (ObjectCleaner, error) {
}

func (oc *objectCleaner) Start() error {
oc.setStatus(CleanerStarting)
stop := make(chan bool, 1)
oc.stop = stop

go func() {
oc.setStatus(CleanerStarted)

for {
select {
case <-oc.ticker.C:
Expand All @@ -66,11 +78,42 @@ func (oc *objectCleaner) Start() error {

func (oc *objectCleaner) Stop() error {
oc.stop <- true
oc.setStatus(CleanerStopped)

return nil
}

func (oc *objectCleaner) removeOldObjects(ctx context.Context) error {
// setStatus sets cleaner status and records it as a metric.
// It does not record metrics for the non-active state = notStarted
func (oc *objectCleaner) setStatus(s string) {
if oc.status != "" {
metrics.CleanerWatcherDecrease(oc.status)
}

oc.lastStatusChange = time.Now()
oc.status = s

if oc.status != CleanerStopped {
metrics.CleanerWatcherIncrease(oc.status)
}
}

func recordCleanerMetrics(start time.Time, err error) {
metrics.CleanerAddInflightRequests(-1)

label := metrics.SuccessLabel
if err != nil {
label = metrics.FailedLabel
}

metrics.CleanerSetLatency(label, time.Since(start))
}

func (oc *objectCleaner) removeOldObjects(ctx context.Context) (retErr error) {
// metrics
metrics.CleanerAddInflightRequests(1)
defer recordCleanerMetrics(time.Now(), retErr)

iter, err := oc.store.GetAllObjects(ctx)

if err != nil {
Expand All @@ -80,12 +123,11 @@ func (oc *objectCleaner) removeOldObjects(ctx context.Context) error {
all, err := iter.All()

if err != nil {
return fmt.Errorf("could not get all objects: %w", err)
return fmt.Errorf("could not iterate over objects: %w", err)
}

for _, obj := range all {
for i, k := range oc.config {

kind := fmt.Sprintf("%s/%s", k.Gvk.GroupVersion().String(), k.Gvk.Kind)
gvk := obj.GroupVersionKind()
if kind == gvk {
Expand All @@ -95,11 +137,11 @@ func (oc *objectCleaner) removeOldObjects(ctx context.Context) error {
remove := []models.Object{obj}

if err := oc.store.DeleteObjects(ctx, remove); err != nil {
oc.log.Error(err, "could not delete object")
oc.log.Error(err, "could not delete object with ID: %s", obj.ID)
}

if err := oc.idx.Remove(ctx, remove); err != nil {
oc.log.Error(err, "could not delete object from index")
oc.log.Error(err, "could not delete object with ID: %s from index", obj.ID)
}
}
}
Expand Down
76 changes: 76 additions & 0 deletions pkg/query/cleaner/cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package cleaner

import (
"context"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/go-logr/logr"
. "github.com/onsi/gomega"
"github.com/weaveworks/weave-gitops-enterprise/pkg/monitoring/metrics"
cleanermetrics "github.com/weaveworks/weave-gitops-enterprise/pkg/query/cleaner/metrics"
"github.com/weaveworks/weave-gitops-enterprise/pkg/query/configuration"
"github.com/weaveworks/weave-gitops-enterprise/pkg/query/internal/models"
"github.com/weaveworks/weave-gitops-enterprise/pkg/query/store/storefakes"
Expand Down Expand Up @@ -70,5 +75,76 @@ func TestObjectCleaner(t *testing.T) {
// The `iter` mock will return only the second object, which is not old enough to be deleted.
g.Expect(oc.removeOldObjects(context.Background())).To(Succeed())
g.Expect(s.DeleteObjectsCallCount()).To(Equal(1))
}

func TestObjectCleanerMetrics(t *testing.T) {
g := NewWithT(t)
s := storefakes.FakeStore{}

cfg := configuration.BucketObjectKind
cfg.RetentionPolicy = configuration.RetentionPolicy(60 * time.Second)

cleanermetrics.CleanerLatencyHistogram.Reset()
cleanermetrics.CleanerInflightRequests.Reset()

_, h := metrics.NewDefaultPrometheusHandler()
ts := httptest.NewServer(h)
defer ts.Close()

objs := []models.Object{
{
Cluster: "cluster1",
Kind: cfg.Gvk.Kind,
Name: "name1",
APIGroup: cfg.Gvk.Group,
APIVersion: cfg.Gvk.Version,
// Deleted 1 hour ago, our retention policy is 60s, so this should be deleted.
KubernetesDeletedAt: time.Now().Add(-time.Hour),
},
{
Cluster: "cluster1",
Kind: cfg.Gvk.Kind,
Name: "name2",
APIGroup: cfg.Gvk.Group,
APIVersion: cfg.Gvk.Version,
// Deleted 10s ago, our retention policy is 60s, so this should not be deleted.
KubernetesDeletedAt: time.Now().Add(10 * -time.Second),
},
}

iter := storefakes.FakeIterator{}
iter.AllReturnsOnCall(0, objs, nil)
// Pretend the first object is deleted on the second call
iter.AllReturnsOnCall(1, objs[1:2], nil)
s.GetAllObjectsReturns(&iter, nil)

index := storefakes.FakeIndexWriter{}

oc := objectCleaner{
log: logr.Discard(),
store: &s,
idx: &index,
config: []configuration.ObjectKind{cfg},
}

// Skipping starting the cleaner here to avoid dealing with async and time stuff.
g.Expect(oc.removeOldObjects(context.Background())).To(Succeed())

wantMetrics := []string{
`objects_cleaner_inflight_requests{action="RemoveObjects"} 0`,
`objects_cleaner_latency_seconds_count{action="RemoveObjects",status="success"} 1`,
}
assertMetrics(g, ts, wantMetrics)
}

func assertMetrics(g *WithT, ts *httptest.Server, expMetrics []string) {
resp, err := http.Get(ts.URL)
g.Expect(err).NotTo(HaveOccurred())
b, err := io.ReadAll(resp.Body)
g.Expect(err).NotTo(HaveOccurred())
metrics := string(b)

for _, expMetric := range expMetrics {
g.Expect(metrics).To(ContainSubstring(expMetric))
}
}
60 changes: 60 additions & 0 deletions pkg/query/cleaner/metrics/recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package metrics

import (
"time"

"github.com/prometheus/client_golang/prometheus"
)

const (
cleanerSubsystem = "objects_cleaner"

// cleaner actions
removeObjectsAction = "RemoveObjects"

FailedLabel = "error"
SuccessLabel = "success"
)

var cleanerWatcher = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: cleanerSubsystem,
Name: "status",
Help: "cleaner status",
}, []string{"status"})

var CleanerLatencyHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: cleanerSubsystem,
Name: "latency_seconds",
Help: "cleaner latency",
Buckets: prometheus.LinearBuckets(0.01, 0.01, 10),
}, []string{"action", "status"})

var CleanerInflightRequests = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: cleanerSubsystem,
Name: "inflight_requests",
Help: "number of cleaner in-flight requests.",
}, []string{"action"})

func init() {
prometheus.MustRegister(cleanerWatcher)
prometheus.MustRegister(CleanerLatencyHistogram)
prometheus.MustRegister(CleanerInflightRequests)
}

// CleanerWatcherDecrease decreases cleaner_watcher metric for status
func CleanerWatcherDecrease(status string) {
cleanerWatcher.WithLabelValues(status).Dec()
}

// CleanerWatcherIncrease increases cleaner_watcher metric for status
func CleanerWatcherIncrease(status string) {
cleanerWatcher.WithLabelValues(status).Inc()
}

func CleanerSetLatency(status string, duration time.Duration) {
CleanerLatencyHistogram.WithLabelValues(removeObjectsAction, status).Observe(duration.Seconds())
}

func CleanerAddInflightRequests(number float64) {
CleanerInflightRequests.WithLabelValues(removeObjectsAction).Add(number)
}
6 changes: 3 additions & 3 deletions pkg/query/collector/watching.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type child struct {
}

// setStatus sets watcher status and records it as a metric.
// It does not record metrics for stopped state non-active states = notStarted and Stopped
// It does not record metrics for the non-active state = notStarted
func (c *child) setStatus(s string) {
if c.status != "" {
metrics.ClusterWatcherDecrease(c.collector, c.status)
Expand Down Expand Up @@ -145,7 +145,7 @@ func newWatchingCollector(opts CollectorOpts) (*watchingCollector, error) {
}, nil
}

func (w *watchingCollector) watch(cluster cluster.Cluster) (reterr error) {
func (w *watchingCollector) watch(cluster cluster.Cluster) (retErr error) {
clusterName := cluster.GetName()
if clusterName == "" {
return fmt.Errorf("cluster name is empty")
Expand All @@ -159,7 +159,7 @@ func (w *watchingCollector) watch(cluster cluster.Cluster) (reterr error) {
childctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
defer func() {
if reterr != nil {
if retErr != nil {
w.clusterWatchersMu.Lock()
c.setStatus(ClusterWatchingFailed)
cancel := c.cancel
Expand Down
4 changes: 2 additions & 2 deletions pkg/query/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (so *ServerOpts) Validate() error {
return nil
}

func NewServer(opts ServerOpts) (_ pb.QueryServer, _ func() error, reterr error) {
func NewServer(opts ServerOpts) (_ pb.QueryServer, _ func() error, retErr error) {
if err := opts.Validate(); err != nil {
return nil, nil, fmt.Errorf("invalid query server options: %w", err)
}
Expand Down Expand Up @@ -256,7 +256,7 @@ func NewServer(opts ServerOpts) (_ pb.QueryServer, _ func() error, reterr error)

ctx, cancel := context.WithCancel(context.Background())
defer func() {
if reterr != nil {
if retErr != nil {
cancel()
}
}()
Expand Down

0 comments on commit 8e1331c

Please sign in to comment.