From 42b82e719a5407a68b9de699f7375a039e521a8c Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 25 Jul 2017 13:35:01 -0600 Subject: [PATCH] Interrupt in progress TSM compactions When snapshots and compactions are disabled, the check to see if the compaction should be aborted occurs in between writing to the next TSM file. If a large compaction is running, it might take a while for the file to be finished writing causing long delays. This now interrupts compactions while iterating over the blocks to write which allows them to abort immediately. --- CHANGELOG.md | 6 +++ tsdb/engine/tsm1/compact.go | 79 ++++++++++++++++++++++++------ tsdb/engine/tsm1/compact_test.go | 82 ++++++++++++++++++++++++++++++-- tsdb/engine/tsm1/engine.go | 2 + 4 files changed, 150 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 369c477ce01..463d057d76a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## v1.3.2 [unreleased] + +### Bugfixes + +- [#8629](https://github.com/influxdata/influxdb/pull/8629): Interrupt in progress TSM compactions + ## v1.3.1 [2017-07-20] ### Bugfixes diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index f3c04d80f77..9c9efd3b5f3 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -591,6 +591,11 @@ type Compactor struct { snapshotsEnabled bool compactionsEnabled bool + // The channel to signal that any in progress snapshots should be aborted. + snapshotsInterrupt chan struct{} + // The channel to signal that any in progress level compactions should be aborted. + compactionsInterrupt chan struct{} + files map[string]struct{} } @@ -604,6 +609,9 @@ func (c *Compactor) Open() { c.snapshotsEnabled = true c.compactionsEnabled = true + c.snapshotsInterrupt = make(chan struct{}) + c.compactionsInterrupt = make(chan struct{}) + c.files = make(map[string]struct{}) } @@ -616,12 +624,22 @@ func (c *Compactor) Close() { } c.snapshotsEnabled = false c.compactionsEnabled = false + if c.compactionsInterrupt != nil { + close(c.compactionsInterrupt) + } + if c.snapshotsInterrupt != nil { + close(c.snapshotsInterrupt) + } } // DisableSnapshots disables the compactor from performing snapshots. func (c *Compactor) DisableSnapshots() { c.mu.Lock() c.snapshotsEnabled = false + if c.snapshotsInterrupt != nil { + close(c.snapshotsInterrupt) + c.snapshotsInterrupt = nil + } c.mu.Unlock() } @@ -629,6 +647,9 @@ func (c *Compactor) DisableSnapshots() { func (c *Compactor) EnableSnapshots() { c.mu.Lock() c.snapshotsEnabled = true + if c.snapshotsInterrupt == nil { + c.snapshotsInterrupt = make(chan struct{}) + } c.mu.Unlock() } @@ -636,6 +657,10 @@ func (c *Compactor) EnableSnapshots() { func (c *Compactor) DisableCompactions() { c.mu.Lock() c.compactionsEnabled = false + if c.compactionsInterrupt != nil { + close(c.compactionsInterrupt) + c.compactionsInterrupt = nil + } c.mu.Unlock() } @@ -643,6 +668,9 @@ func (c *Compactor) DisableCompactions() { func (c *Compactor) EnableCompactions() { c.mu.Lock() c.compactionsEnabled = true + if c.compactionsInterrupt == nil { + c.compactionsInterrupt = make(chan struct{}) + } c.mu.Unlock() } @@ -650,13 +678,14 @@ func (c *Compactor) EnableCompactions() { func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { c.mu.RLock() enabled := c.snapshotsEnabled + intC := c.snapshotsInterrupt c.mu.RUnlock() if !enabled { return nil, errSnapshotsDisabled } - iter := NewCacheKeyIterator(cache, tsdb.DefaultMaxPointsPerBlock) + iter := NewCacheKeyIterator(cache, tsdb.DefaultMaxPointsPerBlock, intC) files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter) // See if we were disabled while writing a snapshot @@ -717,7 +746,11 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) { return nil, nil } - tsm, err := NewTSMKeyIterator(size, fast, trs...) + c.mu.RLock() + intC := c.compactionsInterrupt + c.mu.RUnlock() + + tsm, err := NewTSMKeyIterator(size, fast, intC, trs...) if err != nil { return nil, err } @@ -993,7 +1026,8 @@ type tsmKeyIterator struct { // merged are encoded blocks that have been combined or used as is // without decode - merged blocks + merged blocks + interrupt chan struct{} } type block struct { @@ -1045,7 +1079,7 @@ func (a blocks) Swap(i, j int) { a[i], a[j] = a[j], a[i] } // NewTSMKeyIterator returns a new TSM key iterator from readers. // size indicates the maximum number of values to encode in a single block. -func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator, error) { +func NewTSMKeyIterator(size int, fast bool, interrupt chan struct{}, readers ...*TSMReader) (KeyIterator, error) { var iter []*BlockIterator for _, r := range readers { iter = append(iter, r.BlockIterator()) @@ -1059,6 +1093,7 @@ func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator, iterators: iter, fast: fast, buf: make([]blocks, len(iter)), + interrupt: interrupt, }, nil } @@ -1199,6 +1234,13 @@ func (k *tsmKeyIterator) merge() { } func (k *tsmKeyIterator) Read() (string, int64, int64, []byte, error) { + // See if compactions were disabled while we were running. + select { + case <-k.interrupt: + return "", 0, 0, nil, errCompactionAborted + default: + } + if len(k.merged) == 0 { return "", 0, 0, nil, k.err } @@ -1224,9 +1266,10 @@ type cacheKeyIterator struct { size int order []string - i int - blocks [][]cacheBlock - ready []chan struct{} + i int + blocks [][]cacheBlock + ready []chan struct{} + interrupt chan struct{} } type cacheBlock struct { @@ -1237,7 +1280,7 @@ type cacheBlock struct { } // NewCacheKeyIterator returns a new KeyIterator from a Cache. -func NewCacheKeyIterator(cache *Cache, size int) KeyIterator { +func NewCacheKeyIterator(cache *Cache, size int, interrupt chan struct{}) KeyIterator { keys := cache.Keys() chans := make([]chan struct{}, len(keys)) @@ -1246,12 +1289,13 @@ func NewCacheKeyIterator(cache *Cache, size int) KeyIterator { } cki := &cacheKeyIterator{ - i: -1, - size: size, - cache: cache, - order: keys, - ready: chans, - blocks: make([][]cacheBlock, len(keys)), + i: -1, + size: size, + cache: cache, + order: keys, + ready: chans, + blocks: make([][]cacheBlock, len(keys)), + interrupt: interrupt, } go cki.encode() return cki @@ -1330,6 +1374,13 @@ func (c *cacheKeyIterator) Next() bool { } func (c *cacheKeyIterator) Read() (string, int64, int64, []byte, error) { + // See if snapshot compactions were disabled while we were running. + select { + case <-c.interrupt: + return "", 0, 0, nil, errCompactionAborted + default: + } + blk := c.blocks[c.i][0] return blk.k, blk.minTime, blk.maxTime, blk.b, blk.err } diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index ec903fd4a7a..5422f6cd9d3 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -830,7 +830,7 @@ func TestTSMKeyIterator_Single(t *testing.T) { r := MustTSMReader(dir, 1, writes) - iter, err := tsm1.NewTSMKeyIterator(1, false, r) + iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r) if err != nil { t.Fatalf("unexpected error creating WALKeyIterator: %v", err) } @@ -890,7 +890,7 @@ func TestTSMKeyIterator_Duplicate(t *testing.T) { r2 := MustTSMReader(dir, 2, writes2) - iter, err := tsm1.NewTSMKeyIterator(1, false, r1, r2) + iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r1, r2) if err != nil { t.Fatalf("unexpected error creating WALKeyIterator: %v", err) } @@ -951,7 +951,7 @@ func TestTSMKeyIterator_MultipleKeysDeleted(t *testing.T) { r2 := MustTSMReader(dir, 2, points2) r2.Delete([]string{"cpu,host=A#!~#count"}) - iter, err := tsm1.NewTSMKeyIterator(1, false, r1, r2) + iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r1, r2) if err != nil { t.Fatalf("unexpected error creating WALKeyIterator: %v", err) } @@ -993,6 +993,41 @@ func TestTSMKeyIterator_MultipleKeysDeleted(t *testing.T) { } } +// Tests that the TSMKeyIterator will abort if the interrupt channel is closed +func TestTSMKeyIterator_Abort(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + + v1 := tsm1.NewValue(1, 1.1) + writes := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v1}, + } + + r := MustTSMReader(dir, 1, writes) + + intC := make(chan struct{}) + iter, err := tsm1.NewTSMKeyIterator(1, false, intC, r) + if err != nil { + t.Fatalf("unexpected error creating WALKeyIterator: %v", err) + } + + var aborted bool + for iter.Next() { + // Abort + close(intC) + + _, _, _, _, err := iter.Read() + if err == nil { + t.Fatalf("unexpected error read: %v", err) + } + aborted = err != nil + } + + if !aborted { + t.Fatalf("iteration not aborted") + } +} + func TestCacheKeyIterator_Single(t *testing.T) { v0 := tsm1.NewValue(1, 1.0) @@ -1008,7 +1043,7 @@ func TestCacheKeyIterator_Single(t *testing.T) { } } - iter := tsm1.NewCacheKeyIterator(c, 1) + iter := tsm1.NewCacheKeyIterator(c, 1, nil) var readValues bool for iter.Next() { key, _, _, block, err := iter.Read() @@ -1056,7 +1091,7 @@ func TestCacheKeyIterator_Chunked(t *testing.T) { } } - iter := tsm1.NewCacheKeyIterator(c, 1) + iter := tsm1.NewCacheKeyIterator(c, 1, nil) var readValues bool var chunk int for iter.Next() { @@ -1090,6 +1125,43 @@ func TestCacheKeyIterator_Chunked(t *testing.T) { } } +// Tests that the CacheKeyIterator will abort if the interrupt channel is closed +func TestCacheKeyIterator_Abort(t *testing.T) { + v0 := tsm1.NewValue(1, 1.0) + + writes := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v0}, + } + + c := tsm1.NewCache(0, "") + + for k, v := range writes { + if err := c.Write(k, v); err != nil { + t.Fatalf("failed to write key foo to cache: %s", err.Error()) + } + } + + intC := make(chan struct{}) + + iter := tsm1.NewCacheKeyIterator(c, 1, intC) + + var aborted bool + for iter.Next() { + //Abort + close(intC) + + _, _, _, _, err := iter.Read() + if err == nil { + t.Fatalf("unexpected error read: %v", err) + } + aborted = err != nil + } + + if !aborted { + t.Fatalf("iteration not aborted") + } +} + func TestDefaultPlanner_Plan_Min(t *testing.T) { cp := tsm1.NewDefaultPlanner( &fakeFileStore{ diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 4d24adb5ddd..535865fd6aa 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -290,6 +290,7 @@ func (e *Engine) enableSnapshotCompactions() { return } + e.Compactor.EnableSnapshots() quit := make(chan struct{}) e.snapDone = quit e.snapWG.Add(1) @@ -304,6 +305,7 @@ func (e *Engine) disableSnapshotCompactions() { if e.snapDone != nil { close(e.snapDone) e.snapDone = nil + e.Compactor.DisableSnapshots() } e.mu.Unlock()