Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(ae): add more logging #21381

Merged
merged 6 commits into from
May 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ v1.8.6 [unreleased]
-------------------

- [#21290](https://github.com/influxdata/influxdb/pull/21290): fix: Anti-Entropy loops endlessly with empty shard
- [#21381](https://github.com/influxdata/influxdb/pull/21381): chore(ae): add more logging

v1.8.5 [2021-04-19]
-------------------
Expand Down
4 changes: 4 additions & 0 deletions flux/control/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package control_test
import (
"context"
"testing"
"time"

"github.com/influxdata/flux"
"github.com/influxdata/flux/memory"
Expand All @@ -23,6 +24,9 @@ func TestController_Query(t *testing.T) {
compiler := &mock.Compiler{
Type: "mock",
CompileFn: func(ctx context.Context) (flux.Program, error) {
// On fast machines, compilation can be faster than clock granularity
// causing the compile duration test below to fail
time.Sleep(time.Second)
danxmoran marked this conversation as resolved.
Show resolved Hide resolved
return &mock.Program{
StartFn: func(ctx context.Context, alloc *memory.Allocator) (*mock.Query, error) {
ch := make(chan flux.Result)
Expand Down
2 changes: 1 addition & 1 deletion tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type Engine interface {
Statistics(tags map[string]string) []models.Statistic
LastModified() time.Time
DiskSize() int64
IsIdle() bool
IsIdle() (bool, string)
Free() error

io.WriterTo
Expand Down
38 changes: 27 additions & 11 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,17 +888,33 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error {

// IsIdle returns true if the cache is empty, there are no running compactions and the
// shard is fully compacted.
func (e *Engine) IsIdle() bool {
cacheEmpty := e.Cache.Size() == 0

runningCompactions := atomic.LoadInt64(&e.stats.CacheCompactionsActive)
runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[0])
runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[1])
runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[2])
runningCompactions += atomic.LoadInt64(&e.stats.TSMFullCompactionsActive)
runningCompactions += atomic.LoadInt64(&e.stats.TSMOptimizeCompactionsActive)

return cacheEmpty && runningCompactions == 0 && e.CompactionPlan.FullyCompacted()
func (e *Engine) IsIdle() (state bool, reason string) {
c := []struct {
ActiveCompactions *int64
LogMessage string
}{
{&e.stats.CacheCompactionsActive, "not idle because of active Cache compactions"},
{&e.stats.TSMCompactionsActive[0], "not idle because of active Level Zero compactions"},
{&e.stats.TSMCompactionsActive[1], "not idle because of active Level One compactions"},
{&e.stats.TSMCompactionsActive[2], "not idle because of active Level Two compactions"},
{&e.stats.TSMFullCompactionsActive, "not idle because of active Full compactions"},
{&e.stats.TSMOptimizeCompactionsActive, "not idle because of active TSM Optimization compactions"},
}

for _, compactionState := range c {
count := atomic.LoadInt64(compactionState.ActiveCompactions)
if count > 0 {
return false, compactionState.LogMessage
}
}

if cacheSize := e.Cache.Size(); cacheSize > 0 {
return false, "not idle because cache size is nonzero"
} else if !e.CompactionPlan.FullyCompacted() {
return false, "not idle because shard is not fully compacted"
} else {
return true, ""
}
}

// Free releases any resources held by the engine to free up memory or CPU.
Expand Down
15 changes: 8 additions & 7 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,10 +447,10 @@ func (s *Shard) SeriesFile() (*SeriesFile, error) {
}

// IsIdle return true if the shard is not receiving writes and is fully compacted.
func (s *Shard) IsIdle() bool {
func (s *Shard) IsIdle() (state bool, reason string) {
engine, err := s.Engine()
if err != nil {
return true
return true, ""
}
return engine.IsIdle()
}
Expand Down Expand Up @@ -1200,19 +1200,20 @@ func (s *Shard) TagKeyCardinality(name, key []byte) int {
}

// Digest returns a digest of the shard.
func (s *Shard) Digest() (io.ReadCloser, int64, error) {
func (s *Shard) Digest() (io.ReadCloser, int64, error, string) {
engine, err := s.Engine()
if err != nil {
return nil, 0, err
return nil, 0, err, ""
}

// Make sure the shard is idle/cold. (No use creating a digest of a
// hot shard that is rapidly changing.)
if !engine.IsIdle() {
return nil, 0, ErrShardNotIdle
if isIdle, reason := engine.IsIdle(); !isIdle {
return nil, 0, ErrShardNotIdle, reason
}

return engine.Digest()
readCloser, size, err := engine.Digest()
return readCloser, size, err, ""
}

// engine safely (under an RLock) returns a reference to the shard's Engine, or
Expand Down
9 changes: 5 additions & 4 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (s *Store) loadShards() error {
// Enable all shards
for _, sh := range s.shards {
sh.SetEnabled(true)
if sh.IsIdle() {
if isIdle, _ := sh.IsIdle(); isIdle {
if err := sh.Free(); err != nil {
return err
}
Expand Down Expand Up @@ -593,7 +593,8 @@ func (s *Store) ShardDigest(id uint64) (io.ReadCloser, int64, error) {
return nil, 0, ErrShardNotFound
}

return sh.Digest()
readCloser, size, err, _ := sh.Digest()
return readCloser, size, err
}

// CreateShard creates a shard with the given id and retention policy on a database.
Expand Down Expand Up @@ -1445,7 +1446,7 @@ func (s *Store) WriteToShardWithContext(ctx context.Context, shardID uint64, poi

// Ensure snapshot compactions are enabled since the shard might have been cold
// and disabled by the monitor.
if sh.IsIdle() {
if isIdle, _ := sh.IsIdle(); isIdle {
sh.SetCompactionsEnabled(true)
}

Expand Down Expand Up @@ -1986,7 +1987,7 @@ func (s *Store) monitorShards() {
case <-t.C:
s.mu.RLock()
for _, sh := range s.shards {
if sh.IsIdle() {
if isIdle, _ := sh.IsIdle(); isIdle {
if err := sh.Free(); err != nil {
s.Logger.Warn("Error while freeing cold shard resources",
zap.Error(err),
Expand Down