diff --git a/pkg/restore/client.go b/pkg/restore/client.go index e5c0570e5..7988cff24 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -222,12 +222,22 @@ func (rc *Client) ExecDDLs(ddlJobs []*model.Job) error { sort.Slice(ddlJobs, func(i, j int) bool { return ddlJobs[i].BinlogInfo.SchemaVersion < ddlJobs[j].BinlogInfo.SchemaVersion }) + + for _, job := range ddlJobs { + log.Debug("pre-execute ddl jobs", + zap.String("db", job.SchemaName), + zap.String("query", job.Query), + zap.Int64("historySchemaVersion", job.BinlogInfo.SchemaVersion)) + } for _, job := range ddlJobs { err := rc.db.ExecDDL(rc.ctx, job) if err != nil { return errors.Trace(err) } - log.Info("execute ddl query", zap.String("db", job.SchemaName), zap.String("query", job.Query)) + log.Info("execute ddl query", + zap.String("db", job.SchemaName), + zap.String("query", job.Query), + zap.Int64("historySchemaVersion", job.BinlogInfo.SchemaVersion)) } return nil } diff --git a/pkg/restore/db.go b/pkg/restore/db.go index a00257ba0..8c09af16f 100644 --- a/pkg/restore/db.go +++ b/pkg/restore/db.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "sort" "strings" "github.com/pingcap/errors" @@ -40,20 +41,24 @@ func NewDB(store kv.Storage) (*DB, error) { // ExecDDL executes the query of a ddl job. func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { - switchDbSQL := fmt.Sprintf("use %s;", ddlJob.SchemaName) - _, err := db.se.Execute(ctx, switchDbSQL) - if err != nil { - log.Error("switch db failed", - zap.String("query", switchDbSQL), - zap.String("db", ddlJob.SchemaName), - zap.Error(err)) - return errors.Trace(err) + var err error + if ddlJob.BinlogInfo.TableInfo != nil { + switchDbSQL := fmt.Sprintf("use %s;", ddlJob.SchemaName) + _, err = db.se.Execute(ctx, switchDbSQL) + if err != nil { + log.Error("switch db failed", + zap.String("query", switchDbSQL), + zap.String("db", ddlJob.SchemaName), + zap.Error(err)) + return errors.Trace(err) + } } _, err = db.se.Execute(ctx, ddlJob.Query) if err != nil { log.Error("execute ddl query failed", zap.String("query", ddlJob.Query), zap.String("db", ddlJob.SchemaName), + zap.Int64("historySchemaVersion", ddlJob.BinlogInfo.SchemaVersion), zap.Error(err)) } return errors.Trace(err) @@ -131,3 +136,64 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { func (db *DB) Close() { db.se.Close() } + +// FilterDDLJobs filters ddl jobs +func FilterDDLJobs(allDDLJobs []*model.Job, tables []*utils.Table) (ddlJobs []*model.Job) { + // Sort the ddl jobs by schema version in descending order. + sort.Slice(allDDLJobs, func(i, j int) bool { + return allDDLJobs[i].BinlogInfo.SchemaVersion > allDDLJobs[j].BinlogInfo.SchemaVersion + }) + dbs := getDatabases(tables) + for _, db := range dbs { + // These maps is for solving some corner case. + // e.g. let "t=2" indicates that the id of database "t" is 2, if the ddl execution sequence is: + // rename "a" to "b"(a=1) -> drop "b"(b=1) -> create "b"(b=2) -> rename "b" to "a"(a=2) + // Which we cannot find the "create" DDL by name and id directly. + // To cover †his case, we must find all names and ids the database/table ever had. + dbIDs := make(map[int64]bool) + dbIDs[db.ID] = true + dbNames := make(map[string]bool) + dbNames[db.Name.String()] = true + for _, job := range allDDLJobs { + if job.BinlogInfo.DBInfo != nil { + if dbIDs[job.SchemaID] || dbNames[job.BinlogInfo.DBInfo.Name.String()] { + ddlJobs = append(ddlJobs, job) + // The the jobs executed with the old id, like the step 2 in the example above. + dbIDs[job.SchemaID] = true + // For the jobs executed after rename, like the step 3 in the example above. + dbNames[job.BinlogInfo.DBInfo.Name.String()] = true + } + } + } + } + + for _, table := range tables { + tableIDs := make(map[int64]bool) + tableIDs[table.Info.ID] = true + tableNames := make(map[string]bool) + tableNames[table.Info.Name.String()] = true + for _, job := range allDDLJobs { + if job.BinlogInfo.TableInfo != nil { + if tableIDs[job.TableID] || tableNames[job.BinlogInfo.TableInfo.Name.String()] { + ddlJobs = append(ddlJobs, job) + tableIDs[job.TableID] = true + // For truncate table, the id may be changed + tableIDs[job.BinlogInfo.TableInfo.ID] = true + tableNames[job.BinlogInfo.TableInfo.Name.String()] = true + } + } + } + } + return ddlJobs +} + +func getDatabases(tables []*utils.Table) (dbs []*model.DBInfo) { + dbIDs := make(map[int64]bool) + for _, table := range tables { + if !dbIDs[table.Db.ID] { + dbs = append(dbs, table.Db) + dbIDs[table.Db.ID] = true + } + } + return +} diff --git a/pkg/restore/db_test.go b/pkg/restore/db_test.go index 22af09b80..0151b4da6 100644 --- a/pkg/restore/db_test.go +++ b/pkg/restore/db_test.go @@ -12,6 +12,7 @@ import ( "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testleak" + "github.com/pingcap/br/pkg/backup" "github.com/pingcap/br/pkg/utils" ) @@ -25,19 +26,18 @@ func (s *testRestoreSchemaSuite) SetUpSuite(c *C) { var err error s.mock, err = utils.NewMockCluster() c.Assert(err, IsNil) + c.Assert(s.mock.Start(), IsNil) } func TestT(t *testing.T) { TestingT(t) } func (s *testRestoreSchemaSuite) TearDownSuite(c *C) { + s.mock.Stop() testleak.AfterTest(c)() } func (s *testRestoreSchemaSuite) TestRestoreAutoIncID(c *C) { - c.Assert(s.mock.Start(), IsNil) - defer s.mock.Stop() - tk := testkit.NewTestKit(c, s.mock.Storage) tk.MustExec("use test") tk.MustExec("set @@sql_mode=''") @@ -92,3 +92,39 @@ func (s *testRestoreSchemaSuite) TestRestoreAutoIncID(c *C) { c.Assert(err, IsNil, Commentf("Error query auto inc id: %s", err)) c.Assert(autoIncID, Equals, uint64(globalAutoID+100)) } + +func (s *testRestoreSchemaSuite) TestFilterDDLJobs(c *C) { + tk := testkit.NewTestKit(c, s.mock.Storage) + tk.MustExec("CREATE DATABASE IF NOT EXISTS test_db;") + tk.MustExec("CREATE TABLE IF NOT EXISTS test_db.test_table (c1 INT);") + lastTs, err := s.mock.GetOracle().GetTimestamp(context.Background()) + c.Assert(err, IsNil, Commentf("Error get last ts: %s", err)) + tk.MustExec("RENAME TABLE test_db.test_table to test_db.test_table1;") + tk.MustExec("DROP TABLE test_db.test_table1;") + tk.MustExec("DROP DATABASE test_db;") + tk.MustExec("CREATE DATABASE test_db;") + tk.MustExec("USE test_db;") + tk.MustExec("CREATE TABLE test_table1 (c2 CHAR(255));") + tk.MustExec("RENAME TABLE test_table1 to test_table;") + tk.MustExec("TRUNCATE TABLE test_table;") + + ts, err := s.mock.GetOracle().GetTimestamp(context.Background()) + c.Assert(err, IsNil, Commentf("Error get ts: %s", err)) + allDDLJobs, err := backup.GetBackupDDLJobs(s.mock.Domain, lastTs, ts) + c.Assert(err, IsNil, Commentf("Error get ddl jobs: %s", err)) + infoSchema, err := s.mock.Domain.GetSnapshotInfoSchema(ts) + c.Assert(err, IsNil, Commentf("Error get snapshot info schema: %s", err)) + dbInfo, ok := infoSchema.SchemaByName(model.NewCIStr("test_db")) + c.Assert(ok, IsTrue, Commentf("DB info not exist")) + tableInfo, err := infoSchema.TableByName(model.NewCIStr("test_db"), model.NewCIStr("test_table")) + c.Assert(err, IsNil, Commentf("Error get table info: %s", err)) + tables := []*utils.Table{{ + Db: dbInfo, + Info: tableInfo.Meta(), + }} + ddlJobs := FilterDDLJobs(allDDLJobs, tables) + for _, job := range ddlJobs { + c.Logf("get ddl job: %s", job.Query) + } + c.Assert(len(ddlJobs), Equals, 7) +} diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 1c9528a9b..599dcb478 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -2,12 +2,10 @@ package task import ( "context" - "sort" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/log" - "github.com/pingcap/parser/model" "github.com/pingcap/tidb-tools/pkg/filter" "github.com/spf13/pflag" "go.uber.org/zap" @@ -105,7 +103,7 @@ func RunRestore(c context.Context, cmdName string, cfg *RestoreConfig) error { return err } } - ddlJobs, err := filterDDLJobs(client, cfg) + ddlJobs := restore.FilterDDLJobs(client.GetDDLJobs(), tables) if err != nil { return err } @@ -207,48 +205,6 @@ func filterRestoreFiles( return } -func filterDDLJobs(client *restore.Client, cfg *RestoreConfig) ([]*model.Job, error) { - tableFilter, err := filter.New(cfg.CaseSensitive, &cfg.Filter) - if err != nil { - return nil, err - } - allDDLJobs := client.GetDDLJobs() - // Sort the ddl jobs by schema version in descending order. - sort.Slice(allDDLJobs, func(i, j int) bool { - return allDDLJobs[i].BinlogInfo.SchemaVersion > allDDLJobs[j].BinlogInfo.SchemaVersion - }) - - dbIDs := make(map[int64]bool) - tableIDs := make(map[int64]bool) - ddlJobs := make([]*model.Job, 0) - for _, db := range client.GetDatabases() { - for _, table := range db.Tables { - if !tableFilter.Match(&filter.Table{Schema: db.Info.Name.O, Name: table.Info.Name.O}) { - continue - } - dbIDs[db.Info.ID] = true - tableIDs[table.Info.ID] = true - } - } - - for _, job := range allDDLJobs { - if dbIDs[job.SchemaID] || - (job.BinlogInfo.DBInfo != nil && dbIDs[job.BinlogInfo.DBInfo.ID]) || - tableIDs[job.TableID] || - (job.BinlogInfo.TableInfo != nil && tableIDs[job.BinlogInfo.TableInfo.ID]) { - dbIDs[job.SchemaID] = true - if job.BinlogInfo.DBInfo != nil { - dbIDs[job.BinlogInfo.DBInfo.ID] = true - } - if job.BinlogInfo.TableInfo != nil { - tableIDs[job.BinlogInfo.TableInfo.ID] = true - } - ddlJobs = append(ddlJobs, job) - } - } - return ddlJobs, err -} - // restorePreWork executes some prepare work before restore func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) ([]string, error) { if client.IsOnline() { diff --git a/tests/br_incremental_ddl/run.sh b/tests/br_incremental_ddl/run.sh index 43a126e39..d9a88709b 100755 --- a/tests/br_incremental_ddl/run.sh +++ b/tests/br_incremental_ddl/run.sh @@ -36,15 +36,11 @@ run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/full" --db $DB -t $T echo "run ddls..." run_sql "RENAME TABLE ${DB}.${TABLE} to ${DB}.${TABLE}1;" run_sql "DROP TABLE ${DB}.${TABLE}1;" - -run_sql "RENAME DATABASE ${DB} to ${DB}1;" -run_sql "DROP DATABASE ${DB}1;" -run_sql "CREATE DATABASE ${DB}1;" -run_sql "RENAME DATABASE ${DB}1 to ${DB};" - - +run_sql "DROP DATABASE ${DB};" +run_sql "CREATE DATABASE ${DB};" run_sql "CREATE TABLE ${DB}.${TABLE}1 (c2 CHAR(255));" run_sql "RENAME TABLE ${DB}.${TABLE}1 to ${DB}.${TABLE};" +run_sql "TRUNCATE TABLE ${DB}.${TABLE};" # insert records for i in $(seq $ROW_COUNT); do run_sql "INSERT INTO ${DB}.${TABLE}(c2) VALUES ('$i');"