Skip to content

Commit

Permalink
fix: avoid compaction queue stats flutter. (#22195)
Browse files Browse the repository at this point in the history
When the compaction planner runs, if it cannot acquire
a lock on the files it plans to compact, it returns a
nil list of compaction groups. This, in turn, sets the
engine statistics for compactions queues to zero,
which is incorrect. Instead, use the length of pending
files which would have been returned.

closes #22138

(cherry picked from commit 7d3efe1)

closes #22141
  • Loading branch information
davidby-influx committed Aug 17, 2021
1 parent 2237d02 commit 1b5de55
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 72 deletions.
44 changes: 22 additions & 22 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ type CompactionGroup []string
// CompactionPlanner determines what TSM files and WAL segments to include in a
// given compaction run.
type CompactionPlanner interface {
Plan(lastWrite time.Time) []CompactionGroup
PlanLevel(level int) []CompactionGroup
PlanOptimize() []CompactionGroup
Plan(lastWrite time.Time) ([]CompactionGroup, int64)
PlanLevel(level int) ([]CompactionGroup, int64)
PlanOptimize() ([]CompactionGroup, int64)
Release(group []CompactionGroup)
FullyCompacted() (bool, string)

Expand Down Expand Up @@ -234,13 +234,13 @@ func (c *DefaultPlanner) ForceFull() {
}

// PlanLevel returns a set of TSM files to rewrite for a specific level.
func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
func (c *DefaultPlanner) PlanLevel(level int) ([]CompactionGroup, int64) {
// If a full plan has been requested, don't plan any levels which will prevent
// the full plan from acquiring them.
c.mu.RLock()
if c.forceFull {
c.mu.RUnlock()
return nil
return nil, 0
}
c.mu.RUnlock()

Expand All @@ -252,7 +252,7 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
// If there is only one generation and no tombstones, then there's nothing to
// do.
if len(generations) <= 1 && !generations.hasTombstones() {
return nil
return nil, 0
}

// Group each generation by level such that two adjacent generations in the same
Expand Down Expand Up @@ -321,22 +321,22 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
}

if !c.acquire(cGroups) {
return nil
return nil, int64(len(cGroups))
}

return cGroups
return cGroups, int64(len(cGroups))
}

// PlanOptimize returns all TSM files if they are in different generations in order
// to optimize the index across TSM files. Each returned compaction group can be
// compacted concurrently.
func (c *DefaultPlanner) PlanOptimize() []CompactionGroup {
func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) {
// If a full plan has been requested, don't plan any levels which will prevent
// the full plan from acquiring them.
c.mu.RLock()
if c.forceFull {
c.mu.RUnlock()
return nil
return nil, 0
}
c.mu.RUnlock()

Expand All @@ -348,7 +348,7 @@ func (c *DefaultPlanner) PlanOptimize() []CompactionGroup {
// If there is only one generation and no tombstones, then there's nothing to
// do.
if len(generations) <= 1 && !generations.hasTombstones() {
return nil
return nil, 0
}

// Group each generation by level such that two adjacent generations in the same
Expand Down Expand Up @@ -413,15 +413,15 @@ func (c *DefaultPlanner) PlanOptimize() []CompactionGroup {
}

if !c.acquire(cGroups) {
return nil
return nil, int64(len(cGroups))
}

return cGroups
return cGroups, int64(len(cGroups))
}

// Plan returns a set of TSM files to rewrite for level 4 or higher. The planning returns
// multiple groups if possible to allow compactions to run concurrently.
func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
func (c *DefaultPlanner) Plan(lastWrite time.Time) ([]CompactionGroup, int64) {
generations := c.findGenerations(true)

c.mu.RLock()
Expand Down Expand Up @@ -471,27 +471,27 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {

// Make sure we have more than 1 file and more than 1 generation
if len(tsmFiles) <= 1 || genCount <= 1 {
return nil
return nil, 0
}

group := []CompactionGroup{tsmFiles}
if !c.acquire(group) {
return nil
return nil, int64(len(group))
}
return group
return group, int64(len(group))
}

// don't plan if nothing has changed in the filestore
if c.lastPlanCheck.After(c.FileStore.LastModified()) && !generations.hasTombstones() {
return nil
return nil, 0
}

c.lastPlanCheck = time.Now()

// If there is only one generation, return early to avoid re-compacting the same file
// over and over again.
if len(generations) <= 1 && !generations.hasTombstones() {
return nil
return nil, 0
}

// Need to find the ending point for level 4 files. They will be the oldest files. We scan
Expand Down Expand Up @@ -584,7 +584,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
}

if len(groups) == 0 {
return nil
return nil, 0
}

// With the groups, we need to evaluate whether the group as a whole can be compacted
Expand Down Expand Up @@ -612,9 +612,9 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
}

if !c.acquire(tsmFiles) {
return nil
return nil, int64(len(tsmFiles))
}
return tsmFiles
return tsmFiles, int64(len(tsmFiles))
}

// findGenerations groups all the TSM files by generation based
Expand Down
Loading

0 comments on commit 1b5de55

Please sign in to comment.