Skip to content

Commit

Permalink
fix: extend snapshot copy to filesystems that cannot link (#22703)
Browse files Browse the repository at this point in the history
If os.Link fails with syscall.ENOTSUP, then the file
system does not support links, and we must make copies
to snapshot files for backup. We also automatically make
copies instead of link on Windows, because although it
makes links, their semantics are different from Linux.

closes #16739
  • Loading branch information
davidby-influx authored Oct 21, 2021
1 parent 0902650 commit d9b9e86
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 73 deletions.
15 changes: 8 additions & 7 deletions cmd/influxd/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/influxdata/influxdb/cmd/influxd/backup_util"
errors2 "github.com/influxdata/influxdb/pkg/errors"
"github.com/influxdata/influxdb/services/snapshotter"
"github.com/influxdata/influxdb/tcp"
gzip "github.com/klauspost/pgzip"
Expand Down Expand Up @@ -390,7 +391,7 @@ func (cmd *Command) backupResponsePaths(response *snapshotter.Response) error {

// backupMetastore will backup the whole metastore on the host to the backup path
// if useDB is non-empty, it will backup metadata only for the named database.
func (cmd *Command) backupMetastore() error {
func (cmd *Command) backupMetastore() (retErr error) {
metastoreArchivePath, err := cmd.nextPath(filepath.Join(cmd.path, backup_util.Metafile))
if err != nil {
return err
Expand All @@ -402,12 +403,12 @@ func (cmd *Command) backupMetastore() error {
Type: snapshotter.RequestMetastoreBackup,
}

err = cmd.downloadAndVerify(req, metastoreArchivePath, func(file string) error {
err = cmd.downloadAndVerify(req, metastoreArchivePath, func(file string) (rErr error) {
f, err := os.Open(file)
if err != nil {
return err
}
defer f.Close()
defer errors2.Capture(&rErr, f.Close)()

var magicByte [8]byte
n, err := io.ReadFull(f, magicByte[:])
Expand Down Expand Up @@ -438,7 +439,7 @@ func (cmd *Command) backupMetastore() error {

if cmd.portable {
metaBytes, err := backup_util.GetMetaBytes(metastoreArchivePath)
defer os.Remove(metastoreArchivePath)
defer errors2.Capture(&retErr, func() error { return os.Remove(metastoreArchivePath) })()
if err != nil {
return err
}
Expand Down Expand Up @@ -510,13 +511,13 @@ func (cmd *Command) downloadAndVerify(req *snapshotter.Request, path string, val
}

// download downloads a snapshot of either the metastore or a shard from a host to a given path.
func (cmd *Command) download(req *snapshotter.Request, path string) error {
func (cmd *Command) download(req *snapshotter.Request, path string) (retErr error) {
// Create local file to write to.
f, err := os.Create(path)
if err != nil {
return fmt.Errorf("open temp file: %s", err)
}
defer f.Close()
defer errors2.Capture(&retErr, f.Close)()

min := 2 * time.Second
for i := 0; i < 10; i++ {
Expand All @@ -526,7 +527,7 @@ func (cmd *Command) download(req *snapshotter.Request, path string) error {
if err != nil {
return err
}
defer conn.Close()
defer errors2.Capture(&retErr, conn.Close)()

_, err = conn.Write([]byte{byte(req.Type)})
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion cmd/influxd/backup_util/backup_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sync/atomic"

internal "github.com/influxdata/influxdb/cmd/influxd/backup_util/internal"
errors2 "github.com/influxdata/influxdb/pkg/errors"
"github.com/influxdata/influxdb/services/snapshotter"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -55,11 +56,12 @@ func (ep *PortablePacker) UnmarshalBinary(data []byte) error {
return nil
}

func GetMetaBytes(fname string) ([]byte, error) {
func GetMetaBytes(fname string) (_ []byte, retErr error) {
f, err := os.Open(fname)
if err != nil {
return []byte{}, err
}
defer errors2.Capture(&retErr, f.Close)()

var buf bytes.Buffer
if _, err := io.Copy(&buf, f); err != nil {
Expand Down
17 changes: 0 additions & 17 deletions tsdb/engine/tsm1/copy_or_link_unix.go

This file was deleted.

46 changes: 0 additions & 46 deletions tsdb/engine/tsm1/copy_or_link_windows.go

This file was deleted.

86 changes: 84 additions & 2 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"math"
"os"
Expand All @@ -15,6 +16,7 @@ import (
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/influxdata/influxdb/models"
Expand Down Expand Up @@ -193,6 +195,8 @@ type FileStore struct {
parseFileName ParseFileNameFunc

obs tsdb.FileStoreObserver

copyFiles bool
}

// FileStat holds information about a TSM file on disk.
Expand Down Expand Up @@ -244,6 +248,7 @@ func NewFileStore(dir string) *FileStore {
},
obs: noFileStoreObserver{},
parseFileName: DefaultParseFileName,
copyFiles: runtime.GOOS == "windows",
}
fs.purger.fileStore = fs
return fs
Expand Down Expand Up @@ -1072,19 +1077,96 @@ func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location {
func (f *FileStore) MakeSnapshotLinks(destPath string, files []TSMFile) (returnErr error) {
for _, tsmf := range files {
newpath := filepath.Join(destPath, filepath.Base(tsmf.Path()))
if err := copyOrLink(tsmf.Path(), newpath); err != nil {
err := f.copyOrLink(tsmf.Path(), newpath)
if err != nil {
return err
}
if tf := tsmf.TombstoneStats(); tf.TombstoneExists {
newpath := filepath.Join(destPath, filepath.Base(tf.Path))
if err := copyOrLink(tf.Path, newpath); err != nil {
err := f.copyOrLink(tf.Path, newpath)
if err != nil {
return err
}
}
}
return nil
}

func (f *FileStore) copyOrLink(oldpath string, newpath string) error {
if f.copyFiles {
f.logger.Info("copying backup snapshots", zap.String("OldPath", oldpath), zap.String("NewPath", newpath))
if err := f.copyNotLink(oldpath, newpath); err != nil {
return err
}
} else {
f.logger.Info("linking backup snapshots", zap.String("OldPath", oldpath), zap.String("NewPath", newpath))
if err := f.linkNotCopy(oldpath, newpath); err != nil {
return err
}
}
return nil
}

// copyNotLink - use file copies instead of hard links for 2 scenarios:
// Windows does not permit deleting a file with open file handles
// Azure does not support hard links in its default file system
func (f *FileStore) copyNotLink(oldPath, newPath string) (returnErr error) {
rfd, err := os.Open(oldPath)
if err != nil {
return fmt.Errorf("error opening file for backup %s: %q", oldPath, err)
} else {
defer func() {
if e := rfd.Close(); returnErr == nil && e != nil {
returnErr = fmt.Errorf("error closing source file for backup %s: %w", oldPath, e)
}
}()
}
fi, err := rfd.Stat()
if err != nil {
return fmt.Errorf("error collecting statistics from file for backup %s: %w", oldPath, err)
}
wfd, err := os.OpenFile(newPath, os.O_RDWR|os.O_CREATE, fi.Mode())
if err != nil {
return fmt.Errorf("error creating temporary file for backup %s: %w", newPath, err)
} else {
defer func() {
if e := wfd.Close(); returnErr == nil && e != nil {
returnErr = fmt.Errorf("error closing temporary file for backup %s: %w", newPath, e)
}
}()
}
if _, err := io.Copy(wfd, rfd); err != nil {
return fmt.Errorf("unable to copy file for backup from %s to %s: %w", oldPath, newPath, err)
}
if err := os.Chtimes(newPath, fi.ModTime(), fi.ModTime()); err != nil {
return fmt.Errorf("unable to set modification time on temporary backup file %s: %w", newPath, err)
}
return nil
}

// linkNotCopy - use hard links for backup snapshots
func (f *FileStore) linkNotCopy(oldPath, newPath string) error {
if err := os.Link(oldPath, newPath); err != nil {
if errors.Is(err, syscall.ENOTSUP) {
if fi, e := os.Stat(oldPath); e == nil && !fi.IsDir() {
f.logger.Info("file system does not support hard links, switching to copies for backup", zap.String("OldPath", oldPath), zap.String("NewPath", newPath))
// Force future snapshots to copy
f.copyFiles = true
return f.copyNotLink(oldPath, newPath)
} else if e != nil {
// Stat failed
return fmt.Errorf("error creating hard link for backup, cannot determine if %s is a file or directory: %w", oldPath, e)
} else {
return fmt.Errorf("error creating hard link for backup - %s is a directory, not a file: %q", oldPath, err)
}
} else {
return fmt.Errorf("error creating hard link for backup from %s to %s: %w", oldPath, newPath, err)
}
} else {
return nil
}
}

// CreateSnapshot creates hardlinks for all tsm and tombstone files
// in the path provided.
func (f *FileStore) CreateSnapshot() (string, error) {
Expand Down

0 comments on commit d9b9e86

Please sign in to comment.