diff --git a/CHANGELOG.md b/CHANGELOG.md index 369c477ce01..a3c2c9fa701 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +## v1.3.2 [unreleased] + +### Bugfixes + +- [#8629](https://github.com/influxdata/influxdb/pull/8629): Interrupt in progress TSM compactions +- [#8630](https://github.com/influxdata/influxdb/pull/8630): Prevent excessive memory usage when dropping series + ## v1.3.1 [2017-07-20] ### Bugfixes diff --git a/circle-test.sh b/circle-test.sh index 105b44185c4..7528e8f50c3 100755 --- a/circle-test.sh +++ b/circle-test.sh @@ -9,17 +9,17 @@ set -e DIR=$(cd $(dirname "${BASH_SOURCE[0]}") && pwd) cd $DIR -export OUTPUT_DIR="$CIRCLE_ARTIFACTS" +export OUTPUT_DIR="$CIRCLE_ARTIFACTS" # Don't delete the container since CircleCI doesn't have permission to do so. export DOCKER_RM="false" # Get number of test environments. count=$(./test.sh count) # Check that we aren't wasting CircleCI nodes. -if [ $CIRCLE_NODE_TOTAL -gt $count ] +if [ $CIRCLE_NODE_INDEX -gt $((count - 1)) ] then echo "More CircleCI nodes allocated than tests environments to run!" - exit 1 + exit 0 fi # Map CircleCI nodes to test environments. diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index f3c04d80f77..9c9efd3b5f3 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -591,6 +591,11 @@ type Compactor struct { snapshotsEnabled bool compactionsEnabled bool + // The channel to signal that any in progress snapshots should be aborted. + snapshotsInterrupt chan struct{} + // The channel to signal that any in progress level compactions should be aborted. + compactionsInterrupt chan struct{} + files map[string]struct{} } @@ -604,6 +609,9 @@ func (c *Compactor) Open() { c.snapshotsEnabled = true c.compactionsEnabled = true + c.snapshotsInterrupt = make(chan struct{}) + c.compactionsInterrupt = make(chan struct{}) + c.files = make(map[string]struct{}) } @@ -616,12 +624,22 @@ func (c *Compactor) Close() { } c.snapshotsEnabled = false c.compactionsEnabled = false + if c.compactionsInterrupt != nil { + close(c.compactionsInterrupt) + } + if c.snapshotsInterrupt != nil { + close(c.snapshotsInterrupt) + } } // DisableSnapshots disables the compactor from performing snapshots. func (c *Compactor) DisableSnapshots() { c.mu.Lock() c.snapshotsEnabled = false + if c.snapshotsInterrupt != nil { + close(c.snapshotsInterrupt) + c.snapshotsInterrupt = nil + } c.mu.Unlock() } @@ -629,6 +647,9 @@ func (c *Compactor) DisableSnapshots() { func (c *Compactor) EnableSnapshots() { c.mu.Lock() c.snapshotsEnabled = true + if c.snapshotsInterrupt == nil { + c.snapshotsInterrupt = make(chan struct{}) + } c.mu.Unlock() } @@ -636,6 +657,10 @@ func (c *Compactor) EnableSnapshots() { func (c *Compactor) DisableCompactions() { c.mu.Lock() c.compactionsEnabled = false + if c.compactionsInterrupt != nil { + close(c.compactionsInterrupt) + c.compactionsInterrupt = nil + } c.mu.Unlock() } @@ -643,6 +668,9 @@ func (c *Compactor) DisableCompactions() { func (c *Compactor) EnableCompactions() { c.mu.Lock() c.compactionsEnabled = true + if c.compactionsInterrupt == nil { + c.compactionsInterrupt = make(chan struct{}) + } c.mu.Unlock() } @@ -650,13 +678,14 @@ func (c *Compactor) EnableCompactions() { func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { c.mu.RLock() enabled := c.snapshotsEnabled + intC := c.snapshotsInterrupt c.mu.RUnlock() if !enabled { return nil, errSnapshotsDisabled } - iter := NewCacheKeyIterator(cache, tsdb.DefaultMaxPointsPerBlock) + iter := NewCacheKeyIterator(cache, tsdb.DefaultMaxPointsPerBlock, intC) files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter) // See if we were disabled while writing a snapshot @@ -717,7 +746,11 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) { return nil, nil } - tsm, err := NewTSMKeyIterator(size, fast, trs...) + c.mu.RLock() + intC := c.compactionsInterrupt + c.mu.RUnlock() + + tsm, err := NewTSMKeyIterator(size, fast, intC, trs...) if err != nil { return nil, err } @@ -993,7 +1026,8 @@ type tsmKeyIterator struct { // merged are encoded blocks that have been combined or used as is // without decode - merged blocks + merged blocks + interrupt chan struct{} } type block struct { @@ -1045,7 +1079,7 @@ func (a blocks) Swap(i, j int) { a[i], a[j] = a[j], a[i] } // NewTSMKeyIterator returns a new TSM key iterator from readers. // size indicates the maximum number of values to encode in a single block. -func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator, error) { +func NewTSMKeyIterator(size int, fast bool, interrupt chan struct{}, readers ...*TSMReader) (KeyIterator, error) { var iter []*BlockIterator for _, r := range readers { iter = append(iter, r.BlockIterator()) @@ -1059,6 +1093,7 @@ func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator, iterators: iter, fast: fast, buf: make([]blocks, len(iter)), + interrupt: interrupt, }, nil } @@ -1199,6 +1234,13 @@ func (k *tsmKeyIterator) merge() { } func (k *tsmKeyIterator) Read() (string, int64, int64, []byte, error) { + // See if compactions were disabled while we were running. + select { + case <-k.interrupt: + return "", 0, 0, nil, errCompactionAborted + default: + } + if len(k.merged) == 0 { return "", 0, 0, nil, k.err } @@ -1224,9 +1266,10 @@ type cacheKeyIterator struct { size int order []string - i int - blocks [][]cacheBlock - ready []chan struct{} + i int + blocks [][]cacheBlock + ready []chan struct{} + interrupt chan struct{} } type cacheBlock struct { @@ -1237,7 +1280,7 @@ type cacheBlock struct { } // NewCacheKeyIterator returns a new KeyIterator from a Cache. -func NewCacheKeyIterator(cache *Cache, size int) KeyIterator { +func NewCacheKeyIterator(cache *Cache, size int, interrupt chan struct{}) KeyIterator { keys := cache.Keys() chans := make([]chan struct{}, len(keys)) @@ -1246,12 +1289,13 @@ func NewCacheKeyIterator(cache *Cache, size int) KeyIterator { } cki := &cacheKeyIterator{ - i: -1, - size: size, - cache: cache, - order: keys, - ready: chans, - blocks: make([][]cacheBlock, len(keys)), + i: -1, + size: size, + cache: cache, + order: keys, + ready: chans, + blocks: make([][]cacheBlock, len(keys)), + interrupt: interrupt, } go cki.encode() return cki @@ -1330,6 +1374,13 @@ func (c *cacheKeyIterator) Next() bool { } func (c *cacheKeyIterator) Read() (string, int64, int64, []byte, error) { + // See if snapshot compactions were disabled while we were running. + select { + case <-c.interrupt: + return "", 0, 0, nil, errCompactionAborted + default: + } + blk := c.blocks[c.i][0] return blk.k, blk.minTime, blk.maxTime, blk.b, blk.err } diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index ec903fd4a7a..5422f6cd9d3 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -830,7 +830,7 @@ func TestTSMKeyIterator_Single(t *testing.T) { r := MustTSMReader(dir, 1, writes) - iter, err := tsm1.NewTSMKeyIterator(1, false, r) + iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r) if err != nil { t.Fatalf("unexpected error creating WALKeyIterator: %v", err) } @@ -890,7 +890,7 @@ func TestTSMKeyIterator_Duplicate(t *testing.T) { r2 := MustTSMReader(dir, 2, writes2) - iter, err := tsm1.NewTSMKeyIterator(1, false, r1, r2) + iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r1, r2) if err != nil { t.Fatalf("unexpected error creating WALKeyIterator: %v", err) } @@ -951,7 +951,7 @@ func TestTSMKeyIterator_MultipleKeysDeleted(t *testing.T) { r2 := MustTSMReader(dir, 2, points2) r2.Delete([]string{"cpu,host=A#!~#count"}) - iter, err := tsm1.NewTSMKeyIterator(1, false, r1, r2) + iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r1, r2) if err != nil { t.Fatalf("unexpected error creating WALKeyIterator: %v", err) } @@ -993,6 +993,41 @@ func TestTSMKeyIterator_MultipleKeysDeleted(t *testing.T) { } } +// Tests that the TSMKeyIterator will abort if the interrupt channel is closed +func TestTSMKeyIterator_Abort(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + + v1 := tsm1.NewValue(1, 1.1) + writes := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v1}, + } + + r := MustTSMReader(dir, 1, writes) + + intC := make(chan struct{}) + iter, err := tsm1.NewTSMKeyIterator(1, false, intC, r) + if err != nil { + t.Fatalf("unexpected error creating WALKeyIterator: %v", err) + } + + var aborted bool + for iter.Next() { + // Abort + close(intC) + + _, _, _, _, err := iter.Read() + if err == nil { + t.Fatalf("unexpected error read: %v", err) + } + aborted = err != nil + } + + if !aborted { + t.Fatalf("iteration not aborted") + } +} + func TestCacheKeyIterator_Single(t *testing.T) { v0 := tsm1.NewValue(1, 1.0) @@ -1008,7 +1043,7 @@ func TestCacheKeyIterator_Single(t *testing.T) { } } - iter := tsm1.NewCacheKeyIterator(c, 1) + iter := tsm1.NewCacheKeyIterator(c, 1, nil) var readValues bool for iter.Next() { key, _, _, block, err := iter.Read() @@ -1056,7 +1091,7 @@ func TestCacheKeyIterator_Chunked(t *testing.T) { } } - iter := tsm1.NewCacheKeyIterator(c, 1) + iter := tsm1.NewCacheKeyIterator(c, 1, nil) var readValues bool var chunk int for iter.Next() { @@ -1090,6 +1125,43 @@ func TestCacheKeyIterator_Chunked(t *testing.T) { } } +// Tests that the CacheKeyIterator will abort if the interrupt channel is closed +func TestCacheKeyIterator_Abort(t *testing.T) { + v0 := tsm1.NewValue(1, 1.0) + + writes := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v0}, + } + + c := tsm1.NewCache(0, "") + + for k, v := range writes { + if err := c.Write(k, v); err != nil { + t.Fatalf("failed to write key foo to cache: %s", err.Error()) + } + } + + intC := make(chan struct{}) + + iter := tsm1.NewCacheKeyIterator(c, 1, intC) + + var aborted bool + for iter.Next() { + //Abort + close(intC) + + _, _, _, _, err := iter.Read() + if err == nil { + t.Fatalf("unexpected error read: %v", err) + } + aborted = err != nil + } + + if !aborted { + t.Fatalf("iteration not aborted") + } +} + func TestDefaultPlanner_Plan_Min(t *testing.T) { cp := tsm1.NewDefaultPlanner( &fakeFileStore{ diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 4d24adb5ddd..535865fd6aa 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -290,6 +290,7 @@ func (e *Engine) enableSnapshotCompactions() { return } + e.Compactor.EnableSnapshots() quit := make(chan struct{}) e.snapDone = quit e.snapWG.Add(1) @@ -304,6 +305,7 @@ func (e *Engine) disableSnapshotCompactions() { if e.snapDone != nil { close(e.snapDone) e.snapDone = nil + e.Compactor.DisableSnapshots() } e.mu.Unlock() diff --git a/tsdb/store.go b/tsdb/store.go index 2cc51ac2cd0..4a6b414b45e 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -570,7 +570,13 @@ func (s *Store) DeleteMeasurement(database, name string) error { shards := s.filterShards(byDatabase(database)) s.mu.RUnlock() + // Limit to 1 delete for each shard since expanding the measurement into the list + // of series keys can be very memory intensive if run concurrently. + limit := limiter.NewFixed(1) return s.walkShards(shards, func(sh *Shard) error { + limit.Take() + defer limit.Release() + if err := sh.DeleteMeasurement([]byte(name)); err != nil { return err } @@ -834,6 +840,10 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi s.mu.RLock() defer s.mu.RUnlock() + // Limit to 1 delete for each shard since expanding the measurement into the list + // of series keys can be very memory intensive if run concurrently. + limit := limiter.NewFixed(1) + return s.walkShards(shards, func(sh *Shard) error { // Determine list of measurements from sources. // Use all measurements if no FROM clause was provided. @@ -852,6 +862,9 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi } sort.Strings(names) + limit.Take() + defer limit.Release() + // Find matching series keys for each measurement. var keys [][]byte for _, name := range names {