Skip to content

Commit

Permalink
Allow queries to complete before closing TSM files
Browse files Browse the repository at this point in the history
If a query was running against a file being compacted, we close the file
and the query would end wherever it had read up to.  This could result
in queries that randomly lost data, but running them again showed the
full results.

We now use a reference counting approach and move the in-use files out
of the way in the filestore and allow the queries to complete against
the old tsm files.  The new files are installed and new queries will
use them.

Fixes #5501
  • Loading branch information
jwilder committed Jul 20, 2016
1 parent 16f5ff3 commit 0c9175e
Show file tree
Hide file tree
Showing 7 changed files with 410 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ With this release the systemd configuration files for InfluxDB will use the syst
- [#6819](https://github.com/influxdata/influxdb/issues/6819): Database unresponsive after DROP MEASUREMENT
- [#6796](https://github.com/influxdata/influxdb/issues/6796): Out of Memory Error when Dropping Measurement
- [#6946](https://github.com/influxdata/influxdb/issues/6946): Duplicate data for the same timestamp
- [#5501](https://github.com/influxdata/influxdb/issues/5501): Queries against files that have just been compacted need to point to new files

## v0.13.0 [2016-05-12]

Expand Down
133 changes: 130 additions & 3 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,22 @@ type TSMFile interface {
// Size returns the size of the file on disk in bytes.
Size() uint32

// Rename renames the existing TSM file to a new name and replaces the mmap backing slice using the new
// file name. Index and Reader state are not re-initialized.
Rename(path string) error

// Remove deletes the file from the filesystem
Remove() error

// Returns true if the file is current in use by queries
InUse() bool

// Ref records that this file is actively in use
Ref()

// Unref records that this file is no longer in user
Unref()

// Stats returns summary information about the TSM file.
Stats() FileStat

Expand All @@ -113,7 +126,8 @@ type FileStore struct {
Logger *log.Logger
traceLogging bool

stats *FileStoreStatistics
stats *FileStoreStatistics
purger *purger

currentTempDirID int
}
Expand All @@ -140,18 +154,24 @@ func (f FileStat) ContainsKey(key string) bool {
}

func NewFileStore(dir string) *FileStore {
logger := log.New(os.Stderr, "[filestore] ", log.LstdFlags)
return &FileStore{
dir: dir,
lastModified: time.Now(),
Logger: log.New(os.Stderr, "[filestore] ", log.LstdFlags),
Logger: logger,
stats: &FileStoreStatistics{},
purger: &purger{
files: map[string]TSMFile{},
logger: logger,
},
}
}

// SetLogOutput sets the logger used for all messages. It must not be called
// after the Open method has been called.
func (f *FileStore) SetLogOutput(w io.Writer) {
f.Logger = log.New(w, "[filestore] ", log.LstdFlags)
f.purger.logger = f.Logger
}

// FileStoreStatistics keeps statistics about the file store.
Expand Down Expand Up @@ -485,12 +505,43 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
}

// We need to prune our set of active files now
var active []TSMFile
var active, inuse []TSMFile
for _, file := range updated {
keep := true
for _, remove := range oldFiles {
if remove == file.Path() {
keep = false

// If queries running against this file, then we need to move it out of the
// way and let them complete. We'll then delete the original file to avoid
// blocking callers upstream. If the process crashes, the temp file is
// cleaned up at startup automatically.
if file.InUse() {
// Copy all the tombstones related to this TSM file
var deletes []string
for _, t := range file.TombstoneFiles() {
deletes = append(deletes, t.Path)
}
deletes = append(deletes, file.Path())

// Rename the TSM file used by this reader
tempPath := file.Path() + ".tmp"
if err := file.Rename(tempPath); err != nil {
return err
}

// Remove the old file and tombstones. We can't use the normal TSMReader.Remove()
// because it now refers to our temp file which we can't remove.
for _, f := range deletes {
if err := os.RemoveAll(f); err != nil {
return err
}
}

inuse = append(inuse, file)
break
}

if err := file.Close(); err != nil {
return err
}
Expand All @@ -511,6 +562,9 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
return err
}

// Tell the purger about our in-use files we need to remove
f.purger.add(inuse)

f.files = active
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))
Expand Down Expand Up @@ -718,6 +772,9 @@ type KeyCursor struct {
// If this is true, we need to scan the duplicate blocks and dedup the points
// as query time until they are compacted.
duplicates bool

// The distinct set of TSM files references by the cursor
refs map[string]TSMFile
}

type location struct {
Expand Down Expand Up @@ -773,6 +830,7 @@ func newKeyCursor(fs *FileStore, key string, t int64, ascending bool) *KeyCursor
fs: fs,
seeks: fs.locations(key, t, ascending),
ascending: ascending,
refs: map[string]TSMFile{},
}
c.duplicates = c.hasOverlappingBlocks()

Expand All @@ -782,12 +840,25 @@ func newKeyCursor(fs *FileStore, key string, t int64, ascending bool) *KeyCursor
sort.Sort(descLocations(c.seeks))
}

// Determine the distinct set of TSM files in use and mark then as in-use
for _, f := range c.seeks {
if _, ok := c.refs[f.r.Path()]; !ok {
f.r.Ref()
c.refs[f.r.Path()] = f.r
}
}

c.seek(t)
return c
}

// Close removes all references on the cursor.
func (c *KeyCursor) Close() {
// Remove all of our in-use references since we're done
for _, f := range c.refs {
f.Unref()
}

c.buf = nil
c.seeks = nil
c.fs = nil
Expand Down Expand Up @@ -974,6 +1045,62 @@ func (c *KeyCursor) filterBooleanValues(tombstones []TimeRange, values BooleanVa
return values
}

type purger struct {
mu sync.RWMutex
files map[string]TSMFile
running bool

logger *log.Logger
}

func (p *purger) add(files []TSMFile) {
p.mu.Lock()
for _, f := range files {
p.files[f.Path()] = f
}
p.mu.Unlock()
p.purge()
}

func (p *purger) purge() {
p.mu.Lock()
if p.running {
p.mu.Unlock()
return
}
p.running = true
p.mu.Unlock()

go func() {
for {
p.mu.Lock()
for k, v := range p.files {
if !v.InUse() {
if err := v.Close(); err != nil {
p.logger.Printf("purge: close file: %v", err)
continue
}

if err := v.Remove(); err != nil {
p.logger.Printf("purge: remove file: %v", err)
continue
}
delete(p.files, k)
}
}

if len(p.files) == 0 {
p.running = false
p.mu.Unlock()
return
}

p.mu.Unlock()
time.Sleep(time.Second)
}
}()
}

type tsmReaders []TSMFile

func (a tsmReaders) Len() int { return len(a) }
Expand Down
84 changes: 84 additions & 0 deletions tsdb/engine/tsm1/file_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
Expand Down Expand Up @@ -2020,6 +2021,89 @@ func TestFileStore_Remove(t *testing.T) {
}
}

func TestFileStore_Replace(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)

// Create 3 TSM files...
data := []keyValues{
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 1.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(1, 2.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, 3.0)}},
}

