Skip to content

Commit

Permalink
Merge pull request #7084 from influxdata/jw-tombstones
Browse files Browse the repository at this point in the history
Tombstone memory improvements
  • Loading branch information
jwilder authored Jul 29, 2016
2 parents 644693a + 5576e7f commit 37674d2
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 94 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ With this release the systemd configuration files for InfluxDB will use the syst
- [#7088](https://github.com/influxdata/influxdb/pull/7088): Fix UDP pointsRx being incremented twice.
- [#7080](https://github.com/influxdata/influxdb/pull/7080): Ensure IDs can't clash when managing Continuous Queries.
- [#6990](https://github.com/influxdata/influxdb/issues/6990): Fix panic parsing empty key
- [#7084](https://github.com/influxdata/influxdb/pull/7084): Tombstone memory improvements

## v0.13.0 [2016-05-12]

Expand Down
2 changes: 2 additions & 0 deletions influxql/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ func (a auxIteratorFields) sendError(err error) {

// DrainIterator reads all points from an iterator.
func DrainIterator(itr Iterator) {
defer itr.Close()
switch itr := itr.(type) {
case FloatIterator:
for p, _ := itr.Next(); p != nil; p, _ = itr.Next() {
Expand All @@ -534,6 +535,7 @@ func DrainIterator(itr Iterator) {

// DrainIterators reads all points from all iterators.
func DrainIterators(itrs []Iterator) {
defer Iterators(itrs).Close()
for {
var hasData bool

Expand Down
27 changes: 21 additions & 6 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ func (e *Engine) SetCompactionsEnabled(enabled bool) {

// Wait for compaction goroutines to exit
e.wg.Wait()

if err := e.cleanup(); err != nil {
e.logger.Printf("error cleaning up temp file: %v", err)
}
}
}

Expand Down Expand Up @@ -627,17 +631,21 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error {

// keyMap is used to see if a given key should be deleted. seriesKey
// are the measurement + tagset (minus separate & field)
keyMap := map[string]struct{}{}
keyMap := make(map[string]int, len(seriesKeys))
for _, k := range seriesKeys {
keyMap[k] = struct{}{}
keyMap[k] = 0
}

var deleteKeys []string
deleteKeys := make([]string, 0, len(seriesKeys))
// go through the keys in the file store
if err := e.FileStore.WalkKeys(func(k string, _ byte) error {
seriesKey, _ := SeriesAndFieldFromCompositeKey(k)
if _, ok := keyMap[seriesKey]; ok {

// Keep track if we've added this key since WalkKeys can return keys
// we've seen before
if v, ok := keyMap[seriesKey]; ok && v == 0 {
deleteKeys = append(deleteKeys, k)
keyMap[seriesKey] += 1
}
return nil
}); err != nil {
Expand All @@ -648,8 +656,12 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error {
return err
}

// reset the counts
for k := range keyMap {
keyMap[k] = 0
}
// find the keys in the cache and remove them
walKeys := make([]string, 0)
walKeys := deleteKeys[:0]
e.Cache.RLock()
s := e.Cache.Store()
for k, _ := range s {
Expand Down Expand Up @@ -1065,6 +1077,10 @@ func (e *Engine) cleanup() error {
}
}

return e.cleanupTempTSMFiles()
}

func (e *Engine) cleanupTempTSMFiles() error {
files, err := filepath.Glob(filepath.Join(e.path, fmt.Sprintf("*.%s", CompactionTempExtension)))
if err != nil {
return fmt.Errorf("error getting compaction temp files: %s", err.Error())
Expand All @@ -1075,7 +1091,6 @@ func (e *Engine) cleanup() error {
return fmt.Errorf("error removing temp compaction files: %v", err)
}
}

return nil
}

Expand Down
45 changes: 34 additions & 11 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,16 +331,12 @@ func (f *FileStore) Delete(keys []string) error {
// DeleteRange removes the values for keys between min and max.
func (f *FileStore) DeleteRange(keys []string, min, max int64) error {
f.mu.Lock()
defer f.mu.Unlock()

f.lastModified = time.Now()
f.mu.Unlock()

for _, file := range f.files {
if err := file.DeleteRange(keys, min, max); err != nil {
return err
}
}
return nil
return f.walkFiles(func(tsm TSMFile) error {
return tsm.DeleteRange(keys, min, max)
})
}

func (f *FileStore) Open() error {
Expand Down Expand Up @@ -557,7 +553,7 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
}

inuse = append(inuse, file)
break
continue
}

if err := file.Close(); err != nil {
Expand Down Expand Up @@ -633,6 +629,34 @@ func (f *FileStore) BlockCount(path string, idx int) int {
return 0
}

// walkFiles calls fn for every files in filestore in parallel
func (f *FileStore) walkFiles(fn func(f TSMFile) error) error {
f.mu.RLock()
defer f.mu.RUnlock()

// struct to hold the result of opening each reader in a goroutine

errC := make(chan error, len(f.files))
for _, f := range f.files {
go func(tsm TSMFile) {
if err := fn(tsm); err != nil {
errC <- fmt.Errorf("file %s: %s", tsm.Path(), err)
return
}

errC <- nil
}(f)
}

for i := 0; i < cap(errC); i++ {
res := <-errC
if res != nil {
return res
}
}
return nil
}

// locations returns the files and index blocks for a key and time. ascending indicates
// whether the key will be scan in ascending time order or descenging time order.
// This function assumes the read-lock has been taken.
Expand Down Expand Up @@ -736,9 +760,8 @@ func (f *FileStore) CreateSnapshot() (string, error) {
}
// Check for tombstones and link those as well
for _, tf := range tsmf.TombstoneFiles() {
tfpath := filepath.Join(f.dir, tf.Path)
newpath := filepath.Join(tmpPath, filepath.Base(tf.Path))
if err := os.Link(tfpath, newpath); err != nil {
if err := os.Link(tf.Path, newpath); err != nil {
return "", fmt.Errorf("error creating tombstone hard link: %q", err)
}
}
Expand Down
65 changes: 61 additions & 4 deletions tsdb/engine/tsm1/iterator.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type cursor interface {
// cursorAt provides a bufferred cursor interface.
// This required for literal value cursors which don't have a time value.
type cursorAt interface {
close() error
peek() (k int64, v interface{})
nextAt(seek int64) interface{}
}
Expand All @@ -47,6 +48,10 @@ func newBufCursor(cur cursor, ascending bool) *bufCursor {
return &bufCursor{cur: cur, ascending: ascending}
}

func (c *bufCursor) close() error {
return c.cur.close()
}

// next returns the buffer, if filled. Otherwise returns the next key/value from the cursor.
func (c *bufCursor) next() (int64, interface{}) {
if c.buf.filled {
Expand Down Expand Up @@ -229,7 +234,18 @@ func (itr *floatIterator) Stats() influxql.IteratorStats {
}

// Close closes the iterator.
func (itr *floatIterator) Close() error { return itr.cur.close() }
func (itr *floatIterator) Close() error {
for _, c := range itr.aux {
c.close()
}
for _, c := range itr.conds.curs {
c.close()
}
if itr.cur != nil {
return itr.cur.close()
}
return nil
}

// floatLimitIterator
type floatLimitIterator struct {
Expand Down Expand Up @@ -518,6 +534,7 @@ type floatLiteralCursor struct {
value float64
}

func (c *floatLiteralCursor) close() error { return nil }
func (c *floatLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *floatLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *floatLiteralCursor) nextAt(seek int64) interface{} { return c.value }
Expand All @@ -526,6 +543,7 @@ func (c *floatLiteralCursor) nextAt(seek int64) interface{} { return c.value }
// It doesn't not have a time value so it can only be used with nextAt().
type floatNilLiteralCursor struct{}

func (c *floatNilLiteralCursor) close() error { return nil }
func (c *floatNilLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, (*float64)(nil) }
func (c *floatNilLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, (*float64)(nil) }
func (c *floatNilLiteralCursor) nextAt(seek int64) interface{} { return (*float64)(nil) }
Expand Down Expand Up @@ -651,7 +669,18 @@ func (itr *integerIterator) Stats() influxql.IteratorStats {
}

// Close closes the iterator.
func (itr *integerIterator) Close() error { return itr.cur.close() }
func (itr *integerIterator) Close() error {
for _, c := range itr.aux {
c.close()
}
for _, c := range itr.conds.curs {
c.close()
}
if itr.cur != nil {
return itr.cur.close()
}
return nil
}

// integerLimitIterator
type integerLimitIterator struct {
Expand Down Expand Up @@ -940,6 +969,7 @@ type integerLiteralCursor struct {
value int64
}

func (c *integerLiteralCursor) close() error { return nil }
func (c *integerLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *integerLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *integerLiteralCursor) nextAt(seek int64) interface{} { return c.value }
Expand All @@ -948,6 +978,7 @@ func (c *integerLiteralCursor) nextAt(seek int64) interface{} { return c.value
// It doesn't not have a time value so it can only be used with nextAt().
type integerNilLiteralCursor struct{}

func (c *integerNilLiteralCursor) close() error { return nil }
func (c *integerNilLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, (*int64)(nil) }
func (c *integerNilLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, (*int64)(nil) }
func (c *integerNilLiteralCursor) nextAt(seek int64) interface{} { return (*int64)(nil) }
Expand Down Expand Up @@ -1073,7 +1104,18 @@ func (itr *stringIterator) Stats() influxql.IteratorStats {
}

// Close closes the iterator.
func (itr *stringIterator) Close() error { return itr.cur.close() }
func (itr *stringIterator) Close() error {
for _, c := range itr.aux {
c.close()
}
for _, c := range itr.conds.curs {
c.close()
}
if itr.cur != nil {
return itr.cur.close()
}
return nil
}

// stringLimitIterator
type stringLimitIterator struct {
Expand Down Expand Up @@ -1362,6 +1404,7 @@ type stringLiteralCursor struct {
value string
}

func (c *stringLiteralCursor) close() error { return nil }
func (c *stringLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *stringLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *stringLiteralCursor) nextAt(seek int64) interface{} { return c.value }
Expand All @@ -1370,6 +1413,7 @@ func (c *stringLiteralCursor) nextAt(seek int64) interface{} { return c.value }
// It doesn't not have a time value so it can only be used with nextAt().
type stringNilLiteralCursor struct{}

func (c *stringNilLiteralCursor) close() error { return nil }
func (c *stringNilLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, (*string)(nil) }
func (c *stringNilLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, (*string)(nil) }
func (c *stringNilLiteralCursor) nextAt(seek int64) interface{} { return (*string)(nil) }
Expand Down Expand Up @@ -1495,7 +1539,18 @@ func (itr *booleanIterator) Stats() influxql.IteratorStats {
}

// Close closes the iterator.
func (itr *booleanIterator) Close() error { return itr.cur.close() }
func (itr *booleanIterator) Close() error {
for _, c := range itr.aux {
c.close()
}
for _, c := range itr.conds.curs {
c.close()
}
if itr.cur != nil {
return itr.cur.close()
}
return nil
}

// booleanLimitIterator
type booleanLimitIterator struct {
Expand Down Expand Up @@ -1784,6 +1839,7 @@ type booleanLiteralCursor struct {
value bool
}

func (c *booleanLiteralCursor) close() error { return nil }
func (c *booleanLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *booleanLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *booleanLiteralCursor) nextAt(seek int64) interface{} { return c.value }
Expand All @@ -1792,6 +1848,7 @@ func (c *booleanLiteralCursor) nextAt(seek int64) interface{} { return c.value
// It doesn't not have a time value so it can only be used with nextAt().
type booleanNilLiteralCursor struct{}

func (c *booleanNilLiteralCursor) close() error { return nil }
func (c *booleanNilLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, (*bool)(nil) }
func (c *booleanNilLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, (*bool)(nil) }
func (c *booleanNilLiteralCursor) nextAt(seek int64) interface{} { return (*bool)(nil) }
Expand Down
20 changes: 19 additions & 1 deletion tsdb/engine/tsm1/iterator.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type cursor interface {
// cursorAt provides a bufferred cursor interface.
// This required for literal value cursors which don't have a time value.
type cursorAt interface {
close() error
peek() (k int64, v interface{})
nextAt(seek int64) interface{}
}
Expand All @@ -40,6 +41,10 @@ func newBufCursor(cur cursor, ascending bool) *bufCursor {
return &bufCursor{cur: cur, ascending: ascending}
}

func (c *bufCursor) close() error {
return c.cur.close()
}

// next returns the buffer, if filled. Otherwise returns the next key/value from the cursor.
func (c *bufCursor) next() (int64, interface{}) {
if c.buf.filled {
Expand Down Expand Up @@ -225,7 +230,18 @@ func (itr *{{.name}}Iterator) Stats() influxql.IteratorStats {
}

// Close closes the iterator.
func (itr *{{.name}}Iterator) Close() error { return itr.cur.close() }
func (itr *{{.name}}Iterator) Close() error {
for _, c := range itr.aux {
c.close()
}
for _, c := range itr.conds.curs {
c.close()
}
if itr.cur != nil {
return itr.cur.close()
}
return nil
}

// {{.name}}LimitIterator
type {{.name}}LimitIterator struct {
Expand Down Expand Up @@ -514,6 +530,7 @@ type {{.name}}LiteralCursor struct {
value {{.Type}}
}

func (c *{{.name}}LiteralCursor) close() error { return nil }
func (c *{{.name}}LiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *{{.name}}LiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *{{.name}}LiteralCursor) nextAt(seek int64) interface{} { return c.value }
Expand All @@ -523,6 +540,7 @@ func (c *{{.name}}LiteralCursor) nextAt(seek int64) interface{} { return c.value
// It doesn't not have a time value so it can only be used with nextAt().
type {{.name}}NilLiteralCursor struct {}

func (c *{{.name}}NilLiteralCursor) close() error { return nil }
func (c *{{.name}}NilLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, (*{{.Type}})(nil) }
func (c *{{.name}}NilLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, (*{{.Type}})(nil) }
func (c *{{.name}}NilLiteralCursor) nextAt(seek int64) interface{} { return (*{{.Type}})(nil) }
Expand Down
Loading

0 comments on commit 37674d2

Please sign in to comment.