Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for TSI shard streaming and shard size #8491

Merged
merged 8 commits into from
Nov 29, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- [#8495](https://github.com/influxdata/influxdb/pull/8495): Improve CLI connection warnings
- [#9084](https://github.com/influxdata/influxdb/pull/9084): Handle high cardinality deletes in TSM engine
- [#9162](https://github.com/influxdata/influxdb/pull/9162): Improve inmem index startup performance for high cardinality.
- [#8491](https://github.com/influxdata/influxdb/pull/8491): Add further tsi support for streaming/copying shards.

### Bugfixes

Expand Down Expand Up @@ -93,8 +94,7 @@ refuse to open, and will most likely see the following error message:
- [#8897](https://github.com/influxdata/influxdb/pull/8897): Add message pack format for query responses.
- [#8886](https://github.com/influxdata/influxdb/pull/8886): Improved compaction scheduling
- [#8690](https://github.com/influxdata/influxdb/issues/8690): Implicitly decide on a lower limit for fill queries when none is present.
- [#8947](https://github.com/influxdata/influxdb/pull/8947): Add `EXPLAIN ANALYZE` command, which produces a detailed execution plan of a `SELECT` statement.
- [#8963](https://github.com/influxdata/influxdb/pull/8963): Streaming inmem2tsi conversion.
- [#8947](https://github.com/influxdata/influxdb/pull/8947): Add `EXPLAIN ANALYZE` command, which produces a detailed execution plan of a `SELECT` statement.- [#8963](https://github.com/influxdata/influxdb/pull/8963): Streaming inmem2tsi conversion.
- [#8995](https://github.com/influxdata/influxdb/pull/8995): Sort & validate TSI key value insertion.
- [#8968](https://github.com/influxdata/influxdb/issues/8968): Make client errors more helpful on downstream errs. Thanks @darkliquid!
- [#8984](https://github.com/influxdata/influxdb/pull/8984): EXACT and estimated CARDINALITY queries.
Expand Down
2 changes: 1 addition & 1 deletion tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([
for {
sequence++
// New TSM files are written to a temp file and renamed when fully completed.
fileName := filepath.Join(c.Dir, fmt.Sprintf("%09d-%09d.%s.tmp", generation, sequence, TSMFileExtension))
fileName := filepath.Join(c.Dir, fmt.Sprintf("%09d-%09d.%s.%s", generation, sequence, TSMFileExtension, TmpTSMFileExtension))

// Write as much as possible to this file
err := c.write(fileName, iter)
Expand Down
30 changes: 18 additions & 12 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ const (
type Engine struct {
mu sync.RWMutex

index tsdb.Index

// The following group of fields is used to track the state of level compactions within the
// Engine. The WaitGroup is used to monitor the compaction goroutines, the 'done' channel is
// used to signal those goroutines to shutdown. Every request to disable level compactions will
Expand All @@ -147,7 +149,6 @@ type Engine struct {
traceLogger *zap.Logger // Logger to be used when trace-logging is on.
traceLogging bool

index tsdb.Index
fieldset *tsdb.MeasurementFieldSet

WAL *WAL
Expand Down Expand Up @@ -579,7 +580,7 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic {

// DiskSize returns the total size in bytes of all TSM and WAL segments on disk.
func (e *Engine) DiskSize() int64 {
return e.FileStore.DiskSizeBytes() + e.WAL.DiskSizeBytes()
return e.FileStore.DiskSizeBytes() + e.WAL.DiskSizeBytes() + e.index.DiskSizeBytes()
}

// Open opens and initializes the engine.
Expand Down Expand Up @@ -652,6 +653,9 @@ func (e *Engine) WithLogger(log *zap.Logger) {
}

// LoadMetadataIndex loads the shard metadata into memory.
//
// Note, it not safe to call LoadMetadataIndex concurrently. LoadMetadataIndex
// should only be called when initialising a new Engine.
func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error {
now := time.Now()

Expand Down Expand Up @@ -736,10 +740,6 @@ func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error {
return err
}

if err := e.index.SnapshotTo(path); err != nil {
return err
}

tw := tar.NewWriter(w)
defer tw.Close()

Expand Down Expand Up @@ -845,6 +845,7 @@ func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error {
return nil, err
}

// The filestore will only handle tsm files. Other file types will be ignored.
if err := e.FileStore.Replace(nil, newFiles); err != nil {
return nil, err
}
Expand All @@ -857,13 +858,14 @@ func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error {

// Load any new series keys to the index
readers := make([]chan seriesKey, 0, len(newFiles))
ext := fmt.Sprintf(".%s", TmpTSMFileExtension)
for _, f := range newFiles {
ch := make(chan seriesKey, 1)
readers = append(readers, ch)

// If asNew is true, the files created from readFileFromBackup will be new ones
// having a temp extension.
f = strings.TrimSuffix(f, ".tmp")
f = strings.TrimSuffix(f, ext)

fd, err := os.Open(f)
if err != nil {
Expand Down Expand Up @@ -928,9 +930,7 @@ func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string, as
filename = fmt.Sprintf("%09d-%09d.%s", e.FileStore.NextGeneration(), 1, TSMFileExtension)
}

destPath := filepath.Join(e.path, filename)
tmp := destPath + ".tmp"

tmp := fmt.Sprintf("%s.%s", filepath.Join(e.path, filename), TmpTSMFileExtension)
// Create new file on disk.
f, err := os.OpenFile(tmp, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
Expand Down Expand Up @@ -1424,8 +1424,13 @@ func (e *Engine) CreateSnapshot() (string, error) {

e.mu.RLock()
defer e.mu.RUnlock()
path, err := e.FileStore.CreateSnapshot()
if err != nil {
return "", err
}

return e.FileStore.CreateSnapshot()
// Generate a snapshot of the index.
return path, e.index.SnapshotTo(path)
}

// writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments.
Expand Down Expand Up @@ -1843,9 +1848,10 @@ func (e *Engine) cleanup() error {
return err
}

ext := fmt.Sprintf(".%s", TmpTSMFileExtension)
for _, f := range allfiles {
// Check to see if there are any `.tmp` directories that were left over from failed shard snapshots
if f.IsDir() && strings.HasSuffix(f.Name(), ".tmp") {
if f.IsDir() && strings.HasSuffix(f.Name(), ext) {
if err := os.RemoveAll(filepath.Join(e.path, f.Name())); err != nil {
return fmt.Errorf("error removing tmp snapshot directory %q: %s", f.Name(), err)
}
Expand Down
18 changes: 14 additions & 4 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ import (
"go.uber.org/zap"
)

const (
// The extension used to describe temporary snapshot files.
TmpTSMFileExtension = "tmp"
)

// TSMFile represents an on-disk TSM file.
type TSMFile interface {
// Path returns the underlying file path for the TSMFile. If the file
Expand Down Expand Up @@ -433,8 +438,9 @@ func (f *FileStore) Open() error {
if err != nil {
return err
}
ext := fmt.Sprintf(".%s", TmpTSMFileExtension)
for _, fi := range tmpfiles {
if fi.IsDir() && strings.HasSuffix(fi.Name(), ".tmp") {
if fi.IsDir() && strings.HasSuffix(fi.Name(), ext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is missing the "." now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See a couple of lines up

ss := strings.Split(filepath.Base(fi.Name()), ".")
if len(ss) == 2 {
if i, err := strconv.Atoi(ss[0]); err != nil {
Expand Down Expand Up @@ -635,16 +641,20 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
f.mu.RUnlock()

updated := make([]TSMFile, 0, len(newFiles))
tsmTmpExt := fmt.Sprintf("%s.%s", TSMFileExtension, TmpTSMFileExtension)

// Rename all the new files to make them live on restart
for _, file := range newFiles {
var newName = file
if strings.HasSuffix(file, ".tmp") {
if strings.HasSuffix(file, tsmTmpExt) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"." is missing as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defined on 574

// The new TSM files have a tmp extension. First rename them.
newName = file[:len(file)-4]
if err := os.Rename(file, newName); err != nil {
return err
}
} else if !strings.HasSuffix(file, TSMFileExtension) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be TmpTSMFileExtension. New files passed in are always tmp one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That check happens in the if above. It checks for tsmTmpExt which is .tsm.tmp. This else if skips any non-tsm or non-tmp-tsm files (such as index files).

In the if above we rename the tmp tsm file and drop the .tmp. Then below this else if we process the tsm file.

Without this else if to skip non-tsm files I believe we could get caught up on tsi index files.

// This isn't a .tsm or .tsm.tmp file.
continue
}

fd, err := os.Open(newName)
Expand Down Expand Up @@ -701,7 +711,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
deletes = append(deletes, file.Path())

// Rename the TSM file used by this reader
tempPath := file.Path() + ".tmp"
tempPath := fmt.Sprintf("%s.%s", file.Path(), TmpTSMFileExtension)
if err := file.Rename(tempPath); err != nil {
return err
}
Expand Down Expand Up @@ -958,7 +968,7 @@ func (f *FileStore) CreateSnapshot() (string, error) {
defer f.mu.RUnlock()

// get a tmp directory name
tmpPath := fmt.Sprintf("%s/%d.tmp", f.dir, f.currentTempDirID)
tmpPath := fmt.Sprintf("%s/%d.%s", f.dir, f.currentTempDirID, TmpTSMFileExtension)
err := os.Mkdir(tmpPath, 0777)
if err != nil {
return "", err
Expand Down
8 changes: 4 additions & 4 deletions tsdb/engine/tsm1/file_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2434,7 +2434,7 @@ func TestFileStore_Replace(t *testing.T) {
}

// Replace requires assumes new files have a .tmp extension
replacement := files[2] + ".tmp"
replacement := fmt.Sprintf("%s.%s", files[2], tsm1.TmpTSMFileExtension)
os.Rename(files[2], replacement)

fs := tsm1.NewFileStore(dir)
Expand Down Expand Up @@ -2663,9 +2663,9 @@ func TestFileStore_Stats(t *testing.T) {
"mem": []tsm1.Value{tsm1.NewValue(0, 1.0)},
})

replacement := files[2] + "-foo" + ".tmp" // Assumes new files have a .tmp extension
replacement := fmt.Sprintf("%s.%s.%s", files[2], tsm1.TmpTSMFileExtension, tsm1.TSMFileExtension) // Assumes new files have a .tmp extension
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should Tmp extension be at the end?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benbjohnson I changed the implementation to check for either .tsm.tmp or .tsm. The previous test was creating a filename ending in -foo.tmp, which didn't match the new filters, so I updated the test to come up with a new filename that matched one of the filters.

if err := os.Rename(newFile, replacement); err != nil {

t.Fatalf("rename: %v", err)
}
// Replace 3 w/ 1
if err := fs.Replace(files, []string{replacement}); err != nil {
Expand All @@ -2675,7 +2675,7 @@ func TestFileStore_Stats(t *testing.T) {
var found bool
stats = fs.Stats()
for _, stat := range stats {
if strings.HasSuffix(stat.Path, "-foo") {
if strings.HasSuffix(stat.Path, fmt.Sprintf("%s.%s.%s", tsm1.TSMFileExtension, tsm1.TmpTSMFileExtension, tsm1.TSMFileExtension)) {
found = true
}
}
Expand Down
3 changes: 3 additions & 0 deletions tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type Index interface {
// Creates hard links inside path for snapshotting.
SnapshotTo(path string) error

// Size of the index on disk, if applicable.
DiskSizeBytes() int64

// To be removed w/ tsi1.
SetFieldName(measurement []byte, name string)
AssignShard(k string, shardID uint64)
Expand Down
3 changes: 3 additions & 0 deletions tsdb/index/inmem/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,9 @@ func (i *Index) SeriesPointIterator(opt query.IteratorOptions) (query.Iterator,
// SnapshotTo is a no-op since this is an in-memory index.
func (i *Index) SnapshotTo(path string) error { return nil }

// DiskSizeBytes always returns zero bytes, since this is an in-memory index.
func (i *Index) DiskSizeBytes() int64 { return 0 }

// AssignShard update the index to indicate that series k exists in the given shardID.
func (i *Index) AssignShard(k string, shardID uint64) {
ss, _ := i.Series([]byte(k))
Expand Down
21 changes: 17 additions & 4 deletions tsdb/index/tsi1/file_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ import (

// FileSet represents a collection of files.
type FileSet struct {
levels []CompactionLevel
files []File
filters []*bloom.Filter // per-level filters
database string
levels []CompactionLevel
files []File
filters []*bloom.Filter // per-level filters
database string
manifestSize int64 // Size of the manifest file in bytes.
}

// NewFileSet returns a new instance of FileSet.
Expand Down Expand Up @@ -74,6 +75,15 @@ func (fs *FileSet) PrependLogFile(f *LogFile) *FileSet {
}
}

// Size returns the on-disk size of the FileSet.
func (fs *FileSet) Size() int64 {
var total int64
for _, f := range fs.files {
total += f.Size()
}
return total + int64(fs.manifestSize)
}

// MustReplace swaps a list of files for a single file and returns a new file set.
// The caller should always guarantee that the files exist and are contiguous.
func (fs *FileSet) MustReplace(oldFiles []File, newFile File) *FileSet {
Expand Down Expand Up @@ -1098,6 +1108,9 @@ type File interface {
// Reference counting.
Retain()
Release()

// Size of file on disk
Size() int64
}

type Files []File
Expand Down
Loading