Skip to content

Commit

Permalink
Allow snapshot compactions during deletes
Browse files Browse the repository at this point in the history
If a delete takes a long time to process while writes to the
shard are occuring, it was possible for the cache to fill up
and writes to be rejected.  This occurred because we disabled
all compactions while writing tombstone file to prevent deleted
data from re-appearing after a compaction completed.

Instead, we only disable the level compactions and allow snapshot
compactions to continue.  Snapshots already handle deleted data
with the cache and wal.

Fixes #7161
  • Loading branch information
jwilder committed Aug 17, 2016
1 parent 6553667 commit 43425ad
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 75 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

- [#1834](https://github.com/influxdata/influxdb/issues/1834): Drop time when used as a tag or field key.
- [#7152](https://github.com/influxdata/influxdb/issues/7152): Decrement number of measurements only once when deleting the last series from a measurement.
- [#7161](https://github.com/influxdata/influxdb/issues/7161): Drop measurement causes cache max memory exceeded error.

## v1.0.0 [unreleased]

Expand Down
79 changes: 51 additions & 28 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,41 +489,66 @@ type Compactor struct {
NextGeneration() int
}

mu sync.RWMutex
opened bool
closing chan struct{}
files map[string]struct{}
mu sync.RWMutex
snapshotsEnabled bool
compactionsEnabled bool

files map[string]struct{}
}

func (c *Compactor) Open() {
c.mu.Lock()
defer c.mu.Unlock()
if c.opened {
if c.snapshotsEnabled || c.compactionsEnabled {
return
}

c.closing = make(chan struct{})
c.opened = true
c.snapshotsEnabled = true
c.compactionsEnabled = true
c.files = make(map[string]struct{})
}

func (c *Compactor) Close() {
c.mu.Lock()
defer c.mu.Unlock()
if !c.opened {
if !(c.snapshotsEnabled || c.compactionsEnabled) {
return
}
c.opened = false
close(c.closing)
c.snapshotsEnabled = false
c.compactionsEnabled = false
}

func (c *Compactor) DisableSnapshots() {
c.mu.Lock()
c.snapshotsEnabled = false
c.mu.Unlock()
}

func (c *Compactor) EnabledSnapshots() {
c.mu.Lock()
c.snapshotsEnabled = true
c.mu.Unlock()
}

func (c *Compactor) DisableCompactions() {
c.mu.Lock()
c.compactionsEnabled = false
c.mu.Unlock()
}

func (c *Compactor) EnabledCompactions() {
c.mu.Lock()
c.compactionsEnabled = true
c.mu.Unlock()
}

// WriteSnapshot will write a Cache snapshot to a new TSM files.
func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
c.mu.RLock()
opened := c.opened
enabled := c.snapshotsEnabled
c.mu.RUnlock()

if !opened {
if !enabled {
return nil, errSnapshotsDisabled
}

Expand All @@ -532,10 +557,10 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {

// See if we were closed while writing a snapshot
c.mu.RLock()
opened = c.opened
enabled = c.snapshotsEnabled
c.mu.RUnlock()

if !opened {
if !enabled {
return nil, errSnapshotsDisabled
}

Expand Down Expand Up @@ -599,10 +624,10 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
// Compact will write multiple smaller TSM files into 1 or more larger files
func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) {
c.mu.RLock()
opened := c.opened
enabled := c.compactionsEnabled
c.mu.RUnlock()

if !opened {
if !enabled {
return nil, errCompactionsDisabled
}

Expand All @@ -615,10 +640,10 @@ func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) {

// See if we were closed while writing a snapshot
c.mu.RLock()
opened = c.opened
enabled = c.compactionsEnabled
c.mu.RUnlock()

if !opened {
if !enabled {
return nil, errCompactionsDisabled
}

Expand All @@ -628,10 +653,10 @@ func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) {
// Compact will write multiple smaller TSM files into 1 or more larger files
func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) {
c.mu.RLock()
opened := c.opened
enabled := c.compactionsEnabled
c.mu.RUnlock()

if !opened {
if !enabled {
return nil, errCompactionsDisabled
}

Expand All @@ -644,10 +669,10 @@ func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) {

// See if we were closed while writing a snapshot
c.mu.RLock()
opened = c.opened
enabled = c.compactionsEnabled
c.mu.RUnlock()

if !opened {
if !enabled {
return nil, errCompactionsDisabled
}

Expand Down Expand Up @@ -715,14 +740,12 @@ func (c *Compactor) write(path string, iter KeyIterator) (err error) {

for iter.Next() {
c.mu.RLock()
select {
case <-c.closing:
c.mu.RUnlock()
return errCompactionAborted
default:
}
enabled := c.snapshotsEnabled || c.compactionsEnabled
c.mu.RUnlock()

if !enabled {
return errCompactionAborted
}
// Each call to read returns the next sorted key (or the prior one if there are
// more values to write). The size of values will be less than or equal to our
// chunk size (1000)
Expand Down
133 changes: 90 additions & 43 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,13 @@ const (

// Engine represents a storage engine with compressed blocks.
type Engine struct {
mu sync.RWMutex
done chan struct{}
wg sync.WaitGroup
compactionsEnabled bool
mu sync.RWMutex
done chan struct{}
snapshotterDone chan struct{}
wg sync.WaitGroup
snapshotterWg sync.WaitGroup
levelCompactionsEnabled bool
snapshotCompactionsEnabled bool

path string
logger *log.Logger // Logger to be used for important messages
Expand Down Expand Up @@ -150,47 +153,81 @@ func (e *Engine) SetEnabled(enabled bool) {
// all running compactions are aborted and new compactions stop running.
func (e *Engine) SetCompactionsEnabled(enabled bool) {
if enabled {
e.mu.Lock()
if e.compactionsEnabled {
e.mu.Unlock()
return
}
e.compactionsEnabled = true
e.enableSnapshotCompactions()
e.enableLevelCompactions()

e.done = make(chan struct{})
e.Compactor.Open()
} else {
e.disableSnapshotCompactions()
e.disableLevelCompactions()
}
}

func (e *Engine) enableLevelCompactions() {
e.mu.Lock()
if e.levelCompactionsEnabled {
e.mu.Unlock()
return
}
e.levelCompactionsEnabled = true
e.Compactor.EnabledCompactions()
e.done = make(chan struct{})
e.mu.Unlock()

e.wg.Add(5)
go e.compactCache()
go e.compactTSMFull()
go e.compactTSMLevel(true, 1)
go e.compactTSMLevel(true, 2)
go e.compactTSMLevel(false, 3)
} else {
e.mu.Lock()
if !e.compactionsEnabled {
e.mu.Unlock()
return
}
// Prevent new compactions from starting
e.compactionsEnabled = false
e.wg.Add(4)
go e.compactTSMFull()
go e.compactTSMLevel(true, 1)
go e.compactTSMLevel(true, 2)
go e.compactTSMLevel(false, 3)
}

func (e *Engine) disableLevelCompactions() {
e.mu.Lock()
if !e.levelCompactionsEnabled {
e.mu.Unlock()
return
}
// Prevent new compactions from starting
e.levelCompactionsEnabled = false
e.Compactor.DisableCompactions()
e.mu.Unlock()

// Stop all background compaction goroutines
close(e.done)

// Wait for compaction goroutines to exit
e.wg.Wait()

if err := e.cleanup(); err != nil {
e.logger.Printf("error cleaning up temp file: %v", err)
}
}

// Stop all background compaction goroutines
close(e.done)
func (e *Engine) enableSnapshotCompactions() {
e.mu.Lock()
if e.snapshotCompactionsEnabled {
e.mu.Unlock()
return
}

// Abort any running goroutines (this could take a while)
e.Compactor.Close()
e.snapshotCompactionsEnabled = true
e.snapshotterDone = make(chan struct{})
e.Compactor.EnabledSnapshots()
e.mu.Unlock()

// Wait for compaction goroutines to exit
e.wg.Wait()
e.snapshotterWg.Add(1)
go e.compactCache()
}

if err := e.cleanup(); err != nil {
e.logger.Printf("error cleaning up temp file: %v", err)
}
func (e *Engine) disableSnapshotCompactions() {
e.mu.Lock()
if !e.snapshotCompactionsEnabled {
e.mu.Unlock()
return
}
e.snapshotCompactionsEnabled = false
e.Compactor.DisableSnapshots()
e.mu.Unlock()
e.snapshotterWg.Wait()
}

// Path returns the path the engine was opened with.
Expand Down Expand Up @@ -293,6 +330,8 @@ func (e *Engine) Open() error {
return err
}

e.Compactor.Open()

if e.enableCompactionsOnOpen {
e.SetCompactionsEnabled(true)
}
Expand Down Expand Up @@ -624,12 +663,12 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error {

// Disable and abort running compactions so that tombstones added existing tsm
// files don't get removed. This would cause deleted measurements/series to
// re-appear once the compaction completed.
e.SetCompactionsEnabled(false)
defer e.SetCompactionsEnabled(true)

e.mu.RLock()
defer e.mu.RUnlock()
// re-appear once the compaction completed. We only disable the level compactions
// so that snapshotting does not stop while writing out tombstones. If it is stopped,
// and writing tombstones takes a long time, writes can get rejected due to the cache
// filling up.
e.disableLevelCompactions()
defer e.enableLevelCompactions()

// keyMap is used to see if a given key should be deleted. seriesKey
// are the measurement + tagset (minus separate & field)
Expand Down Expand Up @@ -799,13 +838,21 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) (

// compactCache continually checks if the WAL cache should be written to disk
func (e *Engine) compactCache() {
defer e.wg.Done()
defer e.snapshotterWg.Done()
for {
select {
case <-e.done:
case <-e.snapshotterDone:
return

default:
e.mu.RLock()
enabled := e.snapshotCompactionsEnabled
e.mu.RUnlock()

if !enabled {
return
}

e.Cache.UpdateAge()
if e.ShouldCompactCache(e.WAL.LastWriteTime()) {
start := time.Now()
Expand Down
11 changes: 7 additions & 4 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,13 +647,16 @@ func (f *FileStore) BlockCount(path string, idx int) int {

// walkFiles calls fn for every files in filestore in parallel
func (f *FileStore) walkFiles(fn func(f TSMFile) error) error {
// Copy the current TSM files to prevent a slow walker from
// blocking other operations.
f.mu.RLock()
defer f.mu.RUnlock()
files := make([]TSMFile, len(f.files))
copy(files, f.files)
f.mu.RUnlock()

// struct to hold the result of opening each reader in a goroutine

errC := make(chan error, len(f.files))
for _, f := range f.files {
errC := make(chan error, len(files))
for _, f := range files {
go func(tsm TSMFile) {
if err := fn(tsm); err != nil {
errC <- fmt.Errorf("file %s: %s", tsm.Path(), err)
Expand Down

0 comments on commit 43425ad

Please sign in to comment.