diff --git a/cmd/influxd/backup/backup.go b/cmd/influxd/backup/backup.go index 9cb5569bfc3..ee46f63b27c 100644 --- a/cmd/influxd/backup/backup.go +++ b/cmd/influxd/backup/backup.go @@ -17,6 +17,7 @@ import ( "time" "github.com/influxdata/influxdb/cmd/influxd/backup_util" + errors2 "github.com/influxdata/influxdb/pkg/errors" "github.com/influxdata/influxdb/services/snapshotter" "github.com/influxdata/influxdb/tcp" gzip "github.com/klauspost/pgzip" @@ -390,7 +391,7 @@ func (cmd *Command) backupResponsePaths(response *snapshotter.Response) error { // backupMetastore will backup the whole metastore on the host to the backup path // if useDB is non-empty, it will backup metadata only for the named database. -func (cmd *Command) backupMetastore() error { +func (cmd *Command) backupMetastore() (retErr error) { metastoreArchivePath, err := cmd.nextPath(filepath.Join(cmd.path, backup_util.Metafile)) if err != nil { return err @@ -402,12 +403,12 @@ func (cmd *Command) backupMetastore() error { Type: snapshotter.RequestMetastoreBackup, } - err = cmd.downloadAndVerify(req, metastoreArchivePath, func(file string) error { + err = cmd.downloadAndVerify(req, metastoreArchivePath, func(file string) (rErr error) { f, err := os.Open(file) if err != nil { return err } - defer f.Close() + defer errors2.Capture(&rErr, f.Close)() var magicByte [8]byte n, err := io.ReadFull(f, magicByte[:]) @@ -438,7 +439,7 @@ func (cmd *Command) backupMetastore() error { if cmd.portable { metaBytes, err := backup_util.GetMetaBytes(metastoreArchivePath) - defer os.Remove(metastoreArchivePath) + defer errors2.Capture(&retErr, func() error { return os.Remove(metastoreArchivePath) })() if err != nil { return err } @@ -510,13 +511,13 @@ func (cmd *Command) downloadAndVerify(req *snapshotter.Request, path string, val } // download downloads a snapshot of either the metastore or a shard from a host to a given path. -func (cmd *Command) download(req *snapshotter.Request, path string) error { +func (cmd *Command) download(req *snapshotter.Request, path string) (retErr error) { // Create local file to write to. f, err := os.Create(path) if err != nil { return fmt.Errorf("open temp file: %s", err) } - defer f.Close() + defer errors2.Capture(&retErr, f.Close)() min := 2 * time.Second for i := 0; i < 10; i++ { @@ -526,7 +527,7 @@ func (cmd *Command) download(req *snapshotter.Request, path string) error { if err != nil { return err } - defer conn.Close() + defer errors2.Capture(&retErr, conn.Close)() _, err = conn.Write([]byte{byte(req.Type)}) if err != nil { diff --git a/cmd/influxd/backup_util/backup_util.go b/cmd/influxd/backup_util/backup_util.go index f9d77fb248f..e1c361508d3 100644 --- a/cmd/influxd/backup_util/backup_util.go +++ b/cmd/influxd/backup_util/backup_util.go @@ -14,6 +14,7 @@ import ( "sync/atomic" internal "github.com/influxdata/influxdb/cmd/influxd/backup_util/internal" + errors2 "github.com/influxdata/influxdb/pkg/errors" "github.com/influxdata/influxdb/services/snapshotter" "google.golang.org/protobuf/proto" ) @@ -55,11 +56,12 @@ func (ep *PortablePacker) UnmarshalBinary(data []byte) error { return nil } -func GetMetaBytes(fname string) ([]byte, error) { +func GetMetaBytes(fname string) (_ []byte, retErr error) { f, err := os.Open(fname) if err != nil { return []byte{}, err } + defer errors2.Capture(&retErr, f.Close)() var buf bytes.Buffer if _, err := io.Copy(&buf, f); err != nil { diff --git a/tsdb/engine/tsm1/copy_or_link_unix.go b/tsdb/engine/tsm1/copy_or_link_unix.go deleted file mode 100644 index 2ba7974ccdc..00000000000 --- a/tsdb/engine/tsm1/copy_or_link_unix.go +++ /dev/null @@ -1,17 +0,0 @@ -//go:build !windows -// +build !windows - -package tsm1 - -import ( - "fmt" - "os" -) - -// copyOrLink - allow substitution of a file copy for a hard link when running on Windows systems. -func copyOrLink(oldPath, newPath string) error { - if err := os.Link(oldPath, newPath); err != nil { - return fmt.Errorf("error creating hard link for backup from %s to %s: %q", oldPath, newPath, err) - } - return nil -} diff --git a/tsdb/engine/tsm1/copy_or_link_windows.go b/tsdb/engine/tsm1/copy_or_link_windows.go deleted file mode 100644 index fdccfbd5d73..00000000000 --- a/tsdb/engine/tsm1/copy_or_link_windows.go +++ /dev/null @@ -1,46 +0,0 @@ -//go:build windows -// +build windows - -package tsm1 - -import ( - "fmt" - "io" - "os" -) - -// copyOrLink - Windows does not permit deleting a file with open file handles, so -// instead of hard links, make temporary copies of files that can then be deleted. -func copyOrLink(oldPath, newPath string) (returnErr error) { - rfd, err := os.Open(oldPath) - if err != nil { - return fmt.Errorf("error opening file for backup %s: %q", oldPath, err) - } else { - defer func() { - if e := rfd.Close(); returnErr == nil && e != nil { - returnErr = fmt.Errorf("error closing source file for backup %s: %q", oldPath, e) - } - }() - } - fi, err := rfd.Stat() - if err != nil { - fmt.Errorf("error collecting statistics from file for backup %s: %q", oldPath, err) - } - wfd, err := os.OpenFile(newPath, os.O_RDWR|os.O_CREATE, fi.Mode()) - if err != nil { - return fmt.Errorf("error creating temporary file for backup %s: %q", newPath, err) - } else { - defer func() { - if e := wfd.Close(); returnErr == nil && e != nil { - returnErr = fmt.Errorf("error closing temporary file for backup %s: %q", newPath, e) - } - }() - } - if _, err := io.Copy(wfd, rfd); err != nil { - return fmt.Errorf("unable to copy file for backup from %s to %s: %q", oldPath, newPath, err) - } - if err := os.Chtimes(newPath, fi.ModTime(), fi.ModTime()); err != nil { - return fmt.Errorf("unable to set modification time on temporary backup file %s: %q", newPath, err) - } - return nil -} diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index d726151542e..efe4f8f9f11 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "io" "io/ioutil" "math" "os" @@ -15,6 +16,7 @@ import ( "strings" "sync" "sync/atomic" + "syscall" "time" "github.com/influxdata/influxdb/models" @@ -193,6 +195,8 @@ type FileStore struct { parseFileName ParseFileNameFunc obs tsdb.FileStoreObserver + + copyFiles bool } // FileStat holds information about a TSM file on disk. @@ -244,6 +248,7 @@ func NewFileStore(dir string) *FileStore { }, obs: noFileStoreObserver{}, parseFileName: DefaultParseFileName, + copyFiles: runtime.GOOS == "windows", } fs.purger.fileStore = fs return fs @@ -1072,12 +1077,14 @@ func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location { func (f *FileStore) MakeSnapshotLinks(destPath string, files []TSMFile) (returnErr error) { for _, tsmf := range files { newpath := filepath.Join(destPath, filepath.Base(tsmf.Path())) - if err := copyOrLink(tsmf.Path(), newpath); err != nil { + err := f.copyOrLink(tsmf.Path(), newpath) + if err != nil { return err } if tf := tsmf.TombstoneStats(); tf.TombstoneExists { newpath := filepath.Join(destPath, filepath.Base(tf.Path)) - if err := copyOrLink(tf.Path, newpath); err != nil { + err := f.copyOrLink(tf.Path, newpath) + if err != nil { return err } } @@ -1085,6 +1092,81 @@ func (f *FileStore) MakeSnapshotLinks(destPath string, files []TSMFile) (returnE return nil } +func (f *FileStore) copyOrLink(oldpath string, newpath string) error { + if f.copyFiles { + f.logger.Info("copying backup snapshots", zap.String("OldPath", oldpath), zap.String("NewPath", newpath)) + if err := f.copyNotLink(oldpath, newpath); err != nil { + return err + } + } else { + f.logger.Info("linking backup snapshots", zap.String("OldPath", oldpath), zap.String("NewPath", newpath)) + if err := f.linkNotCopy(oldpath, newpath); err != nil { + return err + } + } + return nil +} + +// copyNotLink - use file copies instead of hard links for 2 scenarios: +// Windows does not permit deleting a file with open file handles +// Azure does not support hard links in its default file system +func (f *FileStore) copyNotLink(oldPath, newPath string) (returnErr error) { + rfd, err := os.Open(oldPath) + if err != nil { + return fmt.Errorf("error opening file for backup %s: %q", oldPath, err) + } else { + defer func() { + if e := rfd.Close(); returnErr == nil && e != nil { + returnErr = fmt.Errorf("error closing source file for backup %s: %w", oldPath, e) + } + }() + } + fi, err := rfd.Stat() + if err != nil { + return fmt.Errorf("error collecting statistics from file for backup %s: %w", oldPath, err) + } + wfd, err := os.OpenFile(newPath, os.O_RDWR|os.O_CREATE, fi.Mode()) + if err != nil { + return fmt.Errorf("error creating temporary file for backup %s: %w", newPath, err) + } else { + defer func() { + if e := wfd.Close(); returnErr == nil && e != nil { + returnErr = fmt.Errorf("error closing temporary file for backup %s: %w", newPath, e) + } + }() + } + if _, err := io.Copy(wfd, rfd); err != nil { + return fmt.Errorf("unable to copy file for backup from %s to %s: %w", oldPath, newPath, err) + } + if err := os.Chtimes(newPath, fi.ModTime(), fi.ModTime()); err != nil { + return fmt.Errorf("unable to set modification time on temporary backup file %s: %w", newPath, err) + } + return nil +} + +// linkNotCopy - use hard links for backup snapshots +func (f *FileStore) linkNotCopy(oldPath, newPath string) error { + if err := os.Link(oldPath, newPath); err != nil { + if errors.Is(err, syscall.ENOTSUP) { + if fi, e := os.Stat(oldPath); e == nil && !fi.IsDir() { + f.logger.Info("file system does not support hard links, switching to copies for backup", zap.String("OldPath", oldPath), zap.String("NewPath", newPath)) + // Force future snapshots to copy + f.copyFiles = true + return f.copyNotLink(oldPath, newPath) + } else if e != nil { + // Stat failed + return fmt.Errorf("error creating hard link for backup, cannot determine if %s is a file or directory: %w", oldPath, e) + } else { + return fmt.Errorf("error creating hard link for backup - %s is a directory, not a file: %q", oldPath, err) + } + } else { + return fmt.Errorf("error creating hard link for backup from %s to %s: %w", oldPath, newPath, err) + } + } else { + return nil + } +} + // CreateSnapshot creates hardlinks for all tsm and tombstone files // in the path provided. func (f *FileStore) CreateSnapshot() (string, error) {