files, err := newFileDir(dir, data...)
if err != nil {
fatal(t, "creating test files", err)
}

// Replace requires assumes new files have a .tmp extension
replacement := files[2] + ".tmp"
os.Rename(files[2], replacement)

fs := tsm1.NewFileStore(dir)
if err := fs.Open(); err != nil {
fatal(t, "opening file store", err)
}
defer fs.Close()

if got, exp := fs.Count(), 2; got != exp {
t.Fatalf("file count mismatch: got %v, exp %v", got, exp)
}

// Should record references to the two existing TSM files
cur := fs.KeyCursor("cpu", 0, true)

// Should move the existing files out of the way, but allow query to complete
if err := fs.Replace(files[:2], []string{replacement}); err != nil {
t.Fatalf("replace: %v", err)
}

if got, exp := fs.Count(), 1; got != exp {
t.Fatalf("file count mismatch: got %v, exp %v", got, exp)
}

// There should be two blocks (1 in each file)
tdec := &tsm1.TimeDecoder{}
vdec := &tsm1.FloatDecoder{}
cur.Next()
buf := make([]tsm1.FloatValue, 10)
values, err := cur.ReadFloatBlock(tdec, vdec, &buf)
if got, exp := len(values), 1; got != exp {
t.Fatalf("value len mismatch: got %v, exp %v", got, exp)
}
cur.Next()
values, err = cur.ReadFloatBlock(tdec, vdec, &buf)
if got, exp := len(values), 1; got != exp {
t.Fatalf("value len mismatch: got %v, exp %v", got, exp)
}

