Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compactor start duration in seconds metric #5683

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* [CHANGE] Index Cache: Multi level cache backfilling operation becomes async. Added `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` and `-blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size` configs and metric `cortex_store_multilevel_index_cache_backfill_dropped_items_total` for number of dropped items. #5661
* [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477
* [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638
* [ENHANCEMENT] Compactor: Add new compactor metric `cortex_compactor_start_duration_seconds`. #5683
* [ENHANCEMENT] Upgraded Docker base images to `alpine:3.18`. #5684

## 1.16.0 2023-11-20
Expand Down
16 changes: 16 additions & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ type Compactor struct {
ringSubservicesWatcher *services.FailureWatcher

// Metrics.
CompactorStartDurationSeconds prometheus.Gauge
compactionRunsStarted prometheus.Counter
compactionRunsInterrupted prometheus.Counter
compactionRunsCompleted prometheus.Counter
Expand Down Expand Up @@ -403,6 +404,10 @@ func newCompactor(
blocksCompactorFactory: blocksCompactorFactory,
allowedTenants: util.NewAllowedTenants(compactorCfg.EnabledTenants, compactorCfg.DisabledTenants),

CompactorStartDurationSeconds: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
Name: "cortex_compactor_start_duration_seconds",
Help: "Time in seconds spent by compactor running start function",
}),
compactionRunsStarted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_runs_started_total",
Help: "Total number of compaction runs started.",
Expand Down Expand Up @@ -485,6 +490,12 @@ func newCompactor(

// Start the compactor.
func (c *Compactor) starting(ctx context.Context) error {
begin := time.Now()
defer func() {
c.CompactorStartDurationSeconds.Set(time.Since(begin).Seconds())
level.Info(c.logger).Log("msg", "compactor started", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
}()

var err error

// Create bucket client.
Expand Down Expand Up @@ -581,6 +592,11 @@ func (c *Compactor) starting(ctx context.Context) error {
}

func (c *Compactor) stopping(_ error) error {
begin := time.Now()
defer func() {
level.Info(c.logger).Log("msg", "compactor stopped", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
}()

ctx := context.Background()

services.StopAndAwaitTerminated(ctx, c.blocksCleaner) //nolint:errcheck
Expand Down
19 changes: 17 additions & 2 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,11 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) {
assert.Equal(t, []string{
`level=info component=cleaner msg="started blocks cleanup and maintenance"`,
`level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`,
`level=info component=compactor msg="compactor started"`,
`level=info component=compactor msg="discovering users from bucket"`,
`level=info component=compactor msg="discovered users from bucket" users=0`,
}, strings.Split(strings.TrimSpace(logs.String()), "\n"))
`level=info component=compactor msg="compactor stopped"`,
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))

assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(`
# TYPE cortex_compactor_runs_started_total counter
Expand Down Expand Up @@ -365,9 +367,11 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket
assert.Equal(t, []string{
`level=info component=cleaner msg="started blocks cleanup and maintenance"`,
`level=error component=cleaner msg="failed to run blocks cleanup and maintenance" err="failed to discover users from bucket: failed to iterate the bucket"`,
`level=info component=compactor msg="compactor started"`,
`level=info component=compactor msg="discovering users from bucket"`,
`level=error component=compactor msg="failed to discover users from bucket" err="failed to iterate the bucket"`,
}, strings.Split(strings.TrimSpace(logs.String()), "\n"))
`level=info component=compactor msg="compactor stopped"`,
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))

assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(`
# TYPE cortex_compactor_runs_started_total counter
Expand Down Expand Up @@ -661,6 +665,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
`level=info component=cleaner org_id=user-2 msg="started blocks cleanup and maintenance"`,
`level=info component=cleaner org_id=user-2 msg="completed blocks cleanup and maintenance"`,
`level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`,
`level=info component=compactor msg="compactor started"`,
`level=info component=compactor msg="discovering users from bucket"`,
`level=info component=compactor msg="discovered users from bucket" users=2`,
`level=info component=compactor msg="starting compaction of user blocks" user=user-1`,
Expand All @@ -675,6 +680,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
`level=info component=compactor org_id=user-2 msg="start of compactions"`,
`level=info component=compactor org_id=user-2 msg="compaction iterations done"`,
`level=info component=compactor msg="successfully compacted user blocks" user=user-2`,
`level=info component=compactor msg="compactor stopped"`,
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))

// Instead of testing for shipper metrics, we only check our metrics here.
Expand Down Expand Up @@ -794,6 +800,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
`level=info component=cleaner org_id=user-1 msg="deleted block marked for deletion" block=01DTW0ZCPDDNV4BV83Q2SV4QAZ`,
`level=info component=cleaner org_id=user-1 msg="completed blocks cleanup and maintenance"`,
`level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`,
`level=info component=compactor msg="compactor started"`,
`level=info component=compactor msg="discovering users from bucket"`,
`level=info component=compactor msg="discovered users from bucket" users=1`,
`level=info component=compactor msg="starting compaction of user blocks" user=user-1`,
Expand All @@ -802,6 +809,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
`level=info component=compactor org_id=user-1 msg="start of compactions"`,
`level=info component=compactor org_id=user-1 msg="compaction iterations done"`,
`level=info component=compactor msg="successfully compacted user blocks" user=user-1`,
`level=info component=compactor msg="compactor stopped"`,
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))

// Instead of testing for shipper metrics, we only check our metrics here.
Expand Down Expand Up @@ -986,9 +994,11 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T)
`level=info component=cleaner org_id=user-1 msg="deleted blocks for tenant marked for deletion" deletedBlocks=1`,
`level=info component=cleaner org_id=user-1 msg="updating finished time in tenant deletion mark"`,
`level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`,
`level=info component=compactor msg="compactor started"`,
`level=info component=compactor msg="discovering users from bucket"`,
`level=info component=compactor msg="discovered users from bucket" users=1`,
`level=debug component=compactor msg="skipping user because it is marked for deletion" user=user-1`,
`level=info component=compactor msg="compactor stopped"`,
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))

