Skip to content

Commit

Permalink
Add compactor start duration in seconds metric (#5683)
Browse files Browse the repository at this point in the history
* Add additional metrics to record compactor start and stop duration in seconds

Signed-off-by: Alex Le <leqiyue@amazon.com>

* update CHANGELOG

Signed-off-by: Alex Le <leqiyue@amazon.com>

* removed stop duration metric replaced it by log

Signed-off-by: Alex Le <leqiyue@amazon.com>

* fix test

Signed-off-by: Alex Le <leqiyue@amazon.com>

---------

Signed-off-by: Alex Le <leqiyue@amazon.com>
  • Loading branch information
alexqyle authored Nov 30, 2023
1 parent 9013059 commit ba730c9
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* [CHANGE] Index Cache: Multi level cache backfilling operation becomes async. Added `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` and `-blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size` configs and metric `cortex_store_multilevel_index_cache_backfill_dropped_items_total` for number of dropped items. #5661
* [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477
* [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638
* [ENHANCEMENT] Compactor: Add new compactor metric `cortex_compactor_start_duration_seconds`. #5683
* [ENHANCEMENT] Upgraded Docker base images to `alpine:3.18`. #5684

## 1.16.0 2023-11-20
Expand Down
16 changes: 16 additions & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ type Compactor struct {
ringSubservicesWatcher *services.FailureWatcher

// Metrics.
CompactorStartDurationSeconds prometheus.Gauge
compactionRunsStarted prometheus.Counter
compactionRunsInterrupted prometheus.Counter
compactionRunsCompleted prometheus.Counter
Expand Down Expand Up @@ -403,6 +404,10 @@ func newCompactor(
blocksCompactorFactory: blocksCompactorFactory,
allowedTenants: util.NewAllowedTenants(compactorCfg.EnabledTenants, compactorCfg.DisabledTenants),

CompactorStartDurationSeconds: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
Name: "cortex_compactor_start_duration_seconds",
Help: "Time in seconds spent by compactor running start function",
}),
compactionRunsStarted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_runs_started_total",
Help: "Total number of compaction runs started.",
Expand Down Expand Up @@ -485,6 +490,12 @@ func newCompactor(

// Start the compactor.
func (c *Compactor) starting(ctx context.Context) error {
begin := time.Now()
defer func() {
c.CompactorStartDurationSeconds.Set(time.Since(begin).Seconds())
level.Info(c.logger).Log("msg", "compactor started", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
}()

var err error

// Create bucket client.
Expand Down Expand Up @@ -581,6 +592,11 @@ func (c *Compactor) starting(ctx context.Context) error {
}

func (c *Compactor) stopping(_ error) error {
begin := time.Now()
defer func() {
level.Info(c.logger).Log("msg", "compactor stopped", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
}()

ctx := context.Background()

services.StopAndAwaitTerminated(ctx, c.blocksCleaner) //nolint:errcheck
Expand Down
19 changes: 17 additions & 2 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,11 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) {
assert.Equal(t, []string{
`level=info component=cleaner msg="started blocks cleanup and maintenance"`,
`level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`,
`level=info component=compactor msg="compactor started"`,
`level=info component=compactor msg="discovering users from bucket"`,
`level=info component=compactor msg="discovered users from bucket" users=0`,
}, strings.Split(strings.TrimSpace(logs.String()), "\n"))
`level=info component=compactor msg="compactor stopped"`,
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))

assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(`
# TYPE cortex_compactor_runs_started_total counter
Expand Down Expand Up @@ -365,9 +367,11 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket
assert.Equal(t, []string{
`level=info component=cleaner msg="started blocks cleanup and maintenance"`,
`level=error component=cleaner msg="failed to run blocks cleanup and maintenance" err="failed to discover users from bucket: failed to iterate the bucket"`,
`level=info component=compactor msg="compactor started"`,
`level=info component=compactor msg="discovering users from bucket"`,
`level=error component=compactor msg="failed to discover users from bucket" err="failed to iterate the bucket"`,
}, strings.Split(strings.TrimSpace(logs.String()), "\n"))
`level=info component=compactor msg="compactor stopped"`,
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))

assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(`
# TYPE cortex_compactor_runs_started_total counter
Expand Down Expand Up @@ -661,6 +665,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
`level=info component=cleaner org_id=user-2 msg="started blocks cleanup and maintenance"`,
`level=info component=cleaner org_id=user-2 msg="completed blocks cleanup and maintenance"`,
`level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`,
`level=info component=compactor msg="compactor started"`,
`level=info component=compactor msg="discovering users from bucket"`,
`level=info component=compactor msg="discovered users from bucket" users=2`,
`level=info component=compactor msg="starting compaction of user blocks" user=user-1`,
Expand All @@ -675,6 +680,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
`level=info component=compactor org_id=user-2 msg="start of compactions"`,
`level=info component=compactor org_id=user-2 msg="compaction iterations done"`,
`level=info component=compactor msg="successfully compacted user blocks" user=user-2`,
`level=info component=compactor msg="compactor stopped"`,
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))

// Instead of testing for shipper metrics, we only check our metrics here.
Expand Down Expand Up @@ -794,6 +800,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
`level=info component=cleaner org_id=user-1 msg="deleted block marked for deletion" block=01DTW0ZCPDDNV4BV83Q2SV4QAZ`,
`level=info component=cleaner org_id=user-1 msg="completed blocks cleanup and maintenance"`,
`level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`,
`level=info component=compactor msg="compactor started"`,
`level=info component=compactor msg="discovering users from bucket"`,
`level=info component=compactor msg="discovered users from bucket" users=1`,
`level=info component=compactor msg="starting compaction of user blocks" user=user-1`,
Expand All @@ -802,6 +809,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
`level=info component=compactor org_id=user-1 msg="start of compactions"`,
`level=info component=compactor org_id=user-1 msg="compaction iterations done"`,
`level=info component=compactor msg="successfully compacted user blocks" user=user-1`,
`level=info component=compactor msg="compactor stopped"`,
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))

// Instead of testing for shipper metrics, we only check our metrics here.
Expand Down Expand Up @@ -986,9 +994,11 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T)
`level=info component=cleaner org_id=user-1 msg="deleted blocks for tenant marked for deletion" deletedBlocks=1`,
`level=info component=cleaner org_id=user-1 msg="updating finished time in tenant deletion mark"`,
`level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`,
`level=info component=compactor msg="compactor started"`,
`level=info component=compactor msg="discovering users from bucket"`,
`level=info component=compactor msg="discovered users from bucket" users=1`,
`level=debug component=compactor msg="skipping user because it is marked for deletion" user=user-1`,
`level=info component=compactor msg="compactor stopped"`,
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))

