Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(ae): add more logging #21381

Merged
merged 6 commits into from
May 7, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ v1.8.6 [unreleased]
-------------------

- [#21290](https://github.com/influxdata/influxdb/pull/21290): fix: Anti-Entropy loops endlessly with empty shard
- [#21381](https://github.com/influxdata/influxdb/pull/21381): chore(ae): add more logging

v1.8.5 [2021-04-19]
-------------------
Expand Down
2 changes: 1 addition & 1 deletion tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type Engine interface {
Statistics(tags map[string]string) []models.Statistic
LastModified() time.Time
DiskSize() int64
IsIdle() bool
IsIdle(isLogged bool) bool
davidby-influx marked this conversation as resolved.
Show resolved Hide resolved
Free() error

io.WriterTo
Expand Down
54 changes: 44 additions & 10 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,19 +886,53 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error {
return nil
}

// IsIdle returns true if the cache is empty, there are no running compactions and the
// shard is fully compacted.
func (e *Engine) IsIdle() bool {
func (e *Engine) IsIdle(isLogged bool) bool {
return e.loggedIsIdle(isLogged)
}

const logMsg = "IsIdle false because nonzero"

// loggedIsIdle returns true if the cache is empty, there are no running compactions and the
// shard is fully compacted. If trace logging is enabled, it logs the reasons why an engine
// is busy
func (e *Engine) loggedIsIdle(isLogged bool) bool {
const cacheCompactions = "Cache Compactions: "
const levelZeroCompactions = "Level Zero Compactions: "
const levelOneCompactions = "Level One Compactions: "
const levelTwoCompactions = "Level Two Compactions: "
const fullCompactions = "Full Compactions: "
const optimizeCompactions = "TSM Optimization Compactions: "
davidby-influx marked this conversation as resolved.
Show resolved Hide resolved
var log *zap.Logger

if isLogged {
log = e.traceLogger.With(zap.Uint64("ShardId", e.id), zap.String("path", e.path))
}

cacheEmpty := e.Cache.Size() == 0
if !cacheEmpty && log != nil {
log.Info(logMsg, zap.Uint64("Cache size", e.Cache.Size()))
}

runningCompactions := atomic.LoadInt64(&e.stats.CacheCompactionsActive)
runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[0])
runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[1])
runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[2])
runningCompactions += atomic.LoadInt64(&e.stats.TSMFullCompactionsActive)
runningCompactions += atomic.LoadInt64(&e.stats.TSMOptimizeCompactionsActive)
runningCompactions := logCheckForCompactions(log, &e.stats.CacheCompactionsActive, cacheCompactions)
runningCompactions += logCheckForCompactions(log, &e.stats.TSMCompactionsActive[0], levelZeroCompactions)
runningCompactions += logCheckForCompactions(log, &e.stats.TSMCompactionsActive[1], levelOneCompactions)
runningCompactions += logCheckForCompactions(log, &e.stats.TSMCompactionsActive[2], levelTwoCompactions)
runningCompactions += logCheckForCompactions(log, &e.stats.TSMFullCompactionsActive, fullCompactions)
runningCompactions += logCheckForCompactions(log, &e.stats.TSMOptimizeCompactionsActive, optimizeCompactions)
fullyCompacted := e.CompactionPlan.FullyCompacted()
if !fullyCompacted && log != nil {
log.Info("IsIdle false because", zap.Bool("FullyCompacted", fullyCompacted))
davidby-influx marked this conversation as resolved.
Show resolved Hide resolved
}

return cacheEmpty && runningCompactions == 0 && e.CompactionPlan.FullyCompacted()
return cacheEmpty && runningCompactions == 0 && fullyCompacted
}

func logCheckForCompactions(log *zap.Logger, counter *int64, fieldName string) int64 {
count := atomic.LoadInt64(counter)
if count > 0 && log != nil {
log.Info(logMsg, zap.Int64(fieldName, count))
}
return count
}

// Free releases any resources held by the engine to free up memory or CPU.
Expand Down
15 changes: 12 additions & 3 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,16 @@ func (s *Shard) IsIdle() bool {
if err != nil {
return true
}
return engine.IsIdle()
return engine.IsIdle(false)
}

// IsIdle return true if the shard is not receiving writes and is fully compacted.
davidby-influx marked this conversation as resolved.
Show resolved Hide resolved
func (s *Shard) LoggedIsIdle() bool {
engine, err := s.Engine()
if err != nil {
return true
}
return engine.IsIdle(true)
}

func (s *Shard) Free() error {
Expand Down Expand Up @@ -1200,15 +1209,15 @@ func (s *Shard) TagKeyCardinality(name, key []byte) int {
}

// Digest returns a digest of the shard.
func (s *Shard) Digest() (io.ReadCloser, int64, error) {
func (s *Shard) Digest(isLogged bool) (io.ReadCloser, int64, error) {
engine, err := s.Engine()
if err != nil {
return nil, 0, err
}

// Make sure the shard is idle/cold. (No use creating a digest of a
// hot shard that is rapidly changing.)
if !engine.IsIdle() {
if !engine.IsIdle(isLogged) {
return nil, 0, ErrShardNotIdle
}

Expand Down
2 changes: 1 addition & 1 deletion tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func (s *Store) ShardDigest(id uint64) (io.ReadCloser, int64, error) {
return nil, 0, ErrShardNotFound
}

return sh.Digest()
return sh.Digest(false)
}

// CreateShard creates a shard with the given id and retention policy on a database.
Expand Down