From 1b5de55908a2da8f940388fad356423f03fb84ec Mon Sep 17 00:00:00 2001 From: davidby-influx <72418212+davidby-influx@users.noreply.github.com> Date: Mon, 16 Aug 2021 09:21:07 -0700 Subject: [PATCH] fix: avoid compaction queue stats flutter. (#22195) When the compaction planner runs, if it cannot acquire a lock on the files it plans to compact, it returns a nil list of compaction groups. This, in turn, sets the engine statistics for compactions queues to zero, which is incorrect. Instead, use the length of pending files which would have been returned. closes https://github.com/influxdata/influxdb/issues/22138 (cherry picked from commit 7d3efe1e9e9f6efa92a41da1fc225886362d41a2) closes https://github.com/influxdata/influxdb/issues/22141 --- tsdb/engine/tsm1/compact.go | 44 +++++----- tsdb/engine/tsm1/compact_test.go | 143 +++++++++++++++++++++++-------- tsdb/engine/tsm1/engine.go | 24 +++--- tsdb/engine/tsm1/engine_test.go | 8 +- 4 files changed, 147 insertions(+), 72 deletions(-) diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 38caecdb366..a6c872ac54e 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -89,9 +89,9 @@ type CompactionGroup []string // CompactionPlanner determines what TSM files and WAL segments to include in a // given compaction run. type CompactionPlanner interface { - Plan(lastWrite time.Time) []CompactionGroup - PlanLevel(level int) []CompactionGroup - PlanOptimize() []CompactionGroup + Plan(lastWrite time.Time) ([]CompactionGroup, int64) + PlanLevel(level int) ([]CompactionGroup, int64) + PlanOptimize() ([]CompactionGroup, int64) Release(group []CompactionGroup) FullyCompacted() (bool, string) @@ -234,13 +234,13 @@ func (c *DefaultPlanner) ForceFull() { } // PlanLevel returns a set of TSM files to rewrite for a specific level. -func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup { +func (c *DefaultPlanner) PlanLevel(level int) ([]CompactionGroup, int64) { // If a full plan has been requested, don't plan any levels which will prevent // the full plan from acquiring them. c.mu.RLock() if c.forceFull { c.mu.RUnlock() - return nil + return nil, 0 } c.mu.RUnlock() @@ -252,7 +252,7 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup { // If there is only one generation and no tombstones, then there's nothing to // do. if len(generations) <= 1 && !generations.hasTombstones() { - return nil + return nil, 0 } // Group each generation by level such that two adjacent generations in the same @@ -321,22 +321,22 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup { } if !c.acquire(cGroups) { - return nil + return nil, int64(len(cGroups)) } - return cGroups + return cGroups, int64(len(cGroups)) } // PlanOptimize returns all TSM files if they are in different generations in order // to optimize the index across TSM files. Each returned compaction group can be // compacted concurrently. -func (c *DefaultPlanner) PlanOptimize() []CompactionGroup { +func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) { // If a full plan has been requested, don't plan any levels which will prevent // the full plan from acquiring them. c.mu.RLock() if c.forceFull { c.mu.RUnlock() - return nil + return nil, 0 } c.mu.RUnlock() @@ -348,7 +348,7 @@ func (c *DefaultPlanner) PlanOptimize() []CompactionGroup { // If there is only one generation and no tombstones, then there's nothing to // do. if len(generations) <= 1 && !generations.hasTombstones() { - return nil + return nil, 0 } // Group each generation by level such that two adjacent generations in the same @@ -413,15 +413,15 @@ func (c *DefaultPlanner) PlanOptimize() []CompactionGroup { } if !c.acquire(cGroups) { - return nil + return nil, int64(len(cGroups)) } - return cGroups + return cGroups, int64(len(cGroups)) } // Plan returns a set of TSM files to rewrite for level 4 or higher. The planning returns // multiple groups if possible to allow compactions to run concurrently. -func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup { +func (c *DefaultPlanner) Plan(lastWrite time.Time) ([]CompactionGroup, int64) { generations := c.findGenerations(true) c.mu.RLock() @@ -471,19 +471,19 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup { // Make sure we have more than 1 file and more than 1 generation if len(tsmFiles) <= 1 || genCount <= 1 { - return nil + return nil, 0 } group := []CompactionGroup{tsmFiles} if !c.acquire(group) { - return nil + return nil, int64(len(group)) } - return group + return group, int64(len(group)) } // don't plan if nothing has changed in the filestore if c.lastPlanCheck.After(c.FileStore.LastModified()) && !generations.hasTombstones() { - return nil + return nil, 0 } c.lastPlanCheck = time.Now() @@ -491,7 +491,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup { // If there is only one generation, return early to avoid re-compacting the same file // over and over again. if len(generations) <= 1 && !generations.hasTombstones() { - return nil + return nil, 0 } // Need to find the ending point for level 4 files. They will be the oldest files. We scan @@ -584,7 +584,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup { } if len(groups) == 0 { - return nil + return nil, 0 } // With the groups, we need to evaluate whether the group as a whole can be compacted @@ -612,9 +612,9 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup { } if !c.acquire(tsmFiles) { - return nil + return nil, int64(len(tsmFiles)) } - return tsmFiles + return tsmFiles, int64(len(tsmFiles)) } // findGenerations groups all the TSM files by generation based diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index bd02ea86840..feb2402d9b8 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -1535,9 +1535,11 @@ func TestDefaultPlanner_Plan_Min(t *testing.T) { }, tsdb.DefaultCompactFullWriteColdDuration, ) - tsm := cp.Plan(time.Now()) + tsm, pLen := cp.Plan(time.Now()) if exp, got := 0, len(tsm); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } } @@ -1584,9 +1586,11 @@ func TestDefaultPlanner_Plan_CombineSequence(t *testing.T) { ) expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3]} - tsm := cp.Plan(time.Now()) + tsm, pLen := cp.Plan(time.Now()) if exp, got := len(expFiles), len(tsm[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } for i, p := range expFiles { @@ -1645,10 +1649,12 @@ func TestDefaultPlanner_Plan_MultipleGroups(t *testing.T) { expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]} - tsm := cp.Plan(time.Now()) + tsm, pLen := cp.Plan(time.Now()) if got, exp := len(tsm), 2; got != exp { t.Fatalf("compaction group length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } if exp, got := len(expFiles[:4]), len(tsm[0]); got != exp { @@ -1670,7 +1676,6 @@ func TestDefaultPlanner_Plan_MultipleGroups(t *testing.T) { t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp) } } - } // Ensure that the planner grabs the smallest compaction step @@ -1735,9 +1740,11 @@ func TestDefaultPlanner_PlanLevel_SmallestCompactionStep(t *testing.T) { ) expFiles := []tsm1.FileStat{data[4], data[5], data[6], data[7], data[8], data[9], data[10], data[11]} - tsm := cp.PlanLevel(1) + tsm, pLen := cp.PlanLevel(1) if exp, got := len(expFiles), len(tsm[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } for i, p := range expFiles { @@ -1788,9 +1795,11 @@ func TestDefaultPlanner_PlanLevel_SplitFile(t *testing.T) { ) expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4]} - tsm := cp.PlanLevel(3) + tsm, pLen := cp.PlanLevel(3) if exp, got := len(expFiles), len(tsm[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } for i, p := range expFiles { @@ -1841,9 +1850,11 @@ func TestDefaultPlanner_PlanLevel_IsolatedHighLevel(t *testing.T) { ) expFiles := []tsm1.FileStat{} - tsm := cp.PlanLevel(3) + tsm, pLen := cp.PlanLevel(3) if exp, got := len(expFiles), len(tsm); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } } @@ -1884,9 +1895,11 @@ func TestDefaultPlanner_PlanLevel3_MinFiles(t *testing.T) { ) expFiles := []tsm1.FileStat{} - tsm := cp.PlanLevel(3) + tsm, pLen := cp.PlanLevel(3) if exp, got := len(expFiles), len(tsm); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } } @@ -1916,9 +1929,11 @@ func TestDefaultPlanner_PlanLevel2_MinFiles(t *testing.T) { ) expFiles := []tsm1.FileStat{} - tsm := cp.PlanLevel(2) + tsm, pLen := cp.PlanLevel(2) if exp, got := len(expFiles), len(tsm); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } } @@ -1960,9 +1975,11 @@ func TestDefaultPlanner_PlanLevel_Tombstone(t *testing.T) { ) expFiles := []tsm1.FileStat{data[0], data[1]} - tsm := cp.PlanLevel(3) + tsm, pLen := cp.PlanLevel(3) if exp, got := len(expFiles), len(tsm[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } for i, p := range expFiles { @@ -2018,9 +2035,11 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) { expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]} - tsm := cp.PlanLevel(1) + tsm, pLen := cp.PlanLevel(1) if exp, got := len(expFiles1), len(tsm[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } for i, p := range expFiles1 { @@ -2109,9 +2128,11 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) { expFiles1 := data[0:8] expFiles2 := data[8:16] - tsm := cp.PlanLevel(1) + tsm, pLen := cp.PlanLevel(1) if exp, got := len(expFiles1), len(tsm[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } for i, p := range expFiles1 { @@ -2132,9 +2153,11 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) { cp.Release(tsm[1:]) - tsm = cp.PlanLevel(1) + tsm, pLen = cp.PlanLevel(1) if exp, got := len(expFiles2), len(tsm[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } for i, p := range expFiles2 { @@ -2169,9 +2192,11 @@ func TestDefaultPlanner_PlanOptimize_NoLevel4(t *testing.T) { ) expFiles := []tsm1.FileStat{} - tsm := cp.PlanOptimize() + tsm, pLen := cp.PlanOptimize() if exp, got := len(expFiles), len(tsm); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } } @@ -2216,9 +2241,11 @@ func TestDefaultPlanner_PlanOptimize_Level4(t *testing.T) { ) expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5]} - tsm := cp.PlanOptimize() + tsm, pLen := cp.PlanOptimize() if exp, got := 1, len(tsm); exp != got { t.Fatalf("group length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } if exp, got := len(expFiles1), len(tsm[0]); got != exp { @@ -2287,9 +2314,11 @@ func TestDefaultPlanner_PlanOptimize_Multiple(t *testing.T) { expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} expFiles2 := []tsm1.FileStat{data[6], data[7], data[8], data[9]} - tsm := cp.PlanOptimize() + tsm, pLen := cp.PlanOptimize() if exp, got := 2, len(tsm); exp != got { t.Fatalf("group length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } if exp, got := len(expFiles1), len(tsm[0]); got != exp { @@ -2338,9 +2367,11 @@ func TestDefaultPlanner_PlanOptimize_Optimized(t *testing.T) { ) expFiles := []tsm1.FileStat{} - tsm := cp.PlanOptimize() + tsm, pLen := cp.PlanOptimize() if exp, got := len(expFiles), len(tsm); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } } @@ -2370,9 +2401,11 @@ func TestDefaultPlanner_PlanOptimize_Tombstones(t *testing.T) { ) expFiles := []tsm1.FileStat{data[0], data[1], data[2]} - tsm := cp.PlanOptimize() + tsm, pLen := cp.PlanOptimize() if exp, got := len(expFiles), len(tsm[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } for i, p := range expFiles { @@ -2422,9 +2455,11 @@ func TestDefaultPlanner_Plan_FullOnCold(t *testing.T) { time.Nanosecond, ) - tsm := cp.Plan(time.Now().Add(-time.Second)) + tsm, pLen := cp.Plan(time.Now().Add(-time.Second)) if exp, got := len(data), len(tsm[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } for i, p := range data { @@ -2456,9 +2491,11 @@ func TestDefaultPlanner_Plan_SkipMaxSizeFiles(t *testing.T) { }, tsdb.DefaultCompactFullWriteColdDuration, ) - tsm := cp.Plan(time.Now()) + tsm, pLen := cp.Plan(time.Now()) if exp, got := 0, len(tsm); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } } @@ -2492,10 +2529,12 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) { } cp := tsm1.NewDefaultPlanner(fs, time.Nanosecond) - plan := cp.Plan(time.Now().Add(-time.Second)) + plan, pLen := cp.Plan(time.Now().Add(-time.Second)) // first verify that our test set would return files if exp, got := 4, len(plan[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(plan)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } cp.Release(plan) @@ -2531,16 +2570,20 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) { } cp.FileStore = overFs - plan = cp.Plan(time.Now().Add(-time.Second)) + plan, pLen = cp.Plan(time.Now().Add(-time.Second)) if exp, got := 0, len(plan); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(plan)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } cp.Release(plan) - plan = cp.PlanOptimize() + plan, pLen = cp.PlanOptimize() // ensure the optimize planner would pick this up if exp, got := 1, len(plan); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(plan)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } cp.Release(plan) @@ -2548,8 +2591,11 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) { // ensure that it will plan if last modified has changed fs.lastModified = time.Now() - if exp, got := 4, len(cp.Plan(time.Now())[0]); got != exp { + cGroups, pLen := cp.Plan(time.Now()) + if exp, got := 4, len(cGroups[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(cGroups)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } } @@ -2609,9 +2655,11 @@ func TestDefaultPlanner_Plan_TwoGenLevel3(t *testing.T) { }, time.Hour) - tsm := cp.Plan(time.Now().Add(-24 * time.Hour)) + tsm, pLen := cp.Plan(time.Now().Add(-24 * time.Hour)) if exp, got := 1, len(tsm); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } } @@ -2649,10 +2697,12 @@ func TestDefaultPlanner_Plan_NotFullOverMaxsize(t *testing.T) { time.Nanosecond, ) - plan := cp.Plan(time.Now().Add(-time.Second)) + plan, pLen := cp.Plan(time.Now().Add(-time.Second)) // first verify that our test set would return files if exp, got := 4, len(plan[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(plan)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } cp.Release(plan) @@ -2676,8 +2726,11 @@ func TestDefaultPlanner_Plan_NotFullOverMaxsize(t *testing.T) { } cp.FileStore = overFs - if exp, got := 1, len(cp.Plan(time.Now().Add(-time.Second))); got != exp { + cGroups, pLen := cp.Plan(time.Now().Add(-time.Second)) + if exp, got := 1, len(cGroups); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(cGroups)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } } @@ -2716,9 +2769,11 @@ func TestDefaultPlanner_Plan_CompactsMiddleSteps(t *testing.T) { ) expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3]} - tsm := cp.Plan(time.Now()) + tsm, pLen := cp.Plan(time.Now()) if exp, got := len(expFiles), len(tsm[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } for i, p := range expFiles { @@ -2758,9 +2813,11 @@ func TestDefaultPlanner_Plan_LargeGeneration(t *testing.T) { }, tsdb.DefaultCompactFullWriteColdDuration, ) - tsm := cp.Plan(time.Now()) + tsm, pLen := cp.Plan(time.Now()) if exp, got := 0, len(tsm); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } } @@ -2826,36 +2883,46 @@ func TestDefaultPlanner_Plan_ForceFull(t *testing.T) { }, tsdb.DefaultCompactFullWriteColdDuration, ) - tsm := cp.PlanLevel(1) + tsm, pLen := cp.PlanLevel(1) if exp, got := 1, len(tsm); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } cp.Release(tsm) - tsm = cp.PlanLevel(2) + tsm, pLen = cp.PlanLevel(2) if exp, got := 1, len(tsm); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } cp.Release(tsm) cp.ForceFull() // Level plans should not return any plans - tsm = cp.PlanLevel(1) + tsm, pLen = cp.PlanLevel(1) if exp, got := 0, len(tsm); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } cp.Release(tsm) - tsm = cp.PlanLevel(2) + tsm, pLen = cp.PlanLevel(2) if exp, got := 0, len(tsm); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } cp.Release(tsm) - tsm = cp.Plan(time.Now()) + tsm, pLen = cp.Plan(time.Now()) if exp, got := 1, len(tsm); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } if got, exp := len(tsm[0]), 13; got != exp { @@ -2864,15 +2931,19 @@ func TestDefaultPlanner_Plan_ForceFull(t *testing.T) { cp.Release(tsm) // Level plans should return plans now that Plan has been called - tsm = cp.PlanLevel(1) + tsm, pLen = cp.PlanLevel(1) if exp, got := 1, len(tsm); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } cp.Release(tsm) - tsm = cp.PlanLevel(2) + tsm, pLen = cp.PlanLevel(2) if exp, got := 1, len(tsm); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) } cp.Release(tsm) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index d851e1f2283..7934110da1a 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -2013,24 +2013,28 @@ func (e *Engine) compact(wg *sync.WaitGroup) { case <-t.C: // Find our compaction plans - level1Groups := e.CompactionPlan.PlanLevel(1) - level2Groups := e.CompactionPlan.PlanLevel(2) - level3Groups := e.CompactionPlan.PlanLevel(3) - level4Groups := e.CompactionPlan.Plan(e.LastModified()) - atomic.StoreInt64(&e.stats.TSMFullCompactionsQueue, int64(len(level4Groups))) + level1Groups, len1 := e.CompactionPlan.PlanLevel(1) + level2Groups, len2 := e.CompactionPlan.PlanLevel(2) + level3Groups, len3 := e.CompactionPlan.PlanLevel(3) + level4Groups, len4 := e.CompactionPlan.Plan(e.LastModified()) + atomic.StoreInt64(&e.stats.TSMFullCompactionsQueue, len4) // If no full compactions are need, see if an optimize is needed if len(level4Groups) == 0 { - level4Groups = e.CompactionPlan.PlanOptimize() - atomic.StoreInt64(&e.stats.TSMOptimizeCompactionsQueue, int64(len(level4Groups))) + level4Groups, len4 = e.CompactionPlan.PlanOptimize() + atomic.StoreInt64(&e.stats.TSMOptimizeCompactionsQueue, len4) } // Update the level plan queue stats - atomic.StoreInt64(&e.stats.TSMCompactionsQueue[0], int64(len(level1Groups))) - atomic.StoreInt64(&e.stats.TSMCompactionsQueue[1], int64(len(level2Groups))) - atomic.StoreInt64(&e.stats.TSMCompactionsQueue[2], int64(len(level3Groups))) + // For stats, use the length needed, even if the lock was + // not acquired + atomic.StoreInt64(&e.stats.TSMCompactionsQueue[0], len1) + atomic.StoreInt64(&e.stats.TSMCompactionsQueue[1], len2) + atomic.StoreInt64(&e.stats.TSMCompactionsQueue[2], len3) // Set the queue depths on the scheduler + // Use the real queue depth, dependent on acquiring + // the file locks. e.scheduler.setDepth(1, len(level1Groups)) e.scheduler.setDepth(2, len(level2Groups)) e.scheduler.setDepth(3, len(level3Groups)) diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 58b95942518..827c19ab020 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -2731,10 +2731,10 @@ func MustParsePointString(buf string) models.Point { return MustParsePointsStrin type mockPlanner struct{} -func (m *mockPlanner) Plan(lastWrite time.Time) []tsm1.CompactionGroup { return nil } -func (m *mockPlanner) PlanLevel(level int) []tsm1.CompactionGroup { return nil } -func (m *mockPlanner) PlanOptimize() []tsm1.CompactionGroup { return nil } -func (m *mockPlanner) Release(groups []tsm1.CompactionGroup) {} +func (m *mockPlanner) Plan(lastWrite time.Time) ([]tsm1.CompactionGroup, int64) { return nil, 0 } +func (m *mockPlanner) PlanLevel(level int) ([]tsm1.CompactionGroup, int64) { return nil, 0 } +func (m *mockPlanner) PlanOptimize() ([]tsm1.CompactionGroup, int64) { return nil, 0 } +func (m *mockPlanner) Release(groups []tsm1.CompactionGroup) {} func (m *mockPlanner) FullyCompacted() (bool, string) { return false, "not compacted" } func (m *mockPlanner) ForceFull() {} func (m *mockPlanner) SetFileStore(fs *tsm1.FileStore) {}