// Instead of testing for shipper metrics, we only check our metrics here.
Expand Down Expand Up @@ -1178,6 +1188,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
`level=info component=cleaner org_id=user-2 msg="started blocks cleanup and maintenance"`,
`level=info component=cleaner org_id=user-2 msg="completed blocks cleanup and maintenance"`,
`level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`,
`level=info component=compactor msg="compactor started"`,
`level=info component=compactor msg="discovering users from bucket"`,
`level=info component=compactor msg="discovered users from bucket" users=2`,
`level=info component=compactor msg="starting compaction of user blocks" user=user-1`,
Expand All @@ -1192,6 +1203,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
`level=info component=compactor org_id=user-2 msg="start of compactions"`,
`level=info component=compactor org_id=user-2 msg="compaction iterations done"`,
`level=info component=compactor msg="successfully compacted user blocks" user=user-2`,
`level=info component=compactor msg="compactor stopped"`,
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))
}

Expand Down Expand Up @@ -1599,6 +1611,7 @@ func removeIgnoredLogs(input []string) []string {

out := make([]string, 0, len(input))
durationRe := regexp.MustCompile(`\s?duration=\S+`)
durationMsRe := regexp.MustCompile(`\s?duration_ms=\S+`)

for i := 0; i < len(input); i++ {
log := input[i]
Expand All @@ -1612,6 +1625,7 @@ func removeIgnoredLogs(input []string) []string {

// Remove any duration from logs.
log = durationRe.ReplaceAllString(log, "")
log = durationMsRe.ReplaceAllString(log, "")

out = append(out, log)
}
Expand Down Expand Up @@ -1941,6 +1955,7 @@ func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) {
assert.Equal(t, context.DeadlineExceeded, err)

assert.ElementsMatch(t, []string{
`level=info component=compactor msg="compactor started"`,
`level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`,
`level=error component=compactor msg="compactor failed to become ACTIVE in the ring" err="context deadline exceeded"`,
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))
Expand Down
Loading