Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create shard snapshot #6593

Merged
merged 1 commit into from
May 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ With this release InfluxDB is moving to Go v1.6.
- [#6534](https://github.com/influxdata/influxdb/pull/6534): Move to Go v1.6.2 (over Go v1.4.3)
- [#6522](https://github.com/influxdata/influxdb/pull/6522): Dump TSM files to line protocol
- [#6585](https://github.com/influxdata/influxdb/pull/6585): Parallelize iterators
- [#6502](https://github.com/influxdata/influxdb/pull/6502): Add ability to copy shard via rpc calls. Remove deprecated copier service..
- [#6502](https://github.com/influxdata/influxdb/pull/6502): Add ability to copy shard via rpc calls. Remove deprecated copier service.
- [#6593](https://github.com/influxdata/influxdb/pull/6593): Add ability to create snapshots of shards.

### Bugfixes

Expand Down
1 change: 1 addition & 0 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Engine interface {
DeleteMeasurement(name string, seriesKeys []string) error
SeriesCount() (n int, err error)
MeasurementFields(measurement string) *MeasurementFields
CreateSnapshot() (string, error)

// Format will return the format for the engine
Format() EngineFormat
Expand Down
28 changes: 28 additions & 0 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"archive/tar"
"fmt"
"io"
"io/ioutil"
"log"
"math"
"os"
Expand Down Expand Up @@ -578,6 +579,19 @@ func (e *Engine) WriteSnapshot() error {
return e.writeSnapshotAndCommit(closedFiles, snapshot, compactor)
}

// CreateSnapshot will create a temp directory that holds
// temporary hardlinks to the underylyng shard files
func (e *Engine) CreateSnapshot() (string, error) {
if err := e.WriteSnapshot(); err != nil {
return "", nil
}

e.mu.RLock()
defer e.mu.RUnlock()

return e.FileStore.CreateSnapshot()
}

// writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments
func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache, compactor *Compactor) (err error) {

Expand Down Expand Up @@ -797,6 +811,20 @@ func (e *Engine) cleanup() error {
return fmt.Errorf("error removing temp compaction files: %v", err)
}
}

allfiles, err := ioutil.ReadDir(e.path)
if err != nil {
return err
}
for _, f := range allfiles {
// Check to see if there are any `.tmp` directories that were left over from failed shard snapshots
if f.IsDir() && strings.HasSuffix(f.Name(), ".tmp") {
if err := os.Remove(f.Name()); err != nil {
return fmt.Errorf("error removing tmp snapshot directory %q: %s", f.Name(), err)
}
}
}

return nil
}

Expand Down
60 changes: 59 additions & 1 deletion tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"expvar"
"fmt"
"io"
"io/ioutil"
"log"
"math"
"os"
Expand All @@ -20,7 +21,7 @@ import (

type TSMFile interface {
// Path returns the underlying file path for the TSMFile. If the file
// has not be written or loaded from disk, the zero value is returne.
// has not be written or loaded from disk, the zero value is returned.
Path() string

// Read returns all the values in the block where time t resides
Expand Down Expand Up @@ -113,6 +114,8 @@ type FileStore struct {
traceLogging bool

statMap *expvar.Map

currentTempDirID int
}

type FileStat struct {
Expand Down Expand Up @@ -294,6 +297,24 @@ func (f *FileStore) Open() error {
return nil
}

// find the current max ID for temp directories
tmpfiles, err := ioutil.ReadDir(f.dir)
if err != nil {
return err
}
for _, fi := range tmpfiles {
if fi.IsDir() && strings.HasSuffix(fi.Name(), ".tmp") {
ss := strings.Split(filepath.Base(fi.Name()), ".")
if len(ss) == 2 {
if i, err := strconv.Atoi(ss[0]); err != nil {
if i > f.currentTempDirID {
f.currentTempDirID = i
}
}
}
}
}

files, err := filepath.Glob(filepath.Join(f.dir, fmt.Sprintf("*.%s", TSMFileExtension)))
if err != nil {
return err
Expand Down Expand Up @@ -589,6 +610,43 @@ func (f *FileStore) locations(key string, t int64, ascending bool) []*location {
return locations
}

// CreateSnapshot will create hardlinks for all tsm and tombstone files
// in the path provided
func (f *FileStore) CreateSnapshot() (string, error) {
files := f.Files()

f.mu.Lock()
f.currentTempDirID += 1
f.mu.Unlock()

f.mu.RLock()
defer f.mu.RUnlock()

// get a tmp directory name
tmpPath := fmt.Sprintf("%s/%d.tmp", f.dir, f.currentTempDirID)
err := os.Mkdir(tmpPath, 0777)
if err != nil {
return "", nil
}

for _, tsmf := range files {
newpath := filepath.Join(tmpPath, filepath.Base(tsmf.Path()))
if err := os.Link(tsmf.Path(), newpath); err != nil {
return "", fmt.Errorf("error creating tsm hard link: %q", err)
}
// Check for tombstones and link those as well
for _, tf := range tsmf.TombstoneFiles() {
tfpath := filepath.Join(f.dir, tf.Path)
newpath := filepath.Join(tmpPath, filepath.Base(tf.Path))
if err := os.Link(tfpath, newpath); err != nil {
return "", fmt.Errorf("error creating tombstone hard link: %q", err)
}
}
}

return tmpPath, nil
}

// ParseTSMFileName parses the generation and sequence from a TSM file name.
func ParseTSMFileName(name string) (int, int, error) {
base := filepath.Base(name)
Expand Down
53 changes: 53 additions & 0 deletions tsdb/engine/tsm1/file_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1538,7 +1538,60 @@ func TestFileStore_Stats(t *testing.T) {
if got, exp := len(stats), 3; got != exp {
t.Fatalf("file count mismatch: got %v, exp %v", got, exp)
}
}

func TestFileStore_CreateSnapshot(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)

// Setup 3 files
data := []keyValues{
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 1.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(1, 2.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, 3.0)}},
}

files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}

fs.Add(files...)

// Create a tombstone
if err := fs.DeleteRange([]string{"cpu"}, 1, 1); err != nil {
t.Fatalf("unexpected error delete range: %v", err)
}

s, e := fs.CreateSnapshot()
if e != nil {
t.Fatal(e)
}
t.Logf("temp file for hard links: %q", s)

tfs, e := ioutil.ReadDir(s)
if e != nil {
t.Fatal(e)
}
if len(tfs) == 0 {
t.Fatal("no files found")
}

for _, f := range fs.Files() {
p := filepath.Join(s, filepath.Base(f.Path()))
t.Logf("checking for existence of hard link %q", p)
if _, err := os.Stat(p); os.IsNotExist(err) {
t.Fatalf("unable to find file %q", p)
}
for _, tf := range f.TombstoneFiles() {
p := filepath.Join(s, filepath.Base(tf.Path))
t.Logf("checking for existence of hard link %q", p)
if _, err := os.Stat(p); os.IsNotExist(err) {
t.Fatalf("unable to find file %q", p)
}
}
}
}

func newFileDir(dir string, values ...keyValues) ([]string, error) {
Expand Down
8 changes: 8 additions & 0 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,14 @@ func (s *Shard) Restore(r io.Reader, basePath string) error {
return s.Open()
}

// CreateSnapshot will return a path to a temp directory
// containing hard links to the underlying shard files
func (s *Shard) CreateSnapshot() (string, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.engine.CreateSnapshot()
}

// Shards represents a sortable list of shards.
type Shards []*Shard

Expand Down
11 changes: 11 additions & 0 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,17 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er
return nil
}

// CreateShardSnapShot will create a hard link to the underlying shard and return a path
// The caller is responsible for cleaning up (removing) the file path returned
func (s *Store) CreateShardSnapshot(id uint64) (string, error) {
sh := s.Shard(id)
if sh == nil {
return "", ErrShardNotFound
}

return sh.CreateSnapshot()
}

// DeleteShard removes a shard from disk.
func (s *Store) DeleteShard(shardID uint64) error {
s.mu.Lock()
Expand Down
23 changes: 23 additions & 0 deletions tsdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,29 @@ func TestStore_DeleteShard(t *testing.T) {
}
}

// Ensure the store can create a snapshot to a shard.
func TestStore_CreateShardSnapShot(t *testing.T) {
s := MustOpenStore()
defer s.Close()

// Create a new shard and verify that it exists.
if err := s.CreateShard("db0", "rp0", 1); err != nil {
t.Fatal(err)
} else if sh := s.Shard(1); sh == nil {
t.Fatalf("expected shard")
} else if di := s.DatabaseIndex("db0"); di == nil {
t.Errorf("expected database index")
}

dir, e := s.CreateShardSnapshot(1)
if e != nil {
t.Fatal(e)
}
if dir == "" {
t.Fatal("empty directory name")
}
}

// Ensure the store reports an error when it can't open a database directory.
func TestStore_Open_InvalidDatabaseFile(t *testing.T) {
s := NewStore()
Expand Down