Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add monitoring to objects cleaner #3608

Merged
merged 1 commit into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions charts/mccp/templates/clusters-service/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ data:
MONITORING_METRICS_ENABLED: {{ .Values.monitoring.metrics.enabled | quote }}
MONITORING_PROFILING_ENABLED: {{ .Values.monitoring.profiling.enabled | quote }}
EXPLORER_ENABLED_FOR: {{ .Values.explorer.enabledFor | join "," | quote }}
EXPLORER_CLEANER_DISABLED: {{ .Values.explorer.cleaner.disabled | quote }}
8 changes: 4 additions & 4 deletions cmd/clusters-service/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type Options struct {
PipelineControllerAddress string
CollectorServiceAccount collector.ImpersonateServiceAccount
MonitoringOptions monitoring.Options
EnableObjectCleaner bool
ExplorerCleanerDisabled bool
ExplorerEnabledFor []string
}

Expand Down Expand Up @@ -285,10 +285,10 @@ func WithMonitoring(enabled bool, address string, metricsEnabled bool, profiling
}
}

// WithObjectCleaner enables the object cleaner
func WithObjectCleaner(enabled bool) Option {
// WithExplorerCleanerDisabled configures the object cleaner
func WithExplorerCleanerDisabled(disabled bool) Option {
return func(o *Options) {
o.EnableObjectCleaner = enabled
o.ExplorerCleanerDisabled = disabled
}
}

Expand Down
6 changes: 3 additions & 3 deletions cmd/clusters-service/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ type Params struct {
MonitoringBindAddress string `mapstructure:"monitoring-bind-address"`
MetricsEnabled bool `mapstructure:"monitoring-metrics-enabled"`
ProfilingEnabled bool `mapstructure:"monitoring-profiling-enabled"`
EnableObjectCleaner bool `mapstructure:"enable-object-cleaner"`
ExplorerCleanerDisabled bool `mapstructure:"explorer-cleaner-disabled"`
NoAuthUser string `mapstructure:"insecure-no-authentication-user"`
ExplorerEnabledFor []string `mapstructure:"explorer-enabled-for"`
}
Expand Down Expand Up @@ -586,7 +586,7 @@ func StartServer(ctx context.Context, p Params, logOptions flux_logger.Options)
WithPipelineControllerAddress(p.PipelineControllerAddress),
WithCollectorServiceAccount(p.CollectorServiceAccountName, p.CollectorServiceAccountNamespace),
WithMonitoring(p.MonitoringEnabled, p.MonitoringBindAddress, p.MetricsEnabled, p.ProfilingEnabled, log),
WithObjectCleaner(p.EnableObjectCleaner),
WithExplorerCleanerDisabled(p.ExplorerCleanerDisabled),
WithExplorerEnabledFor(p.ExplorerEnabledFor),
)
}
Expand Down Expand Up @@ -707,7 +707,7 @@ func RunInProcessGateway(ctx context.Context, addr string, setters ...Option) er
SkipCollection: false,
ObjectKinds: configuration.SupportedObjectKinds,
ServiceAccount: args.CollectorServiceAccount,
EnableObjectCleaner: args.EnableObjectCleaner,
EnableObjectCleaner: !args.ExplorerCleanerDisabled,
EnabledFor: args.ExplorerEnabledFor,
})
if err != nil {
Expand Down
63 changes: 52 additions & 11 deletions pkg/query/cleaner/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/weaveworks/weave-gitops-enterprise/pkg/query/cleaner/metrics"
"github.com/weaveworks/weave-gitops-enterprise/pkg/query/configuration"
"github.com/weaveworks/weave-gitops-enterprise/pkg/query/internal/models"
"github.com/weaveworks/weave-gitops-enterprise/pkg/query/store"
Expand All @@ -17,12 +18,14 @@ type ObjectCleaner interface {
}

type objectCleaner struct {
log logr.Logger
ticker *time.Ticker
config []configuration.ObjectKind
idx store.IndexWriter
store store.Store
stop chan bool
log logr.Logger
ticker *time.Ticker
config []configuration.ObjectKind
idx store.IndexWriter
store store.Store
stop chan bool
status string
lastStatusChange time.Time
}

type CleanerOpts struct {
Expand All @@ -33,6 +36,12 @@ type CleanerOpts struct {
Index store.IndexWriter
}

const (
CleanerStarting = "starting"
CleanerStarted = "started"
CleanerStopped = "stopped"
)

func NewObjectCleaner(opts CleanerOpts) (ObjectCleaner, error) {
return &objectCleaner{
log: opts.Log,
Expand All @@ -44,10 +53,13 @@ func NewObjectCleaner(opts CleanerOpts) (ObjectCleaner, error) {
}

func (oc *objectCleaner) Start() error {
oc.setStatus(CleanerStarting)
stop := make(chan bool, 1)
oc.stop = stop

go func() {
oc.setStatus(CleanerStarted)

for {
select {
case <-oc.ticker.C:
Expand All @@ -66,11 +78,41 @@ func (oc *objectCleaner) Start() error {

func (oc *objectCleaner) Stop() error {
oc.stop <- true
oc.setStatus(CleanerStopped)

return nil
}

func (oc *objectCleaner) removeOldObjects(ctx context.Context) error {
// setStatus sets cleaner status and records it as a metric.
func (oc *objectCleaner) setStatus(s string) {
if oc.status != "" {
metrics.CleanerWatcherDecrease(oc.status)
}

oc.lastStatusChange = time.Now()
oc.status = s

if oc.status != CleanerStopped {
metrics.CleanerWatcherIncrease(oc.status)
}
}

func recordCleanerMetrics(start time.Time, err error) {
metrics.CleanerAddInflightRequests(-1)

label := metrics.SuccessLabel
if err != nil {
label = metrics.FailedLabel
}

metrics.CleanerSetLatency(label, time.Since(start))
}

func (oc *objectCleaner) removeOldObjects(ctx context.Context) (retErr error) {
// metrics
metrics.CleanerAddInflightRequests(1)
defer recordCleanerMetrics(time.Now(), retErr)

iter, err := oc.store.GetAllObjects(ctx)

if err != nil {
Expand All @@ -80,12 +122,11 @@ func (oc *objectCleaner) removeOldObjects(ctx context.Context) error {
all, err := iter.All()

if err != nil {
return fmt.Errorf("could not get all objects: %w", err)
return fmt.Errorf("could not iterate over objects: %w", err)
}

for _, obj := range all {
for i, k := range oc.config {

kind := fmt.Sprintf("%s/%s", k.Gvk.GroupVersion().String(), k.Gvk.Kind)
gvk := obj.GroupVersionKind()
if kind == gvk {
Expand All @@ -95,11 +136,11 @@ func (oc *objectCleaner) removeOldObjects(ctx context.Context) error {
remove := []models.Object{obj}

if err := oc.store.DeleteObjects(ctx, remove); err != nil {
oc.log.Error(err, "could not delete object")
oc.log.Error(err, "could not delete object with ID: %s", obj.ID)
}

if err := oc.idx.Remove(ctx, remove); err != nil {
oc.log.Error(err, "could not delete object from index")
oc.log.Error(err, "could not delete object with ID: %s from index", obj.ID)
}
}
}
Expand Down
76 changes: 76 additions & 0 deletions pkg/query/cleaner/cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package cleaner

import (
"context"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/go-logr/logr"
. "github.com/onsi/gomega"
"github.com/weaveworks/weave-gitops-enterprise/pkg/monitoring/metrics"
cleanermetrics "github.com/weaveworks/weave-gitops-enterprise/pkg/query/cleaner/metrics"
"github.com/weaveworks/weave-gitops-enterprise/pkg/query/configuration"
"github.com/weaveworks/weave-gitops-enterprise/pkg/query/internal/models"
"github.com/weaveworks/weave-gitops-enterprise/pkg/query/store/storefakes"
Expand Down Expand Up @@ -70,5 +75,76 @@ func TestObjectCleaner(t *testing.T) {
// The `iter` mock will return only the second object, which is not old enough to be deleted.
g.Expect(oc.removeOldObjects(context.Background())).To(Succeed())
g.Expect(s.DeleteObjectsCallCount()).To(Equal(1))
}

func TestObjectCleanerMetrics(t *testing.T) {
g := NewWithT(t)
s := storefakes.FakeStore{}

cfg := configuration.BucketObjectKind
cfg.RetentionPolicy = configuration.RetentionPolicy(60 * time.Second)

cleanermetrics.CleanerLatencyHistogram.Reset()
cleanermetrics.CleanerInflightRequests.Reset()

_, h := metrics.NewDefaultPrometheusHandler()
ts := httptest.NewServer(h)
defer ts.Close()

objs := []models.Object{
{
Cluster: "cluster1",
Kind: cfg.Gvk.Kind,
Name: "name1",
APIGroup: cfg.Gvk.Group,
APIVersion: cfg.Gvk.Version,
// Deleted 1 hour ago, our retention policy is 60s, so this should be deleted.
KubernetesDeletedAt: time.Now().Add(-time.Hour),
},
{
Cluster: "cluster1",
Kind: cfg.Gvk.Kind,
Name: "name2",
APIGroup: cfg.Gvk.Group,
APIVersion: cfg.Gvk.Version,
// Deleted 10s ago, our retention policy is 60s, so this should not be deleted.
KubernetesDeletedAt: time.Now().Add(10 * -time.Second),
},
}

iter := storefakes.FakeIterator{}
iter.AllReturnsOnCall(0, objs, nil)
// Pretend the first object is deleted on the second call
iter.AllReturnsOnCall(1, objs[1:2], nil)
s.GetAllObjectsReturns(&iter, nil)

index := storefakes.FakeIndexWriter{}

oc := objectCleaner{
log: logr.Discard(),
store: &s,
idx: &index,
config: []configuration.ObjectKind{cfg},
}

// Skipping starting the cleaner here to avoid dealing with async and time stuff.
g.Expect(oc.removeOldObjects(context.Background())).To(Succeed())

wantMetrics := []string{
`objects_cleaner_inflight_requests{action="RemoveObjects"} 0`,
`objects_cleaner_latency_seconds_count{action="RemoveObjects",status="success"} 1`,
}
assertMetrics(g, ts, wantMetrics)
}

func assertMetrics(g *WithT, ts *httptest.Server, expMetrics []string) {
resp, err := http.Get(ts.URL)
g.Expect(err).NotTo(HaveOccurred())
b, err := io.ReadAll(resp.Body)
g.Expect(err).NotTo(HaveOccurred())
metrics := string(b)

for _, expMetric := range expMetrics {
g.Expect(metrics).To(ContainSubstring(expMetric))
}
}
60 changes: 60 additions & 0 deletions pkg/query/cleaner/metrics/recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package metrics

import (
"time"

"github.com/prometheus/client_golang/prometheus"
)

const (
cleanerSubsystem = "objects_cleaner"

// cleaner actions
removeObjectsAction = "RemoveObjects"

FailedLabel = "error"
SuccessLabel = "success"
)

var cleanerWatcher = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: cleanerSubsystem,
enekofb marked this conversation as resolved.
Show resolved Hide resolved
Name: "status",
Help: "cleaner status",
}, []string{"status"})

var CleanerLatencyHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
enekofb marked this conversation as resolved.
Show resolved Hide resolved
Subsystem: cleanerSubsystem,
Name: "latency_seconds",
Help: "cleaner latency",
Buckets: prometheus.LinearBuckets(0.01, 0.01, 10),
}, []string{"action", "status"})

var CleanerInflightRequests = prometheus.NewGaugeVec(prometheus.GaugeOpts{
enekofb marked this conversation as resolved.
Show resolved Hide resolved
Subsystem: cleanerSubsystem,
Name: "inflight_requests",
Help: "number of cleaner in-flight requests.",
}, []string{"action"})

func init() {
prometheus.MustRegister(cleanerWatcher)
opudrovs marked this conversation as resolved.
Show resolved Hide resolved
prometheus.MustRegister(CleanerLatencyHistogram)
prometheus.MustRegister(CleanerInflightRequests)
}

// CleanerWatcherDecrease decreases cleaner_watcher metric for status
func CleanerWatcherDecrease(status string) {
cleanerWatcher.WithLabelValues(status).Dec()
}

// CleanerWatcherIncrease increases cleaner_watcher metric for status
func CleanerWatcherIncrease(status string) {
cleanerWatcher.WithLabelValues(status).Inc()
}

func CleanerSetLatency(status string, duration time.Duration) {
CleanerLatencyHistogram.WithLabelValues(removeObjectsAction, status).Observe(duration.Seconds())
}

func CleanerAddInflightRequests(number float64) {
CleanerInflightRequests.WithLabelValues(removeObjectsAction).Add(number)
}
5 changes: 2 additions & 3 deletions pkg/query/collector/watching.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ type child struct {
}

// setStatus sets watcher status and records it as a metric.
// It does not record metrics for stopped state non-active states = notStarted and Stopped
func (c *child) setStatus(s string) {
if c.status != "" {
metrics.ClusterWatcherDecrease(c.collector, c.status)
Expand Down Expand Up @@ -145,7 +144,7 @@ func newWatchingCollector(opts CollectorOpts) (*watchingCollector, error) {
}, nil
}

func (w *watchingCollector) watch(cluster cluster.Cluster) (reterr error) {
func (w *watchingCollector) watch(cluster cluster.Cluster) (retErr error) {
clusterName := cluster.GetName()
if clusterName == "" {
return fmt.Errorf("cluster name is empty")
Expand All @@ -159,7 +158,7 @@ func (w *watchingCollector) watch(cluster cluster.Cluster) (reterr error) {
childctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
defer func() {
if reterr != nil {
if retErr != nil {
w.clusterWatchersMu.Lock()
c.setStatus(ClusterWatchingFailed)
cancel := c.cancel
Expand Down
4 changes: 2 additions & 2 deletions pkg/query/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (so *ServerOpts) Validate() error {
return nil
}

func NewServer(opts ServerOpts) (_ pb.QueryServer, _ func() error, reterr error) {
func NewServer(opts ServerOpts) (_ pb.QueryServer, _ func() error, retErr error) {
if err := opts.Validate(); err != nil {
return nil, nil, fmt.Errorf("invalid query server options: %w", err)
}
Expand Down Expand Up @@ -256,7 +256,7 @@ func NewServer(opts ServerOpts) (_ pb.QueryServer, _ func() error, reterr error)

ctx, cancel := context.WithCancel(context.Background())
defer func() {
if reterr != nil {
if retErr != nil {
cancel()
}
}()
Expand Down