Skip to content

Commit

Permalink
Interrupt in progress TSM compactions
Browse files Browse the repository at this point in the history
When snapshots and compactions are disabled, the check to see if
the compaction should be aborted occurs in between writing to the
next TSM file.  If a large compaction is running, it might take
a while for the file to be finished writing causing long delays.

This now interrupts compactions while iterating over the blocks to
write which allows them to abort immediately.
  • Loading branch information
jwilder committed Jul 27, 2017
1 parent 5887e92 commit 42b82e7
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 19 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## v1.3.2 [unreleased]

### Bugfixes

- [#8629](https://github.com/influxdata/influxdb/pull/8629): Interrupt in progress TSM compactions

## v1.3.1 [2017-07-20]

### Bugfixes
Expand Down
79 changes: 65 additions & 14 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,11 @@ type Compactor struct {
snapshotsEnabled bool
compactionsEnabled bool

// The channel to signal that any in progress snapshots should be aborted.
snapshotsInterrupt chan struct{}
// The channel to signal that any in progress level compactions should be aborted.
compactionsInterrupt chan struct{}

files map[string]struct{}
}

Expand All @@ -604,6 +609,9 @@ func (c *Compactor) Open() {

c.snapshotsEnabled = true
c.compactionsEnabled = true
c.snapshotsInterrupt = make(chan struct{})
c.compactionsInterrupt = make(chan struct{})

c.files = make(map[string]struct{})
}

Expand All @@ -616,47 +624,68 @@ func (c *Compactor) Close() {
}
c.snapshotsEnabled = false
c.compactionsEnabled = false
if c.compactionsInterrupt != nil {
close(c.compactionsInterrupt)
}
if c.snapshotsInterrupt != nil {
close(c.snapshotsInterrupt)
}
}

// DisableSnapshots disables the compactor from performing snapshots.
func (c *Compactor) DisableSnapshots() {
c.mu.Lock()
c.snapshotsEnabled = false
if c.snapshotsInterrupt != nil {
close(c.snapshotsInterrupt)
c.snapshotsInterrupt = nil
}
c.mu.Unlock()
}

// EnableSnapshots allows the compactor to perform snapshots.
func (c *Compactor) EnableSnapshots() {
c.mu.Lock()
c.snapshotsEnabled = true
if c.snapshotsInterrupt == nil {
c.snapshotsInterrupt = make(chan struct{})
}
c.mu.Unlock()
}

// DisableSnapshots disables the compactor from performing compactions.
func (c *Compactor) DisableCompactions() {
c.mu.Lock()
c.compactionsEnabled = false
if c.compactionsInterrupt != nil {
close(c.compactionsInterrupt)
c.compactionsInterrupt = nil
}
c.mu.Unlock()
}

// EnableCompactions allows the compactor to perform compactions.
func (c *Compactor) EnableCompactions() {
c.mu.Lock()
c.compactionsEnabled = true
if c.compactionsInterrupt == nil {
c.compactionsInterrupt = make(chan struct{})
}
c.mu.Unlock()
}

// WriteSnapshot writes a Cache snapshot to one or more new TSM files.
func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
c.mu.RLock()
enabled := c.snapshotsEnabled
intC := c.snapshotsInterrupt
c.mu.RUnlock()

if !enabled {
return nil, errSnapshotsDisabled
}

iter := NewCacheKeyIterator(cache, tsdb.DefaultMaxPointsPerBlock)
iter := NewCacheKeyIterator(cache, tsdb.DefaultMaxPointsPerBlock, intC)
files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter)

// See if we were disabled while writing a snapshot
Expand Down Expand Up @@ -717,7 +746,11 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
return nil, nil
}

tsm, err := NewTSMKeyIterator(size, fast, trs...)
c.mu.RLock()
intC := c.compactionsInterrupt
c.mu.RUnlock()

