Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming inmem2tsi conversion. #8963

Merged
merged 3 commits into from
Oct 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
- [#8886](https://github.com/influxdata/influxdb/pull/8886): Improved compaction scheduling
- [#8690](https://github.com/influxdata/influxdb/issues/8690): Implicitly decide on a lower limit for fill queries when none is present.
- [#8947](https://github.com/influxdata/influxdb/pull/8947): Add `EXPLAIN ANALYZE` command, which produces a detailed execution plan of a `SELECT` statement.
- [#8963](https://github.com/influxdata/influxdb/pull/8963): Streaming inmem2tsi conversion.

### Bugfixes

Expand Down
178 changes: 117 additions & 61 deletions cmd/influx_inspect/inmem2tsi/inmem2tsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"flag"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"

"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/index/inmem"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
"github.com/influxdata/influxdb/tsdb/index/tsi1"
"github.com/uber-go/zap"
)
Expand All @@ -20,7 +21,9 @@ import (
type Command struct {
Stderr io.Writer
Stdout io.Writer
Logger zap.Logger

Verbose bool
Logger zap.Logger
}

// NewCommand returns a new instance of Command.
Expand All @@ -35,59 +38,49 @@ func NewCommand() *Command {
// Run executes the command.
func (cmd *Command) Run(args ...string) error {
fs := flag.NewFlagSet("inmem2tsi", flag.ExitOnError)
path := fs.String("path", "", "data path")
walPath := fs.String("wal-path", "", "WAL path")
verbose := fs.Bool("v", false, "verbose")
dataDir := fs.String("datadir", "", "shard data directory")
walDir := fs.String("waldir", "", "shard WAL directory")
fs.BoolVar(&cmd.Verbose, "v", false, "verbose")
fs.SetOutput(cmd.Stdout)
fs.Usage = cmd.printUsage
if err := fs.Parse(args); err != nil {
return err
} else if fs.NArg() > 0 || *path == "" || *walPath == "" {
cmd.printUsage()
} else if fs.NArg() > 0 || *dataDir == "" || *walDir == "" {
return flag.ErrHelp
}

if *verbose {
cmd.Logger = zap.New(
zap.NewTextEncoder(),
zap.Output(os.Stderr),
)
}
cmd.Logger = zap.New(
zap.NewTextEncoder(),
zap.Output(os.Stderr),
)

return cmd.run(*path, *walPath, *verbose)
return cmd.run(*dataDir, *walDir)
}

func (cmd *Command) run(path, walPath string, verbose bool) error {
func (cmd *Command) run(dataDir, walDir string) error {
// Check if shard already has a TSI index.
indexPath := filepath.Join(path, "index")
indexPath := filepath.Join(dataDir, "index")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This confused me because in your flags section you have "data directory" and "wal directory", which I interpreted as ~/.influxdb/data and ~/.influxdb/wal. I guess by data and wal directories you mean the data and wal database directories?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added clarification in 22a0e42.

cmd.Logger.Info("checking index path", zap.String("path", indexPath))
if _, err := os.Stat(indexPath); !os.IsNotExist(err) {
return errors.New("tsi1 index already exists")
}

cmd.Logger.Info("opening shard", zap.String("path", path), zap.String("wal-path", walPath))
cmd.Logger.Info("opening shard",
zap.String("datadir", dataDir),
zap.String("waldir", walDir),
)

// Open shard at path.
sh := tsdb.NewShard(0, path, walPath, tsdb.EngineOptions{
EngineVersion: tsdb.DefaultEngine,
IndexVersion: inmem.IndexName,
InmemIndex: inmem.NewIndex(""),
})
if err := sh.Open(); err != nil {
// Find shard files.
tsmPaths, err := cmd.collectTSMFiles(dataDir)
if err != nil {
return err
}
defer sh.CloseFast()

cmd.Logger.Info("reading in-memory index")

// Retrieve in-memory index reference.
inmemIndex, ok := sh.Index().(*inmem.ShardIndex)
if !ok {
return fmt.Errorf("invalid source index type: %T", sh.Index())
walPaths, err := cmd.collectWALFiles(walDir)
if err != nil {
return err
}

// Remove temporary index files if this is being re-run.
tmpPath := filepath.Join(path, ".index")
tmpPath := filepath.Join(dataDir, ".index")
cmd.Logger.Info("cleaning up partial index from previous run, if any")
if err := os.RemoveAll(tmpPath); err != nil {
return err
Expand All @@ -103,61 +96,124 @@ func (cmd *Command) run(path, walPath string, verbose bool) error {
}
defer tsiIndex.Close()

cmd.Logger.Info("iterating over measurements")

// Iterate over each series & insert into new index.
if err := inmemIndex.ForEachMeasurementName(func(name []byte) error {
cmd.Logger.Info("processing measurement", zap.String("name", string(name)))

mm, err := inmemIndex.Measurement(name)
if err != nil {
// Write out tsm1 files.
cmd.Logger.Info("iterating over tsm files")
for _, path := range tsmPaths {
cmd.Logger.Info("processing tsm file", zap.String("path", path))
if err := cmd.processTSMFile(tsiIndex, path); err != nil {
return err
} else if mm == nil {
return nil
}
}

// Write out wal files.
cmd.Logger.Info("building cache from wal files")
cache := tsm1.NewCache(tsdb.DefaultCacheMaxMemorySize, "")
loader := tsm1.NewCacheLoader(walPaths)
loader.WithLogger(cmd.Logger)
if err := loader.Load(cache); err != nil {
return err
}

cmd.Logger.Info("iterating over cache")
for _, key := range cache.Keys() {
seriesKey, _ := tsm1.SeriesAndFieldFromCompositeKey(key)
name, tags := models.ParseKey(seriesKey)

if err := mm.ForEachSeriesByExpr(nil, func(tags models.Tags) error {
if cmd.Verbose {
cmd.Logger.Info("series", zap.String("name", string(name)), zap.String("tags", tags.String()))
if err := tsiIndex.CreateSeriesIfNotExists(nil, name, tags); err != nil {
return fmt.Errorf("cannot create series: %s %s (%s)", name, tags.String(), err)
}
return nil
}); err != nil {
return err
}
return nil

}); err != nil {
return err
if err := tsiIndex.CreateSeriesIfNotExists(nil, []byte(name), tags); err != nil {
return fmt.Errorf("cannot create series: %s %s (%s)", name, tags.String(), err)
}
}

cmd.Logger.Info("compacting index")

// Attempt to compact the index & wait for all compactions to complete.
cmd.Logger.Info("compacting index")
tsiIndex.Compact()
tsiIndex.Wait()

cmd.Logger.Info("closing tsi index")

// Close TSI index.
cmd.Logger.Info("closing tsi index")
if err := tsiIndex.Close(); err != nil {
return err
}

cmd.Logger.Info("moving tsi to permanent location")

// Rename TSI to standard path.
cmd.Logger.Info("moving tsi to permanent location")
if err := os.Rename(tmpPath, indexPath); err != nil {
return err
}

return nil
}

func (cmd *Command) processTSMFile(index *tsi1.Index, path string) error {
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()

r, err := tsm1.NewTSMReader(f)
if err != nil {
cmd.Logger.Warn("unable to read, skipping", zap.String("path", path), zap.Error(err))
return nil
}
defer r.Close()

for i := 0; i < r.KeyCount(); i++ {
key, _ := r.KeyAt(i)
seriesKey, _ := tsm1.SeriesAndFieldFromCompositeKey(key)
name, tags := models.ParseKey(seriesKey)

if cmd.Verbose {
cmd.Logger.Info("series", zap.String("name", string(name)), zap.String("tags", tags.String()))
}

if err := index.CreateSeriesIfNotExists(nil, []byte(name), tags); err != nil {
return fmt.Errorf("cannot create series: %s %s (%s)", name, tags.String(), err)
}
}
return nil
}

func (cmd *Command) collectTSMFiles(path string) ([]string, error) {
fis, err := ioutil.ReadDir(path)
if err != nil {
return nil, err
}

var paths []string
for _, fi := range fis {
if filepath.Ext(fi.Name()) != "."+tsm1.TSMFileExtension {
continue
}
paths = append(paths, filepath.Join(path, fi.Name()))
}
return paths, nil
}

func (cmd *Command) collectWALFiles(path string) ([]string, error) {
fis, err := ioutil.ReadDir(path)
if err != nil {
return nil, err
}

var paths []string
for _, fi := range fis {
if filepath.Ext(fi.Name()) != "."+tsm1.WALFileExtension {
continue
}
paths = append(paths, filepath.Join(path, fi.Name()))
}
return paths, nil
}

func (cmd *Command) printUsage() {
usage := `Converts a shard from an in-memory index to a TSI index.

Usage: influx_inspect inmem2tsi -path DATA_PATH -wal-path WAL_PATH
Usage: influx_inspect inmem2tsi [-v] -datadir DATADIR -waldir WALDIR
`

fmt.Fprintf(cmd.Stdout, usage)
Expand Down