Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
add unit test
Browse files Browse the repository at this point in the history
Signed-off-by: 5kbpers <tangminghua@pingcap.com>
  • Loading branch information
5kbpers committed Feb 17, 2020
1 parent 4d5d966 commit a468099
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 64 deletions.
12 changes: 11 additions & 1 deletion pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,22 @@ func (rc *Client) ExecDDLs(ddlJobs []*model.Job) error {
sort.Slice(ddlJobs, func(i, j int) bool {
return ddlJobs[i].BinlogInfo.SchemaVersion < ddlJobs[j].BinlogInfo.SchemaVersion
})

for _, job := range ddlJobs {
log.Debug("pre-execute ddl jobs",
zap.String("db", job.SchemaName),
zap.String("query", job.Query),
zap.Int64("historySchemaVersion", job.BinlogInfo.SchemaVersion))
}
for _, job := range ddlJobs {
err := rc.db.ExecDDL(rc.ctx, job)
if err != nil {
return errors.Trace(err)
}
log.Info("execute ddl query", zap.String("db", job.SchemaName), zap.String("query", job.Query))
log.Info("execute ddl query",
zap.String("db", job.SchemaName),
zap.String("query", job.Query),
zap.Int64("historySchemaVersion", job.BinlogInfo.SchemaVersion))
}
return nil
}
Expand Down
82 changes: 74 additions & 8 deletions pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"sort"
"strings"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -40,20 +41,24 @@ func NewDB(store kv.Storage) (*DB, error) {

// ExecDDL executes the query of a ddl job.
func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error {
switchDbSQL := fmt.Sprintf("use %s;", ddlJob.SchemaName)
_, err := db.se.Execute(ctx, switchDbSQL)
if err != nil {
log.Error("switch db failed",
zap.String("query", switchDbSQL),
zap.String("db", ddlJob.SchemaName),
zap.Error(err))
return errors.Trace(err)
var err error
if ddlJob.BinlogInfo.TableInfo != nil {
switchDbSQL := fmt.Sprintf("use %s;", ddlJob.SchemaName)
_, err = db.se.Execute(ctx, switchDbSQL)
if err != nil {
log.Error("switch db failed",
zap.String("query", switchDbSQL),
zap.String("db", ddlJob.SchemaName),
zap.Error(err))
return errors.Trace(err)
}
}
_, err = db.se.Execute(ctx, ddlJob.Query)
if err != nil {
log.Error("execute ddl query failed",
zap.String("query", ddlJob.Query),
zap.String("db", ddlJob.SchemaName),
zap.Int64("historySchemaVersion", ddlJob.BinlogInfo.SchemaVersion),
zap.Error(err))
}
return errors.Trace(err)
Expand Down Expand Up @@ -131,3 +136,64 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error {
func (db *DB) Close() {
db.se.Close()
}

// FilterDDLJobs filters ddl jobs
func FilterDDLJobs(allDDLJobs []*model.Job, tables []*utils.Table) (ddlJobs []*model.Job) {
// Sort the ddl jobs by schema version in descending order.
sort.Slice(allDDLJobs, func(i, j int) bool {
return allDDLJobs[i].BinlogInfo.SchemaVersion > allDDLJobs[j].BinlogInfo.SchemaVersion
})
dbs := getDatabases(tables)
for _, db := range dbs {
// These maps is for solving some corner case.
// e.g. let "t=2" indicates that the id of database "t" is 2, if the ddl execution sequence is:
// rename "a" to "b"(a=1) -> drop "b"(b=1) -> create "b"(b=2) -> rename "b" to "a"(a=2)
// Which we cannot find the "create" DDL by name and id directly.
// To cover †his case, we must find all names and ids the database/table ever had.
dbIDs := make(map[int64]bool)
dbIDs[db.ID] = true
dbNames := make(map[string]bool)
dbNames[db.Name.String()] = true
for _, job := range allDDLJobs {
if job.BinlogInfo.DBInfo != nil {
if dbIDs[job.SchemaID] || dbNames[job.BinlogInfo.DBInfo.Name.String()] {
ddlJobs = append(ddlJobs, job)
// The the jobs executed with the old id, like the step 2 in the example above.
dbIDs[job.SchemaID] = true
// For the jobs executed after rename, like the step 3 in the example above.
dbNames[job.BinlogInfo.DBInfo.Name.String()] = true
}
}
}
}

for _, table := range tables {
tableIDs := make(map[int64]bool)
tableIDs[table.Info.ID] = true
tableNames := make(map[string]bool)
tableNames[table.Info.Name.String()] = true
for _, job := range allDDLJobs {
if job.BinlogInfo.TableInfo != nil {
if tableIDs[job.TableID] || tableNames[job.BinlogInfo.TableInfo.Name.String()] {
ddlJobs = append(ddlJobs, job)
tableIDs[job.TableID] = true
// For truncate table, the id may be changed
tableIDs[job.BinlogInfo.TableInfo.ID] = true
tableNames[job.BinlogInfo.TableInfo.Name.String()] = true
}
}
}
}
return ddlJobs
}

func getDatabases(tables []*utils.Table) (dbs []*model.DBInfo) {
dbIDs := make(map[int64]bool)
for _, table := range tables {
if !dbIDs[table.Db.ID] {
dbs = append(dbs, table.Db)
dbIDs[table.Db.ID] = true
}
}
return
}
42 changes: 39 additions & 3 deletions pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"

"github.com/pingcap/br/pkg/backup"
"github.com/pingcap/br/pkg/utils"
)

Expand All @@ -25,19 +26,18 @@ func (s *testRestoreSchemaSuite) SetUpSuite(c *C) {
var err error
s.mock, err = utils.NewMockCluster()
c.Assert(err, IsNil)
c.Assert(s.mock.Start(), IsNil)
}
func TestT(t *testing.T) {
TestingT(t)
}

func (s *testRestoreSchemaSuite) TearDownSuite(c *C) {
s.mock.Stop()
testleak.AfterTest(c)()
}

func (s *testRestoreSchemaSuite) TestRestoreAutoIncID(c *C) {
c.Assert(s.mock.Start(), IsNil)
defer s.mock.Stop()

tk := testkit.NewTestKit(c, s.mock.Storage)
tk.MustExec("use test")
tk.MustExec("set @@sql_mode=''")
Expand Down Expand Up @@ -92,3 +92,39 @@ func (s *testRestoreSchemaSuite) TestRestoreAutoIncID(c *C) {
c.Assert(err, IsNil, Commentf("Error query auto inc id: %s", err))
c.Assert(autoIncID, Equals, uint64(globalAutoID+100))
}

func (s *testRestoreSchemaSuite) TestFilterDDLJobs(c *C) {
tk := testkit.NewTestKit(c, s.mock.Storage)
tk.MustExec("CREATE DATABASE IF NOT EXISTS test_db;")
tk.MustExec("CREATE TABLE IF NOT EXISTS test_db.test_table (c1 INT);")
lastTs, err := s.mock.GetOracle().GetTimestamp(context.Background())
c.Assert(err, IsNil, Commentf("Error get last ts: %s", err))
tk.MustExec("RENAME TABLE test_db.test_table to test_db.test_table1;")
tk.MustExec("DROP TABLE test_db.test_table1;")
tk.MustExec("DROP DATABASE test_db;")
tk.MustExec("CREATE DATABASE test_db;")
tk.MustExec("USE test_db;")
tk.MustExec("CREATE TABLE test_table1 (c2 CHAR(255));")
tk.MustExec("RENAME TABLE test_table1 to test_table;")
tk.MustExec("TRUNCATE TABLE test_table;")

ts, err := s.mock.GetOracle().GetTimestamp(context.Background())
c.Assert(err, IsNil, Commentf("Error get ts: %s", err))
allDDLJobs, err := backup.GetBackupDDLJobs(s.mock.Domain, lastTs, ts)
c.Assert(err, IsNil, Commentf("Error get ddl jobs: %s", err))
infoSchema, err := s.mock.Domain.GetSnapshotInfoSchema(ts)
c.Assert(err, IsNil, Commentf("Error get snapshot info schema: %s", err))
dbInfo, ok := infoSchema.SchemaByName(model.NewCIStr("test_db"))
c.Assert(ok, IsTrue, Commentf("DB info not exist"))
tableInfo, err := infoSchema.TableByName(model.NewCIStr("test_db"), model.NewCIStr("test_table"))
c.Assert(err, IsNil, Commentf("Error get table info: %s", err))
tables := []*utils.Table{{
Db: dbInfo,
Info: tableInfo.Meta(),
}}
ddlJobs := FilterDDLJobs(allDDLJobs, tables)
for _, job := range ddlJobs {
c.Logf("get ddl job: %s", job.Query)
}
c.Assert(len(ddlJobs), Equals, 7)
}
46 changes: 1 addition & 45 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package task

import (
"context"
"sort"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/spf13/pflag"
"go.uber.org/zap"
Expand Down Expand Up @@ -105,7 +103,7 @@ func RunRestore(c context.Context, cmdName string, cfg *RestoreConfig) error {
return err
}
}
ddlJobs, err := filterDDLJobs(client, cfg)
ddlJobs := restore.FilterDDLJobs(client.GetDDLJobs(), tables)
if err != nil {
return err
}
Expand Down Expand Up @@ -207,48 +205,6 @@ func filterRestoreFiles(
return
}

func filterDDLJobs(client *restore.Client, cfg *RestoreConfig) ([]*model.Job, error) {
tableFilter, err := filter.New(cfg.CaseSensitive, &cfg.Filter)
if err != nil {
return nil, err
}
allDDLJobs := client.GetDDLJobs()
// Sort the ddl jobs by schema version in descending order.
sort.Slice(allDDLJobs, func(i, j int) bool {
return allDDLJobs[i].BinlogInfo.SchemaVersion > allDDLJobs[j].BinlogInfo.SchemaVersion
})

dbIDs := make(map[int64]bool)
tableIDs := make(map[int64]bool)
ddlJobs := make([]*model.Job, 0)
for _, db := range client.GetDatabases() {
for _, table := range db.Tables {
if !tableFilter.Match(&filter.Table{Schema: db.Info.Name.O, Name: table.Info.Name.O}) {
continue
}
dbIDs[db.Info.ID] = true
tableIDs[table.Info.ID] = true
}
}

for _, job := range allDDLJobs {
if dbIDs[job.SchemaID] ||
(job.BinlogInfo.DBInfo != nil && dbIDs[job.BinlogInfo.DBInfo.ID]) ||
tableIDs[job.TableID] ||
(job.BinlogInfo.TableInfo != nil && tableIDs[job.BinlogInfo.TableInfo.ID]) {
dbIDs[job.SchemaID] = true
if job.BinlogInfo.DBInfo != nil {
dbIDs[job.BinlogInfo.DBInfo.ID] = true
}
if job.BinlogInfo.TableInfo != nil {
tableIDs[job.BinlogInfo.TableInfo.ID] = true
}
ddlJobs = append(ddlJobs, job)
}
}
return ddlJobs, err
}

// restorePreWork executes some prepare work before restore
func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) ([]string, error) {
if client.IsOnline() {
Expand Down
10 changes: 3 additions & 7 deletions tests/br_incremental_ddl/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,11 @@ run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/full" --db $DB -t $T
echo "run ddls..."
run_sql "RENAME TABLE ${DB}.${TABLE} to ${DB}.${TABLE}1;"
run_sql "DROP TABLE ${DB}.${TABLE}1;"

run_sql "RENAME DATABASE ${DB} to ${DB}1;"
run_sql "DROP DATABASE ${DB}1;"
run_sql "CREATE DATABASE ${DB}1;"
run_sql "RENAME DATABASE ${DB}1 to ${DB};"


run_sql "DROP DATABASE ${DB};"
run_sql "CREATE DATABASE ${DB};"
run_sql "CREATE TABLE ${DB}.${TABLE}1 (c2 CHAR(255));"
run_sql "RENAME TABLE ${DB}.${TABLE}1 to ${DB}.${TABLE};"
run_sql "TRUNCATE TABLE ${DB}.${TABLE};"
# insert records
for i in $(seq $ROW_COUNT); do
run_sql "INSERT INTO ${DB}.${TABLE}(c2) VALUES ('$i');"
Expand Down

0 comments on commit a468099

Please sign in to comment.