Skip to content

Commit

Permalink
Prevent concurrent compactions from stepping on each other
Browse files Browse the repository at this point in the history
Normally, compactions do not conflict on the files they are compacting.
If the full cold threshold is set very low, it can cause conflicts where
two compactions compact the same files.  The full compaction was the
only place this could happen as it's planning is greedy.

To make this safer for concurrent execution, the compaction tracks which
files are current being compacted and prevents any new compactions from
starting if the file set overlaps.

Fixes #6595
  • Loading branch information
jwilder committed Jul 26, 2016
1 parent ded6e40 commit cab84ae
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ With this release the systemd configuration files for InfluxDB will use the syst
- [#6946](https://github.com/influxdata/influxdb/issues/6946): Duplicate data for the same timestamp
- [#7043](https://github.com/influxdata/influxdb/pull/7043): Remove limiter from walkShards
- [#5501](https://github.com/influxdata/influxdb/issues/5501): Queries against files that have just been compacted need to point to new files
- [#6595](https://github.com/influxdata/influxdb/issues/6595): Fix full compactions conflicting with level compactions

## v0.13.0 [2016-05-12]

Expand Down
45 changes: 42 additions & 3 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ type Compactor struct {
mu sync.RWMutex
opened bool
closing chan struct{}
files map[string]struct{}
}

func (c *Compactor) Open() {
Expand All @@ -503,6 +504,7 @@ func (c *Compactor) Open() {

c.closing = make(chan struct{})
c.opened = true
c.files = make(map[string]struct{})
}

func (c *Compactor) Close() {
Expand Down Expand Up @@ -604,6 +606,11 @@ func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) {
return nil, errCompactionsDisabled
}

if !c.add(tsmFiles) {
return nil, errCompactionInProgress
}
defer c.remove(tsmFiles)

files, err := c.compact(false, tsmFiles)

// See if we were closed while writing a snapshot
Expand All @@ -628,6 +635,11 @@ func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) {
return nil, errCompactionsDisabled
}

if !c.add(tsmFiles) {
return nil, errCompactionInProgress
}
defer c.remove(tsmFiles)

files, err := c.compact(true, tsmFiles)

// See if we were closed while writing a snapshot
Expand Down Expand Up @@ -673,9 +685,6 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([

// We hit an error but didn't finish the compaction. Remove the temp file and abort.
if err != nil {
if err := os.Remove(fileName); err != nil {
return nil, err
}
return nil, err
}

Expand Down Expand Up @@ -750,6 +759,36 @@ func (c *Compactor) write(path string, iter KeyIterator) (err error) {
return nil
}

func (c *Compactor) add(files []string) bool {
c.mu.Lock()
defer c.mu.Unlock()

var inuse bool
for _, f := range files {
if _, ok := c.files[f]; ok {
inuse = true
break
}
}

if inuse {
return false
}

for _, f := range files {
c.files[f] = struct{}{}
}
return true
}

func (c *Compactor) remove(files []string) {
c.mu.Lock()
defer c.mu.Unlock()
for _, f := range files {
delete(c.files, f)
}
}

// KeyIterator allows iteration over set of keys and values in sorted order.
type KeyIterator interface {
Next() bool
Expand Down
40 changes: 36 additions & 4 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,15 +858,31 @@ func (e *Engine) compactTSMLevel(fast bool, level int) {

if fast {
files, err = e.Compactor.CompactFast(group)
if err != nil && err != errCompactionsDisabled {
if err == errCompactionsDisabled || err == errCompactionInProgress {
e.logger.Printf("aborted level %d group (%d). %v",
level, groupNum, err)

if err == errCompactionInProgress {
time.Sleep(time.Second)
}
return
} else if err != nil {
e.logger.Printf("error compacting TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMCompactionErrors[level-1], 1)
time.Sleep(time.Second)
return
}
} else {
files, err = e.Compactor.CompactFull(group)
if err != nil && err != errCompactionsDisabled {
if err == errCompactionsDisabled || err == errCompactionInProgress {
e.logger.Printf("aborted level %d compaction group (%d). %v",
level, groupNum, err)

if err == errCompactionInProgress {
time.Sleep(time.Second)
}
return
} else if err != nil {
e.logger.Printf("error compacting TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMCompactionErrors[level-1], 1)
time.Sleep(time.Second)
Expand Down Expand Up @@ -941,7 +957,15 @@ func (e *Engine) compactTSMFull() {
)
if optimize {
files, err = e.Compactor.CompactFast(group)
if err != nil && err != errCompactionsDisabled {
if err == errCompactionsDisabled || err == errCompactionInProgress {
e.logger.Printf("aborted %s compaction group (%d). %v",
logDesc, groupNum, err)

if err == errCompactionInProgress {
time.Sleep(time.Second)
}
return
} else if err != nil {
e.logger.Printf("error compacting TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMOptimizeCompactionErrors, 1)

Expand All @@ -950,7 +974,15 @@ func (e *Engine) compactTSMFull() {
}
} else {
files, err = e.Compactor.CompactFull(group)
if err != nil && err != errCompactionsDisabled {
if err == errCompactionsDisabled || err == errCompactionInProgress {
e.logger.Printf("aborted %s compaction group (%d). %v",
logDesc, groupNum, err)

if err == errCompactionInProgress {
time.Sleep(time.Second)
}
return
} else if err != nil {
e.logger.Printf("error compacting TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMFullCompactionErrors, 1)

Expand Down

0 comments on commit cab84ae

Please sign in to comment.