diff --git a/CHANGELOG.md b/CHANGELOG.md index e1edeb0d3b9..79e6f10928c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - [#8495](https://github.com/influxdata/influxdb/pull/8495): Improve CLI connection warnings - [#9084](https://github.com/influxdata/influxdb/pull/9084): Handle high cardinality deletes in TSM engine - [#9162](https://github.com/influxdata/influxdb/pull/9162): Improve inmem index startup performance for high cardinality. +- [#8491](https://github.com/influxdata/influxdb/pull/8491): Add further tsi support for streaming/copying shards. ### Bugfixes @@ -93,8 +94,7 @@ refuse to open, and will most likely see the following error message: - [#8897](https://github.com/influxdata/influxdb/pull/8897): Add message pack format for query responses. - [#8886](https://github.com/influxdata/influxdb/pull/8886): Improved compaction scheduling - [#8690](https://github.com/influxdata/influxdb/issues/8690): Implicitly decide on a lower limit for fill queries when none is present. -- [#8947](https://github.com/influxdata/influxdb/pull/8947): Add `EXPLAIN ANALYZE` command, which produces a detailed execution plan of a `SELECT` statement. -- [#8963](https://github.com/influxdata/influxdb/pull/8963): Streaming inmem2tsi conversion. +- [#8947](https://github.com/influxdata/influxdb/pull/8947): Add `EXPLAIN ANALYZE` command, which produces a detailed execution plan of a `SELECT` statement.- [#8963](https://github.com/influxdata/influxdb/pull/8963): Streaming inmem2tsi conversion. - [#8995](https://github.com/influxdata/influxdb/pull/8995): Sort & validate TSI key value insertion. - [#8968](https://github.com/influxdata/influxdb/issues/8968): Make client errors more helpful on downstream errs. Thanks @darkliquid! - [#8984](https://github.com/influxdata/influxdb/pull/8984): EXACT and estimated CARDINALITY queries. diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index b9f2db4b6e1..0fa48fd870b 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -984,7 +984,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([ for { sequence++ // New TSM files are written to a temp file and renamed when fully completed. - fileName := filepath.Join(c.Dir, fmt.Sprintf("%09d-%09d.%s.tmp", generation, sequence, TSMFileExtension)) + fileName := filepath.Join(c.Dir, fmt.Sprintf("%09d-%09d.%s.%s", generation, sequence, TSMFileExtension, TmpTSMFileExtension)) // Write as much as possible to this file err := c.write(fileName, iter) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 7b382a67741..520fee6a237 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -125,6 +125,8 @@ const ( type Engine struct { mu sync.RWMutex + index tsdb.Index + // The following group of fields is used to track the state of level compactions within the // Engine. The WaitGroup is used to monitor the compaction goroutines, the 'done' channel is // used to signal those goroutines to shutdown. Every request to disable level compactions will @@ -147,7 +149,6 @@ type Engine struct { traceLogger *zap.Logger // Logger to be used when trace-logging is on. traceLogging bool - index tsdb.Index fieldset *tsdb.MeasurementFieldSet WAL *WAL @@ -579,7 +580,7 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic { // DiskSize returns the total size in bytes of all TSM and WAL segments on disk. func (e *Engine) DiskSize() int64 { - return e.FileStore.DiskSizeBytes() + e.WAL.DiskSizeBytes() + return e.FileStore.DiskSizeBytes() + e.WAL.DiskSizeBytes() + e.index.DiskSizeBytes() } // Open opens and initializes the engine. @@ -652,6 +653,9 @@ func (e *Engine) WithLogger(log *zap.Logger) { } // LoadMetadataIndex loads the shard metadata into memory. +// +// Note, it not safe to call LoadMetadataIndex concurrently. LoadMetadataIndex +// should only be called when initialising a new Engine. func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error { now := time.Now() @@ -736,10 +740,6 @@ func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error { return err } - if err := e.index.SnapshotTo(path); err != nil { - return err - } - tw := tar.NewWriter(w) defer tw.Close() @@ -845,6 +845,7 @@ func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error { return nil, err } + // The filestore will only handle tsm files. Other file types will be ignored. if err := e.FileStore.Replace(nil, newFiles); err != nil { return nil, err } @@ -857,13 +858,14 @@ func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error { // Load any new series keys to the index readers := make([]chan seriesKey, 0, len(newFiles)) + ext := fmt.Sprintf(".%s", TmpTSMFileExtension) for _, f := range newFiles { ch := make(chan seriesKey, 1) readers = append(readers, ch) // If asNew is true, the files created from readFileFromBackup will be new ones // having a temp extension. - f = strings.TrimSuffix(f, ".tmp") + f = strings.TrimSuffix(f, ext) fd, err := os.Open(f) if err != nil { @@ -928,9 +930,7 @@ func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string, as filename = fmt.Sprintf("%09d-%09d.%s", e.FileStore.NextGeneration(), 1, TSMFileExtension) } - destPath := filepath.Join(e.path, filename) - tmp := destPath + ".tmp" - + tmp := fmt.Sprintf("%s.%s", filepath.Join(e.path, filename), TmpTSMFileExtension) // Create new file on disk. f, err := os.OpenFile(tmp, os.O_CREATE|os.O_RDWR, 0666) if err != nil { @@ -1424,8 +1424,13 @@ func (e *Engine) CreateSnapshot() (string, error) { e.mu.RLock() defer e.mu.RUnlock() + path, err := e.FileStore.CreateSnapshot() + if err != nil { + return "", err + } - return e.FileStore.CreateSnapshot() + // Generate a snapshot of the index. + return path, e.index.SnapshotTo(path) } // writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments. @@ -1843,9 +1848,10 @@ func (e *Engine) cleanup() error { return err } + ext := fmt.Sprintf(".%s", TmpTSMFileExtension) for _, f := range allfiles { // Check to see if there are any `.tmp` directories that were left over from failed shard snapshots - if f.IsDir() && strings.HasSuffix(f.Name(), ".tmp") { + if f.IsDir() && strings.HasSuffix(f.Name(), ext) { if err := os.RemoveAll(filepath.Join(e.path, f.Name())); err != nil { return fmt.Errorf("error removing tmp snapshot directory %q: %s", f.Name(), err) } diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 6b60b1827c3..fae6a3b3284 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -23,6 +23,11 @@ import ( "go.uber.org/zap" ) +const ( + // The extension used to describe temporary snapshot files. + TmpTSMFileExtension = "tmp" +) + // TSMFile represents an on-disk TSM file. type TSMFile interface { // Path returns the underlying file path for the TSMFile. If the file @@ -433,8 +438,9 @@ func (f *FileStore) Open() error { if err != nil { return err } + ext := fmt.Sprintf(".%s", TmpTSMFileExtension) for _, fi := range tmpfiles { - if fi.IsDir() && strings.HasSuffix(fi.Name(), ".tmp") { + if fi.IsDir() && strings.HasSuffix(fi.Name(), ext) { ss := strings.Split(filepath.Base(fi.Name()), ".") if len(ss) == 2 { if i, err := strconv.Atoi(ss[0]); err != nil { @@ -635,16 +641,20 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF f.mu.RUnlock() updated := make([]TSMFile, 0, len(newFiles)) + tsmTmpExt := fmt.Sprintf("%s.%s", TSMFileExtension, TmpTSMFileExtension) // Rename all the new files to make them live on restart for _, file := range newFiles { var newName = file - if strings.HasSuffix(file, ".tmp") { + if strings.HasSuffix(file, tsmTmpExt) { // The new TSM files have a tmp extension. First rename them. newName = file[:len(file)-4] if err := os.Rename(file, newName); err != nil { return err } + } else if !strings.HasSuffix(file, TSMFileExtension) { + // This isn't a .tsm or .tsm.tmp file. + continue } fd, err := os.Open(newName) @@ -701,7 +711,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF deletes = append(deletes, file.Path()) // Rename the TSM file used by this reader - tempPath := file.Path() + ".tmp" + tempPath := fmt.Sprintf("%s.%s", file.Path(), TmpTSMFileExtension) if err := file.Rename(tempPath); err != nil { return err } @@ -958,7 +968,7 @@ func (f *FileStore) CreateSnapshot() (string, error) { defer f.mu.RUnlock() // get a tmp directory name - tmpPath := fmt.Sprintf("%s/%d.tmp", f.dir, f.currentTempDirID) + tmpPath := fmt.Sprintf("%s/%d.%s", f.dir, f.currentTempDirID, TmpTSMFileExtension) err := os.Mkdir(tmpPath, 0777) if err != nil { return "", err diff --git a/tsdb/engine/tsm1/file_store_test.go b/tsdb/engine/tsm1/file_store_test.go index 15923170ecb..ae68282051c 100644 --- a/tsdb/engine/tsm1/file_store_test.go +++ b/tsdb/engine/tsm1/file_store_test.go @@ -2434,7 +2434,7 @@ func TestFileStore_Replace(t *testing.T) { } // Replace requires assumes new files have a .tmp extension - replacement := files[2] + ".tmp" + replacement := fmt.Sprintf("%s.%s", files[2], tsm1.TmpTSMFileExtension) os.Rename(files[2], replacement) fs := tsm1.NewFileStore(dir) @@ -2663,9 +2663,9 @@ func TestFileStore_Stats(t *testing.T) { "mem": []tsm1.Value{tsm1.NewValue(0, 1.0)}, }) - replacement := files[2] + "-foo" + ".tmp" // Assumes new files have a .tmp extension + replacement := fmt.Sprintf("%s.%s.%s", files[2], tsm1.TmpTSMFileExtension, tsm1.TSMFileExtension) // Assumes new files have a .tmp extension if err := os.Rename(newFile, replacement); err != nil { - + t.Fatalf("rename: %v", err) } // Replace 3 w/ 1 if err := fs.Replace(files, []string{replacement}); err != nil { @@ -2675,7 +2675,7 @@ func TestFileStore_Stats(t *testing.T) { var found bool stats = fs.Stats() for _, stat := range stats { - if strings.HasSuffix(stat.Path, "-foo") { + if strings.HasSuffix(stat.Path, fmt.Sprintf("%s.%s.%s", tsm1.TSMFileExtension, tsm1.TmpTSMFileExtension, tsm1.TSMFileExtension)) { found = true } } diff --git a/tsdb/index.go b/tsdb/index.go index 67bed830ea4..c5651afbee1 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -53,6 +53,9 @@ type Index interface { // Creates hard links inside path for snapshotting. SnapshotTo(path string) error + // Size of the index on disk, if applicable. + DiskSizeBytes() int64 + // To be removed w/ tsi1. SetFieldName(measurement []byte, name string) AssignShard(k string, shardID uint64) diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index c47082ebdc1..d8eb0873eb4 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -816,6 +816,9 @@ func (i *Index) SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, // SnapshotTo is a no-op since this is an in-memory index. func (i *Index) SnapshotTo(path string) error { return nil } +// DiskSizeBytes always returns zero bytes, since this is an in-memory index. +func (i *Index) DiskSizeBytes() int64 { return 0 } + // AssignShard update the index to indicate that series k exists in the given shardID. func (i *Index) AssignShard(k string, shardID uint64) { ss, _ := i.Series([]byte(k)) diff --git a/tsdb/index/tsi1/file_set.go b/tsdb/index/tsi1/file_set.go index d26b71b763f..36b13f6d056 100644 --- a/tsdb/index/tsi1/file_set.go +++ b/tsdb/index/tsi1/file_set.go @@ -18,10 +18,11 @@ import ( // FileSet represents a collection of files. type FileSet struct { - levels []CompactionLevel - files []File - filters []*bloom.Filter // per-level filters - database string + levels []CompactionLevel + files []File + filters []*bloom.Filter // per-level filters + database string + manifestSize int64 // Size of the manifest file in bytes. } // NewFileSet returns a new instance of FileSet. @@ -74,6 +75,15 @@ func (fs *FileSet) PrependLogFile(f *LogFile) *FileSet { } } +// Size returns the on-disk size of the FileSet. +func (fs *FileSet) Size() int64 { + var total int64 + for _, f := range fs.files { + total += f.Size() + } + return total + int64(fs.manifestSize) +} + // MustReplace swaps a list of files for a single file and returns a new file set. // The caller should always guarantee that the files exist and are contiguous. func (fs *FileSet) MustReplace(oldFiles []File, newFile File) *FileSet { @@ -1098,6 +1108,9 @@ type File interface { // Reference counting. Retain() Release() + + // Size of file on disk + Size() int64 } type Files []File diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index bbf79e152c0..55b7d4a599a 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -141,9 +141,9 @@ func (i *Index) Open() error { } // Read manifest file. - m, err := ReadManifestFile(filepath.Join(i.Path, ManifestFileName)) + m, err := ReadManifestFile(i.ManifestPath()) if os.IsNotExist(err) { - m = NewManifest() + m = NewManifest(i.ManifestPath()) } else if err != nil { return err } @@ -189,6 +189,7 @@ func (i *Index) Open() error { if err != nil { return err } + fs.manifestSize = m.size i.fileSet = fs // Set initial sequnce number. @@ -309,6 +310,7 @@ func (i *Index) Manifest() *Manifest { Levels: i.levels, Files: make([]string, len(i.fileSet.files)), Version: i.version, + path: i.ManifestPath(), } for j, f := range i.fileSet.files { @@ -318,14 +320,11 @@ func (i *Index) Manifest() *Manifest { return m } -// writeManifestFile writes the manifest to the appropriate file path. -func (i *Index) writeManifestFile() error { - return WriteManifestFile(i.ManifestPath(), i.Manifest()) -} - // WithLogger sets the logger for the index. func (i *Index) WithLogger(logger *zap.Logger) { + i.mu.Lock() i.logger = logger.With(zap.String("index", "tsi")) + i.mu.Unlock() } // SetFieldSet sets a shared field set from the engine. @@ -365,10 +364,12 @@ func (i *Index) prependActiveLogFile() error { i.fileSet = i.fileSet.PrependLogFile(f) // Write new manifest. - if err := i.writeManifestFile(); err != nil { + m := i.Manifest() + if err = m.Write(); err != nil { // TODO: Close index if write fails. return err } + i.fileSet.manifestSize = m.size return nil } @@ -863,6 +864,13 @@ func (i *Index) TagSets(name []byte, opt query.IteratorOptions) ([]*query.TagSet return sortedTagsSets, nil } +// DiskSizeBytes returns the size of the index on disk. +func (i *Index) DiskSizeBytes() int64 { + fs := i.RetainFileSet() + defer fs.Release() + return fs.Size() +} + // SnapshotTo creates hard links to the file set into path. func (i *Index) SnapshotTo(path string) error { i.mu.Lock() @@ -1035,10 +1043,13 @@ func (i *Index) compactToLevel(files []*IndexFile, level int) { i.fileSet = i.fileSet.MustReplace(IndexFiles(files).Files(), file) // Write new manifest. - if err := i.writeManifestFile(); err != nil { + var err error + m := i.Manifest() + if err = m.Write(); err != nil { // TODO: Close index if write fails. return err } + i.fileSet.manifestSize = m.size return nil }(); err != nil { logger.Error("cannot write manifest", zap.Error(err)) @@ -1168,10 +1179,13 @@ func (i *Index) compactLogFile(logFile *LogFile) { i.fileSet = i.fileSet.MustReplace([]File{logFile}, file) // Write new manifest. - if err := i.writeManifestFile(); err != nil { + var err error + m := i.Manifest() + if err = m.Write(); err != nil { // TODO: Close index if write fails. return err } + i.fileSet.manifestSize = m.size return nil }(); err != nil { logger.Error("cannot update manifest", zap.Error(err)) @@ -1329,18 +1343,20 @@ func ParseFilename(name string) (level, id int) { // Manifest represents the list of log & index files that make up the index. // The files are listed in time order, not necessarily ID order. type Manifest struct { - Levels []CompactionLevel `json:"levels,omitempty"` - Files []string `json:"files,omitempty"` + Levels []CompactionLevel `json:"levels,omitempty"` + Files []string `json:"files,omitempty"` + Version int `json:"version,omitempty"` // Version should be updated whenever the TSI format has changed. - // Version should be updated whenever the TSI format has changed. - Version int `json:"version,omitempty"` + size int64 // Holds the on-disk size of the manifest. + path string // location on disk of the manifest. } // NewManifest returns a new instance of Manifest with default compaction levels. -func NewManifest() *Manifest { +func NewManifest(path string) *Manifest { m := &Manifest{ Levels: make([]CompactionLevel, len(DefaultCompactionLevels)), Version: Version, + path: path, } copy(m.Levels, DefaultCompactionLevels[:]) return m @@ -1368,6 +1384,19 @@ func (m *Manifest) Validate() error { } // ReadManifestFile reads a manifest from a file path. +// Write writes the manifest file to the provided path. +func (m *Manifest) Write() error { + buf, err := json.MarshalIndent(m, "", " ") + if err != nil { + return err + } + buf = append(buf, '\n') + m.size = int64(len(buf)) + return ioutil.WriteFile(m.path, buf, 0666) +} + +// ReadManifestFile reads a manifest from a file path and returns the manifest +// along with its size and any error. func ReadManifestFile(path string) (*Manifest, error) { buf, err := ioutil.ReadFile(path) if err != nil { @@ -1379,25 +1408,13 @@ func ReadManifestFile(path string) (*Manifest, error) { if err := json.Unmarshal(buf, &m); err != nil { return nil, err } + // Set the size of the manifest. + m.size = int64(len(buf)) + m.path = path return &m, nil } -// WriteManifestFile writes a manifest to a file path. -func WriteManifestFile(path string, m *Manifest) error { - buf, err := json.MarshalIndent(m, "", " ") - if err != nil { - return err - } - buf = append(buf, '\n') - - if err := ioutil.WriteFile(path, buf, 0666); err != nil { - return err - } - - return nil -} - func joinIntSlice(a []int, sep string) string { other := make([]string, len(a)) for i := range a { diff --git a/tsdb/index/tsi1/index_test.go b/tsdb/index/tsi1/index_test.go index e6460d66ed7..022ab20bf28 100644 --- a/tsdb/index/tsi1/index_test.go +++ b/tsdb/index/tsi1/index_test.go @@ -220,10 +220,10 @@ func TestIndex_Open(t *testing.T) { idx = NewIndex() // Manually create a MANIFEST file for an incompatible index version. mpath := filepath.Join(idx.Path, tsi1.ManifestFileName) - m := tsi1.NewManifest() + m := tsi1.NewManifest(mpath) m.Levels = nil m.Version = v // Set example MANIFEST version. - if err := tsi1.WriteManifestFile(mpath, m); err != nil { + if err := m.Write(); err != nil { t.Fatal(err) } @@ -254,6 +254,29 @@ func TestIndex_Manifest(t *testing.T) { }) } +func TestIndex_DiskSizeBytes(t *testing.T) { + idx := MustOpenIndex() + defer idx.Close() + + // Add series to index. + if err := idx.CreateSeriesSliceIfNotExists([]Series{ + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, + {Name: []byte("disk"), Tags: models.NewTags(map[string]string{"region": "north"})}, + {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "west", "country": "us"})}, + }); err != nil { + t.Fatal(err) + } + + // Verify on disk size is the same in each stage. + expSize := int64(520) // 419 bytes for MANIFEST and 101 bytes for index file + idx.Run(t, func(t *testing.T) { + if got, exp := idx.DiskSizeBytes(), expSize; got != exp { + t.Fatalf("got %d bytes, expected %d", got, exp) + } + }) +} + // Index is a test wrapper for tsi1.Index. type Index struct { *tsi1.Index diff --git a/tsdb/index/tsi1/log_file.go b/tsdb/index/tsi1/log_file.go index 5dae24b6c82..d8e32d7ba97 100644 --- a/tsdb/index/tsi1/log_file.go +++ b/tsdb/index/tsi1/log_file.go @@ -13,13 +13,12 @@ import ( "sync" "time" - "github.com/influxdata/influxdb/pkg/estimator/hll" - "github.com/influxdata/influxdb/tsdb" - "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/bloom" "github.com/influxdata/influxdb/pkg/estimator" + "github.com/influxdata/influxdb/pkg/estimator/hll" "github.com/influxdata/influxdb/pkg/mmap" + "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxql" )