Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport compaction and delete fixes #8647

Merged
merged 3 commits into from
Jul 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## v1.3.2 [unreleased]

### Bugfixes

- [#8629](https://github.com/influxdata/influxdb/pull/8629): Interrupt in progress TSM compactions
- [#8630](https://github.com/influxdata/influxdb/pull/8630): Prevent excessive memory usage when dropping series

## v1.3.1 [2017-07-20]

### Bugfixes
Expand Down
6 changes: 3 additions & 3 deletions circle-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ set -e
DIR=$(cd $(dirname "${BASH_SOURCE[0]}") && pwd)
cd $DIR

export OUTPUT_DIR="$CIRCLE_ARTIFACTS"
export OUTPUT_DIR="$CIRCLE_ARTIFACTS"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change appears in neither #8630 or #8629. Should it be in this backport?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's new for this branch. It prevents circle from running tests because 1.3 uses 3 builders, but master is using 5.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh.. this line looks like it had trailing white space committed for some reason. My editor removed it.

# Don't delete the container since CircleCI doesn't have permission to do so.
export DOCKER_RM="false"

# Get number of test environments.
count=$(./test.sh count)
# Check that we aren't wasting CircleCI nodes.
if [ $CIRCLE_NODE_TOTAL -gt $count ]
if [ $CIRCLE_NODE_INDEX -gt $((count - 1)) ]
then
echo "More CircleCI nodes allocated than tests environments to run!"
exit 1
exit 0
fi

# Map CircleCI nodes to test environments.
Expand Down
79 changes: 65 additions & 14 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,11 @@ type Compactor struct {
snapshotsEnabled bool
compactionsEnabled bool

// The channel to signal that any in progress snapshots should be aborted.
snapshotsInterrupt chan struct{}
// The channel to signal that any in progress level compactions should be aborted.
compactionsInterrupt chan struct{}

files map[string]struct{}
}

Expand All @@ -604,6 +609,9 @@ func (c *Compactor) Open() {

c.snapshotsEnabled = true
c.compactionsEnabled = true
c.snapshotsInterrupt = make(chan struct{})
c.compactionsInterrupt = make(chan struct{})

c.files = make(map[string]struct{})
}

Expand All @@ -616,47 +624,68 @@ func (c *Compactor) Close() {
}
c.snapshotsEnabled = false
c.compactionsEnabled = false
if c.compactionsInterrupt != nil {
close(c.compactionsInterrupt)
}
if c.snapshotsInterrupt != nil {
close(c.snapshotsInterrupt)
}
}

// DisableSnapshots disables the compactor from performing snapshots.
func (c *Compactor) DisableSnapshots() {
c.mu.Lock()
c.snapshotsEnabled = false
if c.snapshotsInterrupt != nil {
close(c.snapshotsInterrupt)
c.snapshotsInterrupt = nil
}
c.mu.Unlock()
}

// EnableSnapshots allows the compactor to perform snapshots.
func (c *Compactor) EnableSnapshots() {
c.mu.Lock()
c.snapshotsEnabled = true
if c.snapshotsInterrupt == nil {
c.snapshotsInterrupt = make(chan struct{})
}
c.mu.Unlock()
}

// DisableSnapshots disables the compactor from performing compactions.
func (c *Compactor) DisableCompactions() {
c.mu.Lock()
c.compactionsEnabled = false
if c.compactionsInterrupt != nil {
close(c.compactionsInterrupt)
c.compactionsInterrupt = nil
}
c.mu.Unlock()
}

// EnableCompactions allows the compactor to perform compactions.
func (c *Compactor) EnableCompactions() {
c.mu.Lock()
c.compactionsEnabled = true
if c.compactionsInterrupt == nil {
c.compactionsInterrupt = make(chan struct{})
}
c.mu.Unlock()
}

// WriteSnapshot writes a Cache snapshot to one or more new TSM files.
func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
c.mu.RLock()
enabled := c.snapshotsEnabled
intC := c.snapshotsInterrupt
c.mu.RUnlock()

if !enabled {
return nil, errSnapshotsDisabled
}

iter := NewCacheKeyIterator(cache, tsdb.DefaultMaxPointsPerBlock)
iter := NewCacheKeyIterator(cache, tsdb.DefaultMaxPointsPerBlock, intC)
files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter)