// Instead of testing for shipper metrics, we only check our metrics here.
Expand Down Expand Up @@ -1178,6 +1188,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
`level=info component=cleaner org_id=user-2 msg="started blocks cleanup and maintenance"`,
`level=info component=cleaner org_id=user-2 msg="completed blocks cleanup and maintenance"`,
`level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`,
`level=info component=compactor msg="compactor started"`,
`level=info component=compactor msg="discovering users from bucket"`,
`level=info component=compactor msg="discovered users from bucket" users=2`,
`level=info component=compactor msg="starting compaction of user blocks" user=user-1`,
Expand All @@ -1192,6 +1203,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
`level=info component=compactor org_id=user-2 msg="start of compactions"`,
`level=info component=compactor org_id=user-2 msg="compaction iterations done"`,
`level=info component=compactor msg="successfully compacted user blocks" user=user-2`,
`level=info component=compactor msg="compactor stopped"`,
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))
}

Expand Down Expand Up @@ -1599,6 +1611,7 @@ func removeIgnoredLogs(input []string) []string {

out := make([]string, 0, len(input))
durationRe := regexp.MustCompile(`\s?duration=\S+`)
durationMsRe := regexp.MustCompile(`\s?duration_ms=\S+`)

for i := 0; i < len(input); i++ {
log := input[i]
Expand All @@ -1612,6 +1625,7 @@ func removeIgnoredLogs(input []string) []string {

// Remove any duration from logs.
log = durationRe.ReplaceAllString(log, "")
log = durationMsRe.ReplaceAllString(log, "")

out = append(out, log)
}
Expand Down Expand Up @@ -1941,6 +1955,7 @@ func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) {
assert.Equal(t, context.DeadlineExceeded, err)

assert.ElementsMatch(t, []string{
`level=info component=compactor msg="compactor started"`,
`level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`,
`level=error component=compactor msg="compactor failed to become ACTIVE in the ring" err="context deadline exceeded"`,
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))
Expand Down

0 comments on commit ba730c9

Please sign in to comment.