From ff40d8203656788846df9fe9009b11e05d53e92f Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Thu, 6 Feb 2020 18:15:36 +0800 Subject: [PATCH 01/12] support backup&restore ddl Signed-off-by: 5kbpers --- cmd/backup.go | 7 ++++- cmd/restore.go | 56 ++++++++++++++++++++++++++++++++--- cmd/validate.go | 10 +++---- pkg/backup/client.go | 51 ++++++++++++++++++++++++++++++- pkg/checksum/executor.go | 8 ++--- pkg/checksum/executor_test.go | 2 +- pkg/restore/client.go | 51 +++++++++++++++++++++++++------ pkg/restore/client_test.go | 2 +- pkg/restore/db.go | 37 +++++++++++++++++------ pkg/restore/db_test.go | 8 ++--- pkg/utils/schema.go | 10 +++---- 11 files changed, 198 insertions(+), 44 deletions(-) diff --git a/cmd/backup.go b/cmd/backup.go index 73ae6106f..3113d66c6 100644 --- a/cmd/backup.go +++ b/cmd/backup.go @@ -116,6 +116,11 @@ func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error { return err } + ddlJobs, err := backup.GetBackupDDLJobs(mgr.GetDomain(), lastBackupTS, backupTS) + if err != nil { + return err + } + // The number of regions need to backup approximateRegions := 0 for _, r := range ranges { @@ -166,7 +171,7 @@ func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error { // Checksum has finished close(updateCh) - err = client.SaveBackupMeta(ctx) + err = client.SaveBackupMeta(ctx, ddlJobs) if err != nil { return err } diff --git a/cmd/restore.go b/cmd/restore.go index 4f66e47de..bd9112012 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -2,12 +2,14 @@ package cmd import ( "context" + "sort" "strings" "github.com/gogo/protobuf/proto" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/log" + "github.com/pingcap/parser/model" "github.com/pingcap/tidb/session" "github.com/spf13/cobra" flag "github.com/spf13/pflag" @@ -92,6 +94,7 @@ func runRestore(flagSet *flag.FlagSet, cmdName, dbName, tableName string) error files := make([]*backup.File, 0) tables := make([]*utils.Table, 0) + ddlJobs := make([]*model.Job, 0) defer summary.Summary(cmdName) @@ -99,7 +102,7 @@ func runRestore(flagSet *flag.FlagSet, cmdName, dbName, tableName string) error case len(dbName) == 0 && len(tableName) == 0: // full restore for _, db := range client.GetDatabases() { - err = client.CreateDatabase(db.Schema) + err = client.CreateDatabase(db.Info) if err != nil { return errors.Trace(err) } @@ -108,13 +111,14 @@ func runRestore(flagSet *flag.FlagSet, cmdName, dbName, tableName string) error } tables = append(tables, db.Tables...) } + ddlJobs = client.GetDDLJobs() case len(dbName) != 0 && len(tableName) == 0: // database restore db := client.GetDatabase(dbName) if db == nil { return errors.Errorf("database %s not found in backup", dbName) } - err = client.CreateDatabase(db.Schema) + err = client.CreateDatabase(db.Info) if err != nil { return errors.Trace(err) } @@ -122,19 +126,58 @@ func runRestore(flagSet *flag.FlagSet, cmdName, dbName, tableName string) error files = append(files, table.Files...) } tables = db.Tables + allDDLJobs := client.GetDDLJobs() + // Sort the ddl jobs by schema version in descending order. + sort.Slice(allDDLJobs, func(i, j int) bool { + return allDDLJobs[i].BinlogInfo.SchemaVersion > allDDLJobs[j].BinlogInfo.SchemaVersion + }) + // The map is for resolving some corner case. + // Let "t=2" indicates that the id of database "t" is 2, if there is a ddl execution sequence is: + // rename "a" to "b"(a=1) -> drop "b"(b=1) -> create "b"(b=2) -> rename "b" to "a"(a=2) + // Which we cannot find the "create" DDL by name and id. + // To cover †his case, we must find all ids the database ever had. + dbIDs := make(map[int64]bool) + for _, job := range allDDLJobs { + if job.SchemaID == db.Info.ID || + (job.BinlogInfo.DBInfo != nil && job.BinlogInfo.DBInfo.ID == db.Info.ID) || + dbIDs[job.SchemaID] { + dbIDs[job.SchemaID] = true + if job.BinlogInfo.DBInfo != nil { + dbIDs[job.BinlogInfo.DBInfo.ID] = true + } + ddlJobs = append(ddlJobs, job) + } + } case len(dbName) != 0 && len(tableName) != 0: // table restore db := client.GetDatabase(dbName) if db == nil { return errors.Errorf("database %s not found in backup", dbName) } - err = client.CreateDatabase(db.Schema) + err = client.CreateDatabase(db.Info) if err != nil { return errors.Trace(err) } table := db.GetTable(tableName) files = table.Files tables = append(tables, table) + allDDLJobs := client.GetDDLJobs() + // Sort the ddl jobs by schema version in descending order. + sort.Slice(allDDLJobs, func(i, j int) bool { + return allDDLJobs[i].BinlogInfo.SchemaVersion > allDDLJobs[j].BinlogInfo.SchemaVersion + }) + tableIDs := make(map[int64]bool) + for _, job := range allDDLJobs { + if job.SchemaID == table.Info.ID || + (job.BinlogInfo.TableInfo != nil && job.BinlogInfo.TableInfo.ID == table.Info.ID) || + tableIDs[job.SchemaID] { + tableIDs[job.SchemaID] = true + if job.BinlogInfo.TableInfo != nil { + tableIDs[job.BinlogInfo.TableInfo.ID] = true + } + ddlJobs = append(ddlJobs, job) + } + } default: return errors.New("must set db when table was set") } @@ -152,10 +195,15 @@ func runRestore(flagSet *flag.FlagSet, cmdName, dbName, tableName string) error } ranges, err := restore.ValidateFileRanges(files, rewriteRules) if err != nil { - return err + return errors.Trace(err) } summary.CollectInt("restore ranges", len(ranges)) + err = client.ExecDDLs(ddlJobs) + if err != nil { + return errors.Trace(err) + } + // Redirect to log if there is no log file to avoid unreadable output. updateCh := utils.StartProgress( ctx, diff --git a/cmd/validate.go b/cmd/validate.go index 8ba72b372..a8a70e91e 100644 --- a/cmd/validate.go +++ b/cmd/validate.go @@ -216,19 +216,19 @@ func newBackupMetaCommand() *cobra.Command { newTable := new(model.TableInfo) tableID, _ := tableIDAllocator.Alloc() newTable.ID = int64(tableID) - newTable.Name = table.Schema.Name - newTable.Indices = make([]*model.IndexInfo, len(table.Schema.Indices)) - for i, indexInfo := range table.Schema.Indices { + newTable.Name = table.Info.Name + newTable.Indices = make([]*model.IndexInfo, len(table.Info.Indices)) + for i, indexInfo := range table.Info.Indices { indexID, _ := indexIDAllocator.Alloc() newTable.Indices[i] = &model.IndexInfo{ ID: int64(indexID), Name: indexInfo.Name, } } - rules := restore.GetRewriteRules(newTable, table.Schema, 0) + rules := restore.GetRewriteRules(newTable, table.Info, 0) rewriteRules.Table = append(rewriteRules.Table, rules.Table...) rewriteRules.Data = append(rewriteRules.Data, rules.Data...) - tableIDMap[table.Schema.ID] = int64(tableID) + tableIDMap[table.Info.ID] = int64(tableID) } // Validate rewrite rules for _, file := range files { diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 5cba2d9bf..b0ccc9338 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" @@ -121,7 +122,12 @@ func (bc *Client) SetStorage(ctx context.Context, backend *backup.StorageBackend } // SaveBackupMeta saves the current backup meta at the given path. -func (bc *Client) SaveBackupMeta(ctx context.Context) error { +func (bc *Client) SaveBackupMeta(ctx context.Context, ddlJobs []*model.Job) error { + ddlJobsData, err := json.Marshal(ddlJobs) + if err != nil { + return errors.Trace(err) + } + bc.backupMeta.Ddls = ddlJobsData backupMetaData, err := proto.Marshal(&bc.backupMeta) if err != nil { return errors.Trace(err) @@ -276,6 +282,49 @@ LoadDb: return ranges, backupSchemas, nil } +// GetBackupDDLJobs returns the ddl jobs are done in (lastBackupTS, backupTS] +func GetBackupDDLJobs(dom *domain.Domain, lastBackupTS, backupTS uint64) ([]*model.Job, error) { + snapMeta, err := dom.GetSnapshotMeta(backupTS) + if err != nil { + return nil, errors.Trace(err) + } + lastSnapMeta, err := dom.GetSnapshotMeta(lastBackupTS) + if err != nil { + return nil, errors.Trace(err) + } + lastSchemaVersion, err := lastSnapMeta.GetSchemaVersion() + if err != nil { + return nil, errors.Trace(err) + } + allJobs := make([]*model.Job, 0) + defaultJobs, err := snapMeta.GetAllDDLJobsInQueue(meta.DefaultJobListKey) + if err != nil { + return nil, errors.Trace(err) + } + allJobs = append(allJobs, defaultJobs...) + addIndexJobs, err := snapMeta.GetAllDDLJobsInQueue(meta.AddIndexJobListKey) + if err != nil { + return nil, errors.Trace(err) + } + allJobs = append(allJobs, addIndexJobs...) + historyJobs, err := snapMeta.GetAllHistoryDDLJobs() + if err != nil { + return nil, errors.Trace(err) + } + allJobs = append(allJobs, historyJobs...) + + completedJobs := make([]*model.Job, 0) + for _, job := range allJobs { + if job.State != model.JobStateDone || + job.BinlogInfo == nil || + job.BinlogInfo.SchemaVersion <= lastSchemaVersion { + continue + } + completedJobs = append(completedJobs, job) + } + return completedJobs, nil +} + // BackupRanges make a backup of the given key ranges. func (bc *Client) BackupRanges( ctx context.Context, diff --git a/pkg/checksum/executor.go b/pkg/checksum/executor.go index 2ca5cf66d..30e8f11c8 100644 --- a/pkg/checksum/executor.go +++ b/pkg/checksum/executor.go @@ -61,7 +61,7 @@ func buildChecksumRequest( reqs := make([]*kv.Request, 0, (len(newTable.Indices)+1)*(len(partDefs)+1)) var oldTableID int64 if oldTable != nil { - oldTableID = oldTable.Schema.ID + oldTableID = oldTable.Info.ID } rs, err := buildRequest(newTable, newTable.ID, oldTable, oldTableID, startTS) if err != nil { @@ -72,7 +72,7 @@ func buildChecksumRequest( for _, partDef := range partDefs { var oldPartID int64 if oldTable != nil { - for _, oldPartDef := range oldTable.Schema.Partition.Definitions { + for _, oldPartDef := range oldTable.Info.Partition.Definitions { if oldPartDef.Name == partDef.Name { oldPartID = oldPartDef.ID } @@ -108,7 +108,7 @@ func buildRequest( } var oldIndexInfo *model.IndexInfo if oldTable != nil { - for _, oldIndex := range oldTable.Schema.Indices { + for _, oldIndex := range oldTable.Info.Indices { if oldIndex.Name == indexInfo.Name { oldIndexInfo = oldIndex break @@ -117,7 +117,7 @@ func buildRequest( if oldIndexInfo == nil { log.Panic("index not found", zap.Reflect("table", tableInfo), - zap.Reflect("oldTable", oldTable.Schema), + zap.Reflect("oldTable", oldTable.Info), zap.Stringer("index", indexInfo.Name)) } } diff --git a/pkg/checksum/executor_test.go b/pkg/checksum/executor_test.go index ca68628e2..3e6d8078c 100644 --- a/pkg/checksum/executor_test.go +++ b/pkg/checksum/executor_test.go @@ -83,7 +83,7 @@ func (s *testChecksumSuite) TestChecksum(c *C) { // Test rewrite rules tk.MustExec("alter table t1 add index i2(a);") tableInfo1 = s.getTableInfo(c, "test", "t1") - oldTable := utils.Table{Schema: tableInfo1} + oldTable := utils.Table{Info: tableInfo1} exe2, err = NewExecutorBuilder(tableInfo2, math.MaxUint64). SetOldTable(&oldTable).Build() c.Assert(err, IsNil) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 3030ba857..1a814472c 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -2,8 +2,10 @@ package restore import ( "context" + "encoding/json" "fmt" "math" + "sort" "sync" "time" @@ -47,6 +49,7 @@ type Client struct { tableWorkerPool *utils.WorkerPool databases map[string]*utils.Database + ddlJobs []*model.Job backupMeta *backup.BackupMeta db *DB rateLimit uint64 @@ -104,7 +107,16 @@ func (rc *Client) InitBackupMeta(backupMeta *backup.BackupMeta, backend *backup. if err != nil { return errors.Trace(err) } + var ddlJobs []*model.Job + err = json.Unmarshal(backupMeta.GetDdls(), &ddlJobs) + if err != nil { + return errors.Trace(err) + } rc.databases = databases + sort.Slice(ddlJobs, func(i, j int) bool { + return ddlJobs[i].BinlogInfo.SchemaVersion < ddlJobs[j].BinlogInfo.SchemaVersion + }) + rc.ddlJobs = ddlJobs rc.backupMeta = backupMeta metaClient := NewSplitClient(rc.pdClient) @@ -161,6 +173,11 @@ func (rc *Client) GetDatabase(name string) *utils.Database { return rc.databases[name] } +// GetDDLJobs returns ddl jobs +func (rc *Client) GetDDLJobs() []*model.Job { + return rc.ddlJobs +} + // GetTableSchema returns the schema of a table from TiDB. func (rc *Client) GetTableSchema( dom *domain.Domain, @@ -199,11 +216,11 @@ func (rc *Client) CreateTables( if err != nil { return nil, nil, err } - newTableInfo, err := rc.GetTableSchema(dom, table.Db.Name, table.Schema.Name) + newTableInfo, err := rc.GetTableSchema(dom, table.Db.Name, table.Info.Name) if err != nil { return nil, nil, err } - rules := GetRewriteRules(newTableInfo, table.Schema, newTS) + rules := GetRewriteRules(newTableInfo, table.Info, newTS) rewriteRules.Table = append(rewriteRules.Table, rules.Table...) rewriteRules.Data = append(rewriteRules.Data, rules.Data...) newTables = append(newTables, newTableInfo) @@ -211,6 +228,22 @@ func (rc *Client) CreateTables( return rewriteRules, newTables, nil } +// ExecDDLs executes the queries of the ddl jobs. +func (rc *Client) ExecDDLs(ddlJobs []*model.Job) error { + // Sort the ddl jobs by schema version in ascending order. + sort.Slice(ddlJobs, func(i, j int) bool { + return ddlJobs[i].BinlogInfo.SchemaVersion < ddlJobs[j].BinlogInfo.SchemaVersion + }) + for _, job := range ddlJobs { + err := rc.db.ExecDDL(rc.ctx, job) + if err != nil { + return errors.Trace(err) + } + log.Info("execute ddl query", zap.String("db", job.SchemaName), zap.String("query", job.Query)) + } + return nil +} + func (rc *Client) setSpeedLimit() error { if !rc.hasSpeedLimited && rc.rateLimit != 0 { stores, err := rc.pdClient.GetAllStores(rc.ctx, pd.WithExcludeTombstone()) @@ -238,8 +271,8 @@ func (rc *Client) RestoreTable( defer func() { elapsed := time.Since(start) log.Info("restore table", - zap.Stringer("table", table.Schema.Name), zap.Duration("take", elapsed)) - key := fmt.Sprintf("%s.%s", table.Db.Name.String(), table.Schema.Name.String()) + zap.Stringer("table", table.Info.Name), zap.Duration("take", elapsed)) + key := fmt.Sprintf("%s.%s", table.Db.Name.String(), table.Info.Name.String()) if err != nil { summary.CollectFailureUnit(key, err) } else { @@ -248,7 +281,7 @@ func (rc *Client) RestoreTable( }() log.Debug("start to restore table", - zap.Stringer("table", table.Schema.Name), + zap.Stringer("table", table.Info.Name), zap.Stringer("db", table.Db.Name), zap.Array("files", files(table.Files)), ) @@ -281,7 +314,7 @@ func (rc *Client) RestoreTable( wg.Wait() log.Error( "restore table failed", - zap.Stringer("table", table.Schema.Name), + zap.Stringer("table", table.Info.Name), zap.Stringer("db", table.Db.Name), zap.Error(err), ) @@ -290,7 +323,7 @@ func (rc *Client) RestoreTable( } log.Info( "finish to restore table", - zap.Stringer("table", table.Schema.Name), + zap.Stringer("table", table.Info.Name), zap.Stringer("db", table.Db.Name), ) return nil @@ -305,7 +338,7 @@ func (rc *Client) RestoreDatabase( start := time.Now() defer func() { elapsed := time.Since(start) - log.Info("Restore Database", zap.Stringer("db", db.Schema.Name), zap.Duration("take", elapsed)) + log.Info("Restore Database", zap.Stringer("db", db.Info.Name), zap.Duration("take", elapsed)) }() errCh := make(chan error, len(db.Tables)) wg := new(sync.WaitGroup) @@ -474,7 +507,7 @@ func (rc *Client) ValidateChecksum( checksumResp.TotalBytes != table.TotalBytes { log.Error("failed in validate checksum", zap.String("database", table.Db.Name.L), - zap.String("table", table.Schema.Name.L), + zap.String("table", table.Info.Name.L), zap.Uint64("origin tidb crc64", table.Crc64Xor), zap.Uint64("calculated crc64", checksumResp.Checksum), zap.Uint64("origin tidb total kvs", table.TotalKvs), diff --git a/pkg/restore/client_test.go b/pkg/restore/client_test.go index 5007f1281..3d608b3b9 100644 --- a/pkg/restore/client_test.go +++ b/pkg/restore/client_test.go @@ -52,7 +52,7 @@ func (s *testRestoreClientSuite) TestCreateTables(c *C) { for i := len(tables) - 1; i >= 0; i-- { tables[i] = &utils.Table{ Db: dbSchema, - Schema: &model.TableInfo{ + Info: &model.TableInfo{ ID: int64(i), Name: model.NewCIStr("test" + strconv.Itoa(i)), Columns: []*model.ColumnInfo{{ diff --git a/pkg/restore/db.go b/pkg/restore/db.go index b114b7629..a00257ba0 100644 --- a/pkg/restore/db.go +++ b/pkg/restore/db.go @@ -38,6 +38,27 @@ func NewDB(store kv.Storage) (*DB, error) { }, nil } +// ExecDDL executes the query of a ddl job. +func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { + switchDbSQL := fmt.Sprintf("use %s;", ddlJob.SchemaName) + _, err := db.se.Execute(ctx, switchDbSQL) + if err != nil { + log.Error("switch db failed", + zap.String("query", switchDbSQL), + zap.String("db", ddlJob.SchemaName), + zap.Error(err)) + return errors.Trace(err) + } + _, err = db.se.Execute(ctx, ddlJob.Query) + if err != nil { + log.Error("execute ddl query failed", + zap.String("query", ddlJob.Query), + zap.String("db", ddlJob.SchemaName), + zap.Error(err)) + } + return errors.Trace(err) +} + // CreateDatabase executes a CREATE DATABASE SQL. func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { var buf bytes.Buffer @@ -49,16 +70,15 @@ func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { createSQL := buf.String() _, err = db.se.Execute(ctx, createSQL) if err != nil { - log.Error("create database failed", zap.String("SQL", createSQL), zap.Error(err)) - return errors.Trace(err) + log.Error("create database failed", zap.String("query", createSQL), zap.Error(err)) } - return nil + return errors.Trace(err) } // CreateTable executes a CREATE TABLE SQL. func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { var buf bytes.Buffer - schema := table.Schema + schema := table.Info err := executor.ConstructResultOfShowCreateTable(db.se, schema, newIDAllocator(schema.AutoIncID), &buf) if err != nil { log.Error( @@ -88,7 +108,7 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { log.Error("create table failed", zap.String("SQL", createSQL), zap.Stringer("db", table.Db.Name), - zap.Stringer("table", table.Schema.Name), + zap.Stringer("table", table.Info.Name), zap.Error(err)) return errors.Trace(err) } @@ -99,13 +119,12 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { _, err = db.se.Execute(ctx, alterAutoIncIDSQL) if err != nil { log.Error("alter AutoIncID failed", - zap.String("SQL", alterAutoIncIDSQL), + zap.String("query", alterAutoIncIDSQL), zap.Stringer("db", table.Db.Name), - zap.Stringer("table", table.Schema.Name), + zap.Stringer("table", table.Info.Name), zap.Error(err)) - return errors.Trace(err) } - return nil + return errors.Trace(err) } // Close closes the connection diff --git a/pkg/restore/db_test.go b/pkg/restore/db_test.go index 9583f7f8c..03a38a3a5 100644 --- a/pkg/restore/db_test.go +++ b/pkg/restore/db_test.go @@ -60,16 +60,16 @@ func (s *testRestoreSchemaSuite) TestRestoreAutoIncID(c *C) { tableInfo, err := info.TableByName(model.NewCIStr("test"), model.NewCIStr("\"t\"")) c.Assert(err, IsNil, Commentf("Error get table info: %s", err)) table := utils.Table{ - Schema: tableInfo.Meta(), - Db: dbInfo, + Info: tableInfo.Meta(), + Db: dbInfo, } // Get the next AutoIncID idAlloc := autoid.NewAllocator(s.mock.Storage, dbInfo.ID, false) - globalAutoID, err := idAlloc.NextGlobalAutoID(table.Schema.ID) + globalAutoID, err := idAlloc.NextGlobalAutoID(table.Info.ID) c.Assert(err, IsNil, Commentf("Error allocate next auto id")) c.Assert(autoIncID, Equals, uint64(globalAutoID)) // Alter AutoIncID to the next AutoIncID + 100 - table.Schema.AutoIncID = globalAutoID + 100 + table.Info.AutoIncID = globalAutoID + 100 db, err := NewDB(s.mock.Storage) c.Assert(err, IsNil, Commentf("Error create DB")) tk.MustExec("drop database if exists test;") diff --git a/pkg/utils/schema.go b/pkg/utils/schema.go index 67d28132f..0afe98e5b 100644 --- a/pkg/utils/schema.go +++ b/pkg/utils/schema.go @@ -24,7 +24,7 @@ const ( // Table wraps the schema and files of a table. type Table struct { Db *model.DBInfo - Schema *model.TableInfo + Info *model.TableInfo Crc64Xor uint64 TotalKvs uint64 TotalBytes uint64 @@ -33,14 +33,14 @@ type Table struct { // Database wraps the schema and tables of a database. type Database struct { - Schema *model.DBInfo + Info *model.DBInfo Tables []*Table } // GetTable returns a table of the database by name. func (db *Database) GetTable(name string) *Table { for _, table := range db.Tables { - if table.Schema.Name.String() == name { + if table.Info.Name.String() == name { return table } } @@ -61,7 +61,7 @@ func LoadBackupTables(meta *backup.BackupMeta) (map[string]*Database, error) { db, ok := databases[dbInfo.Name.String()] if !ok { db = &Database{ - Schema: dbInfo, + Info: dbInfo, Tables: make([]*Table, 0), } databases[dbInfo.Name.String()] = db @@ -94,7 +94,7 @@ func LoadBackupTables(meta *backup.BackupMeta) (map[string]*Database, error) { } table := &Table{ Db: dbInfo, - Schema: tableInfo, + Info: tableInfo, Crc64Xor: schema.Crc64Xor, TotalKvs: schema.TotalKvs, TotalBytes: schema.TotalBytes, From 20cf2b7e09c592af6598a6d0bd72a4c5ad9a152e Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Fri, 7 Feb 2020 14:54:17 +0800 Subject: [PATCH 02/12] integration tests Signed-off-by: 5kbpers --- tests/br_incremental_ddl/run.sh | 72 +++++++++++++++++++++++++++++ tests/br_incremental_index/run.sh | 76 +++++++++++++++++++++++++++++++ 2 files changed, 148 insertions(+) create mode 100755 tests/br_incremental_ddl/run.sh create mode 100755 tests/br_incremental_index/run.sh diff --git a/tests/br_incremental_ddl/run.sh b/tests/br_incremental_ddl/run.sh new file mode 100755 index 000000000..4f9750549 --- /dev/null +++ b/tests/br_incremental_ddl/run.sh @@ -0,0 +1,72 @@ +#!/bin/sh +# +# Copyright 2019 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +DB="$TEST_NAME" +TABLE="usertable" +ROW_COUNT=100 +PATH="tests/$TEST_NAME:bin:$PATH" + +echo "load data..." +# create database +run_sql "CREATE DATABASE IF NOT EXISTS $DB;" +# create table +run_sql "CREATE TABLE IF NOT EXISTS ${DB}.${TABLE} (\ + c1 INT, \ +);" +# insert records +for i in $(seq $ROW_COUNT); do + run_sql "INSERT INTO ${DB}.${TABLE}(c1) VALUES ($i);" +done + +# full backup +echo "full backup start..." +run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/full" --db $DB -t $TABLE --ratelimit 5 --concurrency 4 +# run ddls +echo "run ddls..." +run_sql "RENAME TABLE ${DB}.${TABLE} to ${DB}.${TABLE}1;" +run_sql "DROP TABLE ${DB}.${TABLE}1;" +run_sql "CREATE TABLE ${DB}.${TABLE}1 (c2 CHAR(255));" +run_sql "RENAME TABLE ${DB}.${TABLE}1 to ${DB}.${TABLE};" +# insert records +for i in $(seq $ROW_COUNT); do + run_sql "INSERT INTO ${DB}.${TABLE}(c2) VALUES ('$i');" +done +# incremental backup +echo "incremental backup start..." +last_backup_ts=$(br validate decode --field="end-version" -s "local://$TEST_DIR/$DB/full" | tail -n1) +run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/inc" --db $DB -t $TABLE --ratelimit 5 --concurrency 4 --lastbackupts $last_backup_ts + +run_sql "DROP DATABASE $DB;" +# full restore +echo "full restore start..." +run_br restore table --db $DB --table $TABLE -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR +row_count_full=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') +# check full restore +if [ "${row_count_full}" != "${ROW_COUNT}" ];then + echo "TEST: [$TEST_NAME] full restore fail on database $DB" + exit 1 +fi +# incremental restore +echo "incremental restore start..." +run_br restore table --db $DB --table $TABLE -s "local://$TEST_DIR/$DB/inc" --pd $PD_ADDR +row_count_inc=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') +# check full restore +if [ "${row_count_inc}" != "${ROW_COUNT}" ];then + echo "TEST: [$TEST_NAME] incremental restore fail on database $DB" + exit 1 +fi + +run_sql "DROP DATABASE $DB;" diff --git a/tests/br_incremental_index/run.sh b/tests/br_incremental_index/run.sh new file mode 100755 index 000000000..dda658edf --- /dev/null +++ b/tests/br_incremental_index/run.sh @@ -0,0 +1,76 @@ +#!/bin/sh +# +# Copyright 2019 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +DB="$TEST_NAME" +TABLE="usertable" +ROW_COUNT=100 +PATH="tests/$TEST_NAME:bin:$PATH" + +echo "load data..." +# create database +run_sql "CREATE DATABASE IF NOT EXISTS $DB;" +# create table +run_sql "CREATE TABLE IF NOT EXISTS ${DB}.${TABLE} (\ + c1 INT, \ +);" +# insert records +for i in $(seq $ROW_COUNT); do + run_sql "INSERT INTO ${DB}.${TABLE} VALUES ($i);" +done + +# full backup +echo "backup full start..." +run_sql "CREATE INDEX idx_c1 ON ${DB}.${TABLE}(c1)" & +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/full" --ratelimit 5 --concurrency 4 +wait +# run ddls +echo "run ddls..." +ALTER TABLE ${DB}.${TABLE} ADD COLUMN c2 INT NOT NULL; +ALTER TABLE ${DB}.${TABLE} ADD COLUMN c3 INT NOT NULL; +ALTER TABLE ${DB}.${TABLE} DROP COLUMN c3 INT NOT NULL; +# incremental backup +echo "incremental backup start..." +last_backup_ts=$(br validate decode --field="end-version" -s "local://$TEST_DIR/$DB/full" | tail -n1) +run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/inc" --db $DB -t $TABLE --ratelimit 5 --concurrency 4 --lastbackupts $last_backup_ts + +run_sql "DROP DATABASE $DB;" +# full restore +echo "full restore start..." +run_br restore table --db $DB --table $TABLE -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR +row_count_full=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') +# check full restore +if [ "${row_count_full}" != "${ROW_COUNT}" ];then + echo "TEST: [$TEST_NAME] full restore fail on database $DB" + exit 1 +fi +# incremental restore +echo "incremental restore start..." +run_br restore table --db $DB --table $TABLE -s "local://$TEST_DIR/$DB/inc" --pd $PD_ADDR +row_count_inc=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') +# check full restore +if [ "${row_count_inc}" != "${ROW_COUNT}" ];then + echo "TEST: [$TEST_NAME] incremental restore fail on database $DB" + exit 1 +fi +run_sql "INSERT INTO ${DB}.${TABLE} VALUES (1, 1);" +row_count_insert=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') +# check insert count +if [ "${row_count_insert}" != "$(expr $row_count_inc + 1)" ];then + echo "TEST: [$TEST_NAME] insert record fail on database $DB" + exit 1 +fi + +run_sql "DROP DATABASE $DB;" From 40c2b3cd9c27db9e894849d317c4565395f9ac30 Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Tue, 11 Feb 2020 11:07:12 +0800 Subject: [PATCH 03/12] update kvproto Signed-off-by: 5kbpers --- cmd/backup.go | 10 +++++++--- cmd/restore.go | 11 +++++------ go.mod | 2 +- go.sum | 13 +++++++++++-- pkg/backup/client.go | 19 +++++++++++++------ pkg/restore/client.go | 1 + pkg/restore/split.go | 6 +++++- 7 files changed, 43 insertions(+), 19 deletions(-) diff --git a/cmd/backup.go b/cmd/backup.go index 3113d66c6..5418213ef 100644 --- a/cmd/backup.go +++ b/cmd/backup.go @@ -5,6 +5,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/parser/model" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/session" "github.com/spf13/cobra" @@ -116,9 +117,12 @@ func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error { return err } - ddlJobs, err := backup.GetBackupDDLJobs(mgr.GetDomain(), lastBackupTS, backupTS) - if err != nil { - return err + ddlJobs := make([]*model.Job, 0) + if lastBackupTS > 0 { + ddlJobs, err = backup.GetBackupDDLJobs(mgr.GetDomain(), lastBackupTS, backupTS) + if err != nil { + return err + } } // The number of regions need to backup diff --git a/cmd/restore.go b/cmd/restore.go index bd9112012..4dea2f79e 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -181,6 +181,7 @@ func runRestore(flagSet *flag.FlagSet, cmdName, dbName, tableName string) error default: return errors.New("must set db when table was set") } + summary.CollectInt("restore files", len(files)) var newTS uint64 if client.IsIncremental() { newTS, err = client.GetTS(ctx) @@ -188,21 +189,19 @@ func runRestore(flagSet *flag.FlagSet, cmdName, dbName, tableName string) error return err } } - summary.CollectInt("restore files", len(files)) - rewriteRules, newTables, err := client.CreateTables(mgr.GetDomain(), tables, newTS) + err = client.ExecDDLs(ddlJobs) if err != nil { return errors.Trace(err) } - ranges, err := restore.ValidateFileRanges(files, rewriteRules) + rewriteRules, newTables, err := client.CreateTables(mgr.GetDomain(), tables, newTS) if err != nil { return errors.Trace(err) } - summary.CollectInt("restore ranges", len(ranges)) - - err = client.ExecDDLs(ddlJobs) + ranges, err := restore.ValidateFileRanges(files, rewriteRules) if err != nil { return errors.Trace(err) } + summary.CollectInt("restore ranges", len(ranges)) // Redirect to log if there is no log file to avoid unreadable output. updateCh := utils.StartProgress( diff --git a/go.mod b/go.mod index 9951c2922..2da26a29a 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/onsi/gomega v1.7.1 // indirect github.com/pingcap/check v0.0.0-20191107115940-caf2b9e6ccf4 github.com/pingcap/errors v0.11.4 - github.com/pingcap/kvproto v0.0.0-20191212110315-d6a9d626988c + github.com/pingcap/kvproto v0.0.0-20200210234432-a965739f8162 github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 github.com/pingcap/parser v0.0.0-20191210060830-bdf23a7ade01 github.com/pingcap/pd v1.1.0-beta.0.20191212045800-234784c7a9c5 diff --git a/go.sum b/go.sum index 085e00355..583329102 100644 --- a/go.sum +++ b/go.sum @@ -26,6 +26,7 @@ github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdc github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/aws/aws-sdk-go v1.26.1 h1:JGQggXhOiNJIqsmbYUl3cYtJZUffeOWlHtxfzGK7WPI= github.com/aws/aws-sdk-go v1.26.1/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= @@ -168,6 +169,8 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf github.com/grpc-ecosystem/grpc-gateway v1.4.1/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw= github.com/grpc-ecosystem/grpc-gateway v1.9.5 h1:UImYN5qQ8tuGpGE16ZmjvcTtTw24zw1QAp/SlnNrZhI= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= +github.com/grpc-ecosystem/grpc-gateway v1.12.1 h1:zCy2xE9ablevUOrUZc3Dl72Dt+ya2FNAvC2yLYMHzi4= +github.com/grpc-ecosystem/grpc-gateway v1.12.1/go.mod h1:8XEsbTttt/W+VvjtQhLACqCisSPWTxCZ7sBRjU6iH9c= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= @@ -268,8 +271,8 @@ github.com/pingcap/kvproto v0.0.0-20191030021250-51b332bcb20b/go.mod h1:WWLmULLO github.com/pingcap/kvproto v0.0.0-20191121022655-4c654046831d/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20191202044712-32be31591b03 h1:IyJl+qesVPf3UfFFmKtX69y1K5KC8uXlot3U0QgH7V4= github.com/pingcap/kvproto v0.0.0-20191202044712-32be31591b03/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20191212110315-d6a9d626988c h1:CwVCq7XA/NvTQ6X9ZAhZlvcEvseUsHiPFQf2mL3LVl4= -github.com/pingcap/kvproto v0.0.0-20191212110315-d6a9d626988c/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= +github.com/pingcap/kvproto v0.0.0-20200210234432-a965739f8162 h1:lsoIoCoXMpcHvW6jHcqP/prA4I6duAp1DVyG2ULz4bM= +github.com/pingcap/kvproto v0.0.0-20200210234432-a965739f8162/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA= @@ -313,6 +316,7 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/remyoudompheng/bigfft v0.0.0-20190512091148-babf20351dd7 h1:FUL3b97ZY2EPqg2NbXKuMHs5pXJB9hjj1fDHnF2vl28= github.com/remyoudompheng/bigfft v0.0.0-20190512091148-babf20351dd7/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= +github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= @@ -458,6 +462,7 @@ golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190909003024-a7b16738d86b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20191002035440-2ec189313ef0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191011234655-491137f69257 h1:ry8e2D+cwaV6hk7lb3aRTjjZo24shrbK0e11QEOkTIg= golang.org/x/net v0.0.0-20191011234655-491137f69257/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -524,6 +529,7 @@ golang.org/x/tools v0.0.0-20191107010934-f79515f33823 h1:akkRBeitX2EZP59KdtKw310 golang.org/x/tools v0.0.0-20191107010934-f79515f33823/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2 h1:EtTFh6h4SAKemS+CURDMTDIANuduG5zKEXShyy18bGA= golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f h1:kDxGY2VmgABOe55qheT/TFqUMtcTHnomIPS1iv3G4Ms= golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191213032237-7093a17b0467 h1:Jybbe55FT+YYZIJGWmJIA4ZGcglFuZOduakIW3+gHXY= golang.org/x/tools v0.0.0-20191213032237-7093a17b0467/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= @@ -554,6 +560,7 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98 google.golang.org/genproto v0.0.0-20190905072037-92dd089d5514 h1:oFSK4421fpCKRrpzIpybyBVWyht05NegY9+L/3TLAZs= google.golang.org/genproto v0.0.0-20190905072037-92dd089d5514/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8= +google.golang.org/genproto v0.0.0-20190927181202-20e1ac93f88c/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8= google.golang.org/genproto v0.0.0-20191115194625-c23dd37a84c9 h1:6XzpBoANz1NqMNfDXzc2QmHmbb1vyMsvRfoP5rM+K1I= google.golang.org/genproto v0.0.0-20191115194625-c23dd37a84c9/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= @@ -563,6 +570,7 @@ google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiq google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRnRtcA= google.golang.org/grpc v1.25.1 h1:wdKvqQk7IttEw92GoRyKG2IDrUIpgpj6H6m81yfeMW0= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= @@ -586,6 +594,7 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/backup/client.go b/pkg/backup/client.go index b0ccc9338..0afd2fab6 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -135,7 +135,7 @@ func (bc *Client) SaveBackupMeta(ctx context.Context, ddlJobs []*model.Job) erro log.Debug("backup meta", zap.Reflect("meta", bc.backupMeta)) backendURL := storage.FormatBackendURL(bc.backend) - log.Info("save backup meta", zap.Stringer("path", &backendURL)) + log.Info("save backup meta", zap.Stringer("path", &backendURL), zap.Int("jobs", len(ddlJobs))) return bc.storage.Write(ctx, utils.MetaFile, backupMetaData) } @@ -301,27 +301,34 @@ func GetBackupDDLJobs(dom *domain.Domain, lastBackupTS, backupTS uint64) ([]*mod if err != nil { return nil, errors.Trace(err) } + log.Debug("get default jobs", zap.Int("jobs", len(defaultJobs))) allJobs = append(allJobs, defaultJobs...) addIndexJobs, err := snapMeta.GetAllDDLJobsInQueue(meta.AddIndexJobListKey) if err != nil { return nil, errors.Trace(err) } + log.Debug("get add index jobs", zap.Int("jobs", len(addIndexJobs))) allJobs = append(allJobs, addIndexJobs...) historyJobs, err := snapMeta.GetAllHistoryDDLJobs() if err != nil { return nil, errors.Trace(err) } + log.Debug("get history jobs", zap.Int("jobs", len(historyJobs))) allJobs = append(allJobs, historyJobs...) completedJobs := make([]*model.Job, 0) for _, job := range allJobs { - if job.State != model.JobStateDone || - job.BinlogInfo == nil || - job.BinlogInfo.SchemaVersion <= lastSchemaVersion { - continue + log.Debug("get job", + zap.String("query", job.Query), + zap.Int64("schemaVersion", job.BinlogInfo.SchemaVersion), + zap.Int64("lastSchemaVersion", lastSchemaVersion), + zap.Stringer("state", job.State)) + if (job.State == model.JobStateDone || job.State == model.JobStateSynced) && + (job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion) { + completedJobs = append(completedJobs, job) } - completedJobs = append(completedJobs, job) } + log.Debug("get completed jobs", zap.Int("jobs", len(completedJobs))) return completedJobs, nil } diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 1a814472c..6059260fc 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -118,6 +118,7 @@ func (rc *Client) InitBackupMeta(backupMeta *backup.BackupMeta, backend *backup. }) rc.ddlJobs = ddlJobs rc.backupMeta = backupMeta + log.Info("load backupmeta", zap.Int("databases", len(rc.databases)), zap.Int("jobs", len(rc.ddlJobs))) metaClient := NewSplitClient(rc.pdClient) importClient := NewImportClient(metaClient) diff --git a/pkg/restore/split.go b/pkg/restore/split.go index 31b23a60f..195329876 100644 --- a/pkg/restore/split.go +++ b/pkg/restore/split.go @@ -3,6 +3,7 @@ package restore import ( "bytes" "context" + "strings" "time" "github.com/pingcap/errors" @@ -105,6 +106,9 @@ SplitRegions: var newRegions []*RegionInfo newRegions, err = rs.splitAndScatterRegions(ctx, regionMap[regionID], keys) if err != nil { + if strings.Contains(err.Error(), "no valid key") { + continue + } interval = 2 * interval if interval > SplitMaxRetryInterval { interval = SplitMaxRetryInterval @@ -120,7 +124,7 @@ SplitRegions: } break } - if err != nil { + if err != nil && !strings.Contains(err.Error(), "no valid key") { return errors.Trace(err) } log.Info("splitting regions done, wait for scattering regions", From e5ae264f76b9fcf478aef85615b16cf6663a67ee Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Tue, 11 Feb 2020 11:08:05 +0800 Subject: [PATCH 04/12] fix integration tests Signed-off-by: 5kbpers --- tests/br_incremental_ddl/run.sh | 5 ++--- tests/br_incremental_index/run.sh | 10 ++++------ tests/config/tikv.toml | 1 + tests/run.sh | 2 ++ 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/br_incremental_ddl/run.sh b/tests/br_incremental_ddl/run.sh index 4f9750549..31593c7e6 100755 --- a/tests/br_incremental_ddl/run.sh +++ b/tests/br_incremental_ddl/run.sh @@ -23,9 +23,7 @@ echo "load data..." # create database run_sql "CREATE DATABASE IF NOT EXISTS $DB;" # create table -run_sql "CREATE TABLE IF NOT EXISTS ${DB}.${TABLE} (\ - c1 INT, \ -);" +run_sql "CREATE TABLE IF NOT EXISTS ${DB}.${TABLE} (c1 INT);" # insert records for i in $(seq $ROW_COUNT); do run_sql "INSERT INTO ${DB}.${TABLE}(c1) VALUES ($i);" @@ -68,5 +66,6 @@ if [ "${row_count_inc}" != "${ROW_COUNT}" ];then echo "TEST: [$TEST_NAME] incremental restore fail on database $DB" exit 1 fi +run_sql "INSERT INTO ${DB}.${TABLE}(c2) VALUES ('1');" run_sql "DROP DATABASE $DB;" diff --git a/tests/br_incremental_index/run.sh b/tests/br_incremental_index/run.sh index dda658edf..f4b4b9de7 100755 --- a/tests/br_incremental_index/run.sh +++ b/tests/br_incremental_index/run.sh @@ -23,9 +23,7 @@ echo "load data..." # create database run_sql "CREATE DATABASE IF NOT EXISTS $DB;" # create table -run_sql "CREATE TABLE IF NOT EXISTS ${DB}.${TABLE} (\ - c1 INT, \ -);" +run_sql "CREATE TABLE IF NOT EXISTS ${DB}.${TABLE} (c1 INT);" # insert records for i in $(seq $ROW_COUNT); do run_sql "INSERT INTO ${DB}.${TABLE} VALUES ($i);" @@ -38,9 +36,9 @@ run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/full" --ratelimit 5 - wait # run ddls echo "run ddls..." -ALTER TABLE ${DB}.${TABLE} ADD COLUMN c2 INT NOT NULL; -ALTER TABLE ${DB}.${TABLE} ADD COLUMN c3 INT NOT NULL; -ALTER TABLE ${DB}.${TABLE} DROP COLUMN c3 INT NOT NULL; +run_sql "ALTER TABLE ${DB}.${TABLE} ADD COLUMN c2 INT NOT NULL;"; +run_sql "ALTER TABLE ${DB}.${TABLE} ADD COLUMN c3 INT NOT NULL;"; +run_sql "ALTER TABLE ${DB}.${TABLE} DROP COLUMN c3;"; # incremental backup echo "incremental backup start..." last_backup_ts=$(br validate decode --field="end-version" -s "local://$TEST_DIR/$DB/full" | tail -n1) diff --git a/tests/config/tikv.toml b/tests/config/tikv.toml index e93a16597..73323d878 100644 --- a/tests/config/tikv.toml +++ b/tests/config/tikv.toml @@ -11,3 +11,4 @@ max-open-files = 4096 [raftstore] # true (default value) for high reliability, this can prevent data loss when power failure. sync-log = false +capacity = "10GB" \ No newline at end of file diff --git a/tests/run.sh b/tests/run.sh index 3cedc7093..a4edb762a 100755 --- a/tests/run.sh +++ b/tests/run.sh @@ -24,6 +24,7 @@ TIDB_ADDR="127.0.0.1:4000" TIDB_STATUS_ADDR="127.0.0.1:10080" # actaul tikv_addr are TIKV_ADDR${i} TIKV_ADDR="127.0.0.1:2016" +TIKV_STATUS_ADDR="127.0.0.1:2018" TIKV_COUNT=4 stop_services() { @@ -55,6 +56,7 @@ start_services() { bin/tikv-server \ --pd "$PD_ADDR" \ -A "$TIKV_ADDR$i" \ + --status-addr "$TIKV_STATUS_ADDR$i" \ --log-file "$TEST_DIR/tikv${i}.log" \ -C "tests/config/tikv.toml" \ -s "$TEST_DIR/tikv${i}" & From 07de065c981489da2ee7272864cd232f00e3d0f3 Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Tue, 11 Feb 2020 11:54:20 +0800 Subject: [PATCH 05/12] reduce cyclomatic complexity of `runRestore` Signed-off-by: 5kbpers --- cmd/restore.go | 182 ++++++++++++++++++++++++++----------------------- 1 file changed, 96 insertions(+), 86 deletions(-) diff --git a/cmd/restore.go b/cmd/restore.go index 4dea2f79e..2ff8502ee 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -91,20 +91,106 @@ func runRestore(flagSet *flag.FlagSet, cmdName, dbName, tableName string) error if err != nil { return errors.Trace(err) } + files, tables, ddlJobs, err := getRestoreInfos(client, dbName, tableName) + if err != nil { + return errors.Trace(err) + } + + summary.CollectInt("restore files", len(files)) + defer summary.Summary(cmdName) + var newTS uint64 + if client.IsIncremental() { + newTS, err = client.GetTS(ctx) + if err != nil { + return err + } + } + err = client.ExecDDLs(ddlJobs) + if err != nil { + return errors.Trace(err) + } + rewriteRules, newTables, err := client.CreateTables(mgr.GetDomain(), tables, newTS) + if err != nil { + return errors.Trace(err) + } + ranges, err := restore.ValidateFileRanges(files, rewriteRules) + if err != nil { + return errors.Trace(err) + } + summary.CollectInt("restore ranges", len(ranges)) + + // Redirect to log if there is no log file to avoid unreadable output. + updateCh := utils.StartProgress( + ctx, + cmdName, + // Split/Scatter + Download/Ingest + int64(len(ranges)+len(files)), + !HasLogFile()) + + err = restore.SplitRanges(ctx, client, ranges, rewriteRules, updateCh) + if err != nil { + log.Error("split regions failed", zap.Error(err)) + return errors.Trace(err) + } + + if !client.IsIncremental() { + var pdAddr string + pdAddr, err = flagSet.GetString(FlagPD) + if err != nil { + return errors.Trace(err) + } + pdAddrs := strings.Split(pdAddr, ",") + err = client.ResetTS(pdAddrs) + if err != nil { + log.Error("reset pd TS failed", zap.Error(err)) + return errors.Trace(err) + } + } + + removedSchedulers, err := RestorePrepareWork(ctx, client, mgr) + if err != nil { + return errors.Trace(err) + } + + err = client.RestoreAll(rewriteRules, updateCh) + if err != nil { + return errors.Trace(err) + } + + err = RestorePostWork(ctx, client, mgr, removedSchedulers) + if err != nil { + return errors.Trace(err) + } + // Restore has finished. + close(updateCh) + + // Checksum + updateCh = utils.StartProgress( + ctx, "Checksum", int64(len(newTables)), !HasLogFile()) + err = client.ValidateChecksum( + ctx, mgr.GetTiKV().GetClient(), tables, newTables, updateCh) + if err != nil { + return err + } + close(updateCh) + + return nil +} +func getRestoreInfos( + client *restore.Client, dbName, tableName string, +) ([]*backup.File, []*utils.Table, []*model.Job, error) { + var err error files := make([]*backup.File, 0) tables := make([]*utils.Table, 0) ddlJobs := make([]*model.Job, 0) - - defer summary.Summary(cmdName) - switch { case len(dbName) == 0 && len(tableName) == 0: // full restore for _, db := range client.GetDatabases() { err = client.CreateDatabase(db.Info) if err != nil { - return errors.Trace(err) + return nil, nil, nil, errors.Trace(err) } for _, table := range db.Tables { files = append(files, table.Files...) @@ -116,11 +202,11 @@ func runRestore(flagSet *flag.FlagSet, cmdName, dbName, tableName string) error // database restore db := client.GetDatabase(dbName) if db == nil { - return errors.Errorf("database %s not found in backup", dbName) + return nil, nil, nil, errors.Errorf("database %s not found in backup", dbName) } err = client.CreateDatabase(db.Info) if err != nil { - return errors.Trace(err) + return nil, nil, nil, errors.Trace(err) } for _, table := range db.Tables { files = append(files, table.Files...) @@ -152,11 +238,11 @@ func runRestore(flagSet *flag.FlagSet, cmdName, dbName, tableName string) error // table restore db := client.GetDatabase(dbName) if db == nil { - return errors.Errorf("database %s not found in backup", dbName) + return nil, nil, nil, errors.Errorf("database %s not found in backup", dbName) } err = client.CreateDatabase(db.Info) if err != nil { - return errors.Trace(err) + return nil, nil, nil, errors.Trace(err) } table := db.GetTable(tableName) files = table.Files @@ -179,86 +265,10 @@ func runRestore(flagSet *flag.FlagSet, cmdName, dbName, tableName string) error } } default: - return errors.New("must set db when table was set") - } - summary.CollectInt("restore files", len(files)) - var newTS uint64 - if client.IsIncremental() { - newTS, err = client.GetTS(ctx) - if err != nil { - return err - } - } - err = client.ExecDDLs(ddlJobs) - if err != nil { - return errors.Trace(err) - } - rewriteRules, newTables, err := client.CreateTables(mgr.GetDomain(), tables, newTS) - if err != nil { - return errors.Trace(err) - } - ranges, err := restore.ValidateFileRanges(files, rewriteRules) - if err != nil { - return errors.Trace(err) + return nil, nil, nil, errors.New("must set db when table was set") } - summary.CollectInt("restore ranges", len(ranges)) - // Redirect to log if there is no log file to avoid unreadable output. - updateCh := utils.StartProgress( - ctx, - cmdName, - // Split/Scatter + Download/Ingest - int64(len(ranges)+len(files)), - !HasLogFile()) - - err = restore.SplitRanges(ctx, client, ranges, rewriteRules, updateCh) - if err != nil { - log.Error("split regions failed", zap.Error(err)) - return errors.Trace(err) - } - - if !client.IsIncremental() { - var pdAddr string - pdAddr, err = flagSet.GetString(FlagPD) - if err != nil { - return errors.Trace(err) - } - pdAddrs := strings.Split(pdAddr, ",") - err = client.ResetTS(pdAddrs) - if err != nil { - log.Error("reset pd TS failed", zap.Error(err)) - return errors.Trace(err) - } - } - - removedSchedulers, err := RestorePrepareWork(ctx, client, mgr) - if err != nil { - return errors.Trace(err) - } - - err = client.RestoreAll(rewriteRules, updateCh) - if err != nil { - return errors.Trace(err) - } - - err = RestorePostWork(ctx, client, mgr, removedSchedulers) - if err != nil { - return errors.Trace(err) - } - // Restore has finished. - close(updateCh) - - // Checksum - updateCh = utils.StartProgress( - ctx, "Checksum", int64(len(newTables)), !HasLogFile()) - err = client.ValidateChecksum( - ctx, mgr.GetTiKV().GetClient(), tables, newTables, updateCh) - if err != nil { - return err - } - close(updateCh) - - return nil + return files, tables, ddlJobs, nil } func newFullRestoreCommand() *cobra.Command { From 3e485c0f1ccd27f8b68ea6706dcc12c3cb457835 Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Tue, 11 Feb 2020 15:36:51 +0800 Subject: [PATCH 06/12] fix test Signed-off-by: 5kbpers --- tests/br_full_ddl/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/br_full_ddl/run.sh b/tests/br_full_ddl/run.sh index 3db1ecd60..1e40415d7 100755 --- a/tests/br_full_ddl/run.sh +++ b/tests/br_full_ddl/run.sh @@ -28,7 +28,7 @@ for i in $(seq $DDL_COUNT); do run_sql "USE $DB; ALTER TABLE $TABLE ADD INDEX (FIELD$i);" done -for i in $(sql $DDL_COUNT); do +for i in $(seq $DDL_COUNT); do if (( RANDOM % 2 )); then run_sql "USE $DB; ALTER TABLE $TABLE DROP INDEX FIELD$i;" fi From a46809969be013e9f2843b8661a2d83bef0969a4 Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Mon, 17 Feb 2020 13:31:00 +0800 Subject: [PATCH 07/12] add unit test Signed-off-by: 5kbpers --- pkg/restore/client.go | 12 ++++- pkg/restore/db.go | 82 +++++++++++++++++++++++++++++---- pkg/restore/db_test.go | 42 +++++++++++++++-- pkg/task/restore.go | 46 +----------------- tests/br_incremental_ddl/run.sh | 10 ++-- 5 files changed, 128 insertions(+), 64 deletions(-) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index e5c0570e5..7988cff24 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -222,12 +222,22 @@ func (rc *Client) ExecDDLs(ddlJobs []*model.Job) error { sort.Slice(ddlJobs, func(i, j int) bool { return ddlJobs[i].BinlogInfo.SchemaVersion < ddlJobs[j].BinlogInfo.SchemaVersion }) + + for _, job := range ddlJobs { + log.Debug("pre-execute ddl jobs", + zap.String("db", job.SchemaName), + zap.String("query", job.Query), + zap.Int64("historySchemaVersion", job.BinlogInfo.SchemaVersion)) + } for _, job := range ddlJobs { err := rc.db.ExecDDL(rc.ctx, job) if err != nil { return errors.Trace(err) } - log.Info("execute ddl query", zap.String("db", job.SchemaName), zap.String("query", job.Query)) + log.Info("execute ddl query", + zap.String("db", job.SchemaName), + zap.String("query", job.Query), + zap.Int64("historySchemaVersion", job.BinlogInfo.SchemaVersion)) } return nil } diff --git a/pkg/restore/db.go b/pkg/restore/db.go index a00257ba0..8c09af16f 100644 --- a/pkg/restore/db.go +++ b/pkg/restore/db.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "sort" "strings" "github.com/pingcap/errors" @@ -40,20 +41,24 @@ func NewDB(store kv.Storage) (*DB, error) { // ExecDDL executes the query of a ddl job. func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { - switchDbSQL := fmt.Sprintf("use %s;", ddlJob.SchemaName) - _, err := db.se.Execute(ctx, switchDbSQL) - if err != nil { - log.Error("switch db failed", - zap.String("query", switchDbSQL), - zap.String("db", ddlJob.SchemaName), - zap.Error(err)) - return errors.Trace(err) + var err error + if ddlJob.BinlogInfo.TableInfo != nil { + switchDbSQL := fmt.Sprintf("use %s;", ddlJob.SchemaName) + _, err = db.se.Execute(ctx, switchDbSQL) + if err != nil { + log.Error("switch db failed", + zap.String("query", switchDbSQL), + zap.String("db", ddlJob.SchemaName), + zap.Error(err)) + return errors.Trace(err) + } } _, err = db.se.Execute(ctx, ddlJob.Query) if err != nil { log.Error("execute ddl query failed", zap.String("query", ddlJob.Query), zap.String("db", ddlJob.SchemaName), + zap.Int64("historySchemaVersion", ddlJob.BinlogInfo.SchemaVersion), zap.Error(err)) } return errors.Trace(err) @@ -131,3 +136,64 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { func (db *DB) Close() { db.se.Close() } + +// FilterDDLJobs filters ddl jobs +func FilterDDLJobs(allDDLJobs []*model.Job, tables []*utils.Table) (ddlJobs []*model.Job) { + // Sort the ddl jobs by schema version in descending order. + sort.Slice(allDDLJobs, func(i, j int) bool { + return allDDLJobs[i].BinlogInfo.SchemaVersion > allDDLJobs[j].BinlogInfo.SchemaVersion + }) + dbs := getDatabases(tables) + for _, db := range dbs { + // These maps is for solving some corner case. + // e.g. let "t=2" indicates that the id of database "t" is 2, if the ddl execution sequence is: + // rename "a" to "b"(a=1) -> drop "b"(b=1) -> create "b"(b=2) -> rename "b" to "a"(a=2) + // Which we cannot find the "create" DDL by name and id directly. + // To cover †his case, we must find all names and ids the database/table ever had. + dbIDs := make(map[int64]bool) + dbIDs[db.ID] = true + dbNames := make(map[string]bool) + dbNames[db.Name.String()] = true + for _, job := range allDDLJobs { + if job.BinlogInfo.DBInfo != nil { + if dbIDs[job.SchemaID] || dbNames[job.BinlogInfo.DBInfo.Name.String()] { + ddlJobs = append(ddlJobs, job) + // The the jobs executed with the old id, like the step 2 in the example above. + dbIDs[job.SchemaID] = true + // For the jobs executed after rename, like the step 3 in the example above. + dbNames[job.BinlogInfo.DBInfo.Name.String()] = true + } + } + } + } + + for _, table := range tables { + tableIDs := make(map[int64]bool) + tableIDs[table.Info.ID] = true + tableNames := make(map[string]bool) + tableNames[table.Info.Name.String()] = true + for _, job := range allDDLJobs { + if job.BinlogInfo.TableInfo != nil { + if tableIDs[job.TableID] || tableNames[job.BinlogInfo.TableInfo.Name.String()] { + ddlJobs = append(ddlJobs, job) + tableIDs[job.TableID] = true + // For truncate table, the id may be changed + tableIDs[job.BinlogInfo.TableInfo.ID] = true + tableNames[job.BinlogInfo.TableInfo.Name.String()] = true + } + } + } + } + return ddlJobs +} + +func getDatabases(tables []*utils.Table) (dbs []*model.DBInfo) { + dbIDs := make(map[int64]bool) + for _, table := range tables { + if !dbIDs[table.Db.ID] { + dbs = append(dbs, table.Db) + dbIDs[table.Db.ID] = true + } + } + return +} diff --git a/pkg/restore/db_test.go b/pkg/restore/db_test.go index 22af09b80..0151b4da6 100644 --- a/pkg/restore/db_test.go +++ b/pkg/restore/db_test.go @@ -12,6 +12,7 @@ import ( "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testleak" + "github.com/pingcap/br/pkg/backup" "github.com/pingcap/br/pkg/utils" ) @@ -25,19 +26,18 @@ func (s *testRestoreSchemaSuite) SetUpSuite(c *C) { var err error s.mock, err = utils.NewMockCluster() c.Assert(err, IsNil) + c.Assert(s.mock.Start(), IsNil) } func TestT(t *testing.T) { TestingT(t) } func (s *testRestoreSchemaSuite) TearDownSuite(c *C) { + s.mock.Stop() testleak.AfterTest(c)() } func (s *testRestoreSchemaSuite) TestRestoreAutoIncID(c *C) { - c.Assert(s.mock.Start(), IsNil) - defer s.mock.Stop() - tk := testkit.NewTestKit(c, s.mock.Storage) tk.MustExec("use test") tk.MustExec("set @@sql_mode=''") @@ -92,3 +92,39 @@ func (s *testRestoreSchemaSuite) TestRestoreAutoIncID(c *C) { c.Assert(err, IsNil, Commentf("Error query auto inc id: %s", err)) c.Assert(autoIncID, Equals, uint64(globalAutoID+100)) } + +func (s *testRestoreSchemaSuite) TestFilterDDLJobs(c *C) { + tk := testkit.NewTestKit(c, s.mock.Storage) + tk.MustExec("CREATE DATABASE IF NOT EXISTS test_db;") + tk.MustExec("CREATE TABLE IF NOT EXISTS test_db.test_table (c1 INT);") + lastTs, err := s.mock.GetOracle().GetTimestamp(context.Background()) + c.Assert(err, IsNil, Commentf("Error get last ts: %s", err)) + tk.MustExec("RENAME TABLE test_db.test_table to test_db.test_table1;") + tk.MustExec("DROP TABLE test_db.test_table1;") + tk.MustExec("DROP DATABASE test_db;") + tk.MustExec("CREATE DATABASE test_db;") + tk.MustExec("USE test_db;") + tk.MustExec("CREATE TABLE test_table1 (c2 CHAR(255));") + tk.MustExec("RENAME TABLE test_table1 to test_table;") + tk.MustExec("TRUNCATE TABLE test_table;") + + ts, err := s.mock.GetOracle().GetTimestamp(context.Background()) + c.Assert(err, IsNil, Commentf("Error get ts: %s", err)) + allDDLJobs, err := backup.GetBackupDDLJobs(s.mock.Domain, lastTs, ts) + c.Assert(err, IsNil, Commentf("Error get ddl jobs: %s", err)) + infoSchema, err := s.mock.Domain.GetSnapshotInfoSchema(ts) + c.Assert(err, IsNil, Commentf("Error get snapshot info schema: %s", err)) + dbInfo, ok := infoSchema.SchemaByName(model.NewCIStr("test_db")) + c.Assert(ok, IsTrue, Commentf("DB info not exist")) + tableInfo, err := infoSchema.TableByName(model.NewCIStr("test_db"), model.NewCIStr("test_table")) + c.Assert(err, IsNil, Commentf("Error get table info: %s", err)) + tables := []*utils.Table{{ + Db: dbInfo, + Info: tableInfo.Meta(), + }} + ddlJobs := FilterDDLJobs(allDDLJobs, tables) + for _, job := range ddlJobs { + c.Logf("get ddl job: %s", job.Query) + } + c.Assert(len(ddlJobs), Equals, 7) +} diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 1c9528a9b..599dcb478 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -2,12 +2,10 @@ package task import ( "context" - "sort" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/log" - "github.com/pingcap/parser/model" "github.com/pingcap/tidb-tools/pkg/filter" "github.com/spf13/pflag" "go.uber.org/zap" @@ -105,7 +103,7 @@ func RunRestore(c context.Context, cmdName string, cfg *RestoreConfig) error { return err } } - ddlJobs, err := filterDDLJobs(client, cfg) + ddlJobs := restore.FilterDDLJobs(client.GetDDLJobs(), tables) if err != nil { return err } @@ -207,48 +205,6 @@ func filterRestoreFiles( return } -func filterDDLJobs(client *restore.Client, cfg *RestoreConfig) ([]*model.Job, error) { - tableFilter, err := filter.New(cfg.CaseSensitive, &cfg.Filter) - if err != nil { - return nil, err - } - allDDLJobs := client.GetDDLJobs() - // Sort the ddl jobs by schema version in descending order. - sort.Slice(allDDLJobs, func(i, j int) bool { - return allDDLJobs[i].BinlogInfo.SchemaVersion > allDDLJobs[j].BinlogInfo.SchemaVersion - }) - - dbIDs := make(map[int64]bool) - tableIDs := make(map[int64]bool) - ddlJobs := make([]*model.Job, 0) - for _, db := range client.GetDatabases() { - for _, table := range db.Tables { - if !tableFilter.Match(&filter.Table{Schema: db.Info.Name.O, Name: table.Info.Name.O}) { - continue - } - dbIDs[db.Info.ID] = true - tableIDs[table.Info.ID] = true - } - } - - for _, job := range allDDLJobs { - if dbIDs[job.SchemaID] || - (job.BinlogInfo.DBInfo != nil && dbIDs[job.BinlogInfo.DBInfo.ID]) || - tableIDs[job.TableID] || - (job.BinlogInfo.TableInfo != nil && tableIDs[job.BinlogInfo.TableInfo.ID]) { - dbIDs[job.SchemaID] = true - if job.BinlogInfo.DBInfo != nil { - dbIDs[job.BinlogInfo.DBInfo.ID] = true - } - if job.BinlogInfo.TableInfo != nil { - tableIDs[job.BinlogInfo.TableInfo.ID] = true - } - ddlJobs = append(ddlJobs, job) - } - } - return ddlJobs, err -} - // restorePreWork executes some prepare work before restore func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) ([]string, error) { if client.IsOnline() { diff --git a/tests/br_incremental_ddl/run.sh b/tests/br_incremental_ddl/run.sh index 43a126e39..d9a88709b 100755 --- a/tests/br_incremental_ddl/run.sh +++ b/tests/br_incremental_ddl/run.sh @@ -36,15 +36,11 @@ run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/full" --db $DB -t $T echo "run ddls..." run_sql "RENAME TABLE ${DB}.${TABLE} to ${DB}.${TABLE}1;" run_sql "DROP TABLE ${DB}.${TABLE}1;" - -run_sql "RENAME DATABASE ${DB} to ${DB}1;" -run_sql "DROP DATABASE ${DB}1;" -run_sql "CREATE DATABASE ${DB}1;" -run_sql "RENAME DATABASE ${DB}1 to ${DB};" - - +run_sql "DROP DATABASE ${DB};" +run_sql "CREATE DATABASE ${DB};" run_sql "CREATE TABLE ${DB}.${TABLE}1 (c2 CHAR(255));" run_sql "RENAME TABLE ${DB}.${TABLE}1 to ${DB}.${TABLE};" +run_sql "TRUNCATE TABLE ${DB}.${TABLE};" # insert records for i in $(seq $ROW_COUNT); do run_sql "INSERT INTO ${DB}.${TABLE}(c2) VALUES ('$i');" From 957f6bb02a8b82ce438351cb70c6b6873a4851bc Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Tue, 18 Feb 2020 21:20:11 +0800 Subject: [PATCH 08/12] fix tests Signed-off-by: 5kbpers --- tests/br_full_ddl/run.sh | 4 +-- tests/br_incremental/run.sh | 57 +++++++++++++------------------------ 2 files changed, 22 insertions(+), 39 deletions(-) diff --git a/tests/br_full_ddl/run.sh b/tests/br_full_ddl/run.sh index 1e40415d7..e50ef1ecf 100755 --- a/tests/br_full_ddl/run.sh +++ b/tests/br_full_ddl/run.sh @@ -36,7 +36,7 @@ done # backup full echo "backup start..." -br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 --log-file $LOG +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 --log-file $LOG checksum_count=$(cat $LOG | grep "fast checksum success" | wc -l | xargs) @@ -50,7 +50,7 @@ run_sql "DROP DATABASE $DB;" # restore full echo "restore start..." -br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR +run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR row_count_new=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') diff --git a/tests/br_incremental/run.sh b/tests/br_incremental/run.sh index bb6a42efb..b6a6061de 100755 --- a/tests/br_incremental/run.sh +++ b/tests/br_incremental/run.sh @@ -20,55 +20,38 @@ TABLE="usertable" run_sql "CREATE DATABASE $DB;" go-ycsb load mysql -P tests/$TEST_NAME/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB - -row_count_ori=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') +row_count_ori_full=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') # full backup echo "full backup start..." -run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB" --db $DB -t $TABLE --ratelimit 5 --concurrency 4 - -run_sql "DROP TABLE $DB.$TABLE;" - -# full restore -echo "full restore start..." -run_br restore table --db $DB --table $TABLE -s "local://$TEST_DIR/$DB" --pd $PD_ADDR - -row_count_new=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') - -if [ "$row_count_ori" -ne "$row_count_new" ];then - echo "TEST: [$TEST_NAME] full br failed!" - exit 1 -fi +run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/full" --db $DB -t $TABLE --ratelimit 5 --concurrency 4 go-ycsb run mysql -P tests/$TEST_NAME/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB -row_count_ori=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') -last_backup_ts=$(br validate decode --field="end-version" -s "local://$TEST_DIR/$DB" | tail -n1) - -# clean up data -rm -rf $TEST_DIR/$DB - # incremental backup echo "incremental backup start..." -run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB" --db $DB -t $TABLE --ratelimit 5 --concurrency 4 --lastbackupts $last_backup_ts - -start_ts=$(br validate decode --field="start-version" -s "local://$TEST_DIR/$DB" | tail -n1) -end_ts=$(br validate decode --field="end-version" -s "local://$TEST_DIR/$DB" | tail -n1) +last_backup_ts=$(br validate decode --field="end-version" -s "local://$TEST_DIR/$DB/full" | tail -n1) +run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/inc" --db $DB -t $TABLE --ratelimit 5 --concurrency 4 --lastbackupts $last_backup_ts +row_count_ori_inc=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') -echo "start version: $start_ts, end version: $end_ts" +run_sql "DROP DATABASE $DB;" +# full restore +echo "full restore start..." +run_br restore table --db $DB --table $TABLE -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR +row_count_full=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') +# check full restore +if [ "${row_count_full}" != "${row_count_ori_full}" ];then + echo "TEST: [$TEST_NAME] full restore fail on database $DB" + exit 1 +fi # incremental restore echo "incremental restore start..." -run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR - -row_count_new=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') - -echo "[original] row count: $row_count_ori, [after br] row count: $row_count_new" - -if [ "$row_count_ori" -eq "$row_count_new" ];then - echo "TEST: [$TEST_NAME] successed!" -else - echo "TEST: [$TEST_NAME] failed!" +run_br restore table --db $DB --table $TABLE -s "local://$TEST_DIR/$DB/inc" --pd $PD_ADDR +row_count_inc=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') +# check full restore +if [ "${row_count_inc}" != "${row_count_ori_inc}" ];then + echo "TEST: [$TEST_NAME] incremental restore fail on database $DB" exit 1 fi From c9301b18205a7cd29eecec55aa9d8cb5fdfd3ad6 Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Tue, 18 Feb 2020 21:20:39 +0800 Subject: [PATCH 09/12] disable fast checksum in incremental br Signed-off-by: 5kbpers --- pkg/task/backup.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/pkg/task/backup.go b/pkg/task/backup.go index c275e69db..e146450d6 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -148,12 +148,18 @@ func RunBackup(c context.Context, cmdName string, cfg *BackupConfig) error { return err } - valid, err := client.FastChecksum() - if err != nil { - return err - } - if !valid { - log.Error("backup FastChecksum mismatch!") + if cfg.LastBackupTS == 0 { + valid, err := client.FastChecksum() + if err != nil { + return err + } + if !valid { + log.Error("backup FastChecksum mismatch!") + return errors.Errorf("mismatched checksum") + } + + } else { + log.Warn("Skip fast checksum in incremental backup") } // Checksum has finished close(updateCh) From 60187d7001610e8f576d225649b66d95b72717cc Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Tue, 18 Feb 2020 21:21:07 +0800 Subject: [PATCH 10/12] fix no valid key error Signed-off-by: 5kbpers --- pkg/backup/client.go | 5 ----- pkg/restore/split.go | 31 ++++++++++++++++++++++++++----- pkg/restore/split_test.go | 25 ++++++++++++++++++++++++- 3 files changed, 50 insertions(+), 11 deletions(-) diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 4b8aba9b4..6d6eff033 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -283,11 +283,6 @@ func GetBackupDDLJobs(dom *domain.Domain, lastBackupTS, backupTS uint64) ([]*mod completedJobs := make([]*model.Job, 0) for _, job := range allJobs { - log.Debug("get job", - zap.String("query", job.Query), - zap.Int64("schemaVersion", job.BinlogInfo.SchemaVersion), - zap.Int64("lastSchemaVersion", lastSchemaVersion), - zap.Stringer("state", job.State)) if (job.State == model.JobStateDone || job.State == model.JobStateSynced) && (job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion) { completedJobs = append(completedJobs, job) diff --git a/pkg/restore/split.go b/pkg/restore/split.go index 83a43cef7..d2660ef7c 100644 --- a/pkg/restore/split.go +++ b/pkg/restore/split.go @@ -10,6 +10,7 @@ import ( "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" "go.uber.org/zap" ) @@ -104,10 +105,17 @@ SplitRegions: } for regionID, keys := range splitKeyMap { var newRegions []*RegionInfo - newRegions, err = rs.splitAndScatterRegions(ctx, regionMap[regionID], keys) + region := regionMap[regionID] + newRegions, err = rs.splitAndScatterRegions(ctx, region, keys) if err != nil { if strings.Contains(err.Error(), "no valid key") { - continue + for _, key := range keys { + log.Error("no valid key", + zap.Binary("startKey", region.Region.StartKey), + zap.Binary("endKey", region.Region.EndKey), + zap.Binary("key", codec.EncodeBytes([]byte{}, key))) + } + return errors.Trace(err) } interval = 2 * interval if interval > SplitMaxRetryInterval { @@ -119,12 +127,13 @@ SplitRegions: } continue SplitRegions } + log.Debug("split regions", zap.Stringer("region", region.Region), zap.ByteStrings("keys", keys)) scatterRegions = append(scatterRegions, newRegions...) onSplit(keys) } break } - if err != nil && !strings.Contains(err.Error(), "no valid key") { + if err != nil { return errors.Trace(err) } log.Info("splitting regions done, wait for scattering regions", @@ -254,7 +263,7 @@ func getSplitKeys(rewriteRules *RewriteRules, ranges []Range, regions []*RegionI checkKeys = append(checkKeys, rule.GetNewKeyPrefix()) } for _, rg := range ranges { - checkKeys = append(checkKeys, rg.EndKey) + checkKeys = append(checkKeys, truncateRowKey(rg.EndKey)) } for _, key := range checkKeys { if region := needSplit(key, regions); region != nil { @@ -263,7 +272,10 @@ func getSplitKeys(rewriteRules *RewriteRules, ranges []Range, regions []*RegionI splitKeys = make([][]byte, 0, 1) } splitKeyMap[region.Region.GetId()] = append(splitKeys, key) - log.Debug("get key for split region", zap.Binary("key", key), zap.Stringer("region", region.Region)) + log.Debug("get key for split region", + zap.Binary("key", key), + zap.Binary("startKey", region.Region.StartKey), + zap.Binary("endKey", region.Region.EndKey)) } } return splitKeyMap @@ -289,6 +301,15 @@ func needSplit(splitKey []byte, regions []*RegionInfo) *RegionInfo { return nil } +func truncateRowKey(key []byte) []byte { + if bytes.HasPrefix(key, []byte("t")) && + len(key) > tablecodec.RecordRowKeyLen && + bytes.HasPrefix(key[9:], []byte("_r")) { + return key[:tablecodec.RecordRowKeyLen] + } + return key +} + func beforeEnd(key []byte, end []byte) bool { return bytes.Compare(key, end) < 0 || len(end) == 0 } diff --git a/pkg/restore/split_test.go b/pkg/restore/split_test.go index 509c4cfa0..3cc777854 100644 --- a/pkg/restore/split_test.go +++ b/pkg/restore/split_test.go @@ -280,7 +280,7 @@ func validateRegions(regions map[uint64]*RegionInfo) bool { return false } FindRegion: - for i := 1; i < 12; i++ { + for i := 1; i < len(keys); i++ { for _, region := range regions { startKey := []byte(keys[i-1]) if len(startKey) != 0 { @@ -299,3 +299,26 @@ FindRegion: } return true } + +func (s *testRestoreUtilSuite) TestNeedSplit(c *C) { + regions := []*RegionInfo{ + { + Region: &metapb.Region{ + StartKey: codec.EncodeBytes([]byte{}, []byte("b")), + EndKey: codec.EncodeBytes([]byte{}, []byte("d")), + }, + }, + } + // Out of region + c.Assert(needSplit([]byte("a"), regions), IsNil) + // Region start key + c.Assert(needSplit([]byte("b"), regions), IsNil) + // In region + region := needSplit([]byte("c"), regions) + c.Assert(region.Region.GetStartKey(), Equals, codec.EncodeBytes([]byte{}, []byte("b"))) + c.Assert(region.Region.GetEndKey(), Equals, codec.EncodeBytes([]byte{}, []byte("d"))) + // Region end key + c.Assert(needSplit([]byte("d"), regions), IsNil) + // Out of region + c.Assert(needSplit([]byte("e"), regions), IsNil) +} From a78f3caef2e8ad89e275cea12c41bffee82cafb2 Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Tue, 18 Feb 2020 21:29:53 +0800 Subject: [PATCH 11/12] address lint Signed-off-by: 5kbpers --- pkg/restore/split_test.go | 4 ++-- pkg/task/backup.go | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/restore/split_test.go b/pkg/restore/split_test.go index 3cc777854..3ace5b8c8 100644 --- a/pkg/restore/split_test.go +++ b/pkg/restore/split_test.go @@ -315,8 +315,8 @@ func (s *testRestoreUtilSuite) TestNeedSplit(c *C) { c.Assert(needSplit([]byte("b"), regions), IsNil) // In region region := needSplit([]byte("c"), regions) - c.Assert(region.Region.GetStartKey(), Equals, codec.EncodeBytes([]byte{}, []byte("b"))) - c.Assert(region.Region.GetEndKey(), Equals, codec.EncodeBytes([]byte{}, []byte("d"))) + c.Assert(bytes.Compare(region.Region.GetStartKey(), codec.EncodeBytes([]byte{}, []byte("b"))), Equals, 0) + c.Assert(bytes.Compare(region.Region.GetEndKey(), codec.EncodeBytes([]byte{}, []byte("d"))), Equals, 0) // Region end key c.Assert(needSplit([]byte("d"), regions), IsNil) // Out of region diff --git a/pkg/task/backup.go b/pkg/task/backup.go index e146450d6..7019798d8 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -149,7 +149,8 @@ func RunBackup(c context.Context, cmdName string, cfg *BackupConfig) error { } if cfg.LastBackupTS == 0 { - valid, err := client.FastChecksum() + var valid bool + valid, err = client.FastChecksum() if err != nil { return err } From 39efe6d5af4af532de7490e0e69f6cc6c2e1bbb8 Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Wed, 19 Feb 2020 11:21:05 +0800 Subject: [PATCH 12/12] address comments Signed-off-by: 5kbpers --- pkg/restore/client.go | 6 ------ pkg/restore/split.go | 10 ++++++++-- pkg/task/backup.go | 9 ++++++++- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 7988cff24..f45b3d510 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -223,12 +223,6 @@ func (rc *Client) ExecDDLs(ddlJobs []*model.Job) error { return ddlJobs[i].BinlogInfo.SchemaVersion < ddlJobs[j].BinlogInfo.SchemaVersion }) - for _, job := range ddlJobs { - log.Debug("pre-execute ddl jobs", - zap.String("db", job.SchemaName), - zap.String("query", job.Query), - zap.Int64("historySchemaVersion", job.BinlogInfo.SchemaVersion)) - } for _, job := range ddlJobs { err := rc.db.ExecDDL(rc.ctx, job) if err != nil { diff --git a/pkg/restore/split.go b/pkg/restore/split.go index d2660ef7c..378e256c6 100644 --- a/pkg/restore/split.go +++ b/pkg/restore/split.go @@ -301,10 +301,16 @@ func needSplit(splitKey []byte, regions []*RegionInfo) *RegionInfo { return nil } +var ( + tablePrefix = []byte{'t'} + idLen = 8 + recordPrefix = []byte("_r") +) + func truncateRowKey(key []byte) []byte { - if bytes.HasPrefix(key, []byte("t")) && + if bytes.HasPrefix(key, tablePrefix) && len(key) > tablecodec.RecordRowKeyLen && - bytes.HasPrefix(key[9:], []byte("_r")) { + bytes.HasPrefix(key[len(tablePrefix)+idLen:], recordPrefix) { return key[:tablecodec.RecordRowKeyLen] } return key diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 7019798d8..240754517 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -9,6 +9,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/tidb-tools/pkg/filter" "github.com/spf13/pflag" + "go.uber.org/zap" "github.com/pingcap/br/pkg/backup" "github.com/pingcap/br/pkg/storage" @@ -101,6 +102,11 @@ func RunBackup(c context.Context, cmdName string, cfg *BackupConfig) error { ddlJobs := make([]*model.Job, 0) if cfg.LastBackupTS > 0 { + err = backup.CheckGCSafepoint(ctx, mgr.GetPDClient(), cfg.LastBackupTS) + if err != nil { + log.Error("Check gc safepoint for last backup ts failed", zap.Error(err)) + return err + } ddlJobs, err = backup.GetBackupDDLJobs(mgr.GetDomain(), cfg.LastBackupTS, backupTS) if err != nil { return err @@ -160,7 +166,8 @@ func RunBackup(c context.Context, cmdName string, cfg *BackupConfig) error { } } else { - log.Warn("Skip fast checksum in incremental backup") + // Since we don't support checksum for incremental data, fast checksum should be skipped. + log.Info("Skip fast checksum in incremental backup") } // Checksum has finished close(updateCh)