diff --git a/.github/ISSUE_TEMPLATE/bug-report.md b/.github/ISSUE_TEMPLATE/bug-report.md new file mode 100644 index 000000000..872699919 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug-report.md @@ -0,0 +1,29 @@ +--- +name: "🐛 Bug Report" +about: Something isn't working as expected +title: '' +labels: 'bug' +--- + +Please answer these questions before submitting your issue. Thanks! + +1. What did you do? +If possible, provide a recipe for reproducing the error. + + +2. What did you expect to see? + + + +3. What did you see instead? + + + +4. What version of BR and TiDB/TiKV/PD are you using? + + diff --git a/.github/ISSUE_TEMPLATE/feature-request.md b/.github/ISSUE_TEMPLATE/feature-request.md new file mode 100644 index 000000000..e895af84d --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature-request.md @@ -0,0 +1,19 @@ +--- +name: "🚀 Feature Request" +about: I have a suggestion +labels: enhancement +--- + +## Feature Request + +### Describe your feature request related problem: + + +### Describe the feature you'd like: + + +### Describe alternatives you've considered: + + +### Teachability, Documentation, Adoption, Migration Strategy: + diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 000000000..0f6ae8de0 --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,37 @@ + + +### What problem does this PR solve? + + +### What is changed and how it works? + + +### Check List + +Tests + + - Unit test + - Integration test + - Manual test (add detailed scripts or steps below) + - No code + +Code changes + + - Has exported function/method change + - Has exported variable/fields change + - Has interface methods change + - Has persistent data change + +Side effects + + - Possible performance regression + - Increased code complexity + - Breaking backward compatibility + +Related changes + + - Need to cherry-pick to the release branch + - Need to update the documentation + - Need to be included in the release note diff --git a/.golangci.yml b/.golangci.yml index 969cac759..1b025678e 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -9,7 +9,8 @@ issues: text: "Potential HTTP request made with variable url" linters: - gosec - - path: .go - text: "Use of weak random number generator" + # TODO Remove it. + - path: split_client.go + text: "SA1019:" linters: - - gosec + - staticcheck diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 000000000..6db4cfd21 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,18 @@ +# BR (Backup and Restore) Change Log +All notable changes to this project are documented in this file. +See also, +- [TiDB Changelog](https://github.com/pingcap/tidb/blob/master/CHANGELOG.md), +- [TiKV Changelog](https://github.com/tikv/tikv/blob/master/CHANGELOG.md), +- [PD Changelog](https://github.com/pingcap/pd/blob/master/CHANGELOG.md). + +## [3.1.0-beta.1] - 2020-01-10 + +- Fix the inaccurate backup progress information [#127](https://github.com/pingcap/br/pull/127) +- Improve the performance of splitting Regions [#122](https://github.com/pingcap/br/pull/122) +- Add the backup and restore feature for partitioned tables [#137](https://github.com/pingcap/br/pull/137) +- Add the feature of automatically scheduling PD schedulers [#123](https://github.com/pingcap/br/pull/123) +- Fix the issue that data is overwritten after non `PKIsHandle` tables are restored [#139](https://github.com/pingcap/br/pull/139) + +## [3.1.0-beta] - 2019-12-20 + +Initial release of the distributed backup and restore tool diff --git a/Makefile b/Makefile index a03cedc54..839a27b9e 100644 --- a/Makefile +++ b/Makefile @@ -26,6 +26,8 @@ build_for_integration_test: -o bin/br.test # build key locker GO111MODULE=on go build -race -o bin/locker tests/br_key_locked/*.go + # build gc + GO111MODULE=on go build -race -o bin/gc tests/br_z_gc_safepoint/*.go test: GO111MODULE=on go test -race -tags leak ./... diff --git a/cmd/backup.go b/cmd/backup.go index a9a7bdb00..d65582949 100644 --- a/cmd/backup.go +++ b/cmd/backup.go @@ -18,10 +18,12 @@ import ( ) const ( - flagBackupTimeago = "timeago" - flagBackupRateLimit = "ratelimit" - flagBackupConcurrency = "concurrency" - flagBackupChecksum = "checksum" + flagBackupTimeago = "timeago" + flagBackupRateLimit = "ratelimit" + flagBackupRateLimitUnit = "ratelimit-unit" + flagBackupConcurrency = "concurrency" + flagBackupChecksum = "checksum" + flagLastBackupTS = "lastbackupts" ) type backupContext struct { @@ -44,6 +46,13 @@ func defineBackupFlags(flagSet *pflag.FlagSet) { flagBackupConcurrency, "", 4, "The size of thread pool on each node that execute the backup task") flagSet.BoolP(flagBackupChecksum, "", true, "Run checksum after backup") + flagSet.Uint64P(flagLastBackupTS, "", 0, "the last time backup ts") + _ = flagSet.MarkHidden(flagLastBackupTS) + + // Test only flag. + flagSet.Uint64P( + flagBackupRateLimitUnit, "", utils.MB, "The unit of rate limit of the backup task") + _ = flagSet.MarkHidden(flagBackupRateLimitUnit) } func runBackup(flagSet *pflag.FlagSet, cmdName string, bc backupContext) error { @@ -65,6 +74,11 @@ func runBackup(flagSet *pflag.FlagSet, cmdName string, bc backupContext) error { if err != nil { return err } + ratelimitUnit, err := flagSet.GetUint64(flagBackupRateLimitUnit) + if err != nil { + return err + } + ratelimit *= ratelimitUnit concurrency, err := flagSet.GetUint32(flagBackupConcurrency) if err != nil { @@ -80,6 +94,11 @@ func runBackup(flagSet *pflag.FlagSet, cmdName string, bc backupContext) error { return err } + lastBackupTS, err := flagSet.GetUint64(flagLastBackupTS) + if err != nil { + return nil + } + u, err := storage.ParseBackendFromFlags(flagSet, FlagStorage) if err != nil { return err @@ -133,7 +152,7 @@ func runBackup(flagSet *pflag.FlagSet, cmdName string, bc backupContext) error { updateCh := utils.StartProgress( ctx, cmdName, int64(approximateRegions), !HasLogFile()) err = client.BackupRanges( - ctx, ranges, backupTS, ratelimit, concurrency, updateCh, bc.isRawKv, bc.cf) + ctx, ranges, lastBackupTS, backupTS, ratelimit, concurrency, updateCh, bc.isRawKv, bc.cf) if err != nil { return err } diff --git a/cmd/cmd.go b/cmd/cmd.go index e97adec43..468c35232 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/br/pkg/conn" "github.com/pingcap/br/pkg/storage" + "github.com/pingcap/br/pkg/utils" ) var ( @@ -52,12 +53,18 @@ const ( FlagSlowLogFile = "slow-log-file" flagDatabase = "db" + flagTable = "table" - flagTable = "table" + flagVersion = "version" + flagVersionShort = "V" ) // AddFlags adds flags to the given cmd. func AddFlags(cmd *cobra.Command) { + cmd.Version = utils.BRInfo() + cmd.Flags().BoolP(flagVersion, flagVersionShort, false, "Display version information about BR") + cmd.SetVersionTemplate("{{printf \"%s\" .Version}}\n") + cmd.PersistentFlags().StringP(FlagPD, "u", "127.0.0.1:2379", "PD address") cmd.PersistentFlags().String(FlagCA, "", "CA certificate path for TLS connection") cmd.PersistentFlags().String(FlagCert, "", "Certificate path for TLS connection") diff --git a/cmd/restore.go b/cmd/restore.go index 71708629d..4f66e47de 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -63,15 +63,13 @@ func NewRestoreCommand() *cobra.Command { "Run checksum after restore") command.PersistentFlags().BoolP("online", "", false, "Whether online when restore") + // TODO remove hidden flag if it's stable + _ = command.PersistentFlags().MarkHidden("online") return command } func runRestore(flagSet *flag.FlagSet, cmdName, dbName, tableName string) error { - pdAddr, err := flagSet.GetString(FlagPD) - if err != nil { - return errors.Trace(err) - } ctx, cancel := context.WithCancel(GetDefaultContext()) defer cancel() @@ -113,6 +111,9 @@ func runRestore(flagSet *flag.FlagSet, cmdName, dbName, tableName string) error case len(dbName) != 0 && len(tableName) == 0: // database restore db := client.GetDatabase(dbName) + if db == nil { + return errors.Errorf("database %s not found in backup", dbName) + } err = client.CreateDatabase(db.Schema) if err != nil { return errors.Trace(err) @@ -124,19 +125,28 @@ func runRestore(flagSet *flag.FlagSet, cmdName, dbName, tableName string) error case len(dbName) != 0 && len(tableName) != 0: // table restore db := client.GetDatabase(dbName) + if db == nil { + return errors.Errorf("database %s not found in backup", dbName) + } err = client.CreateDatabase(db.Schema) if err != nil { return errors.Trace(err) } table := db.GetTable(tableName) files = table.Files - tables = utils.Tables{table} + tables = append(tables, table) default: return errors.New("must set db when table was set") } - + var newTS uint64 + if client.IsIncremental() { + newTS, err = client.GetTS(ctx) + if err != nil { + return err + } + } summary.CollectInt("restore files", len(files)) - rewriteRules, newTables, err := client.CreateTables(mgr.GetDomain(), tables) + rewriteRules, newTables, err := client.CreateTables(mgr.GetDomain(), tables, newTS) if err != nil { return errors.Trace(err) } @@ -159,11 +169,19 @@ func runRestore(flagSet *flag.FlagSet, cmdName, dbName, tableName string) error log.Error("split regions failed", zap.Error(err)) return errors.Trace(err) } - pdAddrs := strings.Split(pdAddr, ",") - err = client.ResetTS(pdAddrs) - if err != nil { - log.Error("reset pd TS failed", zap.Error(err)) - return errors.Trace(err) + + if !client.IsIncremental() { + var pdAddr string + pdAddr, err = flagSet.GetString(FlagPD) + if err != nil { + return errors.Trace(err) + } + pdAddrs := strings.Split(pdAddr, ",") + err = client.ResetTS(pdAddrs) + if err != nil { + log.Error("reset pd TS failed", zap.Error(err)) + return errors.Trace(err) + } } removedSchedulers, err := RestorePrepareWork(ctx, client, mgr) diff --git a/cmd/validate.go b/cmd/validate.go index 390f4963f..8ba72b372 100644 --- a/cmd/validate.go +++ b/cmd/validate.go @@ -6,7 +6,7 @@ import ( "crypto/sha256" "encoding/hex" "encoding/json" - "sort" + "fmt" "github.com/gogo/protobuf/proto" "github.com/pingcap/errors" @@ -15,8 +15,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/parser/model" "github.com/pingcap/pd/pkg/mock/mockid" - restore_util "github.com/pingcap/tidb-tools/pkg/restore-util" - "github.com/pingcap/tidb/tablecodec" "github.com/spf13/cobra" "go.uber.org/zap" @@ -29,7 +27,7 @@ import ( func NewValidateCommand() *cobra.Command { meta := &cobra.Command{ Use: "validate ", - Short: "commands to check backup data", + Short: "commands to check/debug backup data", PersistentPreRunE: func(c *cobra.Command, args []string) error { if err := Init(c); err != nil { return err @@ -41,6 +39,10 @@ func NewValidateCommand() *cobra.Command { } meta.AddCommand(newCheckSumCommand()) meta.AddCommand(newBackupMetaCommand()) + meta.AddCommand(decodeBackupMetaCommand()) + meta.AddCommand(encodeBackupMetaCommand()) + meta.Hidden = true + return meta } @@ -118,7 +120,8 @@ func newCheckSumCommand() *cobra.Command { return errors.Errorf(` backup data checksum failed: %s may be changed calculated sha256 is %s, -origin sha256 is %s`, file.Name, hex.EncodeToString(s[:]), hex.EncodeToString(file.Sha256)) +origin sha256 is %s`, + file.Name, hex.EncodeToString(s[:]), hex.EncodeToString(file.Sha256)) } } log.Info("table info", zap.Stringer("table", tblInfo.Name), @@ -183,27 +186,26 @@ func newBackupMetaCommand() *cobra.Command { tables = append(tables, db.Tables...) } // Check if the ranges of files overlapped - rangeTree := restore_util.NewRangeTree() + rangeTree := restore.NewRangeTree() for _, file := range files { - if out := rangeTree.InsertRange(restore_util.Range{ + if out := rangeTree.InsertRange(restore.Range{ StartKey: file.GetStartKey(), EndKey: file.GetEndKey(), }); out != nil { log.Error( "file ranges overlapped", - zap.Stringer("out", out.(*restore_util.Range)), + zap.Stringer("out", out.(*restore.Range)), zap.Stringer("file", file), ) } } - sort.Sort(utils.Tables(tables)) tableIDAllocator := mockid.NewIDAllocator() // Advance table ID allocator to the offset. for offset := uint64(0); offset < tableIDOffset; offset++ { _, _ = tableIDAllocator.Alloc() // Ignore error } - rewriteRules := &restore_util.RewriteRules{ + rewriteRules := &restore.RewriteRules{ Table: make([]*import_sstpb.RewriteRule, 0), Data: make([]*import_sstpb.RewriteRule, 0), } @@ -223,20 +225,11 @@ func newBackupMetaCommand() *cobra.Command { Name: indexInfo.Name, } } - // TODO: support table partition - rules := restore.GetRewriteRules(newTable, table.Schema) + rules := restore.GetRewriteRules(newTable, table.Schema, 0) rewriteRules.Table = append(rewriteRules.Table, rules.Table...) rewriteRules.Data = append(rewriteRules.Data, rules.Data...) tableIDMap[table.Schema.ID] = int64(tableID) } - for oldID, newID := range tableIDMap { - if _, ok := tableIDMap[oldID+1]; !ok { - rewriteRules.Table = append(rewriteRules.Table, &import_sstpb.RewriteRule{ - OldKeyPrefix: tablecodec.EncodeTablePrefix(oldID + 1), - NewKeyPrefix: tablecodec.EncodeTablePrefix(newID + 1), - }) - } - } // Validate rewrite rules for _, file := range files { err = restore.ValidateFileRewriteRule(file, rewriteRules) @@ -253,3 +246,103 @@ func newBackupMetaCommand() *cobra.Command { command.Hidden = true return command } + +func decodeBackupMetaCommand() *cobra.Command { + decodeBackupMetaCmd := &cobra.Command{ + Use: "decode", + Short: "decode backupmeta to json", + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithCancel(GetDefaultContext()) + defer cancel() + u, err := storage.ParseBackendFromFlags(cmd.Flags(), FlagStorage) + if err != nil { + return errors.Trace(err) + } + s, err := storage.Create(ctx, u) + if err != nil { + return errors.Trace(err) + } + metaData, err := s.Read(ctx, utils.MetaFile) + if err != nil { + return errors.Trace(err) + } + + backupMeta := &backup.BackupMeta{} + err = proto.Unmarshal(metaData, backupMeta) + if err != nil { + return errors.Trace(err) + } + backupMetaJSON, err := json.Marshal(backupMeta) + if err != nil { + return errors.Trace(err) + } + + err = s.Write(ctx, utils.MetaJSONFile, backupMetaJSON) + if err != nil { + return errors.Trace(err) + } + + field, err := cmd.Flags().GetString("field") + if err != nil { + log.Error("get field flag failed", zap.Error(err)) + return err + } + switch field { + case "start-version": + fmt.Println(backupMeta.StartVersion) + case "end-version": + fmt.Println(backupMeta.EndVersion) + } + return nil + }, + } + + decodeBackupMetaCmd.Flags().String("field", "", "decode specified field") + + return decodeBackupMetaCmd +} + +func encodeBackupMetaCommand() *cobra.Command { + encodeBackupMetaCmd := &cobra.Command{ + Use: "encode", + Short: "encode backupmeta json file to backupmeta", + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithCancel(GetDefaultContext()) + defer cancel() + u, err := storage.ParseBackendFromFlags(cmd.Flags(), FlagStorage) + if err != nil { + return errors.Trace(err) + } + s, err := storage.Create(ctx, u) + if err != nil { + return errors.Trace(err) + } + metaData, err := s.Read(ctx, utils.MetaJSONFile) + if err != nil { + return errors.Trace(err) + } + + backupMetaJSON := &backup.BackupMeta{} + err = json.Unmarshal(metaData, backupMetaJSON) + if err != nil { + return errors.Trace(err) + } + backupMeta, err := proto.Marshal(backupMetaJSON) + if err != nil { + return errors.Trace(err) + } + + fileName := utils.MetaFile + if ok, _ := s.FileExists(ctx, fileName); ok { + // Do not overwrite origin meta file + fileName += "_from_json" + } + err = s.Write(ctx, fileName, backupMeta) + if err != nil { + return errors.Trace(err) + } + return nil + }, + } + return encodeBackupMetaCmd +} diff --git a/cmd/version.go b/cmd/version.go deleted file mode 100644 index af4f7d386..000000000 --- a/cmd/version.go +++ /dev/null @@ -1,20 +0,0 @@ -package cmd - -import ( - "github.com/spf13/cobra" - - "github.com/pingcap/br/pkg/utils" -) - -// NewVersionCommand returns a restore subcommand -func NewVersionCommand() *cobra.Command { - bp := &cobra.Command{ - Use: "version", - Short: "output version information", - Args: cobra.NoArgs, - Run: func(cmd *cobra.Command, args []string) { - utils.PrintBRInfo() - }, - } - return bp -} diff --git a/go.mod b/go.mod index 916c180d1..aed6974c3 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,6 @@ require ( github.com/pingcap/parser v0.0.0-20191210060830-bdf23a7ade01 github.com/pingcap/pd v1.1.0-beta.0.20191212045800-234784c7a9c5 github.com/pingcap/tidb v1.1.0-beta.0.20191213040028-9009da737834 - github.com/pingcap/tidb-tools v3.1.0-beta.0.20191218124733-15882555e4c9+incompatible github.com/pingcap/tipb v0.0.0-20191209145133-44f75c9bef33 github.com/prometheus/client_golang v1.0.0 github.com/sirupsen/logrus v1.4.2 diff --git a/go.sum b/go.sum index 7fecef7b2..461c4ef12 100644 --- a/go.sum +++ b/go.sum @@ -266,6 +266,7 @@ github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rG github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191030021250-51b332bcb20b/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20191121022655-4c654046831d/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= +github.com/pingcap/kvproto v0.0.0-20191202044712-32be31591b03 h1:IyJl+qesVPf3UfFFmKtX69y1K5KC8uXlot3U0QgH7V4= github.com/pingcap/kvproto v0.0.0-20191202044712-32be31591b03/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20191217072959-393e6c0fd4b7 h1:thLL2vFObG8vxBCkAmfAbLVBPfXUkBSXaVxppStCrL0= github.com/pingcap/kvproto v0.0.0-20191217072959-393e6c0fd4b7/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= @@ -282,9 +283,8 @@ github.com/pingcap/sysutil v0.0.0-20191126040022-986c5b3ed9a3 h1:HCNif3lukL83gNC github.com/pingcap/sysutil v0.0.0-20191126040022-986c5b3ed9a3/go.mod h1:Futrrmuw98pEsbEmoPsjw8aKLCmixwHEmT2rF+AsXGw= github.com/pingcap/tidb v1.1.0-beta.0.20191213040028-9009da737834 h1:eNf7bDY39moIzzcs5+PhLLW0BM2D2yrzFbjW/X42y0s= github.com/pingcap/tidb v1.1.0-beta.0.20191213040028-9009da737834/go.mod h1:VWx47QOXISBHHtZeWrDQlBOdbvth9TE9gei6QpoqJ4g= +github.com/pingcap/tidb-tools v3.0.6-0.20191106033616-90632dda3863+incompatible h1:H1jg0aDWz2SLRh3hNBo2HFtnuHtudIUvBumU7syRkic= github.com/pingcap/tidb-tools v3.0.6-0.20191106033616-90632dda3863+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= -github.com/pingcap/tidb-tools v3.1.0-beta.0.20191218124733-15882555e4c9+incompatible h1:YpY7LEombB+E3bhfrS1xjN3p/J5TmeEBTCfTNCUQ/Qo= -github.com/pingcap/tidb-tools v3.1.0-beta.0.20191218124733-15882555e4c9+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tipb v0.0.0-20191209145133-44f75c9bef33 h1:cTSaVv1hue17BCPqt+sURADTFSMpSD26ZuvKRyYIjJs= github.com/pingcap/tipb v0.0.0-20191209145133-44f75c9bef33/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/main.go b/main.go index 8ae652f86..103699614 100644 --- a/main.go +++ b/main.go @@ -47,7 +47,6 @@ func main() { cmd.AddFlags(rootCmd) cmd.SetDefaultContext(ctx) rootCmd.AddCommand( - cmd.NewVersionCommand(), cmd.NewValidateCommand(), cmd.NewBackupCommand(), cmd.NewRestoreCommand(), diff --git a/pkg/backup/client.go b/pkg/backup/client.go index feccecc2a..6c4c8ee8c 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/ranger" "go.uber.org/zap" @@ -71,31 +72,31 @@ func (bc *Client) GetTS(ctx context.Context, timeAgo string) (uint64, error) { if err != nil { return 0, errors.Trace(err) } + backupTS := oracle.ComposeTS(p, l) if timeAgo != "" { duration, err := time.ParseDuration(timeAgo) if err != nil { return 0, errors.Trace(err) } - t := duration.Nanoseconds() / int64(time.Millisecond) - log.Info("backup time ago", zap.Int64("MillisecondsAgo", t)) - - // check backup time do not exceed GCSafePoint - safePoint, err := GetGCSafePoint(ctx, bc.mgr.GetPDClient()) - if err != nil { - return 0, errors.Trace(err) + if duration <= 0 { + return 0, errors.New("negative timeago is not allowed") } - if p-t < safePoint.Physical { - return 0, errors.New("given backup time exceed GCSafePoint") + log.Info("backup time ago", zap.Duration("timeago", duration)) + + backupTime := oracle.GetTimeFromTS(backupTS) + backupAgo := backupTime.Add(-duration) + if backupTS < oracle.ComposeTS(oracle.GetPhysical(backupAgo), l) { + return 0, errors.New("backup ts overflow please choose a smaller timeago") } - p -= t + backupTS = oracle.ComposeTS(oracle.GetPhysical(backupAgo), l) } - ts := utils.Timestamp{ - Physical: p, - Logical: l, + // check backup time do not exceed GCSafePoint + err = CheckGCSafepoint(ctx, bc.mgr.GetPDClient(), backupTS) + if err != nil { + return 0, errors.Trace(err) } - backupTS := utils.EncodeTs(ts) log.Info("backup encode timestamp", zap.Uint64("BackupTS", backupTS)) return backupTS, nil } @@ -279,8 +280,9 @@ LoadDb: func (bc *Client) BackupRanges( ctx context.Context, ranges []Range, + lastBackupTS uint64, backupTS uint64, - rate uint64, + rateLimit uint64, concurrency uint32, updateCh chan<- struct{}, isRawKv bool, @@ -298,7 +300,7 @@ func (bc *Client) BackupRanges( go func() { for _, r := range ranges { err := bc.backupRange( - ctx, r.StartKey, r.EndKey, backupTS, rate, concurrency, updateCh, isRawKv, cf) + ctx, r.StartKey, r.EndKey, lastBackupTS, backupTS, rateLimit, concurrency, updateCh, isRawKv, cf) if err != nil { errCh <- err return @@ -341,8 +343,9 @@ func (bc *Client) BackupRanges( func (bc *Client) backupRange( ctx context.Context, startKey, endKey []byte, + lastBackupTS uint64, backupTS uint64, - rateMBs uint64, + rateLimit uint64, concurrency uint32, updateCh chan<- struct{}, isRawKv bool, @@ -359,12 +362,10 @@ func (bc *Client) backupRange( summary.CollectSuccessUnit(key, elapsed) } }() - // The unit of rate limit in protocol is bytes per second. - rateLimit := rateMBs * 1024 * 1024 log.Info("backup started", zap.Binary("StartKey", startKey), zap.Binary("EndKey", endKey), - zap.Uint64("RateLimit", rateMBs), + zap.Uint64("RateLimit", rateLimit), zap.Uint32("Concurrency", concurrency)) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -374,11 +375,12 @@ func (bc *Client) backupRange( if err != nil { return errors.Trace(err) } + req := backup.BackupRequest{ ClusterId: bc.clusterID, StartKey: startKey, EndKey: endKey, - StartVersion: 0, // Zero start version means full backup. + StartVersion: lastBackupTS, EndVersion: backupTS, StorageBackend: bc.backend, RateLimit: rateLimit, @@ -398,13 +400,13 @@ func (bc *Client) backupRange( // Find and backup remaining ranges. // TODO: test fine grained backup. err = bc.fineGrainedBackup( - ctx, startKey, endKey, + ctx, startKey, endKey, lastBackupTS, backupTS, rateLimit, concurrency, results, updateCh) if err != nil { return err } - bc.backupMeta.StartVersion = backupTS + bc.backupMeta.StartVersion = lastBackupTS bc.backupMeta.EndVersion = backupTS bc.backupMeta.IsRawKv = isRawKv if bc.backupMeta.IsRawKv { @@ -416,10 +418,11 @@ func (bc *Client) backupRange( zap.String("cf", cf)) } else { log.Info("backup time range", - zap.Reflect("StartVersion", backupTS), + zap.Reflect("StartVersion", lastBackupTS), zap.Reflect("EndVersion", backupTS)) } + results.tree.Ascend(func(i btree.Item) bool { r := i.(*Range) bc.backupMeta.Files = append(bc.backupMeta.Files, r.Files...) @@ -461,6 +464,7 @@ func (bc *Client) findRegionLeader( func (bc *Client) fineGrainedBackup( ctx context.Context, startKey, endKey []byte, + lastBackupTS uint64, backupTS uint64, rateLimit uint64, concurrency uint32, @@ -492,7 +496,7 @@ func (bc *Client) fineGrainedBackup( defer wg.Done() for rg := range retry { backoffMs, err := - bc.handleFineGrained(ctx, boFork, rg, backupTS, rateLimit, concurrency, respCh) + bc.handleFineGrained(ctx, boFork, rg, lastBackupTS, backupTS, rateLimit, concurrency, respCh) if err != nil { errCh <- err return @@ -624,6 +628,7 @@ func (bc *Client) handleFineGrained( ctx context.Context, bo *tikv.Backoffer, rg Range, + lastBackupTS uint64, backupTS uint64, rateLimit uint64, concurrency uint32, @@ -635,11 +640,12 @@ func (bc *Client) handleFineGrained( } storeID := leader.GetStoreId() max := 0 + req := backup.BackupRequest{ ClusterId: bc.clusterID, StartKey: rg.StartKey, // TODO: the range may cross region. EndKey: rg.EndKey, - StartVersion: 0, // Zero start version means full backup. + StartVersion: lastBackupTS, EndVersion: backupTS, StorageBackend: bc.backend, RateLimit: rateLimit, @@ -718,7 +724,7 @@ func (bc *Client) FastChecksum() (bool, error) { start := time.Now() defer func() { elapsed := time.Since(start) - summary.CollectDuration("backup checksum", elapsed) + summary.CollectDuration("backup fast checksum", elapsed) }() dbs, err := utils.LoadBackupTables(&bc.backupMeta) diff --git a/pkg/backup/client_test.go b/pkg/backup/client_test.go index 6971026d5..44ca1ad5a 100644 --- a/pkg/backup/client_test.go +++ b/pkg/backup/client_test.go @@ -10,11 +10,11 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/mockstore/mocktikv" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/br/pkg/conn" - "github.com/pingcap/br/pkg/utils" ) type testBackup struct { @@ -61,7 +61,7 @@ func (r *testBackup) TestGetTS(c *C) { currentTs := time.Now().UnixNano() / int64(time.Millisecond) ts, err := r.backupClient.GetTS(r.ctx, timeAgo) c.Assert(err, IsNil) - pdTs := utils.DecodeTs(ts).Physical + pdTs := oracle.ExtractPhysical(ts) duration := int(currentTs - pdTs) c.Assert(duration, Greater, expectedDuration-deviation) c.Assert(duration, Less, expectedDuration+deviation) @@ -72,27 +72,30 @@ func (r *testBackup) TestGetTS(c *C) { currentTs = time.Now().UnixNano() / int64(time.Millisecond) ts, err = r.backupClient.GetTS(r.ctx, timeAgo) c.Assert(err, IsNil) - pdTs = utils.DecodeTs(ts).Physical + pdTs = oracle.ExtractPhysical(ts) duration = int(currentTs - pdTs) c.Assert(duration, Greater, expectedDuration-deviation) c.Assert(duration, Less, expectedDuration+deviation) // timeago = "-1m" timeAgo = "-1m" - expectedDuration = -60000 - currentTs = time.Now().UnixNano() / int64(time.Millisecond) - ts, err = r.backupClient.GetTS(r.ctx, timeAgo) - c.Assert(err, IsNil) - pdTs = utils.DecodeTs(ts).Physical - duration = int(currentTs - pdTs) - c.Assert(duration, Greater, expectedDuration-deviation) - c.Assert(duration, Less, expectedDuration+deviation) + _, err = r.backupClient.GetTS(r.ctx, timeAgo) + c.Assert(err, ErrorMatches, "negative timeago is not allowed") - // timeago = "1000000h" exceed GCSafePoint - // because GCSafePoint in mockPDClient is 0 + // timeago = "1000000h" overflows timeAgo = "1000000h" _, err = r.backupClient.GetTS(r.ctx, timeAgo) - c.Assert(err, ErrorMatches, "given backup time exceed GCSafePoint") + c.Assert(err, ErrorMatches, "backup ts overflow.*") + + // timeago = "10h" exceed GCSafePoint + p, l, err := r.backupClient.mgr.GetPDClient().GetTS(r.ctx) + c.Assert(err, IsNil) + now := oracle.ComposeTS(p, l) + _, err = r.backupClient.mgr.GetPDClient().UpdateGCSafePoint(r.ctx, now) + c.Assert(err, IsNil) + timeAgo = "10h" + _, err = r.backupClient.GetTS(r.ctx, timeAgo) + c.Assert(err, ErrorMatches, "GC safepoint [0-9]+ exceed TS [0-9]+") } func (r *testBackup) TestBuildTableRange(c *C) { diff --git a/pkg/backup/safe_point.go b/pkg/backup/safe_point.go index bc24a01ba..bb73bc7d9 100644 --- a/pkg/backup/safe_point.go +++ b/pkg/backup/safe_point.go @@ -7,32 +7,29 @@ import ( "github.com/pingcap/log" pd "github.com/pingcap/pd/client" "go.uber.org/zap" - - "github.com/pingcap/br/pkg/utils" ) -// GetGCSafePoint returns the current gc safe point. +// getGCSafePoint returns the current gc safe point. // TODO: Some cluster may not enable distributed GC. -func GetGCSafePoint(ctx context.Context, pdClient pd.Client) (utils.Timestamp, error) { +func getGCSafePoint(ctx context.Context, pdClient pd.Client) (uint64, error) { safePoint, err := pdClient.UpdateGCSafePoint(ctx, 0) if err != nil { - return utils.Timestamp{}, err + return 0, err } - return utils.DecodeTs(safePoint), nil + return safePoint, nil } // CheckGCSafepoint checks whether the ts is older than GC safepoint. // Note: It ignores errors other than exceed GC safepoint. func CheckGCSafepoint(ctx context.Context, pdClient pd.Client, ts uint64) error { // TODO: use PDClient.GetGCSafePoint instead once PD client exports it. - safePoint, err := GetGCSafePoint(ctx, pdClient) + safePoint, err := getGCSafePoint(ctx, pdClient) if err != nil { log.Warn("fail to get GC safe point", zap.Error(err)) return nil } - safePointTS := utils.EncodeTs(safePoint) - if ts <= safePointTS { - return errors.Errorf("GC safepoint %d exceed TS %d", safePointTS, ts) + if ts <= safePoint { + return errors.Errorf("GC safepoint %d exceed TS %d", safePoint, ts) } return nil } diff --git a/pkg/backup/schema.go b/pkg/backup/schema.go index d21aac633..66e4beec7 100644 --- a/pkg/backup/schema.go +++ b/pkg/backup/schema.go @@ -16,6 +16,7 @@ import ( "go.uber.org/zap" "github.com/pingcap/br/pkg/checksum" + "github.com/pingcap/br/pkg/summary" "github.com/pingcap/br/pkg/utils" ) @@ -112,8 +113,9 @@ func (pending *Schemas) Start( } pending.wg.Wait() close(pending.backupSchemaCh) - log.Info("Backup Checksum", + log.Info("backup checksum", zap.Duration("take", time.Since(startAll))) + summary.CollectDuration("backup checksum", time.Since(startAll)) }() } diff --git a/pkg/checksum/executor.go b/pkg/checksum/executor.go index 08c68680a..2ca5cf66d 100644 --- a/pkg/checksum/executor.go +++ b/pkg/checksum/executor.go @@ -59,14 +59,26 @@ func buildChecksumRequest( } reqs := make([]*kv.Request, 0, (len(newTable.Indices)+1)*(len(partDefs)+1)) - rs, err := buildRequest(newTable, newTable.ID, oldTable, startTS) + var oldTableID int64 + if oldTable != nil { + oldTableID = oldTable.Schema.ID + } + rs, err := buildRequest(newTable, newTable.ID, oldTable, oldTableID, startTS) if err != nil { return nil, err } reqs = append(reqs, rs...) for _, partDef := range partDefs { - rs, err := buildRequest(newTable, partDef.ID, oldTable, startTS) + var oldPartID int64 + if oldTable != nil { + for _, oldPartDef := range oldTable.Schema.Partition.Definitions { + if oldPartDef.Name == partDef.Name { + oldPartID = oldPartDef.ID + } + } + } + rs, err := buildRequest(newTable, partDef.ID, oldTable, oldPartID, startTS) if err != nil { return nil, errors.Trace(err) } @@ -80,10 +92,11 @@ func buildRequest( tableInfo *model.TableInfo, tableID int64, oldTable *utils.Table, + oldTableID int64, startTS uint64, ) ([]*kv.Request, error) { reqs := make([]*kv.Request, 0) - req, err := buildTableRequest(tableID, oldTable, startTS) + req, err := buildTableRequest(tableID, oldTable, oldTableID, startTS) if err != nil { return nil, err } @@ -93,13 +106,11 @@ func buildRequest( if indexInfo.State != model.StatePublic { continue } - var oldTableID int64 var oldIndexInfo *model.IndexInfo if oldTable != nil { for _, oldIndex := range oldTable.Schema.Indices { if oldIndex.Name == indexInfo.Name { oldIndexInfo = oldIndex - oldTableID = oldTable.Schema.ID break } } @@ -124,12 +135,13 @@ func buildRequest( func buildTableRequest( tableID int64, oldTable *utils.Table, + oldTableID int64, startTS uint64, ) (*kv.Request, error) { var rule *tipb.ChecksumRewriteRule if oldTable != nil { rule = &tipb.ChecksumRewriteRule{ - OldPrefix: tablecodec.GenTableRecordPrefix(oldTable.Schema.ID), + OldPrefix: tablecodec.GenTableRecordPrefix(oldTableID), NewPrefix: tablecodec.GenTableRecordPrefix(tableID), } } diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 273ca7568..3030ba857 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "math" - "sort" "sync" "time" @@ -14,10 +13,9 @@ import ( "github.com/pingcap/log" "github.com/pingcap/parser/model" pd "github.com/pingcap/pd/client" - restore_util "github.com/pingcap/tidb-tools/pkg/restore-util" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/store/tikv/oracle" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/backoff" @@ -109,7 +107,7 @@ func (rc *Client) InitBackupMeta(backupMeta *backup.BackupMeta, backend *backup. rc.databases = databases rc.backupMeta = backupMeta - metaClient := restore_util.NewClient(rc.pdClient) + metaClient := NewSplitClient(rc.pdClient) importClient := NewImportClient(metaClient) rc.fileImporter = NewFileImporter(rc.ctx, metaClient, importClient, backend, rc.rateLimit) return nil @@ -131,11 +129,7 @@ func (rc *Client) GetTS(ctx context.Context) (uint64, error) { if err != nil { return 0, errors.Trace(err) } - ts := utils.Timestamp{ - Physical: p, - Logical: l, - } - restoreTS := utils.EncodeTs(ts) + restoreTS := oracle.ComposeTS(p, l) return restoreTS, nil } @@ -193,17 +187,13 @@ func (rc *Client) CreateDatabase(db *model.DBInfo) error { func (rc *Client) CreateTables( dom *domain.Domain, tables []*utils.Table, -) (*restore_util.RewriteRules, []*model.TableInfo, error) { - rewriteRules := &restore_util.RewriteRules{ + newTS uint64, +) (*RewriteRules, []*model.TableInfo, error) { + rewriteRules := &RewriteRules{ Table: make([]*import_sstpb.RewriteRule, 0), Data: make([]*import_sstpb.RewriteRule, 0), } newTables := make([]*model.TableInfo, 0, len(tables)) - // Sort the tables by id for ensuring the new tables has same id ordering as the old tables. - // We require this constrain since newTableID of tableID+1 must be not bigger - // than newTableID of tableID. - sort.Sort(utils.Tables(tables)) - tableIDMap := make(map[int64]int64) for _, table := range tables { err := rc.db.CreateTable(rc.ctx, table) if err != nil { @@ -213,21 +203,11 @@ func (rc *Client) CreateTables( if err != nil { return nil, nil, err } - rules := GetRewriteRules(newTableInfo, table.Schema) - tableIDMap[table.Schema.ID] = newTableInfo.ID + rules := GetRewriteRules(newTableInfo, table.Schema, newTS) rewriteRules.Table = append(rewriteRules.Table, rules.Table...) rewriteRules.Data = append(rewriteRules.Data, rules.Data...) newTables = append(newTables, newTableInfo) } - // If tableID + 1 has already exist, then we don't need to add a new rewrite rule for it. - for oldID, newID := range tableIDMap { - if _, ok := tableIDMap[oldID+1]; !ok { - rewriteRules.Table = append(rewriteRules.Table, &import_sstpb.RewriteRule{ - OldKeyPrefix: tablecodec.EncodeTablePrefix(oldID + 1), - NewKeyPrefix: tablecodec.EncodeTablePrefix(newID + 1), - }) - } - } return rewriteRules, newTables, nil } @@ -251,7 +231,7 @@ func (rc *Client) setSpeedLimit() error { // RestoreTable tries to restore the data of a table. func (rc *Client) RestoreTable( table *utils.Table, - rewriteRules *restore_util.RewriteRules, + rewriteRules *RewriteRules, updateCh chan<- struct{}, ) (err error) { start := time.Now() @@ -266,6 +246,7 @@ func (rc *Client) RestoreTable( summary.CollectSuccessUnit(key, elapsed) } }() + log.Debug("start to restore table", zap.Stringer("table", table.Schema.Name), zap.Stringer("db", table.Db.Name), @@ -274,12 +255,11 @@ func (rc *Client) RestoreTable( errCh := make(chan error, len(table.Files)) wg := new(sync.WaitGroup) defer close(errCh) - // We should encode the rewrite rewriteRules before using it to import files - encodedRules := encodeRewriteRules(rewriteRules) err = rc.setSpeedLimit() if err != nil { return err } + for _, file := range table.Files { wg.Add(1) fileReplica := file @@ -289,7 +269,7 @@ func (rc *Client) RestoreTable( select { case <-rc.ctx.Done(): errCh <- nil - case errCh <- rc.fileImporter.Import(fileReplica, encodedRules): + case errCh <- rc.fileImporter.Import(fileReplica, rewriteRules): updateCh <- struct{}{} } }) @@ -319,7 +299,7 @@ func (rc *Client) RestoreTable( // RestoreDatabase tries to restore the data of a database func (rc *Client) RestoreDatabase( db *utils.Database, - rewriteRules *restore_util.RewriteRules, + rewriteRules *RewriteRules, updateCh chan<- struct{}, ) (err error) { start := time.Now() @@ -355,7 +335,7 @@ func (rc *Client) RestoreDatabase( // RestoreAll tries to restore all the data of backup files. func (rc *Client) RestoreAll( - rewriteRules *restore_util.RewriteRules, + rewriteRules *RewriteRules, updateCh chan<- struct{}, ) (err error) { start := time.Now() @@ -520,3 +500,9 @@ func (rc *Client) ValidateChecksum( log.Info("validate checksum passed!!") return nil } + +// IsIncremental returns whether this backup is incremental +func (rc *Client) IsIncremental() bool { + return !(rc.backupMeta.StartVersion == rc.backupMeta.EndVersion || + rc.backupMeta.StartVersion == 0) +} diff --git a/pkg/restore/client_test.go b/pkg/restore/client_test.go index d4ea4af4f..5007f1281 100644 --- a/pkg/restore/client_test.go +++ b/pkg/restore/client_test.go @@ -66,7 +66,7 @@ func (s *testRestoreClientSuite) TestCreateTables(c *C) { }, } } - rules, newTables, err := client.CreateTables(s.mock.Domain, tables) + rules, newTables, err := client.CreateTables(s.mock.Domain, tables, 0) c.Assert(err, IsNil) for _, nt := range newTables { c.Assert(nt.Name.String(), Matches, "test[0-3]") @@ -85,7 +85,6 @@ func (s *testRestoreClientSuite) TestCreateTables(c *C) { for i := 0; i < len(tables); i++ { c.Assert(oldTableIDExist[int64(i)], IsTrue, Commentf("table rule does not exist")) - c.Assert(oldTableIDExist[int64(i+1)], IsTrue, Commentf("table rule does not exist")) } } diff --git a/pkg/restore/db.go b/pkg/restore/db.go index 5811605ef..b114b7629 100644 --- a/pkg/restore/db.go +++ b/pkg/restore/db.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "strings" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -77,6 +78,11 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { return errors.Trace(err) } createSQL := buf.String() + // Insert `IF NOT EXISTS` statement to skip the created tables + words := strings.SplitN(createSQL, " ", 3) + if len(words) > 2 && strings.ToUpper(words[0]) == "CREATE" && strings.ToUpper(words[1]) == "TABLE" { + createSQL = "CREATE TABLE IF NOT EXISTS " + words[2] + } _, err = db.se.Execute(ctx, createSQL) if err != nil { log.Error("create table failed", @@ -86,6 +92,19 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { zap.Error(err)) return errors.Trace(err) } + alterAutoIncIDSQL := fmt.Sprintf( + "alter table %s auto_increment = %d", + escapeTableName(schema.Name), + schema.AutoIncID) + _, err = db.se.Execute(ctx, alterAutoIncIDSQL) + if err != nil { + log.Error("alter AutoIncID failed", + zap.String("SQL", alterAutoIncIDSQL), + zap.Stringer("db", table.Db.Name), + zap.Stringer("table", table.Schema.Name), + zap.Error(err)) + return errors.Trace(err) + } return nil } diff --git a/pkg/restore/db_test.go b/pkg/restore/db_test.go index aa015d640..9583f7f8c 100644 --- a/pkg/restore/db_test.go +++ b/pkg/restore/db_test.go @@ -41,23 +41,23 @@ func (s *testRestoreSchemaSuite) TestRestoreAutoIncID(c *C) { tk := testkit.NewTestKit(c, s.mock.Storage) tk.MustExec("use test") tk.MustExec("set @@sql_mode=''") - tk.MustExec("drop table if exists t;") + tk.MustExec("drop table if exists `\"t\"`;") // Test SQL Mode - tk.MustExec("create table t (" + - "a int not null auto_increment," + + tk.MustExec("create table `\"t\"` (" + + "a int not null," + "time timestamp not null default '0000-00-00 00:00:00'," + "primary key (a));", ) - tk.MustExec("insert into t values (10, '0000-00-00 00:00:00');") + tk.MustExec("insert into `\"t\"` values (10, '0000-00-00 00:00:00');") // Query the current AutoIncID - autoIncID, err := strconv.ParseUint(tk.MustQuery("admin show t next_row_id").Rows()[0][3].(string), 10, 64) + autoIncID, err := strconv.ParseUint(tk.MustQuery("admin show `\"t\"` next_row_id").Rows()[0][3].(string), 10, 64) c.Assert(err, IsNil, Commentf("Error query auto inc id: %s", err)) // Get schemas of db and table info, err := s.mock.Domain.GetSnapshotInfoSchema(math.MaxUint64) c.Assert(err, IsNil, Commentf("Error get snapshot info schema: %s", err)) dbInfo, exists := info.SchemaByName(model.NewCIStr("test")) c.Assert(exists, IsTrue, Commentf("Error get db info")) - tableInfo, err := info.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + tableInfo, err := info.TableByName(model.NewCIStr("test"), model.NewCIStr("\"t\"")) c.Assert(err, IsNil, Commentf("Error get table info: %s", err)) table := utils.Table{ Schema: tableInfo.Meta(), @@ -88,7 +88,7 @@ func (s *testRestoreSchemaSuite) TestRestoreAutoIncID(c *C) { c.Assert(err, IsNil, Commentf("Error create table: %s %s", err, s.mock.DSN)) tk.MustExec("use test") // Check if AutoIncID is altered successfully - autoIncID, err = strconv.ParseUint(tk.MustQuery("admin show t next_row_id").Rows()[0][3].(string), 10, 64) + autoIncID, err = strconv.ParseUint(tk.MustQuery("admin show `\"t\"` next_row_id").Rows()[0][3].(string), 10, 64) c.Assert(err, IsNil, Commentf("Error query auto inc id: %s", err)) c.Assert(autoIncID, Equals, uint64(globalAutoID+100)) } diff --git a/pkg/restore/import.go b/pkg/restore/import.go index d03da3d96..77273ebab 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -11,7 +11,7 @@ import ( "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/log" - restore_util "github.com/pingcap/tidb-tools/pkg/restore-util" + "github.com/pingcap/pd/pkg/codec" "go.uber.org/zap" "google.golang.org/grpc" @@ -59,12 +59,12 @@ type ImporterClient interface { type importClient struct { mu sync.Mutex - metaClient restore_util.Client + metaClient SplitClient clients map[uint64]import_sstpb.ImportSSTClient } // NewImportClient returns a new ImporterClient -func NewImportClient(metaClient restore_util.Client) ImporterClient { +func NewImportClient(metaClient SplitClient) ImporterClient { return &importClient{ metaClient: metaClient, clients: make(map[uint64]import_sstpb.ImportSSTClient), @@ -132,7 +132,7 @@ func (ic *importClient) getImportClient( // FileImporter used to import a file to TiKV. type FileImporter struct { - metaClient restore_util.Client + metaClient SplitClient importClient ImporterClient backend *backup.StorageBackend rateLimit uint64 @@ -144,7 +144,7 @@ type FileImporter struct { // NewFileImporter returns a new file importClient. func NewFileImporter( ctx context.Context, - metaClient restore_util.Client, + metaClient SplitClient, importClient ImporterClient, backend *backup.StorageBackend, rateLimit uint64, @@ -162,46 +162,47 @@ func NewFileImporter( // Import tries to import a file. // All rules must contain encoded keys. -func (importer *FileImporter) Import(file *backup.File, rewriteRules *restore_util.RewriteRules) error { +func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRules) error { + log.Debug("import file", zap.Stringer("file", file)) // Rewrite the start key and end key of file to scan regions - scanStartKey, startRule := rewriteRawKeyWithEncodedRules(file.GetStartKey(), rewriteRules) - if startRule == nil { - log.Error("cannot find a rewrite rule for file start key", zap.Stringer("file", file)) - return errRewriteRuleNotFound - } - scanEndKey, endRule := rewriteRawKeyWithEncodedRules(file.GetEndKey(), rewriteRules) - if endRule == nil { - log.Error("cannot find a rewrite rule for file end key", zap.Stringer("file", file)) - return errRewriteRuleNotFound + startKey, endKey, err := rewriteFileKeys(file, rewriteRules) + if err != nil { + return err } - err := withRetry(func() error { + log.Debug("rewrite file keys", + zap.Stringer("file", file), + zap.Binary("startKey", startKey), + zap.Binary("endKey", endKey), + ) + err = withRetry(func() error { ctx, cancel := context.WithTimeout(importer.ctx, importScanResgionTime) defer cancel() // Scan regions covered by the file range - regionInfos, err := importer.metaClient.ScanRegions(ctx, scanStartKey, scanEndKey, 0) - if err != nil { - return errors.Trace(err) + regionInfos, err1 := importer.metaClient.ScanRegions(ctx, startKey, endKey, 0) + if err1 != nil { + return errors.Trace(err1) } + log.Debug("scan regions", zap.Stringer("file", file), zap.Int("count", len(regionInfos))) // Try to download and ingest the file in every region for _, regionInfo := range regionInfos { var downloadMeta *import_sstpb.SSTMeta info := regionInfo // Try to download file. err = withRetry(func() error { - var err error + var err2 error var isEmpty bool - downloadMeta, isEmpty, err = importer.downloadSST(info, file, rewriteRules) - if err != nil { + downloadMeta, isEmpty, err2 = importer.downloadSST(info, file, rewriteRules) + if err2 != nil { if err != errRewriteRuleNotFound { log.Warn("download file failed", zap.Stringer("file", file), zap.Stringer("region", info.Region), - zap.ByteString("scanStartKey", scanStartKey), - zap.ByteString("scanEndKey", scanEndKey), - zap.Error(err), + zap.Binary("startKey", startKey), + zap.Binary("endKey", endKey), + zap.Error(err2), ) } - return err + return err2 } if isEmpty { log.Info( @@ -253,27 +254,46 @@ func (importer *FileImporter) setDownloadSpeedLimit(storeID uint64) error { } func (importer *FileImporter) downloadSST( - regionInfo *restore_util.RegionInfo, + regionInfo *RegionInfo, file *backup.File, - rewriteRules *restore_util.RewriteRules, + rewriteRules *RewriteRules, ) (*import_sstpb.SSTMeta, bool, error) { id, err := uuid.New().MarshalBinary() if err != nil { return nil, true, errors.Trace(err) } - regionRule := findRegionRewriteRule(regionInfo.Region, rewriteRules) + // Assume one region reflects to one rewrite rule + _, key, err := codec.DecodeBytes(regionInfo.Region.GetStartKey()) + if err != nil { + return nil, true, err + } + regionRule := matchNewPrefix(key, rewriteRules) if regionRule == nil { + log.Debug("cannot find rewrite rule, skip region", + zap.Stringer("region", regionInfo.Region), + zap.Array("tableRule", rules(rewriteRules.Table)), + zap.Array("dataRule", rules(rewriteRules.Data)), + zap.Binary("key", key), + ) return nil, true, errRewriteRuleNotFound } - sstMeta := getSSTMetaFromFile(id, file, regionInfo.Region, regionRule) + rule := import_sstpb.RewriteRule{ + OldKeyPrefix: encodeKeyPrefix(regionRule.GetOldKeyPrefix()), + NewKeyPrefix: encodeKeyPrefix(regionRule.GetNewKeyPrefix()), + } + sstMeta := getSSTMetaFromFile(id, file, regionInfo.Region, &rule) sstMeta.RegionId = regionInfo.Region.GetId() sstMeta.RegionEpoch = regionInfo.Region.GetRegionEpoch() req := &import_sstpb.DownloadRequest{ Sst: sstMeta, StorageBackend: importer.backend, Name: file.GetName(), - RewriteRule: *regionRule, + RewriteRule: rule, } + log.Debug("download SST", + zap.Stringer("sstMeta", &sstMeta), + zap.Stringer("region", regionInfo.Region), + ) var resp *import_sstpb.DownloadResponse for _, peer := range regionInfo.Region.GetPeers() { resp, err = importer.importClient.DownloadSST(importer.ctx, peer.GetStoreId(), req) @@ -290,8 +310,8 @@ func (importer *FileImporter) downloadSST( } func (importer *FileImporter) ingestSST( - fileMeta *import_sstpb.SSTMeta, - regionInfo *restore_util.RegionInfo, + sstMeta *import_sstpb.SSTMeta, + regionInfo *RegionInfo, ) error { leader := regionInfo.Leader if leader == nil { @@ -304,8 +324,9 @@ func (importer *FileImporter) ingestSST( } req := &import_sstpb.IngestRequest{ Context: reqCtx, - Sst: fileMeta, + Sst: sstMeta, } + log.Debug("download SST", zap.Stringer("sstMeta", sstMeta)) resp, err := importer.importClient.IngestSST(importer.ctx, leader.GetStoreId(), req) if err != nil { return err diff --git a/pkg/restore/range.go b/pkg/restore/range.go new file mode 100644 index 000000000..f3914539e --- /dev/null +++ b/pkg/restore/range.go @@ -0,0 +1,148 @@ +package restore + +import ( + "bytes" + "fmt" + + "github.com/google/btree" + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" + "github.com/pingcap/tidb/tablecodec" + "go.uber.org/zap" +) + +// Range represents a range of keys. +type Range struct { + StartKey []byte + EndKey []byte +} + +// String formats a range to a string +func (r *Range) String() string { + return fmt.Sprintf("[%x %x]", r.StartKey, r.EndKey) +} + +// Less compares a range with a btree.Item +func (r *Range) Less(than btree.Item) bool { + t := than.(*Range) + return len(r.EndKey) != 0 && bytes.Compare(r.EndKey, t.StartKey) <= 0 +} + +// contains returns if a key is included in the range. +func (r *Range) contains(key []byte) bool { + start, end := r.StartKey, r.EndKey + return bytes.Compare(key, start) >= 0 && + (len(end) == 0 || bytes.Compare(key, end) < 0) +} + +// sortRanges checks if the range overlapped and sort them +func sortRanges(ranges []Range, rewriteRules *RewriteRules) ([]Range, error) { + rangeTree := NewRangeTree() + for _, rg := range ranges { + if rewriteRules != nil { + startID := tablecodec.DecodeTableID(rg.StartKey) + endID := tablecodec.DecodeTableID(rg.EndKey) + var rule *import_sstpb.RewriteRule + if startID == endID { + rg.StartKey, rule = replacePrefix(rg.StartKey, rewriteRules) + if rule == nil { + log.Warn("cannot find rewrite rule", zap.Binary("key", rg.StartKey)) + } else { + log.Debug( + "rewrite start key", + zap.Binary("key", rg.StartKey), + zap.Stringer("rule", rule)) + } + rg.EndKey, rule = replacePrefix(rg.EndKey, rewriteRules) + if rule == nil { + log.Warn("cannot find rewrite rule", zap.Binary("key", rg.EndKey)) + } else { + log.Debug( + "rewrite end key", + zap.Binary("key", rg.EndKey), + zap.Stringer("rule", rule)) + } + } else { + log.Warn("table id does not match", + zap.Binary("startKey", rg.StartKey), + zap.Binary("endKey", rg.EndKey), + zap.Int64("startID", startID), + zap.Int64("endID", endID)) + return nil, errors.New("table id does not match") + } + } + if out := rangeTree.InsertRange(rg); out != nil { + return nil, errors.Errorf("ranges overlapped: %s, %s", out, rg) + } + } + sortedRanges := make([]Range, 0, len(ranges)) + rangeTree.Ascend(func(rg *Range) bool { + if rg == nil { + return false + } + sortedRanges = append(sortedRanges, *rg) + return true + }) + return sortedRanges, nil +} + +// RangeTree stores the ranges in an orderly manner. +// All the ranges it stored do not overlap. +type RangeTree struct { + tree *btree.BTree +} + +// NewRangeTree returns a new RangeTree. +func NewRangeTree() *RangeTree { + return &RangeTree{tree: btree.New(32)} +} + +// Find returns nil or a range in the range tree +func (rt *RangeTree) Find(key []byte) *Range { + var ret *Range + r := &Range{ + StartKey: key, + } + rt.tree.DescendLessOrEqual(r, func(i btree.Item) bool { + ret = i.(*Range) + return false + }) + if ret == nil || !ret.contains(key) { + return nil + } + return ret +} + +// InsertRange inserts ranges into the range tree. +// it returns true if all ranges inserted successfully. +// it returns false if there are some overlapped ranges. +func (rt *RangeTree) InsertRange(rg Range) btree.Item { + return rt.tree.ReplaceOrInsert(&rg) +} + +// RangeIterator allows callers of Ascend to iterate in-order over portions of +// the tree. When this function returns false, iteration will stop and the +// associated Ascend function will immediately return. +type RangeIterator func(rg *Range) bool + +// Ascend calls the iterator for every value in the tree within [first, last], +// until the iterator returns false. +func (rt *RangeTree) Ascend(iterator RangeIterator) { + rt.tree.Ascend(func(i btree.Item) bool { + return iterator(i.(*Range)) + }) +} + +// RegionInfo includes a region and the leader of the region. +type RegionInfo struct { + Region *metapb.Region + Leader *metapb.Peer +} + +// RewriteRules contains rules for rewriting keys of tables. +type RewriteRules struct { + Table []*import_sstpb.RewriteRule + Data []*import_sstpb.RewriteRule +} diff --git a/pkg/restore/range_test.go b/pkg/restore/range_test.go new file mode 100644 index 000000000..a9edc5b82 --- /dev/null +++ b/pkg/restore/range_test.go @@ -0,0 +1,75 @@ +package restore + +import ( + "bytes" + + . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/tidb/tablecodec" +) + +type testRangeSuite struct{} + +var _ = Suite(&testRangeSuite{}) + +type rangeEquals struct { + *CheckerInfo +} + +var RangeEquals Checker = &rangeEquals{ + &CheckerInfo{Name: "RangeEquals", Params: []string{"obtained", "expected"}}, +} + +func (checker *rangeEquals) Check(params []interface{}, names []string) (result bool, error string) { + obtained := params[0].([]Range) + expected := params[1].([]Range) + if len(obtained) != len(expected) { + return false, "" + } + for i := range obtained { + if !bytes.Equal(obtained[i].StartKey, expected[i].StartKey) || + !bytes.Equal(obtained[i].EndKey, expected[i].EndKey) { + return false, "" + } + } + return true, "" +} + +func (s *testRangeSuite) TestSortRange(c *C) { + dataRules := []*import_sstpb.RewriteRule{ + {OldKeyPrefix: tablecodec.GenTableRecordPrefix(1), NewKeyPrefix: tablecodec.GenTableRecordPrefix(4)}, + {OldKeyPrefix: tablecodec.GenTableRecordPrefix(2), NewKeyPrefix: tablecodec.GenTableRecordPrefix(5)}, + } + rewriteRules := &RewriteRules{ + Table: make([]*import_sstpb.RewriteRule, 0), + Data: dataRules, + } + ranges1 := []Range{ + {append(tablecodec.GenTableRecordPrefix(1), []byte("aaa")...), + append(tablecodec.GenTableRecordPrefix(1), []byte("bbb")...)}, + } + rs1, err := sortRanges(ranges1, rewriteRules) + c.Assert(err, IsNil, Commentf("sort range1 failed: %v", err)) + c.Assert(rs1, RangeEquals, []Range{ + {append(tablecodec.GenTableRecordPrefix(4), []byte("aaa")...), + append(tablecodec.GenTableRecordPrefix(4), []byte("bbb")...)}, + }) + + ranges2 := []Range{ + {append(tablecodec.GenTableRecordPrefix(1), []byte("aaa")...), + append(tablecodec.GenTableRecordPrefix(2), []byte("bbb")...)}, + } + _, err = sortRanges(ranges2, rewriteRules) + c.Assert(err, ErrorMatches, ".*table id does not match.*") + + ranges3 := initRanges() + rewriteRules1 := initRewriteRules() + rs3, err := sortRanges(ranges3, rewriteRules1) + c.Assert(err, IsNil, Commentf("sort range1 failed: %v", err)) + c.Assert(rs3, RangeEquals, []Range{ + {[]byte("bbd"), []byte("bbf")}, + {[]byte("bbf"), []byte("bbj")}, + {[]byte("xxa"), []byte("xxe")}, + {[]byte("xxe"), []byte("xxz")}, + }) +} diff --git a/pkg/restore/split.go b/pkg/restore/split.go new file mode 100644 index 000000000..31b23a60f --- /dev/null +++ b/pkg/restore/split.go @@ -0,0 +1,305 @@ +package restore + +import ( + "bytes" + "context" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/pingcap/tidb/util/codec" + "go.uber.org/zap" +) + +// Constants for split retry machinery. +const ( + SplitRetryTimes = 32 + SplitRetryInterval = 50 * time.Millisecond + SplitMaxRetryInterval = time.Second + + SplitCheckMaxRetryTimes = 64 + SplitCheckInterval = 8 * time.Millisecond + SplitMaxCheckInterval = time.Second + + ScatterWaitMaxRetryTimes = 64 + ScatterWaitInterval = 50 * time.Millisecond + ScatterMaxWaitInterval = time.Second + + ScatterWaitUpperInterval = 180 * time.Second +) + +// RegionSplitter is a executor of region split by rules. +type RegionSplitter struct { + client SplitClient +} + +// NewRegionSplitter returns a new RegionSplitter. +func NewRegionSplitter(client SplitClient) *RegionSplitter { + return &RegionSplitter{ + client: client, + } +} + +// OnSplitFunc is called before split a range. +type OnSplitFunc func(key [][]byte) + +// Split executes a region split. It will split regions by the rewrite rules, +// then it will split regions by the end key of each range. +// tableRules includes the prefix of a table, since some ranges may have +// a prefix with record sequence or index sequence. +// note: all ranges and rewrite rules must have raw key. +func (rs *RegionSplitter) Split( + ctx context.Context, + ranges []Range, + rewriteRules *RewriteRules, + onSplit OnSplitFunc, +) error { + if len(ranges) == 0 { + return nil + } + startTime := time.Now() + // Sort the range for getting the min and max key of the ranges + sortedRanges, err := sortRanges(ranges, rewriteRules) + if err != nil { + return errors.Trace(err) + } + minKey := codec.EncodeBytes([]byte{}, sortedRanges[0].StartKey) + maxKey := codec.EncodeBytes([]byte{}, sortedRanges[len(sortedRanges)-1].EndKey) + for _, rule := range rewriteRules.Table { + if bytes.Compare(minKey, rule.GetNewKeyPrefix()) > 0 { + minKey = rule.GetNewKeyPrefix() + } + if bytes.Compare(maxKey, rule.GetNewKeyPrefix()) < 0 { + maxKey = rule.GetNewKeyPrefix() + } + } + for _, rule := range rewriteRules.Data { + if bytes.Compare(minKey, rule.GetNewKeyPrefix()) > 0 { + minKey = rule.GetNewKeyPrefix() + } + if bytes.Compare(maxKey, rule.GetNewKeyPrefix()) < 0 { + maxKey = rule.GetNewKeyPrefix() + } + } + interval := SplitRetryInterval + scatterRegions := make([]*RegionInfo, 0) +SplitRegions: + for i := 0; i < SplitRetryTimes; i++ { + var regions []*RegionInfo + regions, err = rs.client.ScanRegions(ctx, minKey, maxKey, 0) + if err != nil { + return errors.Trace(err) + } + if len(regions) == 0 { + log.Warn("cannot scan any region") + return nil + } + splitKeyMap := getSplitKeys(rewriteRules, sortedRanges, regions) + regionMap := make(map[uint64]*RegionInfo) + for _, region := range regions { + regionMap[region.Region.GetId()] = region + } + for regionID, keys := range splitKeyMap { + var newRegions []*RegionInfo + newRegions, err = rs.splitAndScatterRegions(ctx, regionMap[regionID], keys) + if err != nil { + interval = 2 * interval + if interval > SplitMaxRetryInterval { + interval = SplitMaxRetryInterval + } + time.Sleep(interval) + if i > 3 { + log.Warn("splitting regions failed, retry it", zap.Error(err)) + } + continue SplitRegions + } + scatterRegions = append(scatterRegions, newRegions...) + onSplit(keys) + } + break + } + if err != nil { + return errors.Trace(err) + } + log.Info("splitting regions done, wait for scattering regions", + zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) + startTime = time.Now() + scatterCount := 0 + for _, region := range scatterRegions { + rs.waitForScatterRegion(ctx, region) + if time.Since(startTime) > ScatterWaitUpperInterval { + break + } + scatterCount++ + } + if scatterCount == len(scatterRegions) { + log.Info("waiting for scattering regions done", + zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) + } else { + log.Warn("waiting for scattering regions timeout", + zap.Int("scatterCount", scatterCount), + zap.Int("regions", len(scatterRegions)), + zap.Duration("take", time.Since(startTime))) + } + return nil +} + +func (rs *RegionSplitter) hasRegion(ctx context.Context, regionID uint64) (bool, error) { + regionInfo, err := rs.client.GetRegionByID(ctx, regionID) + if err != nil { + return false, err + } + return regionInfo != nil, nil +} + +func (rs *RegionSplitter) isScatterRegionFinished(ctx context.Context, regionID uint64) (bool, error) { + resp, err := rs.client.GetOperator(ctx, regionID) + if err != nil { + return false, err + } + // Heartbeat may not be sent to PD + if respErr := resp.GetHeader().GetError(); respErr != nil { + if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND { + return true, nil + } + return false, errors.Errorf("get operator error: %s", respErr.GetType()) + } + retryTimes := ctx.Value(retryTimes).(int) + if retryTimes > 3 { + log.Warn("get operator", zap.Uint64("regionID", regionID), zap.Stringer("resp", resp)) + } + // If the current operator of the region is not 'scatter-region', we could assume + // that 'scatter-operator' has finished or timeout + ok := string(resp.GetDesc()) != "scatter-region" || resp.GetStatus() != pdpb.OperatorStatus_RUNNING + return ok, nil +} + +func (rs *RegionSplitter) waitForSplit(ctx context.Context, regionID uint64) { + interval := SplitCheckInterval + for i := 0; i < SplitCheckMaxRetryTimes; i++ { + ok, err := rs.hasRegion(ctx, regionID) + if err != nil { + log.Warn("wait for split failed", zap.Error(err)) + return + } + if ok { + break + } + interval = 2 * interval + if interval > SplitMaxCheckInterval { + interval = SplitMaxCheckInterval + } + time.Sleep(interval) + } +} + +type retryTimeKey struct{} + +var retryTimes = new(retryTimeKey) + +func (rs *RegionSplitter) waitForScatterRegion(ctx context.Context, regionInfo *RegionInfo) { + interval := ScatterWaitInterval + regionID := regionInfo.Region.GetId() + for i := 0; i < ScatterWaitMaxRetryTimes; i++ { + ctx1 := context.WithValue(ctx, retryTimes, i) + ok, err := rs.isScatterRegionFinished(ctx1, regionID) + if err != nil { + log.Warn("scatter region failed: do not have the region", + zap.Stringer("region", regionInfo.Region)) + return + } + if ok { + break + } + interval = 2 * interval + if interval > ScatterMaxWaitInterval { + interval = ScatterMaxWaitInterval + } + time.Sleep(interval) + } +} + +func (rs *RegionSplitter) splitAndScatterRegions( + ctx context.Context, regionInfo *RegionInfo, keys [][]byte, +) ([]*RegionInfo, error) { + newRegions, err := rs.client.BatchSplitRegions(ctx, regionInfo, keys) + if err != nil { + return nil, err + } + for _, region := range newRegions { + // Wait for a while until the regions successfully splits. + rs.waitForSplit(ctx, region.Region.Id) + if err = rs.client.ScatterRegion(ctx, region); err != nil { + log.Warn("scatter region failed", zap.Stringer("region", region.Region), zap.Error(err)) + } + } + return newRegions, nil +} + +// getSplitKeys checks if the regions should be split by the new prefix of the rewrites rule and the end key of +// the ranges, groups the split keys by region id +func getSplitKeys(rewriteRules *RewriteRules, ranges []Range, regions []*RegionInfo) map[uint64][][]byte { + splitKeyMap := make(map[uint64][][]byte) + checkKeys := make([][]byte, 0) + for _, rule := range rewriteRules.Table { + checkKeys = append(checkKeys, rule.GetNewKeyPrefix()) + } + for _, rule := range rewriteRules.Data { + checkKeys = append(checkKeys, rule.GetNewKeyPrefix()) + } + for _, rg := range ranges { + checkKeys = append(checkKeys, rg.EndKey) + } + for _, key := range checkKeys { + if region := needSplit(key, regions); region != nil { + splitKeys, ok := splitKeyMap[region.Region.GetId()] + if !ok { + splitKeys = make([][]byte, 0, 1) + } + splitKeyMap[region.Region.GetId()] = append(splitKeys, key) + } + } + return splitKeyMap +} + +// needSplit checks whether a key is necessary to split, if true returns the split region +func needSplit(splitKey []byte, regions []*RegionInfo) *RegionInfo { + // If splitKey is the max key. + if len(splitKey) == 0 { + return nil + } + splitKey = codec.EncodeBytes([]byte{}, splitKey) + for _, region := range regions { + // If splitKey is the boundary of the region + if bytes.Equal(splitKey, region.Region.GetStartKey()) { + return nil + } + // If splitKey is in a region + if bytes.Compare(splitKey, region.Region.GetStartKey()) > 0 && beforeEnd(splitKey, region.Region.GetEndKey()) { + return region + } + } + return nil +} + +func beforeEnd(key []byte, end []byte) bool { + return bytes.Compare(key, end) < 0 || len(end) == 0 +} + +func replacePrefix(s []byte, rewriteRules *RewriteRules) ([]byte, *import_sstpb.RewriteRule) { + // We should search the dataRules firstly. + for _, rule := range rewriteRules.Data { + if bytes.HasPrefix(s, rule.GetOldKeyPrefix()) { + return append(append([]byte{}, rule.GetNewKeyPrefix()...), s[len(rule.GetOldKeyPrefix()):]...), rule + } + } + for _, rule := range rewriteRules.Table { + if bytes.HasPrefix(s, rule.GetOldKeyPrefix()) { + return append(append([]byte{}, rule.GetNewKeyPrefix()...), s[len(rule.GetOldKeyPrefix()):]...), rule + } + } + + return s, nil +} diff --git a/pkg/restore/split_client.go b/pkg/restore/split_client.go new file mode 100644 index 000000000..8a618a191 --- /dev/null +++ b/pkg/restore/split_client.go @@ -0,0 +1,353 @@ +package restore + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "path" + "strconv" + "strings" + "sync" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/kvproto/pkg/tikvpb" + pd "github.com/pingcap/pd/client" + "github.com/pingcap/pd/server/schedule/placement" + "google.golang.org/grpc" +) + +// SplitClient is an external client used by RegionSplitter. +type SplitClient interface { + // GetStore gets a store by a store id. + GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) + // GetRegion gets a region which includes a specified key. + GetRegion(ctx context.Context, key []byte) (*RegionInfo, error) + // GetRegionByID gets a region by a region id. + GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error) + // SplitRegion splits a region from a key, if key is not included in the region, it will return nil. + // note: the key should not be encoded + SplitRegion(ctx context.Context, regionInfo *RegionInfo, key []byte) (*RegionInfo, error) + // BatchSplitRegions splits a region from a batch of keys. + // note: the keys should not be encoded + BatchSplitRegions(ctx context.Context, regionInfo *RegionInfo, keys [][]byte) ([]*RegionInfo, error) + // ScatterRegion scatters a specified region. + ScatterRegion(ctx context.Context, regionInfo *RegionInfo) error + // GetOperator gets the status of operator of the specified region. + GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) + // ScanRegion gets a list of regions, starts from the region that contains key. + // Limit limits the maximum number of regions returned. + ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error) + // GetPlacementRule loads a placement rule from PD. + GetPlacementRule(ctx context.Context, groupID, ruleID string) (placement.Rule, error) + // SetPlacementRule insert or update a placement rule to PD. + SetPlacementRule(ctx context.Context, rule placement.Rule) error + // DeletePlacementRule removes a placement rule from PD. + DeletePlacementRule(ctx context.Context, groupID, ruleID string) error + // SetStoreLabel add or update specified label of stores. If labelValue + // is empty, it clears the label. + SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error +} + +// pdClient is a wrapper of pd client, can be used by RegionSplitter. +type pdClient struct { + mu sync.Mutex + client pd.Client + storeCache map[uint64]*metapb.Store +} + +// NewSplitClient returns a client used by RegionSplitter. +func NewSplitClient(client pd.Client) SplitClient { + return &pdClient{ + client: client, + storeCache: make(map[uint64]*metapb.Store), + } +} + +func (c *pdClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) { + c.mu.Lock() + defer c.mu.Unlock() + store, ok := c.storeCache[storeID] + if ok { + return store, nil + } + store, err := c.client.GetStore(ctx, storeID) + if err != nil { + return nil, err + } + c.storeCache[storeID] = store + return store, nil + +} + +func (c *pdClient) GetRegion(ctx context.Context, key []byte) (*RegionInfo, error) { + region, leader, err := c.client.GetRegion(ctx, key) + if err != nil { + return nil, err + } + if region == nil { + return nil, nil + } + return &RegionInfo{ + Region: region, + Leader: leader, + }, nil +} + +func (c *pdClient) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error) { + region, leader, err := c.client.GetRegionByID(ctx, regionID) + if err != nil { + return nil, err + } + if region == nil { + return nil, nil + } + return &RegionInfo{ + Region: region, + Leader: leader, + }, nil +} + +func (c *pdClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key []byte) (*RegionInfo, error) { + var peer *metapb.Peer + if regionInfo.Leader != nil { + peer = regionInfo.Leader + } else { + if len(regionInfo.Region.Peers) == 0 { + return nil, errors.New("region does not have peer") + } + peer = regionInfo.Region.Peers[0] + } + storeID := peer.GetStoreId() + store, err := c.GetStore(ctx, storeID) + if err != nil { + return nil, err + } + conn, err := grpc.Dial(store.GetAddress(), grpc.WithInsecure()) + if err != nil { + return nil, err + } + defer conn.Close() + + client := tikvpb.NewTikvClient(conn) + resp, err := client.SplitRegion(ctx, &kvrpcpb.SplitRegionRequest{ + Context: &kvrpcpb.Context{ + RegionId: regionInfo.Region.Id, + RegionEpoch: regionInfo.Region.RegionEpoch, + Peer: peer, + }, + SplitKey: key, + }) + if err != nil { + return nil, err + } + if resp.RegionError != nil { + return nil, errors.Errorf("split region failed: region=%v, key=%x, err=%v", regionInfo.Region, key, resp.RegionError) + } + + // BUG: Left is deprecated, it may be nil even if split is succeed! + // Assume the new region is the left one. + newRegion := resp.GetLeft() + if newRegion == nil { + regions := resp.GetRegions() + for _, r := range regions { + if bytes.Equal(r.GetStartKey(), regionInfo.Region.GetStartKey()) { + newRegion = r + break + } + } + } + if newRegion == nil { + return nil, errors.New("split region failed: new region is nil") + } + var leader *metapb.Peer + // Assume the leaders will be at the same store. + if regionInfo.Leader != nil { + for _, p := range newRegion.GetPeers() { + if p.GetStoreId() == regionInfo.Leader.GetStoreId() { + leader = p + break + } + } + } + return &RegionInfo{ + Region: newRegion, + Leader: leader, + }, nil +} + +func (c *pdClient) BatchSplitRegions( + ctx context.Context, regionInfo *RegionInfo, keys [][]byte, +) ([]*RegionInfo, error) { + var peer *metapb.Peer + if regionInfo.Leader != nil { + peer = regionInfo.Leader + } else { + if len(regionInfo.Region.Peers) == 0 { + return nil, errors.New("region does not have peer") + } + peer = regionInfo.Region.Peers[0] + } + + storeID := peer.GetStoreId() + store, err := c.GetStore(ctx, storeID) + if err != nil { + return nil, err + } + conn, err := grpc.Dial(store.GetAddress(), grpc.WithInsecure()) + if err != nil { + return nil, err + } + defer conn.Close() + client := tikvpb.NewTikvClient(conn) + resp, err := client.SplitRegion(ctx, &kvrpcpb.SplitRegionRequest{ + Context: &kvrpcpb.Context{ + RegionId: regionInfo.Region.Id, + RegionEpoch: regionInfo.Region.RegionEpoch, + Peer: peer, + }, + SplitKeys: keys, + }) + if err != nil { + return nil, err + } + if resp.RegionError != nil { + return nil, errors.Errorf("split region failed: region=%v, err=%v", regionInfo.Region, resp.RegionError) + } + + regions := resp.GetRegions() + newRegionInfos := make([]*RegionInfo, 0, len(regions)) + for _, region := range regions { + // Skip the original region + if region.GetId() == regionInfo.Region.GetId() { + continue + } + var leader *metapb.Peer + // Assume the leaders will be at the same store. + if regionInfo.Leader != nil { + for _, p := range region.GetPeers() { + if p.GetStoreId() == regionInfo.Leader.GetStoreId() { + leader = p + break + } + } + } + newRegionInfos = append(newRegionInfos, &RegionInfo{ + Region: region, + Leader: leader, + }) + } + return newRegionInfos, nil +} + +func (c *pdClient) ScatterRegion(ctx context.Context, regionInfo *RegionInfo) error { + return c.client.ScatterRegion(ctx, regionInfo.Region.GetId()) +} + +func (c *pdClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) { + return c.client.GetOperator(ctx, regionID) +} + +func (c *pdClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error) { + regions, leaders, err := c.client.ScanRegions(ctx, key, endKey, limit) + if err != nil { + return nil, err + } + regionInfos := make([]*RegionInfo, 0, len(regions)) + for i := range regions { + regionInfos = append(regionInfos, &RegionInfo{ + Region: regions[i], + Leader: leaders[i], + }) + } + return regionInfos, nil +} + +func (c *pdClient) GetPlacementRule(ctx context.Context, groupID, ruleID string) (placement.Rule, error) { + var rule placement.Rule + addr := c.getPDAPIAddr() + if addr == "" { + return rule, errors.New("failed to add stores labels: no leader") + } + req, _ := http.NewRequestWithContext(ctx, "GET", addr+path.Join("/pd/api/v1/config/rule", groupID, ruleID), nil) + res, err := http.DefaultClient.Do(req) + if err != nil { + return rule, errors.WithStack(err) + } + b, err := ioutil.ReadAll(res.Body) + if err != nil { + return rule, errors.WithStack(err) + } + res.Body.Close() + err = json.Unmarshal(b, &rule) + if err != nil { + return rule, errors.WithStack(err) + } + return rule, nil +} + +func (c *pdClient) SetPlacementRule(ctx context.Context, rule placement.Rule) error { + addr := c.getPDAPIAddr() + if addr == "" { + return errors.New("failed to add stores labels: no leader") + } + m, _ := json.Marshal(rule) + req, _ := http.NewRequestWithContext(ctx, "POST", addr+path.Join("/pd/api/v1/config/rule"), bytes.NewReader(m)) + res, err := http.DefaultClient.Do(req) + if err != nil { + return errors.WithStack(err) + } + return errors.Trace(res.Body.Close()) +} + +func (c *pdClient) DeletePlacementRule(ctx context.Context, groupID, ruleID string) error { + addr := c.getPDAPIAddr() + if addr == "" { + return errors.New("failed to add stores labels: no leader") + } + req, _ := http.NewRequestWithContext(ctx, "DELETE", addr+path.Join("/pd/api/v1/config/rule", groupID, ruleID), nil) + res, err := http.DefaultClient.Do(req) + if err != nil { + return errors.WithStack(err) + } + return errors.Trace(res.Body.Close()) +} + +func (c *pdClient) SetStoresLabel( + ctx context.Context, stores []uint64, labelKey, labelValue string, +) error { + b := []byte(fmt.Sprintf(`{"%s": "%s"}`, labelKey, labelValue)) + addr := c.getPDAPIAddr() + if addr == "" { + return errors.New("failed to add stores labels: no leader") + } + for _, id := range stores { + req, _ := http.NewRequestWithContext( + ctx, "POST", + addr+path.Join("/pd/api/v1/store", strconv.FormatUint(id, 10), "label"), + bytes.NewReader(b), + ) + res, err := http.DefaultClient.Do(req) + if err != nil { + return errors.WithStack(err) + } + err = res.Body.Close() + if err != nil { + return errors.Trace(err) + } + } + return nil +} + +func (c *pdClient) getPDAPIAddr() string { + addr := c.client.GetLeaderAddr() + if addr != "" && !strings.HasPrefix(addr, "http") { + addr = "http://" + addr + } + return strings.TrimRight(addr, "/") +} diff --git a/pkg/restore/split_test.go b/pkg/restore/split_test.go new file mode 100644 index 000000000..509c4cfa0 --- /dev/null +++ b/pkg/restore/split_test.go @@ -0,0 +1,301 @@ +package restore + +import ( + "bytes" + "context" + "sync" + + . "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/pd/server/schedule/placement" + "github.com/pingcap/tidb/util/codec" +) + +type testClient struct { + mu sync.RWMutex + stores map[uint64]*metapb.Store + regions map[uint64]*RegionInfo + nextRegionID uint64 +} + +func newTestClient(stores map[uint64]*metapb.Store, regions map[uint64]*RegionInfo, nextRegionID uint64) *testClient { + return &testClient{ + stores: stores, + regions: regions, + nextRegionID: nextRegionID, + } +} + +func (c *testClient) GetAllRegions() map[uint64]*RegionInfo { + c.mu.RLock() + defer c.mu.RUnlock() + return c.regions +} + +func (c *testClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) { + c.mu.RLock() + defer c.mu.RUnlock() + store, ok := c.stores[storeID] + if !ok { + return nil, errors.Errorf("store not found") + } + return store, nil +} + +func (c *testClient) GetRegion(ctx context.Context, key []byte) (*RegionInfo, error) { + c.mu.RLock() + defer c.mu.RUnlock() + for _, region := range c.regions { + if bytes.Compare(key, region.Region.StartKey) >= 0 && + (len(region.Region.EndKey) == 0 || bytes.Compare(key, region.Region.EndKey) < 0) { + return region, nil + } + } + return nil, errors.Errorf("region not found: key=%s", string(key)) +} + +func (c *testClient) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error) { + c.mu.RLock() + defer c.mu.RUnlock() + region, ok := c.regions[regionID] + if !ok { + return nil, errors.Errorf("region not found: id=%d", regionID) + } + return region, nil +} + +func (c *testClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key []byte) (*RegionInfo, error) { + c.mu.Lock() + defer c.mu.Unlock() + var target *RegionInfo + splitKey := codec.EncodeBytes([]byte{}, key) + for _, region := range c.regions { + if bytes.Compare(splitKey, region.Region.StartKey) >= 0 && + (len(region.Region.EndKey) == 0 || bytes.Compare(splitKey, region.Region.EndKey) < 0) { + target = region + } + } + if target == nil { + return nil, errors.Errorf("region not found: key=%s", string(key)) + } + newRegion := &RegionInfo{ + Region: &metapb.Region{ + Peers: target.Region.Peers, + Id: c.nextRegionID, + StartKey: target.Region.StartKey, + EndKey: splitKey, + }, + } + c.regions[c.nextRegionID] = newRegion + c.nextRegionID++ + target.Region.StartKey = splitKey + c.regions[target.Region.Id] = target + return newRegion, nil +} + +func (c *testClient) BatchSplitRegions( + ctx context.Context, regionInfo *RegionInfo, keys [][]byte, +) ([]*RegionInfo, error) { + c.mu.Lock() + defer c.mu.Unlock() + newRegions := make([]*RegionInfo, 0) + for _, key := range keys { + var target *RegionInfo + splitKey := codec.EncodeBytes([]byte{}, key) + for _, region := range c.regions { + if bytes.Compare(splitKey, region.Region.GetStartKey()) > 0 && + beforeEnd(splitKey, region.Region.GetEndKey()) { + target = region + } + } + if target == nil { + continue + } + newRegion := &RegionInfo{ + Region: &metapb.Region{ + Peers: target.Region.Peers, + Id: c.nextRegionID, + StartKey: target.Region.StartKey, + EndKey: splitKey, + }, + } + c.regions[c.nextRegionID] = newRegion + c.nextRegionID++ + target.Region.StartKey = splitKey + c.regions[target.Region.Id] = target + newRegions = append(newRegions, newRegion) + } + return newRegions, nil +} + +func (c *testClient) ScatterRegion(ctx context.Context, regionInfo *RegionInfo) error { + return nil +} + +func (c *testClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) { + return &pdpb.GetOperatorResponse{ + Header: new(pdpb.ResponseHeader), + }, nil +} + +func (c *testClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error) { + regions := make([]*RegionInfo, 0) + for _, region := range c.regions { + if limit > 0 && len(regions) >= limit { + break + } + if (len(region.Region.GetEndKey()) != 0 && bytes.Compare(region.Region.GetEndKey(), key) <= 0) || + bytes.Compare(region.Region.GetStartKey(), endKey) > 0 { + continue + } + regions = append(regions, region) + } + return regions, nil +} + +func (c *testClient) GetPlacementRule(ctx context.Context, groupID, ruleID string) (r placement.Rule, err error) { + return +} + +func (c *testClient) SetPlacementRule(ctx context.Context, rule placement.Rule) error { + return nil +} + +func (c *testClient) DeletePlacementRule(ctx context.Context, groupID, ruleID string) error { + return nil +} + +func (c *testClient) SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error { + return nil +} + +// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) +// range: [aaa, aae), [aae, aaz), [ccd, ccf), [ccf, ccj) +// rewrite rules: aa -> xx, cc -> bb +// expected regions after split: +// [, aay), [aay, bb), [bb, bba), [bba, bbf), [bbf, bbh), [bbh, bbj), +// [bbj, cca), [cca, xx), [xx, xxe), [xxe, xxz), [xxz, ) +func (s *testRestoreUtilSuite) TestSplit(c *C) { + client := initTestClient() + ranges := initRanges() + rewriteRules := initRewriteRules() + regionSplitter := NewRegionSplitter(client) + + ctx := context.Background() + err := regionSplitter.Split(ctx, ranges, rewriteRules, func(key [][]byte) {}) + if err != nil { + c.Assert(err, IsNil, Commentf("split regions failed: %v", err)) + } + regions := client.GetAllRegions() + if !validateRegions(regions) { + for _, region := range regions { + c.Logf("region: %v\n", region.Region) + } + c.Log("get wrong result") + c.Fail() + } +} + +// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) +func initTestClient() *testClient { + peers := make([]*metapb.Peer, 1) + peers[0] = &metapb.Peer{ + Id: 1, + StoreId: 1, + } + keys := [6]string{"", "aay", "bba", "bbh", "cca", ""} + regions := make(map[uint64]*RegionInfo) + for i := uint64(1); i < 6; i++ { + startKey := []byte(keys[i-1]) + if len(startKey) != 0 { + startKey = codec.EncodeBytes([]byte{}, startKey) + } + endKey := []byte(keys[i]) + if len(endKey) != 0 { + endKey = codec.EncodeBytes([]byte{}, endKey) + } + regions[i] = &RegionInfo{ + Region: &metapb.Region{ + Id: i, + Peers: peers, + StartKey: startKey, + EndKey: endKey, + }, + } + } + stores := make(map[uint64]*metapb.Store) + stores[1] = &metapb.Store{ + Id: 1, + } + return newTestClient(stores, regions, 6) +} + +// range: [aaa, aae), [aae, aaz), [ccd, ccf), [ccf, ccj) +func initRanges() []Range { + var ranges [4]Range + ranges[0] = Range{ + StartKey: []byte("aaa"), + EndKey: []byte("aae"), + } + ranges[1] = Range{ + StartKey: []byte("aae"), + EndKey: []byte("aaz"), + } + ranges[2] = Range{ + StartKey: []byte("ccd"), + EndKey: []byte("ccf"), + } + ranges[3] = Range{ + StartKey: []byte("ccf"), + EndKey: []byte("ccj"), + } + return ranges[:] +} + +func initRewriteRules() *RewriteRules { + var rules [2]*import_sstpb.RewriteRule + rules[0] = &import_sstpb.RewriteRule{ + OldKeyPrefix: []byte("aa"), + NewKeyPrefix: []byte("xx"), + } + rules[1] = &import_sstpb.RewriteRule{ + OldKeyPrefix: []byte("cc"), + NewKeyPrefix: []byte("bb"), + } + return &RewriteRules{ + Table: rules[:], + Data: rules[:], + } +} + +// expected regions after split: +// [, aay), [aay, bb), [bb, bba), [bba, bbf), [bbf, bbh), [bbh, bbj), +// [bbj, cca), [cca, xx), [xx, xxe), [xxe, xxz), [xxz, ) +func validateRegions(regions map[uint64]*RegionInfo) bool { + keys := [12]string{"", "aay", "bb", "bba", "bbf", "bbh", "bbj", "cca", "xx", "xxe", "xxz", ""} + if len(regions) != 11 { + return false + } +FindRegion: + for i := 1; i < 12; i++ { + for _, region := range regions { + startKey := []byte(keys[i-1]) + if len(startKey) != 0 { + startKey = codec.EncodeBytes([]byte{}, startKey) + } + endKey := []byte(keys[i]) + if len(endKey) != 0 { + endKey = codec.EncodeBytes([]byte{}, endKey) + } + if bytes.Equal(region.Region.GetStartKey(), startKey) && + bytes.Equal(region.Region.GetEndKey(), endKey) { + continue FindRegion + } + } + return false + } + return true +} diff --git a/pkg/restore/util.go b/pkg/restore/util.go index 383bd1ff4..a2e9e3e38 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -13,7 +13,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/pingcap/parser/model" - restore_util "github.com/pingcap/tidb-tools/pkg/restore-util" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" "go.uber.org/zap" @@ -28,10 +27,16 @@ type files []*backup.File func (fs files) MarshalLogArray(arr zapcore.ArrayEncoder) error { for i := range fs { - err := arr.AppendReflected(fs[i]) - if err != nil { - return err - } + arr.AppendString(fs[i].String()) + } + return nil +} + +type rules []*import_sstpb.RewriteRule + +func (rs rules) MarshalLogArray(arr zapcore.ArrayEncoder) error { + for i := range rs { + arr.AppendString(rs[i].String()) } return nil } @@ -66,31 +71,54 @@ func (alloc *idAllocator) NextGlobalAutoID(tableID int64) (int64, error) { } // GetRewriteRules returns the rewrite rule of the new table and the old table. -func GetRewriteRules(newTable *model.TableInfo, oldTable *model.TableInfo) *restore_util.RewriteRules { - tableRules := make([]*import_sstpb.RewriteRule, 0, 1) - tableRules = append(tableRules, &import_sstpb.RewriteRule{ - OldKeyPrefix: tablecodec.EncodeTablePrefix(oldTable.ID), - NewKeyPrefix: tablecodec.EncodeTablePrefix(newTable.ID), - }) - - dataRules := make([]*import_sstpb.RewriteRule, 0, len(oldTable.Indices)+1) - dataRules = append(dataRules, &import_sstpb.RewriteRule{ - OldKeyPrefix: append(tablecodec.EncodeTablePrefix(oldTable.ID), recordPrefixSep...), - NewKeyPrefix: append(tablecodec.EncodeTablePrefix(newTable.ID), recordPrefixSep...), - }) - +func GetRewriteRules( + newTable *model.TableInfo, + oldTable *model.TableInfo, + newTimeStamp uint64, +) *RewriteRules { + tableIDs := make(map[int64]int64) + tableIDs[oldTable.ID] = newTable.ID + if oldTable.Partition != nil { + for _, srcPart := range oldTable.Partition.Definitions { + for _, destPart := range newTable.Partition.Definitions { + if srcPart.Name == destPart.Name { + tableIDs[srcPart.ID] = destPart.ID + } + } + } + } + indexIDs := make(map[int64]int64) for _, srcIndex := range oldTable.Indices { for _, destIndex := range newTable.Indices { if srcIndex.Name == destIndex.Name { - dataRules = append(dataRules, &import_sstpb.RewriteRule{ - OldKeyPrefix: tablecodec.EncodeTableIndexPrefix(oldTable.ID, srcIndex.ID), - NewKeyPrefix: tablecodec.EncodeTableIndexPrefix(newTable.ID, destIndex.ID), - }) + indexIDs[srcIndex.ID] = destIndex.ID } } } - return &restore_util.RewriteRules{ + tableRules := make([]*import_sstpb.RewriteRule, 0) + dataRules := make([]*import_sstpb.RewriteRule, 0) + for oldTableID, newTableID := range tableIDs { + tableRules = append(tableRules, &import_sstpb.RewriteRule{ + OldKeyPrefix: tablecodec.EncodeTablePrefix(oldTableID), + NewKeyPrefix: tablecodec.EncodeTablePrefix(newTableID), + NewTimestamp: newTimeStamp, + }) + dataRules = append(dataRules, &import_sstpb.RewriteRule{ + OldKeyPrefix: append(tablecodec.EncodeTablePrefix(oldTableID), recordPrefixSep...), + NewKeyPrefix: append(tablecodec.EncodeTablePrefix(newTableID), recordPrefixSep...), + NewTimestamp: newTimeStamp, + }) + for oldIndexID, newIndexID := range indexIDs { + dataRules = append(dataRules, &import_sstpb.RewriteRule{ + OldKeyPrefix: tablecodec.EncodeTableIndexPrefix(oldTableID, oldIndexID), + NewKeyPrefix: tablecodec.EncodeTableIndexPrefix(newTableID, newIndexID), + NewTimestamp: newTimeStamp, + }) + } + } + + return &RewriteRules{ Table: tableRules, Data: dataRules, } @@ -167,9 +195,9 @@ func withRetry( // ValidateFileRanges checks and returns the ranges of the files. func ValidateFileRanges( files []*backup.File, - rewriteRules *restore_util.RewriteRules, -) ([]restore_util.Range, error) { - ranges := make([]restore_util.Range, 0, len(files)) + rewriteRules *RewriteRules, +) ([]Range, error) { + ranges := make([]Range, 0, len(files)) fileAppended := make(map[string]bool) for _, file := range files { @@ -179,7 +207,16 @@ func ValidateFileRanges( if err != nil { return nil, err } - ranges = append(ranges, restore_util.Range{ + startID := tablecodec.DecodeTableID(file.GetStartKey()) + endID := tablecodec.DecodeTableID(file.GetEndKey()) + if startID != endID { + log.Error("table ids dont match", + zap.Int64("startID", startID), + zap.Int64("endID", endID), + zap.Stringer("file", file)) + return nil, errors.New("table ids dont match") + } + ranges = append(ranges, Range{ StartKey: file.GetStartKey(), EndKey: file.GetEndKey(), }) @@ -190,10 +227,10 @@ func ValidateFileRanges( } // ValidateFileRewriteRule uses rewrite rules to validate the ranges of a file -func ValidateFileRewriteRule(file *backup.File, rewriteRules *restore_util.RewriteRules) error { +func ValidateFileRewriteRule(file *backup.File, rewriteRules *RewriteRules) error { // Check if the start key has a matched rewrite key - _, startRule := rewriteRawKeyWithOriginalRules(file.GetStartKey(), rewriteRules) - if startRule == nil { + _, startRule := rewriteRawKey(file.GetStartKey(), rewriteRules) + if rewriteRules != nil && startRule == nil { tableID := tablecodec.DecodeTableID(file.GetStartKey()) log.Error( "cannot find rewrite rule for file start key", @@ -203,8 +240,8 @@ func ValidateFileRewriteRule(file *backup.File, rewriteRules *restore_util.Rewri return errors.Errorf("cannot find rewrite rule") } // Check if the end key has a matched rewrite key - _, endRule := rewriteRawKeyWithOriginalRules(file.GetEndKey(), rewriteRules) - if endRule == nil { + _, endRule := rewriteRawKey(file.GetEndKey(), rewriteRules) + if rewriteRules != nil && endRule == nil { tableID := tablecodec.DecodeTableID(file.GetEndKey()) log.Error( "cannot find rewrite rule for file end key", @@ -230,92 +267,45 @@ func ValidateFileRewriteRule(file *backup.File, rewriteRules *restore_util.Rewri return nil } -// rules must be encoded -func findRegionRewriteRule( - region *metapb.Region, - rewriteRules *restore_util.RewriteRules, -) *import_sstpb.RewriteRule { - for _, rule := range rewriteRules.Data { - // regions may have the new prefix - if bytes.HasPrefix(region.GetStartKey(), rule.GetNewKeyPrefix()) { - return rule - } - } - return nil -} - -func encodeRewriteRules(rewriteRules *restore_util.RewriteRules) *restore_util.RewriteRules { - encodedTableRules := make([]*import_sstpb.RewriteRule, 0, len(rewriteRules.Table)) - encodedDataRules := make([]*import_sstpb.RewriteRule, 0, len(rewriteRules.Data)) - for _, rule := range rewriteRules.Table { - encodedTableRules = append(encodedTableRules, &import_sstpb.RewriteRule{ - OldKeyPrefix: encodeKeyPrefix(rule.GetOldKeyPrefix()), - NewKeyPrefix: encodeKeyPrefix(rule.GetNewKeyPrefix()), - }) +// Rewrites a raw key and returns a encoded key +func rewriteRawKey(key []byte, rewriteRules *RewriteRules) ([]byte, *import_sstpb.RewriteRule) { + if rewriteRules == nil { + return codec.EncodeBytes([]byte{}, key), nil } - for _, rule := range rewriteRules.Data { - encodedDataRules = append(encodedDataRules, &import_sstpb.RewriteRule{ - OldKeyPrefix: encodeKeyPrefix(rule.GetOldKeyPrefix()), - NewKeyPrefix: encodeKeyPrefix(rule.GetNewKeyPrefix()), - }) - } - return &restore_util.RewriteRules{ - Table: encodedTableRules, - Data: encodedDataRules, + if len(key) > 0 { + rule := matchOldPrefix(key, rewriteRules) + ret := bytes.Replace(key, rule.GetOldKeyPrefix(), rule.GetNewKeyPrefix(), 1) + return codec.EncodeBytes([]byte{}, ret), rule } + return nil, nil } -func encodeKeyPrefix(key []byte) []byte { - encodedPrefix := make([]byte, 0) - ungroupedLen := len(key) % 8 - encodedPrefix = - append(encodedPrefix, codec.EncodeBytes([]byte{}, key[:len(key)-ungroupedLen])...) - return append(encodedPrefix[:len(encodedPrefix)-9], key[len(key)-ungroupedLen:]...) -} - -// Encode a raw key and find a encoded rewrite rule to rewrite it. -func rewriteRawKeyWithEncodedRules( - key []byte, encodedRules *restore_util.RewriteRules, -) ([]byte, *import_sstpb.RewriteRule) { - if len(key) > 0 { - ret := codec.EncodeBytes([]byte{}, key) - for _, rule := range encodedRules.Data { - // regions may have the new prefix - if bytes.HasPrefix(ret, rule.GetOldKeyPrefix()) { - return bytes.Replace(ret, rule.GetOldKeyPrefix(), rule.GetNewKeyPrefix(), 1), rule - } +func matchOldPrefix(key []byte, rewriteRules *RewriteRules) *import_sstpb.RewriteRule { + for _, rule := range rewriteRules.Data { + if bytes.HasPrefix(key, rule.GetOldKeyPrefix()) { + return rule } - for _, rule := range encodedRules.Table { - // regions may have the new prefix - if bytes.HasPrefix(ret, rule.GetOldKeyPrefix()) { - return bytes.Replace(ret, rule.GetOldKeyPrefix(), rule.GetNewKeyPrefix(), 1), rule - } + } + for _, rule := range rewriteRules.Table { + if bytes.HasPrefix(key, rule.GetOldKeyPrefix()) { + return rule } } - return []byte(""), nil + return nil } -// Encode a raw key and find a rewrite rule to rewrite it. -func rewriteRawKeyWithOriginalRules( - key []byte, rewriteRules *restore_util.RewriteRules, -) ([]byte, *import_sstpb.RewriteRule) { - if len(key) > 0 { - ret := make([]byte, len(key)) - copy(ret, key) - for _, rule := range rewriteRules.Data { - // regions may have the new prefix - if bytes.HasPrefix(ret, rule.GetOldKeyPrefix()) { - return bytes.Replace(ret, rule.GetOldKeyPrefix(), rule.GetNewKeyPrefix(), 1), rule - } +func matchNewPrefix(key []byte, rewriteRules *RewriteRules) *import_sstpb.RewriteRule { + for _, rule := range rewriteRules.Data { + if bytes.HasPrefix(key, rule.GetNewKeyPrefix()) { + return rule } - for _, rule := range rewriteRules.Table { - // regions may have the new prefix - if bytes.HasPrefix(ret, rule.GetOldKeyPrefix()) { - return bytes.Replace(ret, rule.GetOldKeyPrefix(), rule.GetNewKeyPrefix(), 1), rule - } + } + for _, rule := range rewriteRules.Table { + if bytes.HasPrefix(key, rule.GetNewKeyPrefix()) { + return rule } } - return []byte(""), nil + return nil } func truncateTS(key []byte) []byte { @@ -328,8 +318,8 @@ func truncateTS(key []byte) []byte { func SplitRanges( ctx context.Context, client *Client, - ranges []restore_util.Range, - rewriteRules *restore_util.RewriteRules, + ranges []Range, + rewriteRules *RewriteRules, updateCh chan<- struct{}, ) error { start := time.Now() @@ -337,10 +327,52 @@ func SplitRanges( elapsed := time.Since(start) summary.CollectDuration("split region", elapsed) }() - splitter := restore_util.NewRegionSplitter(restore_util.NewClient(client.GetPDClient())) + splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient())) return splitter.Split(ctx, ranges, rewriteRules, func(keys [][]byte) { for range keys { updateCh <- struct{}{} } }) } + +func rewriteFileKeys(file *backup.File, rewriteRules *RewriteRules) (startKey, endKey []byte, err error) { + startID := tablecodec.DecodeTableID(file.GetStartKey()) + endID := tablecodec.DecodeTableID(file.GetEndKey()) + var rule *import_sstpb.RewriteRule + if startID == endID { + startKey, rule = rewriteRawKey(file.GetStartKey(), rewriteRules) + if rewriteRules != nil && rule == nil { + err = errors.New("cannot find rewrite rule for start key") + return + } + endKey, rule = rewriteRawKey(file.GetEndKey(), rewriteRules) + if rewriteRules != nil && rule == nil { + err = errors.New("cannot find rewrite rule for end key") + return + } + } else { + log.Error("table ids dont matched", + zap.Int64("startID", startID), + zap.Int64("endID", endID), + zap.Binary("startKey", startKey), + zap.Binary("endKey", endKey)) + err = errors.New("illegal table id") + } + return +} + +func encodeKeyPrefix(key []byte) []byte { + encodedPrefix := make([]byte, 0) + ungroupedLen := len(key) % 8 + encodedPrefix = append(encodedPrefix, codec.EncodeBytes([]byte{}, key[:len(key)-ungroupedLen])...) + return append(encodedPrefix[:len(encodedPrefix)-9], key[len(key)-ungroupedLen:]...) +} + +// escape the identifier for pretty-printing. +// For instance, the identifier "foo `bar`" will become "`foo ``bar```". +// The sqlMode controls whether to escape with backquotes (`) or double quotes +// (`"`) depending on whether mysql.ModeANSIQuotes is enabled. +func escapeTableName(cis model.CIStr) string { + quote := "`" + return quote + strings.Replace(cis.O, quote, quote+quote, -1) + quote +} diff --git a/pkg/restore/util_test.go b/pkg/restore/util_test.go index d579510e4..bc4da9168 100644 --- a/pkg/restore/util_test.go +++ b/pkg/restore/util_test.go @@ -5,7 +5,6 @@ import ( "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/metapb" - restore_util "github.com/pingcap/tidb-tools/pkg/restore-util" "github.com/pingcap/tidb/tablecodec" ) @@ -34,7 +33,7 @@ func (s *testRestoreUtilSuite) TestGetSSTMetaFromFile(c *C) { } func (s *testRestoreUtilSuite) TestValidateFileRanges(c *C) { - rules := &restore_util.RewriteRules{ + rules := &RewriteRules{ Table: []*import_sstpb.RewriteRule{&import_sstpb.RewriteRule{ OldKeyPrefix: []byte(tablecodec.EncodeTablePrefix(1)), NewKeyPrefix: []byte(tablecodec.EncodeTablePrefix(2)), @@ -54,10 +53,10 @@ func (s *testRestoreUtilSuite) TestValidateFileRanges(c *C) { // Range is not overlap, no rule found. _, err = ValidateFileRanges( - []*backup.File{&backup.File{ + []*backup.File{{ Name: "file_write.sst", - StartKey: []byte(tablecodec.EncodeTablePrefix(0)), - EndKey: []byte(tablecodec.EncodeTablePrefix(1)), + StartKey: tablecodec.EncodeTablePrefix(0), + EndKey: tablecodec.EncodeTablePrefix(1), }}, rules, ) @@ -65,10 +64,10 @@ func (s *testRestoreUtilSuite) TestValidateFileRanges(c *C) { // No rule for end key. _, err = ValidateFileRanges( - []*backup.File{&backup.File{ + []*backup.File{{ Name: "file_write.sst", - StartKey: []byte(tablecodec.EncodeTablePrefix(1)), - EndKey: []byte(tablecodec.EncodeTablePrefix(2)), + StartKey: tablecodec.EncodeTablePrefix(1), + EndKey: tablecodec.EncodeTablePrefix(2), }}, rules, ) @@ -76,29 +75,29 @@ func (s *testRestoreUtilSuite) TestValidateFileRanges(c *C) { // Add a rule for end key. rules.Table = append(rules.Table, &import_sstpb.RewriteRule{ - OldKeyPrefix: []byte(tablecodec.EncodeTablePrefix(2)), - NewKeyPrefix: []byte(tablecodec.EncodeTablePrefix(3)), + OldKeyPrefix: tablecodec.EncodeTablePrefix(2), + NewKeyPrefix: tablecodec.EncodeTablePrefix(3), }) _, err = ValidateFileRanges( - []*backup.File{&backup.File{ + []*backup.File{{ Name: "file_write.sst", - StartKey: []byte(tablecodec.EncodeTablePrefix(1)), - EndKey: []byte(tablecodec.EncodeTablePrefix(2)), + StartKey: tablecodec.EncodeTablePrefix(1), + EndKey: tablecodec.EncodeTablePrefix(2), }}, rules, ) - c.Assert(err, IsNil) + c.Assert(err, ErrorMatches, "table ids dont match") // Add a bad rule for end key, after rewrite start key > end key. rules.Table = append(rules.Table[:1], &import_sstpb.RewriteRule{ - OldKeyPrefix: []byte(tablecodec.EncodeTablePrefix(2)), - NewKeyPrefix: []byte(tablecodec.EncodeTablePrefix(1)), + OldKeyPrefix: tablecodec.EncodeTablePrefix(2), + NewKeyPrefix: tablecodec.EncodeTablePrefix(1), }) _, err = ValidateFileRanges( - []*backup.File{&backup.File{ + []*backup.File{{ Name: "file_write.sst", - StartKey: []byte(tablecodec.EncodeTablePrefix(1)), - EndKey: []byte(tablecodec.EncodeTablePrefix(2)), + StartKey: tablecodec.EncodeTablePrefix(1), + EndKey: tablecodec.EncodeTablePrefix(2), }}, rules, ) diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index 43a089bb1..a0df5b03e 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -65,6 +65,9 @@ You can get one from https://console.cloud.google.com/apis/credentials.`) _ = flags.MarkHidden(gcsEndpointOption) + _ = flags.MarkHidden(gcsStorageClassOption) + _ = flags.MarkHidden(gcsPredefinedACL) + _ = flags.MarkHidden(gcsCredentialsFile) } func getBackendOptionsFromGCSFlags(flags *pflag.FlagSet) (options GCSBackendOptions, err error) { diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 301f52dca..5db54556c 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -108,6 +108,13 @@ func defineS3Flags(flags *pflag.FlagSet) { flags.String(s3SSEOption, "", "Set the S3 server-side encryption algorithm, e.g. AES256") flags.String(s3ACLOption, "", "Set the S3 canned ACLs, e.g. authenticated-read") flags.String(s3ProviderOption, "", "Set the S3 provider, e.g. aws, alibaba, ceph") + + _ = flags.MarkHidden(s3EndpointOption) + _ = flags.MarkHidden(s3RegionOption) + _ = flags.MarkHidden(s3StorageClassOption) + _ = flags.MarkHidden(s3SSEOption) + _ = flags.MarkHidden(s3ACLOption) + _ = flags.MarkHidden(s3ProviderOption) } func getBackendOptionsFromS3Flags(flags *pflag.FlagSet) (options S3BackendOptions, err error) { diff --git a/pkg/utils/schema.go b/pkg/utils/schema.go index c306d8f64..67d28132f 100644 --- a/pkg/utils/schema.go +++ b/pkg/utils/schema.go @@ -17,6 +17,8 @@ import ( const ( // MetaFile represents file name MetaFile = "backupmeta" + // MetaJSONFile represents backup meta json file name + MetaJSONFile = "backupmeta.json" ) // Table wraps the schema and files of a table. @@ -29,21 +31,6 @@ type Table struct { Files []*backup.File } -// Tables wraps a array of Table. -type Tables []*Table - -func (tables Tables) Len() int { - return len(tables) -} - -func (tables Tables) Less(i, j int) bool { - return tables[i].Schema.ID < tables[j].Schema.ID -} - -func (tables Tables) Swap(i, j int) { - tables[i], tables[j] = tables[j], tables[i] -} - // Database wraps the schema and tables of a database. type Database struct { Schema *model.DBInfo diff --git a/pkg/utils/tso.go b/pkg/utils/tso.go index 44c23fc48..a4ca5f5b5 100644 --- a/pkg/utils/tso.go +++ b/pkg/utils/tso.go @@ -8,36 +8,12 @@ import ( "strings" "github.com/pingcap/errors" - "github.com/pingcap/tidb/store/tikv/oracle" ) const ( resetTSURL = "/pd/api/v1/admin/reset-ts" ) -// Timestamp is composed by a physical unix timestamp and a logical timestamp. -type Timestamp struct { - Physical int64 - Logical int64 -} - -const physicalShiftBits = 18 - -// DecodeTs decodes Timestamp from a uint64 -func DecodeTs(ts uint64) Timestamp { - physical := oracle.ExtractPhysical(ts) - logical := ts - (uint64(physical) << physicalShiftBits) - return Timestamp{ - Physical: physical, - Logical: int64(logical), - } -} - -// EncodeTs encodes Timestamp into a uint64 -func EncodeTs(tp Timestamp) uint64 { - return uint64((tp.Physical << physicalShiftBits) + tp.Logical) -} - // ResetTS resets the timestamp of PD to a bigger value func ResetTS(pdAddr string, ts uint64) error { req, err := json.Marshal(struct { diff --git a/pkg/utils/tso_test.go b/pkg/utils/tso_test.go deleted file mode 100644 index 3e6ecd9e5..000000000 --- a/pkg/utils/tso_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package utils - -import ( - "math/rand" - "time" - - . "github.com/pingcap/check" -) - -type testTsoSuite struct{} - -var _ = Suite(&testTsoSuite{}) - -func (r *testTsoSuite) TestTimestampEncodeDecode(c *C) { - rand.Seed(time.Now().UnixNano()) - for i := 0; i < 10; i++ { - ts := rand.Uint64() - tp := DecodeTs(ts) - ts1 := EncodeTs(tp) - c.Assert(ts, DeepEquals, ts1) - } -} diff --git a/pkg/utils/unit.go b/pkg/utils/unit.go index 5f8009878..a12dcb6c2 100644 --- a/pkg/utils/unit.go +++ b/pkg/utils/unit.go @@ -2,7 +2,7 @@ package utils // unit of storage const ( - B = 1 << (iota * 10) + B = uint64(1) << (iota * 10) KB MB GB diff --git a/pkg/utils/unit_test.go b/pkg/utils/unit_test.go new file mode 100644 index 000000000..5b3c00530 --- /dev/null +++ b/pkg/utils/unit_test.go @@ -0,0 +1,17 @@ +package utils + +import ( + . "github.com/pingcap/check" +) + +type testUnitSuite struct{} + +var _ = Suite(&testUnitSuite{}) + +func (r *testUnitSuite) TestLoadBackupMeta(c *C) { + c.Assert(B, Equals, uint64(1)) + c.Assert(KB, Equals, uint64(1024)) + c.Assert(MB, Equals, uint64(1024*1024)) + c.Assert(GB, Equals, uint64(1024*1024*1024)) + c.Assert(TB, Equals, uint64(1024*1024*1024*1024)) +} diff --git a/pkg/utils/version.go b/pkg/utils/version.go index bed19ffa9..13a3c7a92 100644 --- a/pkg/utils/version.go +++ b/pkg/utils/version.go @@ -1,7 +1,9 @@ package utils import ( + "bytes" "fmt" + "runtime" "github.com/pingcap/log" "github.com/pingcap/tidb/util/israce" @@ -16,25 +18,30 @@ var ( BRBuildTS = "None" BRGitHash = "None" BRGitBranch = "None" + goVersion = runtime.Version() ) -// LogBRInfo prints the BR version information. +// LogBRInfo logs version information about BR. func LogBRInfo() { log.Info("Welcome to Backup & Restore (BR)") log.Info("BR", zap.String("release-version", BRReleaseVersion)) log.Info("BR", zap.String("git-hash", BRGitHash)) log.Info("BR", zap.String("git-branch", BRGitBranch)) + log.Info("BR", zap.String("go-version", goVersion)) log.Info("BR", zap.String("utc-build-time", BRBuildTS)) log.Info("BR", zap.Bool("race-enabled", israce.RaceEnabled)) } -// PrintBRInfo prints the BR version information without log info. -func PrintBRInfo() { - fmt.Println("Release Version:", BRReleaseVersion) - fmt.Println("Git Commit Hash:", BRGitHash) - fmt.Println("Git Branch:", BRGitBranch) - fmt.Println("UTC Build Time: ", BRBuildTS) - fmt.Println("Race Enabled: ", israce.RaceEnabled) +// BRInfo returns version information about BR. +func BRInfo() string { + buf := bytes.Buffer{} + fmt.Fprintf(&buf, "Release Version: %s\n", BRReleaseVersion) + fmt.Fprintf(&buf, "Git Commit Hash: %s\n", BRGitHash) + fmt.Fprintf(&buf, "Git Branch: %s\n", BRGitBranch) + fmt.Fprintf(&buf, "Go Version: %s\n", goVersion) + fmt.Fprintf(&buf, "UTC Build Time: %s\n", BRBuildTS) + fmt.Fprintf(&buf, "Race Enabled: %t", israce.RaceEnabled) + return buf.String() } // LogArguments prints origin command arguments diff --git a/tests/_utils/run_br b/tests/_utils/run_br index ca4a09069..0619e0ee6 100755 --- a/tests/_utils/run_br +++ b/tests/_utils/run_br @@ -17,4 +17,4 @@ set -eu TEST_DIR=/tmp/backup_restore_test -bin/br.test -test.coverprofile="$TEST_DIR/cov.$TEST_NAME.$$.out.log" DEVEL "$@" +bin/br.test -test.coverprofile="$TEST_DIR/cov.$TEST_NAME.$$.out.log" DEVEL "$@" -L "debug" diff --git a/tests/br_debug_meta/run.sh b/tests/br_debug_meta/run.sh new file mode 100644 index 000000000..1dcfccefe --- /dev/null +++ b/tests/br_debug_meta/run.sh @@ -0,0 +1,59 @@ +#!/bin/sh +# +# Copyright 2019 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +DB="$TEST_NAME" + +run_sql "CREATE DATABASE $DB;" + +run_sql "CREATE TABLE $DB.usertable1 ( \ + YCSB_KEY varchar(64) NOT NULL, \ + FIELD0 varchar(1) DEFAULT NULL, \ + PRIMARY KEY (YCSB_KEY) \ +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;" + +run_sql "INSERT INTO $DB.usertable1 VALUES (\"a\", \"b\");" +run_sql "INSERT INTO $DB.usertable1 VALUES (\"aa\", \"b\");" + +# backup table +echo "backup start..." +run_br --pd $PD_ADDR backup table --db $DB --table usertable1 -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 + +# Test validate decode +run_br validate decode -s "local://$TEST_DIR/$DB" + +# should generate backupmeta.json +if [ ! -f "$TEST_DIR/$DB/backupmeta.json" ]; then + echo "TEST: [$TEST_NAME] failed!" + exit 1 +fi + +# Test validate encode +run_br validate encode -s "local://$TEST_DIR/$DB" + +# should generate backupmeta_from_json +if [ ! -f "$TEST_DIR/$DB/backupmeta_from_json" ]; then + echo "TEST: [$TEST_NAME] failed!" + exit 1 +fi + +DIFF=$(diff $TEST_DIR/$DB/backupmeta_from_json $TEST_DIR/$DB/backupmeta) +if [ "$DIFF" != "" ] +then + echo "TEST: [$TEST_NAME] failed!" + exit 1 +fi + +run_sql "DROP DATABASE $DB;" diff --git a/tests/br_incremental/run.sh b/tests/br_incremental/run.sh new file mode 100755 index 000000000..bb6a42efb --- /dev/null +++ b/tests/br_incremental/run.sh @@ -0,0 +1,75 @@ +#!/bin/sh +# +# Copyright 2019 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +DB="$TEST_NAME" +TABLE="usertable" + +run_sql "CREATE DATABASE $DB;" + +go-ycsb load mysql -P tests/$TEST_NAME/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB + +row_count_ori=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') + +# full backup +echo "full backup start..." +run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB" --db $DB -t $TABLE --ratelimit 5 --concurrency 4 + +run_sql "DROP TABLE $DB.$TABLE;" + +# full restore +echo "full restore start..." +run_br restore table --db $DB --table $TABLE -s "local://$TEST_DIR/$DB" --pd $PD_ADDR + +row_count_new=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') + +if [ "$row_count_ori" -ne "$row_count_new" ];then + echo "TEST: [$TEST_NAME] full br failed!" + exit 1 +fi + +go-ycsb run mysql -P tests/$TEST_NAME/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB + +row_count_ori=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') +last_backup_ts=$(br validate decode --field="end-version" -s "local://$TEST_DIR/$DB" | tail -n1) + +# clean up data +rm -rf $TEST_DIR/$DB + +# incremental backup +echo "incremental backup start..." +run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB" --db $DB -t $TABLE --ratelimit 5 --concurrency 4 --lastbackupts $last_backup_ts + +start_ts=$(br validate decode --field="start-version" -s "local://$TEST_DIR/$DB" | tail -n1) +end_ts=$(br validate decode --field="end-version" -s "local://$TEST_DIR/$DB" | tail -n1) + +echo "start version: $start_ts, end version: $end_ts" + +# incremental restore +echo "incremental restore start..." +run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR + +row_count_new=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') + +echo "[original] row count: $row_count_ori, [after br] row count: $row_count_new" + +if [ "$row_count_ori" -eq "$row_count_new" ];then + echo "TEST: [$TEST_NAME] successed!" +else + echo "TEST: [$TEST_NAME] failed!" + exit 1 +fi + +run_sql "DROP DATABASE $DB;" diff --git a/tests/br_incremental/workload b/tests/br_incremental/workload new file mode 100644 index 000000000..abb190620 --- /dev/null +++ b/tests/br_incremental/workload @@ -0,0 +1,12 @@ +recordcount=1000 +operationcount=1000 +workload=core + +readallfields=true + +readproportion=0 +updateproportion=0.5 +scanproportion=0 +insertproportion=0.5 + +requestdistribution=uniform \ No newline at end of file diff --git a/tests/br_insert_after_restore/run.sh b/tests/br_insert_after_restore/run.sh new file mode 100755 index 000000000..1c72db9ee --- /dev/null +++ b/tests/br_insert_after_restore/run.sh @@ -0,0 +1,82 @@ +#!/bin/sh +# +# Copyright 2019 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +DB="$TEST_NAME" +TABLE="usertable" +ROW_COUNT=10 +PATH="tests/$TEST_NAME:bin:$PATH" + +insertRecords() { + for i in $(seq $1); do + run_sql "INSERT INTO $DB.$TABLE VALUES ('$i');" + done +} + +createTable() { + run_sql "CREATE TABLE IF NOT EXISTS $DB.$TABLE (c1 CHAR(255));" +} + +echo "load data..." +echo "create database" +run_sql "CREATE DATABASE IF NOT EXISTS $DB;" +echo "create table" +createTable +echo "insert records" +insertRecords $ROW_COUNT + +row_count_ori=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') + +# backup full +echo "backup start..." +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 + +run_sql "DROP DATABASE $DB;" + +# restore full +echo "restore start..." +run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR + +row_count_new=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') + +fail=false +if [ "${row_count_ori}" != "${row_count_new}" ];then + fail=true + echo "TEST: [$TEST_NAME] fail on database $DB" +fi +echo "database $DB [original] row count: ${row_count_ori}, [after br] row count: ${row_count_new}" + +if $fail; then + echo "TEST: [$TEST_NAME] failed!" + exit 1 +fi + +# insert records +insertRecords $ROW_COUNT +row_count_insert=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') +fail=false +if [ "${row_count_insert}" != "$(expr $row_count_new \* 2)" ];then + fail=true + echo "TEST: [$TEST_NAME] fail on inserting records to database $DB after restore: ${row_count_insert}" +fi + +if $fail; then + echo "TEST: [$TEST_NAME] failed!" + exit 1 +else + echo "TEST: [$TEST_NAME] successed!" +fi + +run_sql "DROP DATABASE $DB;" diff --git a/tests/br_other/run.sh b/tests/br_other/run.sh index 579c630df..e25dd2eae 100644 --- a/tests/br_other/run.sh +++ b/tests/br_other/run.sh @@ -55,4 +55,5 @@ fi run_sql "DROP DATABASE $DB;" # Test version -run_br version +run_br --version +run_br -V diff --git a/tests/br_table_partition/prepare.sh b/tests/br_table_partition/prepare.sh new file mode 100755 index 000000000..b2efc8e5a --- /dev/null +++ b/tests/br_table_partition/prepare.sh @@ -0,0 +1,61 @@ +#!/bin/sh +# +# Copyright 2019 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu + +ROW_COUNT=100 +CONCURRENCY=8 + +insertRecords() { + for i in $(seq $2 $3); do + run_sql "INSERT INTO $1 VALUES (\ + $i, \ + REPEAT(' ', 255), \ + REPEAT(' ', 255), \ + REPEAT(' ', 255), \ + REPEAT(' ', 255)\ + );" + done +} + +createTable() { + run_sql "CREATE TABLE IF NOT EXISTS $DB.$TABLE$1 (\ + c1 INT, \ + c2 CHAR(255), \ + c3 CHAR(255), \ + c4 CHAR(255), \ + c5 CHAR(255)) \ + \ + PARTITION BY RANGE(c1) ( \ + PARTITION p0 VALUES LESS THAN (0), \ + PARTITION p1 VALUES LESS THAN ($(expr $ROW_COUNT / 2)) \ + );" + run_sql "ALTER TABLE $DB.$TABLE$1 \ + ADD PARTITION (PARTITION p2 VALUES LESS THAN MAXVALUE);" +} + +echo "load database $DB" +run_sql "CREATE DATABASE IF NOT EXISTS $DB;" +for i in $(seq $TABLE_COUNT); do + createTable "${i}" & +done +wait + +for i in $(seq $TABLE_COUNT); do + for j in $(seq $CONCURRENCY); do + insertRecords $DB.$TABLE${i} $(expr $ROW_COUNT / $CONCURRENCY \* $(expr $j - 1) + 1) $(expr $ROW_COUNT / $CONCURRENCY \* $j) & + done +done +wait diff --git a/tests/br_table_partition/run.sh b/tests/br_table_partition/run.sh new file mode 100755 index 000000000..fe0ce874b --- /dev/null +++ b/tests/br_table_partition/run.sh @@ -0,0 +1,61 @@ +#!/bin/sh +# +# Copyright 2019 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +DB="$TEST_NAME" +TABLE="usertable" +TABLE_COUNT=16 +PATH="tests/$TEST_NAME:bin:$PATH" + +echo "load data..." +DB=$DB TABLE=$TABLE TABLE_COUNT=$TABLE_COUNT prepare.sh + +for i in $(seq $TABLE_COUNT); do + row_count_ori[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE${i};" | awk '/COUNT/{print $2}') +done + +# backup full +echo "backup start..." +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 + +for i in $(seq $DB_COUNT); do + run_sql "DROP DATABASE $DB${i};" +done + +# restore full +echo "restore start..." +run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR + +for i in $(seq $DB_COUNT); do + row_count_new[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE${i};" | awk '/COUNT/{print $2}') +done + +fail=false +for i in $(seq $DB_COUNT); do + if [ "${row_count_ori[i]}" != "${row_count_new[i]}" ];then + fail=true + echo "TEST: [$TEST_NAME] fail on database $DB${i}" + fi + echo "database $DB${i} [original] row count: ${row_count_ori[i]}, [after br] row count: ${row_count_new[i]}" +done + +if $fail; then + echo "TEST: [$TEST_NAME] failed!" + exit 1 +else + echo "TEST: [$TEST_NAME] successed!" +fi + +run_sql "DROP DATABASE $DB;" diff --git a/tests/br_z_gc_safepoint/gc.go b/tests/br_z_gc_safepoint/gc.go new file mode 100644 index 000000000..a18367259 --- /dev/null +++ b/tests/br_z_gc_safepoint/gc.go @@ -0,0 +1,64 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// Test backup with exceeding GC safe point. + +package main + +import ( + "context" + "flag" + "time" + + "github.com/pingcap/log" + pd "github.com/pingcap/pd/client" + "github.com/pingcap/tidb/store/tikv/oracle" + "go.uber.org/zap" +) + +var ( + pdAddr = flag.String("pd", "", "PD address") + gcOffset = flag.Duration("gc-offset", time.Second*10, + "Set GC safe point to current time - gc-offset, default: 10s") +) + +func main() { + flag.Parse() + if *pdAddr == "" { + log.Fatal("pd address is empty") + } + if *gcOffset == time.Duration(0) { + log.Fatal("zero gc-offset is not allowed") + } + + timeout := time.Second * 10 + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + pdclient, err := pd.NewClientWithContext(ctx, []string{*pdAddr}, pd.SecurityOption{}) + if err != nil { + log.Fatal("create pd client failed", zap.Error(err)) + } + p, l, err := pdclient.GetTS(ctx) + if err != nil { + log.Fatal("get ts failed", zap.Error(err)) + } + now := oracle.ComposeTS(p, l) + nowMinusOffset := oracle.GetTimeFromTS(now).Add(-*gcOffset) + newSP := oracle.ComposeTS(oracle.GetPhysical(nowMinusOffset), 0) + _, err = pdclient.UpdateGCSafePoint(ctx, newSP) + if err != nil { + log.Fatal("create pd client failed", zap.Error(err)) + } + + log.Info("update GC safe point", zap.Uint64("SP", newSP), zap.Uint64("now", now)) +} diff --git a/tests/br_z_gc_safepoint/run.sh b/tests/br_z_gc_safepoint/run.sh new file mode 100755 index 000000000..916ca1fa8 --- /dev/null +++ b/tests/br_z_gc_safepoint/run.sh @@ -0,0 +1,46 @@ +#!/bin/sh +# +# Copyright 2019 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +# Test whether BR fails fast when backup ts exceeds GC safe point. +# It is call br_*z*_gc_safepoint, because it brings lots of write and +# slows down other tests to changing GC safe point. Adding a z prefix to run +# the test last. + +set -eu + +DB="$TEST_NAME" +TABLE="usertable" + +run_sql "CREATE DATABASE $DB;" + +go-ycsb load mysql -P tests/$TEST_NAME/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB + +row_count_ori=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') + +# Update GC safepoint to now + 5s after 10s seconds. +sleep 10 && bin/gc -pd $PD_ADDR -gc-offset "5s" & + +# Set ratelimit to 1 bytes/second, we assume it can not finish within 10s, +# so it will trigger exceed GC safe point error. +backup_gc_fail=0 +echo "backup start (expect fail)..." +run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB" --db $DB -t $TABLE --ratelimit 1 --ratelimit-unit 1 || backup_gc_fail=1 + +if [ "$backup_gc_fail" -ne "1" ];then + echo "TEST: [$TEST_NAME] failed!" + exit 1 +fi + +run_sql "DROP TABLE $DB.$TABLE;" diff --git a/tests/br_z_gc_safepoint/workload b/tests/br_z_gc_safepoint/workload new file mode 100644 index 000000000..448ca3c1a --- /dev/null +++ b/tests/br_z_gc_safepoint/workload @@ -0,0 +1,12 @@ +recordcount=1000 +operationcount=0 +workload=core + +readallfields=true + +readproportion=0 +updateproportion=0 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform \ No newline at end of file