// See if we were disabled while writing a snapshot
Expand Down Expand Up @@ -717,7 +746,11 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
return nil, nil
}

tsm, err := NewTSMKeyIterator(size, fast, trs...)
c.mu.RLock()
intC := c.compactionsInterrupt
c.mu.RUnlock()

tsm, err := NewTSMKeyIterator(size, fast, intC, trs...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -993,7 +1026,8 @@ type tsmKeyIterator struct {

// merged are encoded blocks that have been combined or used as is
// without decode
merged blocks
merged blocks
interrupt chan struct{}
}

type block struct {
Expand Down Expand Up @@ -1045,7 +1079,7 @@ func (a blocks) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

// NewTSMKeyIterator returns a new TSM key iterator from readers.
// size indicates the maximum number of values to encode in a single block.
func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator, error) {
func NewTSMKeyIterator(size int, fast bool, interrupt chan struct{}, readers ...*TSMReader) (KeyIterator, error) {
var iter []*BlockIterator
for _, r := range readers {
iter = append(iter, r.BlockIterator())
Expand All @@ -1059,6 +1093,7 @@ func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator,
iterators: iter,
fast: fast,
buf: make([]blocks, len(iter)),
interrupt: interrupt,
}, nil
}

Expand Down Expand Up @@ -1199,6 +1234,13 @@ func (k *tsmKeyIterator) merge() {
}

func (k *tsmKeyIterator) Read() (string, int64, int64, []byte, error) {
// See if compactions were disabled while we were running.
select {
case <-k.interrupt:
return "", 0, 0, nil, errCompactionAborted
default:
}

if len(k.merged) == 0 {
return "", 0, 0, nil, k.err
}
Expand All @@ -1224,9 +1266,10 @@ type cacheKeyIterator struct {
size int
order []string

i int
blocks [][]cacheBlock
ready []chan struct{}
i int
blocks [][]cacheBlock
ready []chan struct{}
interrupt chan struct{}
}

type cacheBlock struct {
Expand All @@ -1237,7 +1280,7 @@ type cacheBlock struct {
}

// NewCacheKeyIterator returns a new KeyIterator from a Cache.
func NewCacheKeyIterator(cache *Cache, size int) KeyIterator {
func NewCacheKeyIterator(cache *Cache, size int, interrupt chan struct{}) KeyIterator {
keys := cache.Keys()

chans := make([]chan struct{}, len(keys))
Expand All @@ -1246,12 +1289,13 @@ func NewCacheKeyIterator(cache *Cache, size int) KeyIterator {
}

cki := &cacheKeyIterator{
i: -1,
size: size,
cache: cache,
order: keys,
ready: chans,
blocks: make([][]cacheBlock, len(keys)),
i: -1,
size: size,
cache: cache,
order: keys,
ready: chans,
blocks: make([][]cacheBlock, len(keys)),
interrupt: interrupt,
}
go cki.encode()
return cki
Expand Down Expand Up @@ -1330,6 +1374,13 @@ func (c *cacheKeyIterator) Next() bool {
}

func (c *cacheKeyIterator) Read() (string, int64, int64, []byte, error) {
// See if snapshot compactions were disabled while we were running.
select {
case <-c.interrupt:
return "", 0, 0, nil, errCompactionAborted
default:
}

blk := c.blocks[c.i][0]
return blk.k, blk.minTime, blk.maxTime, blk.b, blk.err
}
Expand Down
82 changes: 77 additions & 5 deletions tsdb/engine/tsm1/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ func TestTSMKeyIterator_Single(t *testing.T) {

r := MustTSMReader(dir, 1, writes)

iter, err := tsm1.NewTSMKeyIterator(1, false, r)
iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
Expand Down Expand Up @@ -890,7 +890,7 @@ func TestTSMKeyIterator_Duplicate(t *testing.T) {

r2 := MustTSMReader(dir, 2, writes2)

iter, err := tsm1.NewTSMKeyIterator(1, false, r1, r2)
iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r1, r2)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
Expand Down Expand Up @@ -951,7 +951,7 @@ func TestTSMKeyIterator_MultipleKeysDeleted(t *testing.T) {
r2 := MustTSMReader(dir, 2, points2)
r2.Delete([]string{"cpu,host=A#!~#count"})

iter, err := tsm1.NewTSMKeyIterator(1, false, r1, r2)
iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r1, r2)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
Expand Down Expand Up @@ -993,6 +993,41 @@ func TestTSMKeyIterator_MultipleKeysDeleted(t *testing.T) {
}
}

// Tests that the TSMKeyIterator will abort if the interrupt channel is closed
func TestTSMKeyIterator_Abort(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)

v1 := tsm1.NewValue(1, 1.1)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v1},
}

r := MustTSMReader(dir, 1, writes)

intC := make(chan struct{})
iter, err := tsm1.NewTSMKeyIterator(1, false, intC, r)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}

var aborted bool
for iter.Next() {
// Abort
close(intC)

_, _, _, _, err := iter.Read()
if err == nil {
t.Fatalf("unexpected error read: %v", err)
}
aborted = err != nil
}

if !aborted {
t.Fatalf("iteration not aborted")
}
}

func TestCacheKeyIterator_Single(t *testing.T) {
v0 := tsm1.NewValue(1, 1.0)

Expand All @@ -1008,7 +1043,7 @@ func TestCacheKeyIterator_Single(t *testing.T) {
}
}

iter := tsm1.NewCacheKeyIterator(c, 1)
iter := tsm1.NewCacheKeyIterator(c, 1, nil)
var readValues bool
for iter.Next() {
key, _, _, block, err := iter.Read()
Expand Down Expand Up @@ -1056,7 +1091,7 @@ func TestCacheKeyIterator_Chunked(t *testing.T) {
}
}

iter := tsm1.NewCacheKeyIterator(c, 1)
iter := tsm1.NewCacheKeyIterator(c, 1, nil)
var readValues bool
var chunk int
for iter.Next() {
Expand Down Expand Up @@ -1090,6 +1125,43 @@ func TestCacheKeyIterator_Chunked(t *testing.T) {
}
}

// Tests that the CacheKeyIterator will abort if the interrupt channel is closed
func TestCacheKeyIterator_Abort(t *testing.T) {
v0 := tsm1.NewValue(1, 1.0)

writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v0},
}

c := tsm1.NewCache(0, "")

for k, v := range writes {
if err := c.Write(k, v); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}
}

intC := make(chan struct{})

iter := tsm1.NewCacheKeyIterator(c, 1, intC)

var aborted bool
for iter.Next() {
//Abort
close(intC)

_, _, _, _, err := iter.Read()
if err == nil {
t.Fatalf("unexpected error read: %v", err)
}
aborted = err != nil
}

if !aborted {
t.Fatalf("iteration not aborted")
}
}

func TestDefaultPlanner_Plan_Min(t *testing.T) {
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
Expand Down
2 changes: 2 additions & 0 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ func (e *Engine) enableSnapshotCompactions() {
return
}

e.Compactor.EnableSnapshots()
quit := make(chan struct{})
e.snapDone = quit
e.snapWG.Add(1)
Expand All @@ -304,6 +305,7 @@ func (e *Engine) disableSnapshotCompactions() {
if e.snapDone != nil {
close(e.snapDone)
e.snapDone = nil
e.Compactor.DisableSnapshots()
}

e.mu.Unlock()
Expand Down
Loading