Skip to content

Commit

Permalink
Fix deadlock in engine/measurement fields
Browse files Browse the repository at this point in the history
The OnReplace func ends up trying to acquire locks on MeasurementFields.  When
its called via snapshotting, this can deadlock because the snapshotting goroutine
also holds an RLock on the engine.  If a delete measurement calls is run at the
right time, it will lock the MeasurementFields and try to acquire a lock on the engine
to disable compactions.  This creates a deadlock.

To fix this, the OnReplace callback is moved to a function param to allow only Replace
calls as part of a compaction to invoke it as opposed to both snapshotting and compactions.

Fixes #8713
  • Loading branch information
jwilder committed Aug 16, 2017
1 parent 4c4456c commit 7e24c38
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 8 deletions.
7 changes: 4 additions & 3 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,6 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string,
compactionLimiter: opt.CompactionLimiter,
}

fs.OnReplace = e.onFileStoreReplace

// Attach fieldset to index.
e.index.SetFieldSet(e.fieldset)

Expand Down Expand Up @@ -1320,6 +1318,7 @@ type compactionStrategy struct {
compactor *Compactor
fileStore *FileStore
limiter limiter.Fixed
engine *Engine
}

// Apply concurrently compacts all the groups in a compaction strategy.
Expand Down Expand Up @@ -1395,7 +1394,7 @@ func (s *compactionStrategy) compactGroup(groupNum int) {
return
}

if err := s.fileStore.Replace(group, files); err != nil {
if err := s.fileStore.ReplaceWithCallback(group, files, s.engine.onFileStoreReplace); err != nil {
s.logger.Info(fmt.Sprintf("error replacing new TSM files: %v", err))
atomic.AddInt64(s.errorStat, 1)
time.Sleep(time.Second)
Expand Down Expand Up @@ -1426,6 +1425,7 @@ func (e *Engine) levelCompactionStrategy(fast bool, level int) *compactionStrate
compactor: e.Compactor,
fast: fast,
limiter: e.compactionLimiter,
engine: e,

description: fmt.Sprintf("level %d", level),
activeStat: &e.stats.TSMCompactionsActive[level-1],
Expand Down Expand Up @@ -1458,6 +1458,7 @@ func (e *Engine) fullCompactionStrategy() *compactionStrategy {
compactor: e.Compactor,
fast: optimize,
limiter: e.compactionLimiter,
engine: e,
}

if optimize {
Expand Down
16 changes: 11 additions & 5 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,6 @@ type FileStore struct {
purger *purger

currentTempDirID int

// Callback of files that are being added to the filestore
OnReplace func(r []TSMFile)
}

// FileStat holds information about a TSM file on disk.
Expand Down Expand Up @@ -507,8 +504,17 @@ func (f *FileStore) Stats() []FileStat {
return f.lastFileStats
}

// ReplaceWithCallback replaces oldFiles with newFiles and calls updatedFn with the files to be added the FileStore.
func (f *FileStore) ReplaceWithCallback(oldFiles, newFiles []string, updatedFn func(r []TSMFile)) error {
return f.replace(oldFiles, newFiles, updatedFn)
}

// Replace replaces oldFiles with newFiles.
func (f *FileStore) Replace(oldFiles, newFiles []string) error {
return f.replace(oldFiles, newFiles, nil)
}

func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMFile)) error {
if len(oldFiles) == 0 && len(newFiles) == 0 {
return nil
}
Expand Down Expand Up @@ -549,8 +555,8 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
updated = append(updated, tsm)
}

if f.OnReplace != nil {
f.OnReplace(updated)
if updatedFn != nil {
updatedFn(updated)
}

f.mu.Lock()
Expand Down

0 comments on commit 7e24c38

Please sign in to comment.