tsm, err := NewTSMKeyIterator(size, fast, intC, trs...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -993,7 +1026,8 @@ type tsmKeyIterator struct {

// merged are encoded blocks that have been combined or used as is
// without decode
merged blocks
merged blocks
interrupt chan struct{}
}

type block struct {
Expand Down Expand Up @@ -1045,7 +1079,7 @@ func (a blocks) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

// NewTSMKeyIterator returns a new TSM key iterator from readers.
// size indicates the maximum number of values to encode in a single block.
func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator, error) {
func NewTSMKeyIterator(size int, fast bool, interrupt chan struct{}, readers ...*TSMReader) (KeyIterator, error) {
var iter []*BlockIterator
for _, r := range readers {
iter = append(iter, r.BlockIterator())
Expand All @@ -1059,6 +1093,7 @@ func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator,
iterators: iter,
fast: fast,
buf: make([]blocks, len(iter)),
interrupt: interrupt,
}, nil
}

Expand Down Expand Up @@ -1199,6 +1234,13 @@ func (k *tsmKeyIterator) merge() {
}

func (k *tsmKeyIterator) Read() (string, int64, int64, []byte, error) {
// See if compactions were disabled while we were running.
select {
case <-k.interrupt:
return "", 0, 0, nil, errCompactionAborted
default:
}

if len(k.merged) == 0 {
return "", 0, 0, nil, k.err
}
Expand All @@ -1224,9 +1266,10 @@ type cacheKeyIterator struct {
size int
order []string

i int
blocks [][]cacheBlock
ready []chan struct{}
i int
blocks [][]cacheBlock
ready []chan struct{}
interrupt chan struct{}
}

type cacheBlock struct {
Expand All @@ -1237,7 +1280,7 @@ type cacheBlock struct {
}

// NewCacheKeyIterator returns a new KeyIterator from a Cache.
func NewCacheKeyIterator(cache *Cache, size int) KeyIterator {
func NewCacheKeyIterator(cache *Cache, size int, interrupt chan struct{}) KeyIterator {
keys := cache.Keys()

chans := make([]chan struct{}, len(keys))
Expand All @@ -1246,12 +1289,13 @@ func NewCacheKeyIterator(cache *Cache, size int) KeyIterator {
}

cki := &cacheKeyIterator{
i: -1,
size: size,
cache: cache,
order: keys,
ready: chans,
blocks: make([][]cacheBlock, len(keys)),
i: -1,
size: size,
cache: cache,
order: keys,
ready: chans,
blocks: make([][]cacheBlock, len(keys)),
interrupt: interrupt,
}
go cki.encode()
return cki
Expand Down Expand Up @@ -1330,6 +1374,13 @@ func (c *cacheKeyIterator) Next() bool {
}

func (c *cacheKeyIterator) Read() (string, int64, int64, []byte, error) {
// See if snapshot compactions were disabled while we were running.
select {
case <-c.interrupt:
return "", 0, 0, nil, errCompactionAborted
default:
}

blk := c.blocks[c.i][0]
return blk.k, blk.minTime, blk.maxTime, blk.b, blk.err
}
Expand Down
82 changes: 77 additions & 5 deletions tsdb/engine/tsm1/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ func TestTSMKeyIterator_Single(t *testing.T) {

r := MustTSMReader(dir, 1, writes)

iter, err := tsm1.NewTSMKeyIterator(1, false, r)
iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
Expand Down Expand Up @@ -890,7 +890,7 @@ func TestTSMKeyIterator_Duplicate(t *testing.T) {

r2 := MustTSMReader(dir, 2, writes2)

iter, err := tsm1.NewTSMKeyIterator(1, false, r1, r2)
iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r1, r2)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
Expand Down Expand Up @@ -951,7 +951,7 @@ func TestTSMKeyIterator_MultipleKeysDeleted(t *testing.T) {
r2 := MustTSMReader(dir, 2, points2)
r2.Delete([]string{"cpu,host=A#!~#count"})

iter, err := tsm1.NewTSMKeyIterator(1, false, r1, r2)
iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r1, r2)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
Expand Down Expand Up @@ -993,6 +993,41 @@ func TestTSMKeyIterator_MultipleKeysDeleted(t *testing.T) {
}
}

// Tests that the TSMKeyIterator will abort if the interrupt channel is closed
func TestTSMKeyIterator_Abort(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)

v1 := tsm1.NewValue(1, 1.1)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v1},
}

r := MustTSMReader(dir, 1, writes)

intC := make(chan struct{})
iter, err := tsm1.NewTSMKeyIterator(1, false, intC, r)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}

var aborted bool
for iter.Next() {
// Abort
close(intC)

_, _, _, _, err := iter.Read()
if err == nil {
t.Fatalf("unexpected error read: %v", err)
}
aborted = err != nil
}

if !aborted {
t.Fatalf("iteration not aborted")
}
}

func TestCacheKeyIterator_Single(t *testing.T) {
v0 := tsm1.NewValue(1, 1.0)

Expand All @@ -1008,7 +1043,7 @@ func TestCacheKeyIterator_Single(t *testing.T) {
}
}

iter := tsm1.NewCacheKeyIterator(c, 1)
iter := tsm1.NewCacheKeyIterator(c, 1, nil)
var readValues bool
for iter.Next() {
key, _, _, block, err := iter.Read()
Expand Down Expand Up @@ -1056,7 +1091,7 @@ func TestCacheKeyIterator_Chunked(t *testing.T) {
}
}

iter := tsm1.NewCacheKeyIterator(c, 1)
iter := tsm1.NewCacheKeyIterator(c, 1, nil)
var readValues bool
var chunk int
for iter.Next() {
Expand Down Expand Up @@ -1090,6 +1125,43 @@ func TestCacheKeyIterator_Chunked(t *testing.T) {
}
}

// Tests that the CacheKeyIterator will abort if the interrupt channel is closed
func TestCacheKeyIterator_Abort(t *testing.T) {
v0 := tsm1.NewValue(1, 1.0)

writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v0},
}

c := tsm1.NewCache(0, "")

for k, v := range writes {
if err := c.Write(k, v); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}
}

intC := make(chan struct{})

iter := tsm1.NewCacheKeyIterator(c, 1, intC)

var aborted bool
for iter.Next() {
//Abort
close(intC)

_, _, _, _, err := iter.Read()
if err == nil {
t.Fatalf("unexpected error read: %v", err)
}
aborted = err != nil
}

if !aborted {
t.Fatalf("iteration not aborted")
}
}

func TestDefaultPlanner_Plan_Min(t *testing.T) {
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
Expand Down
2 changes: 2 additions & 0 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ func (e *Engine) enableSnapshotCompactions() {
return
}

e.Compactor.EnableSnapshots()
quit := make(chan struct{})
e.snapDone = quit
e.snapWG.Add(1)
Expand All @@ -304,6 +305,7 @@ func (e *Engine) disableSnapshotCompactions() {
if e.snapDone != nil {
close(e.snapDone)
e.snapDone = nil
e.Compactor.DisableSnapshots()
}

e.mu.Unlock()
Expand Down

0 comments on commit 42b82e7

Please sign in to comment.