// No more blocks for this cursor
cur.Next()
values, err = cur.ReadFloatBlock(tdec, vdec, &buf)
if got, exp := len(values), 0; got != exp {
t.Fatalf("value len mismatch: got %v, exp %v", got, exp)
}

// Release the references (files should get evicted by purger shortly)
cur.Close()

time.Sleep(time.Second)
// Make sure the two TSM files used by the cursor are gone
if _, err := os.Stat(files[0]); !os.IsNotExist(err) {
t.Fatalf("stat file: %v", err)
}
if _, err := os.Stat(files[1]); !os.IsNotExist(err) {
t.Fatalf("stat file: %v", err)
}

// Make sure the new file exists
if _, err := os.Stat(files[2]); err != nil {
t.Fatalf("stat file: %v", err)
}

}

func TestFileStore_Open_Deleted(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
Expand Down
8 changes: 8 additions & 0 deletions tsdb/engine/tsm1/iterator.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ func (c *floatAscendingCursor) nextTSM() {
c.tsm.keyCursor.Next()
c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
if len(c.tsm.values) == 0 {
c.tsm.keyCursor.Close()
return
}
c.tsm.pos = 0
Expand Down Expand Up @@ -493,6 +494,7 @@ func (c *floatDescendingCursor) nextTSM() {
c.tsm.keyCursor.Next()
c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
if len(c.tsm.values) == 0 {
c.tsm.keyCursor.Close()
return
}
c.tsm.pos = len(c.tsm.values) - 1
Expand Down Expand Up @@ -791,6 +793,7 @@ func (c *integerAscendingCursor) nextTSM() {
c.tsm.keyCursor.Next()
c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
if len(c.tsm.values) == 0 {
c.tsm.keyCursor.Close()
return
}
c.tsm.pos = 0
Expand Down Expand Up @@ -903,6 +906,7 @@ func (c *integerDescendingCursor) nextTSM() {
c.tsm.keyCursor.Next()
c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
if len(c.tsm.values) == 0 {
c.tsm.keyCursor.Close()
return
}
c.tsm.pos = len(c.tsm.values) - 1
Expand Down Expand Up @@ -1201,6 +1205,7 @@ func (c *stringAscendingCursor) nextTSM() {
c.tsm.keyCursor.Next()
c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
if len(c.tsm.values) == 0 {
c.tsm.keyCursor.Close()
return
}
c.tsm.pos = 0
Expand Down Expand Up @@ -1313,6 +1318,7 @@ func (c *stringDescendingCursor) nextTSM() {
c.tsm.keyCursor.Next()
c.tsm.values, _ = c.tsm.keyCursor.ReadStringBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
if len(c.tsm.values) == 0 {
c.tsm.keyCursor.Close()
return
}
c.tsm.pos = len(c.tsm.values) - 1
Expand Down Expand Up @@ -1611,6 +1617,7 @@ func (c *booleanAscendingCursor) nextTSM() {
c.tsm.keyCursor.Next()
c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
if len(c.tsm.values) == 0 {
c.tsm.keyCursor.Close()
return
}
c.tsm.pos = 0
Expand Down Expand Up @@ -1723,6 +1730,7 @@ func (c *booleanDescendingCursor) nextTSM() {
c.tsm.keyCursor.Next()
c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanBlock(&c.tsm.tdec, &c.tsm.vdec, &c.tsm.buf)
if len(c.tsm.values) == 0 {
c.tsm.keyCursor.Close()
return
}
c.tsm.pos = len(c.tsm.values) - 1
Expand Down
Loading

0 comments on commit 0c9175e

Please sign in to comment.