Skip to content

Commit

Permalink
Add support for TSI shard streaming and shard size
Browse files Browse the repository at this point in the history
This commit firstly ensures that a shard's size on disk is accurately
reported when using the tsi1 index, by including the on-disk size of the
tsi1 index in the calculation.

Secondly, this commit add support for shard streaming/copying when using
the tsi1 index. Prior to this, a tsi1 index would not be correctly
restored when streaming shards.
  • Loading branch information
e-dard committed Jun 14, 2017
1 parent 0b4528b commit f20ba10
Show file tree
Hide file tree
Showing 12 changed files with 240 additions and 62 deletions.
12 changes: 8 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## v1.4.0 [unreleased]

### Features

- [#8491](https://github.com/influxdata/influxdb/pull/8491): Add further tsi support for streaming/copying shards.

### Bugfixes

- [#8480](https://github.com/influxdata/influxdb/pull/8480): Change the default stats interval to 1 second instead of 10 seconds.
Expand Down Expand Up @@ -33,15 +37,15 @@ The admin UI is removed and unusable in this release. The `[admin]` configuratio

* The top-level config `bind-address` now defaults to `localhost:8088`.
The previous default was just `:8088`, causing the backup and restore port to be bound on all available interfaces (i.e. including interfaces on the public internet).

The following new configuration options are available.

#### `[http]` Section

* `max-body-size` was added with a default of 25,000,000, but can be disabled by setting it to 0.
* `max-body-size` was added with a default of 25,000,000, but can be disabled by setting it to 0.
Specifies the maximum size (in bytes) of a client request body. When a client sends data that exceeds
the configured maximum size, a `413 Request Entity Too Large` HTTP response is returned.
the configured maximum size, a `413 Request Entity Too Large` HTTP response is returned.

#### `[continuous_queries]` Section

* `query-stats-enabled` was added with a default of `false`. When set to `true`, continuous query execution statistics are written to the default monitor store.
Expand Down
5 changes: 0 additions & 5 deletions tsdb/engine/tsm1/MANIFEST

This file was deleted.

2 changes: 1 addition & 1 deletion tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([
for {
sequence++
// New TSM files are written to a temp file and renamed when fully completed.
fileName := filepath.Join(c.Dir, fmt.Sprintf("%09d-%09d.%s.tmp", generation, sequence, TSMFileExtension))
fileName := filepath.Join(c.Dir, fmt.Sprintf("%09d-%09d.%s.%s", generation, sequence, TSMFileExtension, TmpTSMFileExtension))

// Write as much as possible to this file
err := c.write(fileName, iter)
Expand Down
43 changes: 29 additions & 14 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ import (
"sync/atomic"
"time"

"github.com/influxdata/influxdb/tsdb/index/inmem"

"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/bytesutil"
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/influxdata/influxdb/tsdb"
_ "github.com/influxdata/influxdb/tsdb/index"
"github.com/influxdata/influxdb/tsdb/index/inmem"
"github.com/influxdata/influxdb/tsdb/index/tsi1"
"github.com/uber-go/zap"
)

Expand Down Expand Up @@ -428,7 +428,7 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic {

// DiskSize returns the total size in bytes of all TSM and WAL segments on disk.
func (e *Engine) DiskSize() int64 {
return e.FileStore.DiskSizeBytes() + e.WAL.DiskSizeBytes()
return e.FileStore.DiskSizeBytes() + e.WAL.DiskSizeBytes() + e.index.DiskSizeBytes()
}

// Open opens and initializes the engine.
Expand Down Expand Up @@ -557,10 +557,6 @@ func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error {
return err
}

if err := e.index.SnapshotTo(path); err != nil {
return err
}

tw := tar.NewWriter(w)
defer tw.Close()

Expand Down Expand Up @@ -666,6 +662,8 @@ func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error {
return nil, err
}

// The filestore will only handle tsm files. Other file types will be
// ignored.
if err := e.FileStore.Replace(nil, newFiles); err != nil {
return nil, err
}
Expand All @@ -676,15 +674,27 @@ func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error {
return err
}

if idx, ok := e.index.(*tsi1.Index); ok {
// Initialise new index.
e.mu.Lock()
if e.index, err = idx.ReplaceIndex(newFiles); err != nil {
e.mu.Unlock()
return err
}
e.mu.Unlock()
return nil
}

// Load any new series keys to the index
readers := make([]chan seriesKey, 0, len(newFiles))
ext := fmt.Sprintf(".%s", TmpTSMFileExtension)
for _, f := range newFiles {
ch := make(chan seriesKey, 1)
readers = append(readers, ch)

// If asNew is true, the files created from readFileFromBackup will be new ones
// having a temp extension.
f = strings.TrimSuffix(f, ".tmp")
f = strings.TrimSuffix(f, ext)

fd, err := os.Open(f)
if err != nil {
Expand Down Expand Up @@ -749,9 +759,7 @@ func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string, as
filename = fmt.Sprintf("%09d-%09d.%s", e.FileStore.NextGeneration(), 1, TSMFileExtension)
}

destPath := filepath.Join(e.path, filename)
tmp := destPath + ".tmp"

tmp := fmt.Sprintf("%s.%s", filepath.Join(e.path, filename), TmpTSMFileExtension)
// Create new file on disk.
f, err := os.OpenFile(tmp, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
Expand Down Expand Up @@ -1093,9 +1101,15 @@ func (e *Engine) CreateSnapshot() (string, error) {
}

e.mu.RLock()
defer e.mu.RUnlock()
path, err := e.FileStore.CreateSnapshot()
if err != nil {
e.mu.RUnlock()
return "", err
}
e.mu.RUnlock()

return e.FileStore.CreateSnapshot()
// Generate a snapshot of the index.
return path, e.index.SnapshotTo(path)
}

// writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments.
Expand Down Expand Up @@ -1426,9 +1440,10 @@ func (e *Engine) cleanup() error {
return err
}

ext := fmt.Sprintf(".%s", TmpTSMFileExtension)
for _, f := range allfiles {
// Check to see if there are any `.tmp` directories that were left over from failed shard snapshots
if f.IsDir() && strings.HasSuffix(f.Name(), ".tmp") {
if f.IsDir() && strings.HasSuffix(f.Name(), ext) {
if err := os.RemoveAll(filepath.Join(e.path, f.Name())); err != nil {
return fmt.Errorf("error removing tmp snapshot directory %q: %s", f.Name(), err)
}
Expand Down
18 changes: 14 additions & 4 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ import (
"github.com/uber-go/zap"
)

const (
// The extension used to describe temporary snapshot files.
TmpTSMFileExtension = "tmp"
)

// TSMFile represents an on-disk TSM file.
type TSMFile interface {
// Path returns the underlying file path for the TSMFile. If the file
Expand Down Expand Up @@ -345,8 +350,9 @@ func (f *FileStore) Open() error {
if err != nil {
return err
}
ext := fmt.Sprintf(".%s", TmpTSMFileExtension)
for _, fi := range tmpfiles {
if fi.IsDir() && strings.HasSuffix(fi.Name(), ".tmp") {
if fi.IsDir() && strings.HasSuffix(fi.Name(), ext) {
ss := strings.Split(filepath.Base(fi.Name()), ".")
if len(ss) == 2 {
if i, err := strconv.Atoi(ss[0]); err != nil {
Expand Down Expand Up @@ -515,16 +521,20 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
f.mu.RUnlock()

updated := make([]TSMFile, 0, len(newFiles))
tsmTmpExt := fmt.Sprintf("%s.%s", TSMFileExtension, TmpTSMFileExtension)

// Rename all the new files to make them live on restart
for _, file := range newFiles {
var newName = file
if strings.HasSuffix(file, ".tmp") {
if strings.HasSuffix(file, tsmTmpExt) {
// The new TSM files have a tmp extension. First rename them.
newName = file[:len(file)-4]
if err := os.Rename(file, newName); err != nil {
return err
}
} else if !strings.HasSuffix(file, TSMFileExtension) {
// This isn't a .tsm or .tsm.tmp file.
continue
}

fd, err := os.Open(newName)
Expand Down Expand Up @@ -577,7 +587,7 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
deletes = append(deletes, file.Path())

// Rename the TSM file used by this reader
tempPath := file.Path() + ".tmp"
tempPath := fmt.Sprintf("%s.%s", file.Path(), TmpTSMFileExtension)
if err := file.Rename(tempPath); err != nil {
return err
}
Expand Down Expand Up @@ -801,7 +811,7 @@ func (f *FileStore) CreateSnapshot() (string, error) {
defer f.mu.RUnlock()

// get a tmp directory name
tmpPath := fmt.Sprintf("%s/%d.tmp", f.dir, f.currentTempDirID)
tmpPath := fmt.Sprintf("%s/%d.%s", f.dir, f.currentTempDirID, TmpTSMFileExtension)
err := os.Mkdir(tmpPath, 0777)
if err != nil {
return "", err
Expand Down
8 changes: 4 additions & 4 deletions tsdb/engine/tsm1/file_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2061,7 +2061,7 @@ func TestFileStore_Replace(t *testing.T) {
}

// Replace requires assumes new files have a .tmp extension
replacement := files[2] + ".tmp"
replacement := fmt.Sprintf("%s.%s", files[2], tsm1.TmpTSMFileExtension)
os.Rename(files[2], replacement)

fs := tsm1.NewFileStore(dir)
Expand Down Expand Up @@ -2253,9 +2253,9 @@ func TestFileStore_Stats(t *testing.T) {
"mem": []tsm1.Value{tsm1.NewValue(0, 1.0)},
})

replacement := files[2] + "-foo" + ".tmp" // Assumes new files have a .tmp extension
replacement := fmt.Sprintf("%s.%s.%s", files[2], tsm1.TmpTSMFileExtension, tsm1.TSMFileExtension) // Assumes new files have a .tmp extension
if err := os.Rename(newFile, replacement); err != nil {

t.Fatalf("rename: %v", err)
}
// Replace 3 w/ 1
if err := fs.Replace(files, []string{replacement}); err != nil {
Expand All @@ -2265,7 +2265,7 @@ func TestFileStore_Stats(t *testing.T) {
var found bool
stats = fs.Stats()
for _, stat := range stats {
if strings.HasSuffix(stat.Path, "-foo") {
if strings.HasSuffix(stat.Path, fmt.Sprintf("%s.%s.%s", tsm1.TSMFileExtension, tsm1.TmpTSMFileExtension, tsm1.TSMFileExtension)) {
found = true
}
}
Expand Down
3 changes: 3 additions & 0 deletions tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type Index interface {
// Creates hard links inside path for snapshotting.
SnapshotTo(path string) error

// Size of the index on disk, if applicable.
DiskSizeBytes() int64

// To be removed w/ tsi1.
SetFieldName(measurement []byte, name string)
AssignShard(k string, shardID uint64)
Expand Down
3 changes: 3 additions & 0 deletions tsdb/index/inmem/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,9 @@ func (i *Index) SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iter
// SnapshotTo is a no-op since this is an in-memory index.
func (i *Index) SnapshotTo(path string) error { return nil }

// DiskSizeBytes always returns zero bytes, since this is an in-memory index.
func (i *Index) DiskSizeBytes() int64 { return 0 }

// AssignShard update the index to indicate that series k exists in the given shardID.
func (i *Index) AssignShard(k string, shardID uint64) {
ss, _ := i.Series([]byte(k))
Expand Down
19 changes: 16 additions & 3 deletions tsdb/index/tsi1/file_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import (

// FileSet represents a collection of files.
type FileSet struct {
levels []CompactionLevel
files []File
filters []*bloom.Filter // per-level filters
levels []CompactionLevel
files []File
filters []*bloom.Filter // per-level filters
manifestSize int64 // Size of the manifest file in bytes.
}

// NewFileSet returns a new instance of FileSet.
Expand Down Expand Up @@ -56,6 +57,15 @@ func (fs *FileSet) Release() {
}
}

// Size returns the on-disk size of the FileSet.
func (fs *FileSet) Size() int64 {
var total int64
for _, f := range fs.files {
total += f.Size()
}
return total + int64(fs.manifestSize)
}

// Prepend returns a new file set with f added at the beginning.
func (fs *FileSet) Prepend(f File) (*FileSet, error) {
return NewFileSet(fs.levels, append([]File{f}, fs.files...))
Expand Down Expand Up @@ -942,6 +952,9 @@ type File interface {
// Reference counting.
Retain()
Release()

// Size of file on disk
Size() int64
}

type Files []File
Expand Down
Loading

0 comments on commit f20ba10

Please sign in to comment.