Skip to content

Commit

Permalink
br: add more precise check for lock file (#30218)
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer authored Dec 2, 2021
1 parent 7746b6b commit 124f1fc
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 7 deletions.
32 changes: 25 additions & 7 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,9 @@ func (bc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBacke
"there may be some backup files in the path already, "+
"please specify a correct backup directory!", bc.storage.URI()+"/"+metautil.MetaFile)
}
exist, err = bc.storage.FileExists(ctx, metautil.LockFile)
err = CheckBackupStorageIsLocked(ctx, bc.storage)
if err != nil {
return errors.Annotatef(err, "error occurred when checking %s file", metautil.LockFile)
}
if exist {
return errors.Annotatef(berrors.ErrInvalidArgument, "backup lock file exists in %v, "+
"there may be some backup files in the path already, "+
"please specify a correct backup directory!", bc.storage.URI()+"/"+metautil.LockFile)
return err
}
bc.backend = backend
return nil
Expand All @@ -198,6 +193,29 @@ func (bc *Client) GetClusterID() uint64 {
return bc.clusterID
}

// CheckBackupStorageIsLocked checks whether backups is locked.
// which means we found other backup progress already write
// some data files into the same backup directory or cloud prefix.
func CheckBackupStorageIsLocked(ctx context.Context, s storage.ExternalStorage) error {
exist, err := s.FileExists(ctx, metautil.LockFile)
if err != nil {
return errors.Annotatef(err, "error occurred when checking %s file", metautil.LockFile)
}
if exist {
err = s.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error {
// should return error to break the walkDir when found lock file and other .sst files.
if strings.HasSuffix(path, ".sst") {
return errors.Annotatef(berrors.ErrInvalidArgument, "backup lock file and sst file exist in %v, "+
"there are some backup files in the path already, "+
"please specify a correct backup directory!", s.URI()+"/"+metautil.LockFile)
}
return nil
})
return err
}
return nil
}

// BuildTableRanges returns the key ranges encompassing the entire table,
// and its partitions if exists.
func BuildTableRanges(tbl *model.TableInfo) ([]kv.KeyRange, error) {
Expand Down
34 changes: 34 additions & 0 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ func (r *testBackup) SetUpSuite(c *C) {

}

func (r *testBackup) resetStorage(c *C) {
var err error
base := c.MkDir()
r.storage, err = storage.NewLocalStorage(base)
c.Assert(err, IsNil)
}

func (r *testBackup) TestGetTS(c *C) {
var (
err error
Expand Down Expand Up @@ -335,3 +342,30 @@ func (r *testBackup) TestskipUnsupportedDDLJob(c *C) {
c.Assert(err, IsNil)
c.Assert(len(allDDLJobs), Equals, 8)
}

func (r *testBackup) TestCheckBackupIsLocked(c *C) {
ctx := context.Background()

r.resetStorage(c)
// check passed with an empty storage
err := backup.CheckBackupStorageIsLocked(ctx, r.storage)
c.Assert(err, IsNil)

// check passed with only a lock file
err = r.storage.WriteFile(ctx, metautil.LockFile, nil)
c.Assert(err, IsNil)
err = backup.CheckBackupStorageIsLocked(ctx, r.storage)
c.Assert(err, IsNil)

// check passed with a lock file and other non-sst files.
err = r.storage.WriteFile(ctx, "1.txt", nil)
c.Assert(err, IsNil)
err = backup.CheckBackupStorageIsLocked(ctx, r.storage)
c.Assert(err, IsNil)

// check failed
err = r.storage.WriteFile(ctx, "1.sst", nil)
c.Assert(err, IsNil)
err = backup.CheckBackupStorageIsLocked(ctx, r.storage)
c.Assert(err, ErrorMatches, "backup lock file and sst file exist in(.+)")
}

0 comments on commit 124f1fc

Please sign in to comment.