From 12a2ff7fac69b5677db7ffea012a2fa44706c4bc Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 13 Jun 2017 17:48:21 +0100 Subject: [PATCH 1/8] Add support for TSI shard streaming and shard size This commit firstly ensures that a shard's size on disk is accurately reported when using the tsi1 index, by including the on-disk size of the tsi1 index in the calculation. Secondly, this commit add support for shard streaming/copying when using the tsi1 index. Prior to this, a tsi1 index would not be correctly restored when streaming shards. --- CHANGELOG.md | 4 +- tsdb/engine/tsm1/compact.go | 2 +- tsdb/engine/tsm1/engine.go | 36 +++++++--- tsdb/engine/tsm1/file_store.go | 18 +++-- tsdb/engine/tsm1/file_store_test.go | 8 +-- tsdb/index.go | 3 + tsdb/index/inmem/inmem.go | 3 + tsdb/index/tsi1/file_set.go | 21 ++++-- tsdb/index/tsi1/index.go | 107 +++++++++++++++++++++------- tsdb/index/tsi1/index_test.go | 86 +++++++++++++++++++++- tsdb/index/tsi1/log_file.go | 5 +- 11 files changed, 235 insertions(+), 58 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e1edeb0d3b9..79e6f10928c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - [#8495](https://github.com/influxdata/influxdb/pull/8495): Improve CLI connection warnings - [#9084](https://github.com/influxdata/influxdb/pull/9084): Handle high cardinality deletes in TSM engine - [#9162](https://github.com/influxdata/influxdb/pull/9162): Improve inmem index startup performance for high cardinality. +- [#8491](https://github.com/influxdata/influxdb/pull/8491): Add further tsi support for streaming/copying shards. ### Bugfixes @@ -93,8 +94,7 @@ refuse to open, and will most likely see the following error message: - [#8897](https://github.com/influxdata/influxdb/pull/8897): Add message pack format for query responses. - [#8886](https://github.com/influxdata/influxdb/pull/8886): Improved compaction scheduling - [#8690](https://github.com/influxdata/influxdb/issues/8690): Implicitly decide on a lower limit for fill queries when none is present. -- [#8947](https://github.com/influxdata/influxdb/pull/8947): Add `EXPLAIN ANALYZE` command, which produces a detailed execution plan of a `SELECT` statement. -- [#8963](https://github.com/influxdata/influxdb/pull/8963): Streaming inmem2tsi conversion. +- [#8947](https://github.com/influxdata/influxdb/pull/8947): Add `EXPLAIN ANALYZE` command, which produces a detailed execution plan of a `SELECT` statement.- [#8963](https://github.com/influxdata/influxdb/pull/8963): Streaming inmem2tsi conversion. - [#8995](https://github.com/influxdata/influxdb/pull/8995): Sort & validate TSI key value insertion. - [#8968](https://github.com/influxdata/influxdb/issues/8968): Make client errors more helpful on downstream errs. Thanks @darkliquid! - [#8984](https://github.com/influxdata/influxdb/pull/8984): EXACT and estimated CARDINALITY queries. diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index b9f2db4b6e1..0fa48fd870b 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -984,7 +984,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([ for { sequence++ // New TSM files are written to a temp file and renamed when fully completed. - fileName := filepath.Join(c.Dir, fmt.Sprintf("%09d-%09d.%s.tmp", generation, sequence, TSMFileExtension)) + fileName := filepath.Join(c.Dir, fmt.Sprintf("%09d-%09d.%s.%s", generation, sequence, TSMFileExtension, TmpTSMFileExtension)) // Write as much as possible to this file err := c.write(fileName, iter) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 7b382a67741..f2c4c9a14f6 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -579,7 +579,7 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic { // DiskSize returns the total size in bytes of all TSM and WAL segments on disk. func (e *Engine) DiskSize() int64 { - return e.FileStore.DiskSizeBytes() + e.WAL.DiskSizeBytes() + return e.FileStore.DiskSizeBytes() + e.WAL.DiskSizeBytes() + e.index.DiskSizeBytes() } // Open opens and initializes the engine. @@ -736,10 +736,6 @@ func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error { return err } - if err := e.index.SnapshotTo(path); err != nil { - return err - } - tw := tar.NewWriter(w) defer tw.Close() @@ -845,6 +841,8 @@ func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error { return nil, err } + // The filestore will only handle tsm files. Other file types will be + // ignored. if err := e.FileStore.Replace(nil, newFiles); err != nil { return nil, err } @@ -855,15 +853,27 @@ func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error { return err } + if idx, ok := e.index.(*tsi1.Index); ok { + // Initialise new index. + e.mu.Lock() + if e.index, err = idx.ReplaceIndex(newFiles); err != nil { + e.mu.Unlock() + return err + } + e.mu.Unlock() + return nil + } + // Load any new series keys to the index readers := make([]chan seriesKey, 0, len(newFiles)) + ext := fmt.Sprintf(".%s", TmpTSMFileExtension) for _, f := range newFiles { ch := make(chan seriesKey, 1) readers = append(readers, ch) // If asNew is true, the files created from readFileFromBackup will be new ones // having a temp extension. - f = strings.TrimSuffix(f, ".tmp") + f = strings.TrimSuffix(f, ext) fd, err := os.Open(f) if err != nil { @@ -928,9 +938,7 @@ func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string, as filename = fmt.Sprintf("%09d-%09d.%s", e.FileStore.NextGeneration(), 1, TSMFileExtension) } - destPath := filepath.Join(e.path, filename) - tmp := destPath + ".tmp" - + tmp := fmt.Sprintf("%s.%s", filepath.Join(e.path, filename), TmpTSMFileExtension) // Create new file on disk. f, err := os.OpenFile(tmp, os.O_CREATE|os.O_RDWR, 0666) if err != nil { @@ -1424,8 +1432,13 @@ func (e *Engine) CreateSnapshot() (string, error) { e.mu.RLock() defer e.mu.RUnlock() + path, err := e.FileStore.CreateSnapshot() + if err != nil { + return "", err + } - return e.FileStore.CreateSnapshot() + // Generate a snapshot of the index. + return path, e.index.SnapshotTo(path) } // writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments. @@ -1843,9 +1856,10 @@ func (e *Engine) cleanup() error { return err } + ext := fmt.Sprintf(".%s", TmpTSMFileExtension) for _, f := range allfiles { // Check to see if there are any `.tmp` directories that were left over from failed shard snapshots - if f.IsDir() && strings.HasSuffix(f.Name(), ".tmp") { + if f.IsDir() && strings.HasSuffix(f.Name(), ext) { if err := os.RemoveAll(filepath.Join(e.path, f.Name())); err != nil { return fmt.Errorf("error removing tmp snapshot directory %q: %s", f.Name(), err) } diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 6b60b1827c3..fae6a3b3284 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -23,6 +23,11 @@ import ( "go.uber.org/zap" ) +const ( + // The extension used to describe temporary snapshot files. + TmpTSMFileExtension = "tmp" +) + // TSMFile represents an on-disk TSM file. type TSMFile interface { // Path returns the underlying file path for the TSMFile. If the file @@ -433,8 +438,9 @@ func (f *FileStore) Open() error { if err != nil { return err } + ext := fmt.Sprintf(".%s", TmpTSMFileExtension) for _, fi := range tmpfiles { - if fi.IsDir() && strings.HasSuffix(fi.Name(), ".tmp") { + if fi.IsDir() && strings.HasSuffix(fi.Name(), ext) { ss := strings.Split(filepath.Base(fi.Name()), ".") if len(ss) == 2 { if i, err := strconv.Atoi(ss[0]); err != nil { @@ -635,16 +641,20 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF f.mu.RUnlock() updated := make([]TSMFile, 0, len(newFiles)) + tsmTmpExt := fmt.Sprintf("%s.%s", TSMFileExtension, TmpTSMFileExtension) // Rename all the new files to make them live on restart for _, file := range newFiles { var newName = file - if strings.HasSuffix(file, ".tmp") { + if strings.HasSuffix(file, tsmTmpExt) { // The new TSM files have a tmp extension. First rename them. newName = file[:len(file)-4] if err := os.Rename(file, newName); err != nil { return err } + } else if !strings.HasSuffix(file, TSMFileExtension) { + // This isn't a .tsm or .tsm.tmp file. + continue } fd, err := os.Open(newName) @@ -701,7 +711,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF deletes = append(deletes, file.Path()) // Rename the TSM file used by this reader - tempPath := file.Path() + ".tmp" + tempPath := fmt.Sprintf("%s.%s", file.Path(), TmpTSMFileExtension) if err := file.Rename(tempPath); err != nil { return err } @@ -958,7 +968,7 @@ func (f *FileStore) CreateSnapshot() (string, error) { defer f.mu.RUnlock() // get a tmp directory name - tmpPath := fmt.Sprintf("%s/%d.tmp", f.dir, f.currentTempDirID) + tmpPath := fmt.Sprintf("%s/%d.%s", f.dir, f.currentTempDirID, TmpTSMFileExtension) err := os.Mkdir(tmpPath, 0777) if err != nil { return "", err diff --git a/tsdb/engine/tsm1/file_store_test.go b/tsdb/engine/tsm1/file_store_test.go index 15923170ecb..ae68282051c 100644 --- a/tsdb/engine/tsm1/file_store_test.go +++ b/tsdb/engine/tsm1/file_store_test.go @@ -2434,7 +2434,7 @@ func TestFileStore_Replace(t *testing.T) { } // Replace requires assumes new files have a .tmp extension - replacement := files[2] + ".tmp" + replacement := fmt.Sprintf("%s.%s", files[2], tsm1.TmpTSMFileExtension) os.Rename(files[2], replacement) fs := tsm1.NewFileStore(dir) @@ -2663,9 +2663,9 @@ func TestFileStore_Stats(t *testing.T) { "mem": []tsm1.Value{tsm1.NewValue(0, 1.0)}, }) - replacement := files[2] + "-foo" + ".tmp" // Assumes new files have a .tmp extension + replacement := fmt.Sprintf("%s.%s.%s", files[2], tsm1.TmpTSMFileExtension, tsm1.TSMFileExtension) // Assumes new files have a .tmp extension if err := os.Rename(newFile, replacement); err != nil { - + t.Fatalf("rename: %v", err) } // Replace 3 w/ 1 if err := fs.Replace(files, []string{replacement}); err != nil { @@ -2675,7 +2675,7 @@ func TestFileStore_Stats(t *testing.T) { var found bool stats = fs.Stats() for _, stat := range stats { - if strings.HasSuffix(stat.Path, "-foo") { + if strings.HasSuffix(stat.Path, fmt.Sprintf("%s.%s.%s", tsm1.TSMFileExtension, tsm1.TmpTSMFileExtension, tsm1.TSMFileExtension)) { found = true } } diff --git a/tsdb/index.go b/tsdb/index.go index 67bed830ea4..c5651afbee1 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -53,6 +53,9 @@ type Index interface { // Creates hard links inside path for snapshotting. SnapshotTo(path string) error + // Size of the index on disk, if applicable. + DiskSizeBytes() int64 + // To be removed w/ tsi1. SetFieldName(measurement []byte, name string) AssignShard(k string, shardID uint64) diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index c47082ebdc1..d8eb0873eb4 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -816,6 +816,9 @@ func (i *Index) SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, // SnapshotTo is a no-op since this is an in-memory index. func (i *Index) SnapshotTo(path string) error { return nil } +// DiskSizeBytes always returns zero bytes, since this is an in-memory index. +func (i *Index) DiskSizeBytes() int64 { return 0 } + // AssignShard update the index to indicate that series k exists in the given shardID. func (i *Index) AssignShard(k string, shardID uint64) { ss, _ := i.Series([]byte(k)) diff --git a/tsdb/index/tsi1/file_set.go b/tsdb/index/tsi1/file_set.go index d26b71b763f..36b13f6d056 100644 --- a/tsdb/index/tsi1/file_set.go +++ b/tsdb/index/tsi1/file_set.go @@ -18,10 +18,11 @@ import ( // FileSet represents a collection of files. type FileSet struct { - levels []CompactionLevel - files []File - filters []*bloom.Filter // per-level filters - database string + levels []CompactionLevel + files []File + filters []*bloom.Filter // per-level filters + database string + manifestSize int64 // Size of the manifest file in bytes. } // NewFileSet returns a new instance of FileSet. @@ -74,6 +75,15 @@ func (fs *FileSet) PrependLogFile(f *LogFile) *FileSet { } } +// Size returns the on-disk size of the FileSet. +func (fs *FileSet) Size() int64 { + var total int64 + for _, f := range fs.files { + total += f.Size() + } + return total + int64(fs.manifestSize) +} + // MustReplace swaps a list of files for a single file and returns a new file set. // The caller should always guarantee that the files exist and are contiguous. func (fs *FileSet) MustReplace(oldFiles []File, newFile File) *FileSet { @@ -1098,6 +1108,9 @@ type File interface { // Reference counting. Retain() Release() + + // Size of file on disk + Size() int64 } type Files []File diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index bbf79e152c0..88017230395 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -7,6 +7,7 @@ import ( "fmt" "io/ioutil" "os" + "path" "path/filepath" "regexp" "sort" @@ -141,9 +142,9 @@ func (i *Index) Open() error { } // Read manifest file. - m, err := ReadManifestFile(filepath.Join(i.Path, ManifestFileName)) + m, err := ReadManifestFile(i.ManifestPath()) if os.IsNotExist(err) { - m = NewManifest() + m = NewManifest(i.ManifestPath()) } else if err != nil { return err } @@ -189,6 +190,7 @@ func (i *Index) Open() error { if err != nil { return err } + fs.manifestSize = m.size i.fileSet = fs // Set initial sequnce number. @@ -215,6 +217,28 @@ func (i *Index) Open() error { return nil } +// ReplaceIndex returns a new index built using the provided file paths. +func (i *Index) ReplaceIndex(newFiles []string) (*Index, error) { + var base string + for _, pth := range newFiles { + if !strings.Contains(pth, "/index") { + continue // Not a tsi1 file path. + } + + if base == "" { + base = path.Dir(pth) + } + if err := os.Rename(pth, strings.TrimSuffix(pth, ".tmp")); err != nil { + return nil, err + } + } + + // Create, open and return the new index. + idx := NewIndex() + idx.Path = base + return idx, idx.Open() +} + // openLogFile opens a log file and appends it to the index. func (i *Index) openLogFile(path string) (*LogFile, error) { f := NewLogFile(path) @@ -309,6 +333,7 @@ func (i *Index) Manifest() *Manifest { Levels: i.levels, Files: make([]string, len(i.fileSet.files)), Version: i.version, + path: i.ManifestPath(), } for j, f := range i.fileSet.files { @@ -319,8 +344,18 @@ func (i *Index) Manifest() *Manifest { } // writeManifestFile writes the manifest to the appropriate file path. -func (i *Index) writeManifestFile() error { - return WriteManifestFile(i.ManifestPath(), i.Manifest()) +func (i *Index) writeManifestFile(m *Manifest) error { + buf, err := json.MarshalIndent(m, "", " ") + if err != nil { + return err + } + buf = append(buf, '\n') + + if err := ioutil.WriteFile(i.ManifestPath(), buf, 0666); err != nil { + return err + } + + return nil } // WithLogger sets the logger for the index. @@ -365,10 +400,12 @@ func (i *Index) prependActiveLogFile() error { i.fileSet = i.fileSet.PrependLogFile(f) // Write new manifest. - if err := i.writeManifestFile(); err != nil { + m := i.Manifest() + if err = m.Write(); err != nil { // TODO: Close index if write fails. return err } + i.fileSet.manifestSize = m.size return nil } @@ -863,6 +900,13 @@ func (i *Index) TagSets(name []byte, opt query.IteratorOptions) ([]*query.TagSet return sortedTagsSets, nil } +// DiskSizeBytes returns the size of the index on disk. +func (i *Index) DiskSizeBytes() int64 { + fs := i.RetainFileSet() + defer fs.Release() + return fs.Size() +} + // SnapshotTo creates hard links to the file set into path. func (i *Index) SnapshotTo(path string) error { i.mu.Lock() @@ -1035,10 +1079,13 @@ func (i *Index) compactToLevel(files []*IndexFile, level int) { i.fileSet = i.fileSet.MustReplace(IndexFiles(files).Files(), file) // Write new manifest. - if err := i.writeManifestFile(); err != nil { + var err error + m := i.Manifest() + if err = m.Write(); err != nil { // TODO: Close index if write fails. return err } + i.fileSet.manifestSize = m.size return nil }(); err != nil { logger.Error("cannot write manifest", zap.Error(err)) @@ -1168,10 +1215,13 @@ func (i *Index) compactLogFile(logFile *LogFile) { i.fileSet = i.fileSet.MustReplace([]File{logFile}, file) // Write new manifest. - if err := i.writeManifestFile(); err != nil { + var err error + m := i.Manifest() + if err = m.Write(); err != nil { // TODO: Close index if write fails. return err } + i.fileSet.manifestSize = m.size return nil }(); err != nil { logger.Error("cannot update manifest", zap.Error(err)) @@ -1329,18 +1379,20 @@ func ParseFilename(name string) (level, id int) { // Manifest represents the list of log & index files that make up the index. // The files are listed in time order, not necessarily ID order. type Manifest struct { - Levels []CompactionLevel `json:"levels,omitempty"` - Files []string `json:"files,omitempty"` + Levels []CompactionLevel `json:"levels,omitempty"` + Files []string `json:"files,omitempty"` + Version int `json:"version,omitempty"` // Version should be updated whenever the TSI format has changed. - // Version should be updated whenever the TSI format has changed. - Version int `json:"version,omitempty"` + size int64 // Holds the on-disk size of the manifest. + path string // location on disk of the manifest. } // NewManifest returns a new instance of Manifest with default compaction levels. -func NewManifest() *Manifest { +func NewManifest(path string) *Manifest { m := &Manifest{ Levels: make([]CompactionLevel, len(DefaultCompactionLevels)), Version: Version, + path: path, } copy(m.Levels, DefaultCompactionLevels[:]) return m @@ -1368,6 +1420,19 @@ func (m *Manifest) Validate() error { } // ReadManifestFile reads a manifest from a file path. +// Write writes the manifest file to the provided path. +func (m *Manifest) Write() error { + buf, err := json.MarshalIndent(m, "", " ") + if err != nil { + return err + } + buf = append(buf, '\n') + m.size = int64(len(buf)) + return ioutil.WriteFile(m.path, buf, 0666) +} + +// ReadManifestFile reads a manifest from a file path and returns the manifest +// along with its size and any error. func ReadManifestFile(path string) (*Manifest, error) { buf, err := ioutil.ReadFile(path) if err != nil { @@ -1379,25 +1444,13 @@ func ReadManifestFile(path string) (*Manifest, error) { if err := json.Unmarshal(buf, &m); err != nil { return nil, err } + // Set the size of the manifest. + m.size = int64(len(buf)) + m.path = path return &m, nil } -// WriteManifestFile writes a manifest to a file path. -func WriteManifestFile(path string, m *Manifest) error { - buf, err := json.MarshalIndent(m, "", " ") - if err != nil { - return err - } - buf = append(buf, '\n') - - if err := ioutil.WriteFile(path, buf, 0666); err != nil { - return err - } - - return nil -} - func joinIntSlice(a []int, sep string) string { other := make([]string, len(a)) for i := range a { diff --git a/tsdb/index/tsi1/index_test.go b/tsdb/index/tsi1/index_test.go index e6460d66ed7..20c15a7d26d 100644 --- a/tsdb/index/tsi1/index_test.go +++ b/tsdb/index/tsi1/index_test.go @@ -7,6 +7,7 @@ import ( "path/filepath" "reflect" "regexp" + "strings" "testing" "github.com/influxdata/influxdb/models" @@ -220,10 +221,10 @@ func TestIndex_Open(t *testing.T) { idx = NewIndex() // Manually create a MANIFEST file for an incompatible index version. mpath := filepath.Join(idx.Path, tsi1.ManifestFileName) - m := tsi1.NewManifest() + m := tsi1.NewManifest(mpath) m.Levels = nil m.Version = v // Set example MANIFEST version. - if err := tsi1.WriteManifestFile(mpath, m); err != nil { + if err := m.Write(); err != nil { t.Fatal(err) } @@ -254,6 +255,87 @@ func TestIndex_Manifest(t *testing.T) { }) } +func TestIndex_ReplaceIndex(t *testing.T) { + idx := MustOpenIndex() + defer idx.Close() + + tmpBase := os.TempDir() + if err := os.MkdirAll(filepath.Join(tmpBase, "index"), 0777); err != nil { + t.Fatal(err) + } + + names := []string{"a.tsi.tmp", "b.tsl.tmp", "c.tsi.tmp", "not_a_tsi_file.tsm.tmp"} + newFiles := make([]string, 0, len(names)) + for i, name := range names { + var inIndex string + if i < len(names)-1 { + // First three files are in the index directory. + inIndex = "index" + } + + fullPath := filepath.Join(tmpBase, inIndex, name) + fd, err := os.Create(fullPath) + if err != nil { + t.Fatal(err) + } + fd.Close() + newFiles = append(newFiles, fullPath) + } + + idx2, err := idx.ReplaceIndex(newFiles) + if err != nil { + t.Fatal(err) + } + defer func() { os.RemoveAll(idx2.Path); idx2.Close() }() + + if got, exp := idx2.Path, filepath.Join(tmpBase, "index"); got != exp { + t.Fatalf("got index path of %s, expected %s", got, exp) + } + + for _, nf := range newFiles { + // Remove tmp extension as would be done when creating the index. + nf = strings.TrimSuffix(nf, ".tmp") + + // Determine existence. + _, err := os.Stat(nf) + if strings.Contains(nf, "not_a_tsi_file.tsm") { + // This file should not exist in the index. + if !os.IsNotExist(err) { + t.Fatalf("got %v, expected %v", err, os.ErrNotExist) + } + } else { + // This file should exist in the index + if err != nil { + t.Fatalf("got %v, expected for file %s", err, nf) + } + } + + } +} + +// Ensure index can delete a measurement and all related keys, values, & series. +func TestIndex_DiskSizeBytes(t *testing.T) { + idx := MustOpenIndex() + defer idx.Close() + + // Add series to index. + if err := idx.CreateSeriesSliceIfNotExists([]Series{ + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, + {Name: []byte("disk"), Tags: models.NewTags(map[string]string{"region": "north"})}, + {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "west", "country": "us"})}, + }); err != nil { + t.Fatal(err) + } + + // Verify on disk size is the same in each stage. + idx.Run(t, func(t *testing.T) { + if got, exp := idx.DiskSizeBytes(), int64(464); got != exp { + t.Fatalf("got %d bytes, expected %d", got, exp) + } + }) +} + // Index is a test wrapper for tsi1.Index. type Index struct { *tsi1.Index diff --git a/tsdb/index/tsi1/log_file.go b/tsdb/index/tsi1/log_file.go index 5dae24b6c82..d8e32d7ba97 100644 --- a/tsdb/index/tsi1/log_file.go +++ b/tsdb/index/tsi1/log_file.go @@ -13,13 +13,12 @@ import ( "sync" "time" - "github.com/influxdata/influxdb/pkg/estimator/hll" - "github.com/influxdata/influxdb/tsdb" - "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/bloom" "github.com/influxdata/influxdb/pkg/estimator" + "github.com/influxdata/influxdb/pkg/estimator/hll" "github.com/influxdata/influxdb/pkg/mmap" + "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxql" ) From 67c67aeb34bd98cb3b765fe3db3b26ca367a717b Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Wed, 14 Jun 2017 17:33:41 +0100 Subject: [PATCH 2/8] Update test for Windows --- tsdb/index/tsi1/index.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 88017230395..07db40a314a 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -7,7 +7,6 @@ import ( "fmt" "io/ioutil" "os" - "path" "path/filepath" "regexp" "sort" @@ -219,14 +218,19 @@ func (i *Index) Open() error { // ReplaceIndex returns a new index built using the provided file paths. func (i *Index) ReplaceIndex(newFiles []string) (*Index, error) { - var base string + var ( + base string + indexMatch = fmt.Sprintf("%[1]s%s%[1]s", string(os.PathSeparator), "index") + ) + for _, pth := range newFiles { - if !strings.Contains(pth, "/index") { + if !strings.Contains(pth, indexMatch) || + (!strings.Contains(pth, LogFileExt) && !strings.Contains(pth, IndexFileExt)) { continue // Not a tsi1 file path. } if base == "" { - base = path.Dir(pth) + base = filepath.Dir(pth) } if err := os.Rename(pth, strings.TrimSuffix(pth, ".tmp")); err != nil { return nil, err From 368420c670de137b9261ec25538bcab99f60ac26 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Fri, 20 Oct 2017 12:59:57 +0100 Subject: [PATCH 3/8] Fix test due to index changes --- tsdb/index/tsi1/file_set.go | 1 + tsdb/index/tsi1/index.go | 1 + tsdb/index/tsi1/index_test.go | 4 ++-- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tsdb/index/tsi1/file_set.go b/tsdb/index/tsi1/file_set.go index 36b13f6d056..422fcc6f7a5 100644 --- a/tsdb/index/tsi1/file_set.go +++ b/tsdb/index/tsi1/file_set.go @@ -79,6 +79,7 @@ func (fs *FileSet) PrependLogFile(f *LogFile) *FileSet { func (fs *FileSet) Size() int64 { var total int64 for _, f := range fs.files { + fmt.Println("fs size", f.Size()) total += f.Size() } return total + int64(fs.manifestSize) diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 07db40a314a..5aeb227dc2c 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -1432,6 +1432,7 @@ func (m *Manifest) Write() error { } buf = append(buf, '\n') m.size = int64(len(buf)) + fmt.Println("size manifest is ", m.size) return ioutil.WriteFile(m.path, buf, 0666) } diff --git a/tsdb/index/tsi1/index_test.go b/tsdb/index/tsi1/index_test.go index 20c15a7d26d..a6d3199cd6c 100644 --- a/tsdb/index/tsi1/index_test.go +++ b/tsdb/index/tsi1/index_test.go @@ -313,7 +313,6 @@ func TestIndex_ReplaceIndex(t *testing.T) { } } -// Ensure index can delete a measurement and all related keys, values, & series. func TestIndex_DiskSizeBytes(t *testing.T) { idx := MustOpenIndex() defer idx.Close() @@ -329,8 +328,9 @@ func TestIndex_DiskSizeBytes(t *testing.T) { } // Verify on disk size is the same in each stage. + expSize := int64(520) // 419 bytes for MANIFEST and 101 bytes for index file idx.Run(t, func(t *testing.T) { - if got, exp := idx.DiskSizeBytes(), int64(464); got != exp { + if got, exp := idx.DiskSizeBytes(), expSize; got != exp { t.Fatalf("got %d bytes, expected %d", got, exp) } }) From abae36f992c0a5d3cccb1494845f1c416ed1f2bf Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Fri, 20 Oct 2017 13:07:18 +0100 Subject: [PATCH 4/8] Ensure all index fields set --- tsdb/index/tsi1/index.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 5aeb227dc2c..17617a103a1 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -239,7 +239,17 @@ func (i *Index) ReplaceIndex(newFiles []string) (*Index, error) { // Create, open and return the new index. idx := NewIndex() + + // Set fields on the new Index. + i.mu.RLock() + idx.logger = i.logger + idx.ShardID = i.ShardID + idx.Database = i.Database idx.Path = base + idx.version = i.version + idx.options = i.options + i.mu.RUnlock() + return idx, idx.Open() } @@ -364,7 +374,9 @@ func (i *Index) writeManifestFile(m *Manifest) error { // WithLogger sets the logger for the index. func (i *Index) WithLogger(logger *zap.Logger) { + i.mu.Lock() i.logger = logger.With(zap.String("index", "tsi")) + i.mu.Unlock() } // SetFieldSet sets a shared field set from the engine. From 38e0dd695f1dcdffea702808bab88fa9d9e372c0 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Fri, 20 Oct 2017 17:34:36 +0100 Subject: [PATCH 5/8] Allow concurrent access to Engine Index --- tsdb/engine/tsm1/engine.go | 91 ++++++++++++++++++++----------------- tsdb/index/tsi1/file_set.go | 1 - 2 files changed, 49 insertions(+), 43 deletions(-) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index f2c4c9a14f6..d9db41eef4a 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -123,7 +123,8 @@ const ( // Engine represents a storage engine with compressed blocks. type Engine struct { - mu sync.RWMutex + mu sync.RWMutex + _index tsdb.Index // The following group of fields is used to track the state of level compactions within the // Engine. The WaitGroup is used to monitor the compaction goroutines, the 'done' channel is @@ -147,7 +148,6 @@ type Engine struct { traceLogger *zap.Logger // Logger to be used when trace-logging is on. traceLogging bool - index tsdb.Index fieldset *tsdb.MeasurementFieldSet WAL *WAL @@ -197,7 +197,7 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string, id: id, database: database, path: path, - index: idx, + _index: idx, logger: logger, traceLogger: logger, traceLogging: opt.Config.TraceLoggingEnabled, @@ -417,19 +417,19 @@ func (e *Engine) ScheduleFullCompaction() error { func (e *Engine) Path() string { return e.path } func (e *Engine) SetFieldName(measurement []byte, name string) { - e.index.SetFieldName(measurement, name) + e.index().SetFieldName(measurement, name) } func (e *Engine) MeasurementExists(name []byte) (bool, error) { - return e.index.MeasurementExists(name) + return e.index().MeasurementExists(name) } func (e *Engine) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { - return e.index.MeasurementNamesByExpr(auth, expr) + return e.index().MeasurementNamesByExpr(auth, expr) } func (e *Engine) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) { - return e.index.MeasurementNamesByRegex(re) + return e.index().MeasurementNamesByRegex(re) } // MeasurementFields returns the measurement fields for a measurement. @@ -442,11 +442,11 @@ func (e *Engine) MeasurementFieldSet() *tsdb.MeasurementFieldSet { } func (e *Engine) HasTagKey(name, key []byte) (bool, error) { - return e.index.HasTagKey(name, key) + return e.index().HasTagKey(name, key) } func (e *Engine) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) { - return e.index.MeasurementTagKeysByExpr(name, expr) + return e.index().MeasurementTagKeysByExpr(name, expr) } // TagKeyHasAuthorizedSeries determines if there exist authorized series for the @@ -465,28 +465,28 @@ func (e *Engine) TagKeyHasAuthorizedSeries(auth query.Authorizer, name []byte, k // slice. // func (e *Engine) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { - return e.index.MeasurementTagKeyValuesByExpr(auth, name, keys, expr, keysSorted) + return e.index().MeasurementTagKeyValuesByExpr(auth, name, keys, expr, keysSorted) } func (e *Engine) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error { - return e.index.ForEachMeasurementTagKey(name, fn) + return e.index().ForEachMeasurementTagKey(name, fn) } func (e *Engine) TagKeyCardinality(name, key []byte) int { - return e.index.TagKeyCardinality(name, key) + return e.index().TagKeyCardinality(name, key) } // SeriesN returns the unique number of series in the index. func (e *Engine) SeriesN() int64 { - return e.index.SeriesN() + return e.index().SeriesN() } func (e *Engine) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { - return e.index.SeriesSketches() + return e.index().SeriesSketches() } func (e *Engine) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { - return e.index.MeasurementsSketches() + return e.index().MeasurementsSketches() } // LastModified returns the time when this shard was last modified. @@ -579,7 +579,7 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic { // DiskSize returns the total size in bytes of all TSM and WAL segments on disk. func (e *Engine) DiskSize() int64 { - return e.FileStore.DiskSizeBytes() + e.WAL.DiskSizeBytes() + e.index.DiskSizeBytes() + return e.FileStore.DiskSizeBytes() + e.WAL.DiskSizeBytes() + e.index().DiskSizeBytes() } // Open opens and initializes the engine. @@ -601,7 +601,7 @@ func (e *Engine) Open() error { e.fieldset = fields e.mu.Unlock() - e.index.SetFieldSet(fields) + e.index().SetFieldSet(fields) if err := e.WAL.Open(); err != nil { return err @@ -639,6 +639,12 @@ func (e *Engine) Close() error { return e.WAL.Close() } +func (e *Engine) index() tsdb.Index { + e.mu.RLock() + defer e.mu.RUnlock() + return e._index +} + // WithLogger sets the logger for the engine. func (e *Engine) WithLogger(log *zap.Logger) { e.logger = log.With(zap.String("engine", "tsm1")) @@ -656,11 +662,13 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error { now := time.Now() // Save reference to index for iterator creation. - e.index = index + e.mu.Lock() + e._index = index + e.mu.Unlock() // If we have the cached fields index on disk and we're using TSI, we // can skip scanning all the TSM files. - if e.index.Type() != inmem.IndexName && !e.fieldset.IsEmpty() { + if e.index().Type() != inmem.IndexName && !e.fieldset.IsEmpty() { return nil } @@ -841,8 +849,7 @@ func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error { return nil, err } - // The filestore will only handle tsm files. Other file types will be - // ignored. + // The filestore will only handle tsm files. Other file types will be ignored. if err := e.FileStore.Replace(nil, newFiles); err != nil { return nil, err } @@ -853,10 +860,10 @@ func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error { return err } - if idx, ok := e.index.(*tsi1.Index); ok { + if idx, ok := e.index().(*tsi1.Index); ok { // Initialise new index. e.mu.Lock() - if e.index, err = idx.ReplaceIndex(newFiles); err != nil { + if e._index, err = idx.ReplaceIndex(newFiles); err != nil { e.mu.Unlock() return err } @@ -971,9 +978,9 @@ func (e *Engine) addToIndexFromKey(key []byte, fieldType influxql.DataType) erro } // Build in-memory index, if necessary. - if e.index.Type() == inmem.IndexName { + if e.index().Type() == inmem.IndexName { tags, _ := models.ParseTags(seriesKey) - if err := e.index.InitializeSeries(seriesKey, name, tags); err != nil { + if err := e.index().InitializeSeries(seriesKey, name, tags); err != nil { return err } } @@ -1054,7 +1061,7 @@ func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) erro // Ensure that the index does not compact away the measurement or series we're // going to delete before we're done with them. - if tsiIndex, ok := e.index.(*tsi1.Index); ok { + if tsiIndex, ok := e.index().(*tsi1.Index); ok { fs := tsiIndex.RetainFileSet() defer fs.Release() } @@ -1102,7 +1109,7 @@ func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) erro batch = batch[:0] } - e.index.Rebuild() + e.index().Rebuild() return nil } @@ -1278,7 +1285,7 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error { continue } - if err := e.index.UnassignShard(string(k), e.id, ts); err != nil { + if err := e.index().UnassignShard(string(k), e.id, ts); err != nil { return err } } @@ -1332,7 +1339,7 @@ func (e *Engine) DeleteMeasurement(name []byte) error { // DeleteMeasurement deletes a measurement and all related series. func (e *Engine) deleteMeasurement(name []byte) error { // Attempt to find the series keys. - itr, err := e.index.MeasurementSeriesKeysByExprIterator(name, nil) + itr, err := e.index().MeasurementSeriesKeysByExprIterator(name, nil) if err != nil { return err } else if itr != nil { @@ -1343,24 +1350,24 @@ func (e *Engine) deleteMeasurement(name []byte) error { // ForEachMeasurementName iterates over each measurement name in the engine. func (e *Engine) ForEachMeasurementName(fn func(name []byte) error) error { - return e.index.ForEachMeasurementName(fn) + return e.index().ForEachMeasurementName(fn) } func (e *Engine) MeasurementSeriesKeysByExprIterator(name []byte, expr influxql.Expr) (tsdb.SeriesIterator, error) { - return e.index.MeasurementSeriesKeysByExprIterator(name, expr) + return e.index().MeasurementSeriesKeysByExprIterator(name, expr) } // MeasurementSeriesKeysByExpr returns a list of series keys matching expr. func (e *Engine) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) { - return e.index.MeasurementSeriesKeysByExpr(name, expr) + return e.index().MeasurementSeriesKeysByExpr(name, expr) } func (e *Engine) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSlice []models.Tags) error { - return e.index.CreateSeriesListIfNotExists(keys, names, tagsSlice) + return e.index().CreateSeriesListIfNotExists(keys, names, tagsSlice) } func (e *Engine) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error { - return e.index.CreateSeriesIfNotExists(key, name, tags) + return e.index().CreateSeriesIfNotExists(key, name, tags) } // WriteTo is not implemented. @@ -1438,7 +1445,7 @@ func (e *Engine) CreateSnapshot() (string, error) { } // Generate a snapshot of the index. - return path, e.index.SnapshotTo(path) + return path, e._index.SnapshotTo(path) // index already under RLock. } // writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments. @@ -1943,14 +1950,14 @@ func (e *Engine) CreateIterator(ctx context.Context, measurement string, opt que func (e *Engine) createCallIterator(ctx context.Context, measurement string, call *influxql.Call, opt query.IteratorOptions) ([]query.Iterator, error) { ref, _ := call.Args[0].(*influxql.VarRef) - if exists, err := e.index.MeasurementExists([]byte(measurement)); err != nil { + if exists, err := e.index().MeasurementExists([]byte(measurement)); err != nil { return nil, err } else if !exists { return nil, nil } // Determine tagsets for this measurement based on dimensions and filters. - tagSets, err := e.index.TagSets([]byte(measurement), opt) + tagSets, err := e.index().TagSets([]byte(measurement), opt) if err != nil { return nil, err } @@ -2013,14 +2020,14 @@ func (e *Engine) createCallIterator(ctx context.Context, measurement string, cal func (e *Engine) createVarRefIterator(ctx context.Context, measurement string, opt query.IteratorOptions) ([]query.Iterator, error) { ref, _ := opt.Expr.(*influxql.VarRef) - if exists, err := e.index.MeasurementExists([]byte(measurement)); err != nil { + if exists, err := e.index().MeasurementExists([]byte(measurement)); err != nil { return nil, err } else if !exists { return nil, nil } // Determine tagsets for this measurement based on dimensions and filters. - tagSets, err := e.index.TagSets([]byte(measurement), opt) + tagSets, err := e.index().TagSets([]byte(measurement), opt) if err != nil { return nil, err } @@ -2475,14 +2482,14 @@ func matchTagValues(tags models.Tags, condition influxql.Expr) []string { func (e *Engine) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) { // Determine if this measurement exists. If it does not, then no shards are // accessed to begin with. - if exists, err := e.index.MeasurementExists([]byte(measurement)); err != nil { + if exists, err := e.index().MeasurementExists([]byte(measurement)); err != nil { return query.IteratorCost{}, err } else if !exists { return query.IteratorCost{}, nil } // Determine all of the tag sets for this query. - tagSets, err := e.index.TagSets([]byte(measurement), opt) + tagSets, err := e.index().TagSets([]byte(measurement), opt) if err != nil { return query.IteratorCost{}, err } @@ -2545,7 +2552,7 @@ func (e *Engine) seriesCost(seriesKey, field string, tmin, tmax int64) query.Ite } func (e *Engine) SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, error) { - return e.index.SeriesPointIterator(opt) + return e.index().SeriesPointIterator(opt) } // SeriesFieldKey combine a series key and field name for a unique string to be hashed to a numeric ID. diff --git a/tsdb/index/tsi1/file_set.go b/tsdb/index/tsi1/file_set.go index 422fcc6f7a5..36b13f6d056 100644 --- a/tsdb/index/tsi1/file_set.go +++ b/tsdb/index/tsi1/file_set.go @@ -79,7 +79,6 @@ func (fs *FileSet) PrependLogFile(f *LogFile) *FileSet { func (fs *FileSet) Size() int64 { var total int64 for _, f := range fs.files { - fmt.Println("fs size", f.Size()) total += f.Size() } return total + int64(fs.manifestSize) From 041a3837bec54ca884a6b5346a69beee4ffc418e Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Fri, 20 Oct 2017 18:57:30 +0100 Subject: [PATCH 6/8] Ensure index can track fields --- tsdb/engine/tsm1/engine.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index d9db41eef4a..b52b87e23e6 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -867,6 +867,8 @@ func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error { e.mu.Unlock() return err } + + e._index.SetFieldSet(e.fieldset) e.mu.Unlock() return nil } From b10249a9b39f9935f87a03e82770774be9072f2d Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 28 Nov 2017 15:58:35 +0000 Subject: [PATCH 7/8] Fix rebase --- tsdb/engine/tsm1/engine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index b52b87e23e6..22bb36b3851 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -452,7 +452,7 @@ func (e *Engine) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[ // TagKeyHasAuthorizedSeries determines if there exist authorized series for the // provided measurement name and tag key. func (e *Engine) TagKeyHasAuthorizedSeries(auth query.Authorizer, name []byte, key string) bool { - return e.index.TagKeyHasAuthorizedSeries(auth, name, key) + return e.index().TagKeyHasAuthorizedSeries(auth, name, key) } // MeasurementTagKeyValuesByExpr returns a set of tag values filtered by an expression. From 81976bca59e526d99db1976b04c496c3776de254 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 28 Nov 2017 17:49:57 +0000 Subject: [PATCH 8/8] Refactor based on new design --- tsdb/engine/tsm1/engine.go | 103 ++++++++++++++-------------------- tsdb/index/tsi1/index.go | 53 ----------------- tsdb/index/tsi1/index_test.go | 59 ------------------- 3 files changed, 43 insertions(+), 172 deletions(-) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 22bb36b3851..520fee6a237 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -123,8 +123,9 @@ const ( // Engine represents a storage engine with compressed blocks. type Engine struct { - mu sync.RWMutex - _index tsdb.Index + mu sync.RWMutex + + index tsdb.Index // The following group of fields is used to track the state of level compactions within the // Engine. The WaitGroup is used to monitor the compaction goroutines, the 'done' channel is @@ -197,7 +198,7 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string, id: id, database: database, path: path, - _index: idx, + index: idx, logger: logger, traceLogger: logger, traceLogging: opt.Config.TraceLoggingEnabled, @@ -417,19 +418,19 @@ func (e *Engine) ScheduleFullCompaction() error { func (e *Engine) Path() string { return e.path } func (e *Engine) SetFieldName(measurement []byte, name string) { - e.index().SetFieldName(measurement, name) + e.index.SetFieldName(measurement, name) } func (e *Engine) MeasurementExists(name []byte) (bool, error) { - return e.index().MeasurementExists(name) + return e.index.MeasurementExists(name) } func (e *Engine) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { - return e.index().MeasurementNamesByExpr(auth, expr) + return e.index.MeasurementNamesByExpr(auth, expr) } func (e *Engine) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) { - return e.index().MeasurementNamesByRegex(re) + return e.index.MeasurementNamesByRegex(re) } // MeasurementFields returns the measurement fields for a measurement. @@ -442,17 +443,17 @@ func (e *Engine) MeasurementFieldSet() *tsdb.MeasurementFieldSet { } func (e *Engine) HasTagKey(name, key []byte) (bool, error) { - return e.index().HasTagKey(name, key) + return e.index.HasTagKey(name, key) } func (e *Engine) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) { - return e.index().MeasurementTagKeysByExpr(name, expr) + return e.index.MeasurementTagKeysByExpr(name, expr) } // TagKeyHasAuthorizedSeries determines if there exist authorized series for the // provided measurement name and tag key. func (e *Engine) TagKeyHasAuthorizedSeries(auth query.Authorizer, name []byte, key string) bool { - return e.index().TagKeyHasAuthorizedSeries(auth, name, key) + return e.index.TagKeyHasAuthorizedSeries(auth, name, key) } // MeasurementTagKeyValuesByExpr returns a set of tag values filtered by an expression. @@ -465,28 +466,28 @@ func (e *Engine) TagKeyHasAuthorizedSeries(auth query.Authorizer, name []byte, k // slice. // func (e *Engine) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { - return e.index().MeasurementTagKeyValuesByExpr(auth, name, keys, expr, keysSorted) + return e.index.MeasurementTagKeyValuesByExpr(auth, name, keys, expr, keysSorted) } func (e *Engine) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error { - return e.index().ForEachMeasurementTagKey(name, fn) + return e.index.ForEachMeasurementTagKey(name, fn) } func (e *Engine) TagKeyCardinality(name, key []byte) int { - return e.index().TagKeyCardinality(name, key) + return e.index.TagKeyCardinality(name, key) } // SeriesN returns the unique number of series in the index. func (e *Engine) SeriesN() int64 { - return e.index().SeriesN() + return e.index.SeriesN() } func (e *Engine) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { - return e.index().SeriesSketches() + return e.index.SeriesSketches() } func (e *Engine) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { - return e.index().MeasurementsSketches() + return e.index.MeasurementsSketches() } // LastModified returns the time when this shard was last modified. @@ -579,7 +580,7 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic { // DiskSize returns the total size in bytes of all TSM and WAL segments on disk. func (e *Engine) DiskSize() int64 { - return e.FileStore.DiskSizeBytes() + e.WAL.DiskSizeBytes() + e.index().DiskSizeBytes() + return e.FileStore.DiskSizeBytes() + e.WAL.DiskSizeBytes() + e.index.DiskSizeBytes() } // Open opens and initializes the engine. @@ -601,7 +602,7 @@ func (e *Engine) Open() error { e.fieldset = fields e.mu.Unlock() - e.index().SetFieldSet(fields) + e.index.SetFieldSet(fields) if err := e.WAL.Open(); err != nil { return err @@ -639,12 +640,6 @@ func (e *Engine) Close() error { return e.WAL.Close() } -func (e *Engine) index() tsdb.Index { - e.mu.RLock() - defer e.mu.RUnlock() - return e._index -} - // WithLogger sets the logger for the engine. func (e *Engine) WithLogger(log *zap.Logger) { e.logger = log.With(zap.String("engine", "tsm1")) @@ -658,17 +653,18 @@ func (e *Engine) WithLogger(log *zap.Logger) { } // LoadMetadataIndex loads the shard metadata into memory. +// +// Note, it not safe to call LoadMetadataIndex concurrently. LoadMetadataIndex +// should only be called when initialising a new Engine. func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error { now := time.Now() // Save reference to index for iterator creation. - e.mu.Lock() - e._index = index - e.mu.Unlock() + e.index = index // If we have the cached fields index on disk and we're using TSI, we // can skip scanning all the TSM files. - if e.index().Type() != inmem.IndexName && !e.fieldset.IsEmpty() { + if e.index.Type() != inmem.IndexName && !e.fieldset.IsEmpty() { return nil } @@ -860,19 +856,6 @@ func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error { return err } - if idx, ok := e.index().(*tsi1.Index); ok { - // Initialise new index. - e.mu.Lock() - if e._index, err = idx.ReplaceIndex(newFiles); err != nil { - e.mu.Unlock() - return err - } - - e._index.SetFieldSet(e.fieldset) - e.mu.Unlock() - return nil - } - // Load any new series keys to the index readers := make([]chan seriesKey, 0, len(newFiles)) ext := fmt.Sprintf(".%s", TmpTSMFileExtension) @@ -980,9 +963,9 @@ func (e *Engine) addToIndexFromKey(key []byte, fieldType influxql.DataType) erro } // Build in-memory index, if necessary. - if e.index().Type() == inmem.IndexName { + if e.index.Type() == inmem.IndexName { tags, _ := models.ParseTags(seriesKey) - if err := e.index().InitializeSeries(seriesKey, name, tags); err != nil { + if err := e.index.InitializeSeries(seriesKey, name, tags); err != nil { return err } } @@ -1063,7 +1046,7 @@ func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) erro // Ensure that the index does not compact away the measurement or series we're // going to delete before we're done with them. - if tsiIndex, ok := e.index().(*tsi1.Index); ok { + if tsiIndex, ok := e.index.(*tsi1.Index); ok { fs := tsiIndex.RetainFileSet() defer fs.Release() } @@ -1111,7 +1094,7 @@ func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) erro batch = batch[:0] } - e.index().Rebuild() + e.index.Rebuild() return nil } @@ -1287,7 +1270,7 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error { continue } - if err := e.index().UnassignShard(string(k), e.id, ts); err != nil { + if err := e.index.UnassignShard(string(k), e.id, ts); err != nil { return err } } @@ -1341,7 +1324,7 @@ func (e *Engine) DeleteMeasurement(name []byte) error { // DeleteMeasurement deletes a measurement and all related series. func (e *Engine) deleteMeasurement(name []byte) error { // Attempt to find the series keys. - itr, err := e.index().MeasurementSeriesKeysByExprIterator(name, nil) + itr, err := e.index.MeasurementSeriesKeysByExprIterator(name, nil) if err != nil { return err } else if itr != nil { @@ -1352,24 +1335,24 @@ func (e *Engine) deleteMeasurement(name []byte) error { // ForEachMeasurementName iterates over each measurement name in the engine. func (e *Engine) ForEachMeasurementName(fn func(name []byte) error) error { - return e.index().ForEachMeasurementName(fn) + return e.index.ForEachMeasurementName(fn) } func (e *Engine) MeasurementSeriesKeysByExprIterator(name []byte, expr influxql.Expr) (tsdb.SeriesIterator, error) { - return e.index().MeasurementSeriesKeysByExprIterator(name, expr) + return e.index.MeasurementSeriesKeysByExprIterator(name, expr) } // MeasurementSeriesKeysByExpr returns a list of series keys matching expr. func (e *Engine) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) { - return e.index().MeasurementSeriesKeysByExpr(name, expr) + return e.index.MeasurementSeriesKeysByExpr(name, expr) } func (e *Engine) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSlice []models.Tags) error { - return e.index().CreateSeriesListIfNotExists(keys, names, tagsSlice) + return e.index.CreateSeriesListIfNotExists(keys, names, tagsSlice) } func (e *Engine) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error { - return e.index().CreateSeriesIfNotExists(key, name, tags) + return e.index.CreateSeriesIfNotExists(key, name, tags) } // WriteTo is not implemented. @@ -1447,7 +1430,7 @@ func (e *Engine) CreateSnapshot() (string, error) { } // Generate a snapshot of the index. - return path, e._index.SnapshotTo(path) // index already under RLock. + return path, e.index.SnapshotTo(path) } // writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments. @@ -1952,14 +1935,14 @@ func (e *Engine) CreateIterator(ctx context.Context, measurement string, opt que func (e *Engine) createCallIterator(ctx context.Context, measurement string, call *influxql.Call, opt query.IteratorOptions) ([]query.Iterator, error) { ref, _ := call.Args[0].(*influxql.VarRef) - if exists, err := e.index().MeasurementExists([]byte(measurement)); err != nil { + if exists, err := e.index.MeasurementExists([]byte(measurement)); err != nil { return nil, err } else if !exists { return nil, nil } // Determine tagsets for this measurement based on dimensions and filters. - tagSets, err := e.index().TagSets([]byte(measurement), opt) + tagSets, err := e.index.TagSets([]byte(measurement), opt) if err != nil { return nil, err } @@ -2022,14 +2005,14 @@ func (e *Engine) createCallIterator(ctx context.Context, measurement string, cal func (e *Engine) createVarRefIterator(ctx context.Context, measurement string, opt query.IteratorOptions) ([]query.Iterator, error) { ref, _ := opt.Expr.(*influxql.VarRef) - if exists, err := e.index().MeasurementExists([]byte(measurement)); err != nil { + if exists, err := e.index.MeasurementExists([]byte(measurement)); err != nil { return nil, err } else if !exists { return nil, nil } // Determine tagsets for this measurement based on dimensions and filters. - tagSets, err := e.index().TagSets([]byte(measurement), opt) + tagSets, err := e.index.TagSets([]byte(measurement), opt) if err != nil { return nil, err } @@ -2484,14 +2467,14 @@ func matchTagValues(tags models.Tags, condition influxql.Expr) []string { func (e *Engine) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) { // Determine if this measurement exists. If it does not, then no shards are // accessed to begin with. - if exists, err := e.index().MeasurementExists([]byte(measurement)); err != nil { + if exists, err := e.index.MeasurementExists([]byte(measurement)); err != nil { return query.IteratorCost{}, err } else if !exists { return query.IteratorCost{}, nil } // Determine all of the tag sets for this query. - tagSets, err := e.index().TagSets([]byte(measurement), opt) + tagSets, err := e.index.TagSets([]byte(measurement), opt) if err != nil { return query.IteratorCost{}, err } @@ -2554,7 +2537,7 @@ func (e *Engine) seriesCost(seriesKey, field string, tmin, tmax int64) query.Ite } func (e *Engine) SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, error) { - return e.index().SeriesPointIterator(opt) + return e.index.SeriesPointIterator(opt) } // SeriesFieldKey combine a series key and field name for a unique string to be hashed to a numeric ID. diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 17617a103a1..55b7d4a599a 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -216,43 +216,6 @@ func (i *Index) Open() error { return nil } -// ReplaceIndex returns a new index built using the provided file paths. -func (i *Index) ReplaceIndex(newFiles []string) (*Index, error) { - var ( - base string - indexMatch = fmt.Sprintf("%[1]s%s%[1]s", string(os.PathSeparator), "index") - ) - - for _, pth := range newFiles { - if !strings.Contains(pth, indexMatch) || - (!strings.Contains(pth, LogFileExt) && !strings.Contains(pth, IndexFileExt)) { - continue // Not a tsi1 file path. - } - - if base == "" { - base = filepath.Dir(pth) - } - if err := os.Rename(pth, strings.TrimSuffix(pth, ".tmp")); err != nil { - return nil, err - } - } - - // Create, open and return the new index. - idx := NewIndex() - - // Set fields on the new Index. - i.mu.RLock() - idx.logger = i.logger - idx.ShardID = i.ShardID - idx.Database = i.Database - idx.Path = base - idx.version = i.version - idx.options = i.options - i.mu.RUnlock() - - return idx, idx.Open() -} - // openLogFile opens a log file and appends it to the index. func (i *Index) openLogFile(path string) (*LogFile, error) { f := NewLogFile(path) @@ -357,21 +320,6 @@ func (i *Index) Manifest() *Manifest { return m } -// writeManifestFile writes the manifest to the appropriate file path. -func (i *Index) writeManifestFile(m *Manifest) error { - buf, err := json.MarshalIndent(m, "", " ") - if err != nil { - return err - } - buf = append(buf, '\n') - - if err := ioutil.WriteFile(i.ManifestPath(), buf, 0666); err != nil { - return err - } - - return nil -} - // WithLogger sets the logger for the index. func (i *Index) WithLogger(logger *zap.Logger) { i.mu.Lock() @@ -1444,7 +1392,6 @@ func (m *Manifest) Write() error { } buf = append(buf, '\n') m.size = int64(len(buf)) - fmt.Println("size manifest is ", m.size) return ioutil.WriteFile(m.path, buf, 0666) } diff --git a/tsdb/index/tsi1/index_test.go b/tsdb/index/tsi1/index_test.go index a6d3199cd6c..022ab20bf28 100644 --- a/tsdb/index/tsi1/index_test.go +++ b/tsdb/index/tsi1/index_test.go @@ -7,7 +7,6 @@ import ( "path/filepath" "reflect" "regexp" - "strings" "testing" "github.com/influxdata/influxdb/models" @@ -255,64 +254,6 @@ func TestIndex_Manifest(t *testing.T) { }) } -func TestIndex_ReplaceIndex(t *testing.T) { - idx := MustOpenIndex() - defer idx.Close() - - tmpBase := os.TempDir() - if err := os.MkdirAll(filepath.Join(tmpBase, "index"), 0777); err != nil { - t.Fatal(err) - } - - names := []string{"a.tsi.tmp", "b.tsl.tmp", "c.tsi.tmp", "not_a_tsi_file.tsm.tmp"} - newFiles := make([]string, 0, len(names)) - for i, name := range names { - var inIndex string - if i < len(names)-1 { - // First three files are in the index directory. - inIndex = "index" - } - - fullPath := filepath.Join(tmpBase, inIndex, name) - fd, err := os.Create(fullPath) - if err != nil { - t.Fatal(err) - } - fd.Close() - newFiles = append(newFiles, fullPath) - } - - idx2, err := idx.ReplaceIndex(newFiles) - if err != nil { - t.Fatal(err) - } - defer func() { os.RemoveAll(idx2.Path); idx2.Close() }() - - if got, exp := idx2.Path, filepath.Join(tmpBase, "index"); got != exp { - t.Fatalf("got index path of %s, expected %s", got, exp) - } - - for _, nf := range newFiles { - // Remove tmp extension as would be done when creating the index. - nf = strings.TrimSuffix(nf, ".tmp") - - // Determine existence. - _, err := os.Stat(nf) - if strings.Contains(nf, "not_a_tsi_file.tsm") { - // This file should not exist in the index. - if !os.IsNotExist(err) { - t.Fatalf("got %v, expected %v", err, os.ErrNotExist) - } - } else { - // This file should exist in the index - if err != nil { - t.Fatalf("got %v, expected for file %s", err, nf) - } - } - - } -} - func TestIndex_DiskSizeBytes(t *testing.T) { idx := MustOpenIndex() defer idx.Close()