From 42b82e719a5407a68b9de699f7375a039e521a8c Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 25 Jul 2017 13:35:01 -0600 Subject: [PATCH 1/3] Interrupt in progress TSM compactions When snapshots and compactions are disabled, the check to see if the compaction should be aborted occurs in between writing to the next TSM file. If a large compaction is running, it might take a while for the file to be finished writing causing long delays. This now interrupts compactions while iterating over the blocks to write which allows them to abort immediately. --- CHANGELOG.md | 6 +++ tsdb/engine/tsm1/compact.go | 79 ++++++++++++++++++++++++------ tsdb/engine/tsm1/compact_test.go | 82 ++++++++++++++++++++++++++++++-- tsdb/engine/tsm1/engine.go | 2 + 4 files changed, 150 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 369c477ce01..463d057d76a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## v1.3.2 [unreleased] + +### Bugfixes + +- [#8629](https://github.com/influxdata/influxdb/pull/8629): Interrupt in progress TSM compactions + ## v1.3.1 [2017-07-20] ### Bugfixes diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index f3c04d80f77..9c9efd3b5f3 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -591,6 +591,11 @@ type Compactor struct { snapshotsEnabled bool compactionsEnabled bool + // The channel to signal that any in progress snapshots should be aborted. + snapshotsInterrupt chan struct{} + // The channel to signal that any in progress level compactions should be aborted. + compactionsInterrupt chan struct{} + files map[string]struct{} } @@ -604,6 +609,9 @@ func (c *Compactor) Open() { c.snapshotsEnabled = true c.compactionsEnabled = true + c.snapshotsInterrupt = make(chan struct{}) + c.compactionsInterrupt = make(chan struct{}) + c.files = make(map[string]struct{}) } @@ -616,12 +624,22 @@ func (c *Compactor) Close() { } c.snapshotsEnabled = false c.compactionsEnabled = false + if c.compactionsInterrupt != nil { + close(c.compactionsInterrupt) + } + if c.snapshotsInterrupt != nil { + close(c.snapshotsInterrupt) + } } // DisableSnapshots disables the compactor from performing snapshots. func (c *Compactor) DisableSnapshots() { c.mu.Lock() c.snapshotsEnabled = false + if c.snapshotsInterrupt != nil { + close(c.snapshotsInterrupt) + c.snapshotsInterrupt = nil + } c.mu.Unlock() } @@ -629,6 +647,9 @@ func (c *Compactor) DisableSnapshots() { func (c *Compactor) EnableSnapshots() { c.mu.Lock() c.snapshotsEnabled = true + if c.snapshotsInterrupt == nil { + c.snapshotsInterrupt = make(chan struct{}) + } c.mu.Unlock() } @@ -636,6 +657,10 @@ func (c *Compactor) EnableSnapshots() { func (c *Compactor) DisableCompactions() { c.mu.Lock() c.compactionsEnabled = false + if c.compactionsInterrupt != nil { + close(c.compactionsInterrupt) + c.compactionsInterrupt = nil + } c.mu.Unlock() } @@ -643,6 +668,9 @@ func (c *Compactor) DisableCompactions() { func (c *Compactor) EnableCompactions() { c.mu.Lock() c.compactionsEnabled = true + if c.compactionsInterrupt == nil { + c.compactionsInterrupt = make(chan struct{}) + } c.mu.Unlock() } @@ -650,13 +678,14 @@ func (c *Compactor) EnableCompactions() { func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { c.mu.RLock() enabled := c.snapshotsEnabled + intC := c.snapshotsInterrupt c.mu.RUnlock() if !enabled { return nil, errSnapshotsDisabled } - iter := NewCacheKeyIterator(cache, tsdb.DefaultMaxPointsPerBlock) + iter := NewCacheKeyIterator(cache, tsdb.DefaultMaxPointsPerBlock, intC) files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter) // See if we were disabled while writing a snapshot @@ -717,7 +746,11 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) { return nil, nil } - tsm, err := NewTSMKeyIterator(size, fast, trs...) + c.mu.RLock() + intC := c.compactionsInterrupt + c.mu.RUnlock() + + tsm, err := NewTSMKeyIterator(size, fast, intC, trs...) if err != nil { return nil, err } @@ -993,7 +1026,8 @@ type tsmKeyIterator struct { // merged are encoded blocks that have been combined or used as is // without decode - merged blocks + merged blocks + interrupt chan struct{} } type block struct { @@ -1045,7 +1079,7 @@ func (a blocks) Swap(i, j int) { a[i], a[j] = a[j], a[i] } // NewTSMKeyIterator returns a new TSM key iterator from readers. // size indicates the maximum number of values to encode in a single block. -func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator, error) { +func NewTSMKeyIterator(size int, fast bool, interrupt chan struct{}, readers ...*TSMReader) (KeyIterator, error) { var iter []*BlockIterator for _, r := range readers { iter = append(iter, r.BlockIterator()) @@ -1059,6 +1093,7 @@ func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator, iterators: iter, fast: fast, buf: make([]blocks, len(iter)), + interrupt: interrupt, }, nil } @@ -1199,6 +1234,13 @@ func (k *tsmKeyIterator) merge() { } func (k *tsmKeyIterator) Read() (string, int64, int64, []byte, error) { + // See if compactions were disabled while we were running. + select { + case <-k.interrupt: + return "", 0, 0, nil, errCompactionAborted + default: + } + if len(k.merged) == 0 { return "", 0, 0, nil, k.err } @@ -1224,9 +1266,10 @@ type cacheKeyIterator struct { size int order []string - i int - blocks [][]cacheBlock - ready []chan struct{} + i int + blocks [][]cacheBlock + ready []chan struct{} + interrupt chan struct{} } type cacheBlock struct { @@ -1237,7 +1280,7 @@ type cacheBlock struct { } // NewCacheKeyIterator returns a new KeyIterator from a Cache. -func NewCacheKeyIterator(cache *Cache, size int) KeyIterator { +func NewCacheKeyIterator(cache *Cache, size int, interrupt chan struct{}) KeyIterator { keys := cache.Keys() chans := make([]chan struct{}, len(keys)) @@ -1246,12 +1289,13 @@ func NewCacheKeyIterator(cache *Cache, size int) KeyIterator { } cki := &cacheKeyIterator{ - i: -1, - size: size, - cache: cache, - order: keys, - ready: chans, - blocks: make([][]cacheBlock, len(keys)), + i: -1, + size: size, + cache: cache, + order: keys, + ready: chans, + blocks: make([][]cacheBlock, len(keys)), + interrupt: interrupt, } go cki.encode() return cki @@ -1330,6 +1374,13 @@ func (c *cacheKeyIterator) Next() bool { } func (c *cacheKeyIterator) Read() (string, int64, int64, []byte, error) { + // See if snapshot compactions were disabled while we were running. + select { + case <-c.interrupt: + return "", 0, 0, nil, errCompactionAborted + default: + } + blk := c.blocks[c.i][0] return blk.k, blk.minTime, blk.maxTime, blk.b, blk.err } diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index ec903fd4a7a..5422f6cd9d3 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -830,7 +830,7 @@ func TestTSMKeyIterator_Single(t *testing.T) { r := MustTSMReader(dir, 1, writes) - iter, err := tsm1.NewTSMKeyIterator(1, false, r) + iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r) if err != nil { t.Fatalf("unexpected error creating WALKeyIterator: %v", err) } @@ -890,7 +890,7 @@ func TestTSMKeyIterator_Duplicate(t *testing.T) { r2 := MustTSMReader(dir, 2, writes2) - iter, err := tsm1.NewTSMKeyIterator(1, false, r1, r2) + iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r1, r2) if err != nil { t.Fatalf("unexpected error creating WALKeyIterator: %v", err) } @@ -951,7 +951,7 @@ func TestTSMKeyIterator_MultipleKeysDeleted(t *testing.T) { r2 := MustTSMReader(dir, 2, points2) r2.Delete([]string{"cpu,host=A#!~#count"}) - iter, err := tsm1.NewTSMKeyIterator(1, false, r1, r2) + iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r1, r2) if err != nil { t.Fatalf("unexpected error creating WALKeyIterator: %v", err) } @@ -993,6 +993,41 @@ func TestTSMKeyIterator_MultipleKeysDeleted(t *testing.T) { } } +// Tests that the TSMKeyIterator will abort if the interrupt channel is closed +func TestTSMKeyIterator_Abort(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + + v1 := tsm1.NewValue(1, 1.1) + writes := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v1}, + } + + r := MustTSMReader(dir, 1, writes) + + intC := make(chan struct{}) + iter, err := tsm1.NewTSMKeyIterator(1, false, intC, r) + if err != nil { + t.Fatalf("unexpected error creating WALKeyIterator: %v", err) + } + + var aborted bool + for iter.Next() { + // Abort + close(intC) + + _, _, _, _, err := iter.Read() + if err == nil { + t.Fatalf("unexpected error read: %v", err) + } + aborted = err != nil + } + + if !aborted { + t.Fatalf("iteration not aborted") + } +} + func TestCacheKeyIterator_Single(t *testing.T) { v0 := tsm1.NewValue(1, 1.0) @@ -1008,7 +1043,7 @@ func TestCacheKeyIterator_Single(t *testing.T) { } } - iter := tsm1.NewCacheKeyIterator(c, 1) + iter := tsm1.NewCacheKeyIterator(c, 1, nil) var readValues bool for iter.Next() { key, _, _, block, err := iter.Read() @@ -1056,7 +1091,7 @@ func TestCacheKeyIterator_Chunked(t *testing.T) { } } - iter := tsm1.NewCacheKeyIterator(c, 1) + iter := tsm1.NewCacheKeyIterator(c, 1, nil) var readValues bool var chunk int for iter.Next() { @@ -1090,6 +1125,43 @@ func TestCacheKeyIterator_Chunked(t *testing.T) { } } +// Tests that the CacheKeyIterator will abort if the interrupt channel is closed +func TestCacheKeyIterator_Abort(t *testing.T) { + v0 := tsm1.NewValue(1, 1.0) + + writes := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v0}, + } + + c := tsm1.NewCache(0, "") + + for k, v := range writes { + if err := c.Write(k, v); err != nil { + t.Fatalf("failed to write key foo to cache: %s", err.Error()) + } + } + + intC := make(chan struct{}) + + iter := tsm1.NewCacheKeyIterator(c, 1, intC) + + var aborted bool + for iter.Next() { + //Abort + close(intC) + + _, _, _, _, err := iter.Read() + if err == nil { + t.Fatalf("unexpected error read: %v", err) + } + aborted = err != nil + } + + if !aborted { + t.Fatalf("iteration not aborted") + } +} + func TestDefaultPlanner_Plan_Min(t *testing.T) { cp := tsm1.NewDefaultPlanner( &fakeFileStore{ diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 4d24adb5ddd..535865fd6aa 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -290,6 +290,7 @@ func (e *Engine) enableSnapshotCompactions() { return } + e.Compactor.EnableSnapshots() quit := make(chan struct{}) e.snapDone = quit e.snapWG.Add(1) @@ -304,6 +305,7 @@ func (e *Engine) disableSnapshotCompactions() { if e.snapDone != nil { close(e.snapDone) e.snapDone = nil + e.Compactor.DisableSnapshots() } e.mu.Unlock() From 935379d7515980e3c830551bd58e1390f52ac8f9 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 25 Jul 2017 16:20:52 -0600 Subject: [PATCH 2/3] Limit delete to run one shard at a time There was a change to speed up deleting and dropping measurements that executed the deletes in parallel for all shards at once. #7015 When TSI was merged in #7618, the series keys passed into Shard.DeleteMeasurement were removed and were expanded lower down. This causes memory to blow up when a delete across many shards occurs as we now expand the set of series keys N times instead of just once as before. While running the deletes in parallel would be ideal, there have been a number of optimizations in the delete path that make running deletes serially pretty good. This change just limits the concurrency of the deletes which keeps memory more stable. --- CHANGELOG.md | 1 + tsdb/store.go | 13 +++++++++++++ 2 files changed, 14 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 463d057d76a..a3c2c9fa701 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Bugfixes - [#8629](https://github.com/influxdata/influxdb/pull/8629): Interrupt in progress TSM compactions +- [#8630](https://github.com/influxdata/influxdb/pull/8630): Prevent excessive memory usage when dropping series ## v1.3.1 [2017-07-20] diff --git a/tsdb/store.go b/tsdb/store.go index 2cc51ac2cd0..4a6b414b45e 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -570,7 +570,13 @@ func (s *Store) DeleteMeasurement(database, name string) error { shards := s.filterShards(byDatabase(database)) s.mu.RUnlock() + // Limit to 1 delete for each shard since expanding the measurement into the list + // of series keys can be very memory intensive if run concurrently. + limit := limiter.NewFixed(1) return s.walkShards(shards, func(sh *Shard) error { + limit.Take() + defer limit.Release() + if err := sh.DeleteMeasurement([]byte(name)); err != nil { return err } @@ -834,6 +840,10 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi s.mu.RLock() defer s.mu.RUnlock() + // Limit to 1 delete for each shard since expanding the measurement into the list + // of series keys can be very memory intensive if run concurrently. + limit := limiter.NewFixed(1) + return s.walkShards(shards, func(sh *Shard) error { // Determine list of measurements from sources. // Use all measurements if no FROM clause was provided. @@ -852,6 +862,9 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi } sort.Strings(names) + limit.Take() + defer limit.Release() + // Find matching series keys for each measurement. var keys [][]byte for _, name := range names { From ae4725f54a1f8d1dd4bb0e4e0339f8359c6d2e80 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 27 Jul 2017 23:48:44 -0600 Subject: [PATCH 3/3] Don't fail with test error if more circle envs are available --- circle-test.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/circle-test.sh b/circle-test.sh index 105b44185c4..7528e8f50c3 100755 --- a/circle-test.sh +++ b/circle-test.sh @@ -9,17 +9,17 @@ set -e DIR=$(cd $(dirname "${BASH_SOURCE[0]}") && pwd) cd $DIR -export OUTPUT_DIR="$CIRCLE_ARTIFACTS" +export OUTPUT_DIR="$CIRCLE_ARTIFACTS" # Don't delete the container since CircleCI doesn't have permission to do so. export DOCKER_RM="false" # Get number of test environments. count=$(./test.sh count) # Check that we aren't wasting CircleCI nodes. -if [ $CIRCLE_NODE_TOTAL -gt $count ] +if [ $CIRCLE_NODE_INDEX -gt $((count - 1)) ] then echo "More CircleCI nodes allocated than tests environments to run!" - exit 1 + exit 0 fi # Map CircleCI nodes to test environments.