Skip to content

Commit

Permalink
br: fix duplicated table check would block incremental restore (pingc…
Browse files Browse the repository at this point in the history
  • Loading branch information
RidRisR authored Sep 3, 2024
1 parent 2b54db5 commit 649dce7
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 5 deletions.
2 changes: 1 addition & 1 deletion br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if cfg.WithSysTable {
client.InitFullClusterRestore(cfg.ExplicitFilter)
}
} else if checkpointFirstRun && cfg.CheckRequirements {
} else if client.IsFull() && checkpointFirstRun && cfg.CheckRequirements {
if err := checkTableExistence(ctx, mgr, tables, g); err != nil {
schedulersRemovable = true
return errors.Trace(err)
Expand Down
182 changes: 178 additions & 4 deletions tests/realtikvtest/brietest/brie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,17 @@ func TestExistedTables(t *testing.T) {
sqlTmp := strings.ReplaceAll(tmp, "'", "''")
executor.ResetGlobalBRIEQueueForTest()
tk.MustExec("use test;")
for i := 0; i < 10; i++ {
for i := 0; i < 5; i++ {
tableName := fmt.Sprintf("foo%d", i)
tk.MustExec(fmt.Sprintf("create table %s(pk int primary key auto_increment, v varchar(255));", tableName))
tk.MustExec(fmt.Sprintf("insert into %s(v) values %s;", tableName, strings.TrimSuffix(strings.Repeat("('hello, world'),", 100), ",")))
}

// full backup
done := make(chan struct{})
go func() {
defer close(done)
backupQuery := fmt.Sprintf("BACKUP DATABASE * TO 'local://%s'", sqlTmp)
backupQuery := fmt.Sprintf("BACKUP DATABASE * TO 'local://%s/full'", sqlTmp)
_ = tk.MustQuery(backupQuery)
}()
select {
Expand All @@ -159,7 +160,7 @@ func TestExistedTables(t *testing.T) {
done = make(chan struct{})
go func() {
defer close(done)
restoreQuery := fmt.Sprintf("RESTORE DATABASE * FROM 'local://%s'", sqlTmp)
restoreQuery := fmt.Sprintf("RESTORE DATABASE * FROM 'local://%s/full'", sqlTmp)
res, err := tk.Exec(restoreQuery)
require.NoError(t, err)

Expand All @@ -172,7 +173,180 @@ func TestExistedTables(t *testing.T) {
case <-done:
}

for i := 0; i < 10; i++ {
// db level backup
done = make(chan struct{})
go func() {
defer close(done)
backupQuery := fmt.Sprintf("BACKUP DATABASE test TO 'local://%s/db'", sqlTmp)
_ = tk.MustQuery(backupQuery)
}()
select {
case <-time.After(20 * time.Second):
t.Fatal("Backup operation exceeded")
case <-done:
}

done = make(chan struct{})
go func() {
defer close(done)
restoreQuery := fmt.Sprintf("Restore DATABASE test FROM 'local://%s/db'", sqlTmp)
res, err := tk.Exec(restoreQuery)
require.NoError(t, err)

_, err = session.ResultSetToStringSlice(context.Background(), tk.Session(), res)
require.ErrorContains(t, err, "table already exists")
}()
select {
case <-time.After(20 * time.Second):
t.Fatal("Restore operation exceeded")
case <-done:
}

for i := 0; i < 5; i++ {
tableName := fmt.Sprintf("foo%d", i)
tk.MustExec(fmt.Sprintf("drop table %s;", tableName))
}
}

// full backup * -> incremental backup * -> restore full backup * -> restore incremental backup *
func TestExistedTablesOfIncremental(t *testing.T) {
tk := initTestKit(t)
tmp := makeTempDirForBackup(t)
sqlTmp := strings.ReplaceAll(tmp, "'", "''")
executor.ResetGlobalBRIEQueueForTest()
tk.MustExec("use test;")
for i := 0; i < 5; i++ {
tableName := fmt.Sprintf("foo%d", i)
tk.MustExec(fmt.Sprintf("create table %s(pk int primary key auto_increment, v varchar(255));", tableName))
tk.MustExec(fmt.Sprintf("insert into %s(v) values %s;", tableName, strings.TrimSuffix(strings.Repeat("('hello, world'),", 100), ",")))
}

// full backup
backupQuery := fmt.Sprintf("BACKUP DATABASE * TO 'local://%s/full'", sqlTmp)
res := tk.MustQuery(backupQuery)
backupTs := res.Rows()[0][2].(string)

// write incremental data
for i := 0; i < 5; i++ {
tableName := fmt.Sprintf("foo%d", i)
tk.MustExec(fmt.Sprintf("insert into %s(v) values %s;", tableName, strings.TrimSuffix(strings.Repeat("('hello, world'),", 100), ",")))
}

// incremental backup
IncrementalBackupQuery := fmt.Sprintf("BACKUP DATABASE * TO 'local://%s/incremental' last_backup=%s", sqlTmp, backupTs)
_ = tk.MustQuery(IncrementalBackupQuery)

// clean up tables
for i := 0; i < 5; i++ {
tableName := fmt.Sprintf("foo%d", i)
tk.MustExec(fmt.Sprintf("drop table %s;", tableName))
}

// restore full backup
restoreQuery := fmt.Sprintf("RESTORE DATABASE * FROM 'local://%s/full'", sqlTmp)
_ = tk.MustQuery(restoreQuery)

// restore incremental backup
restoreIncrementalQuery := fmt.Sprintf("RESTORE DATABASE * FROM 'local://%s/incremental'", sqlTmp)
_ = tk.MustQuery(restoreIncrementalQuery)

for i := 0; i < 5; i++ {
tableName := fmt.Sprintf("foo%d", i)
tk.MustExec(fmt.Sprintf("drop table %s;", tableName))
}
}

// full backup * -> incremental backup * -> restore full backup `test` -> restore incremental backup `test`
func TestExistedTablesOfIncremental_1(t *testing.T) {
tk := initTestKit(t)
tmp := makeTempDirForBackup(t)
sqlTmp := strings.ReplaceAll(tmp, "'", "''")
executor.ResetGlobalBRIEQueueForTest()
tk.MustExec("use test;")
for i := 0; i < 5; i++ {
tableName := fmt.Sprintf("foo%d", i)
tk.MustExec(fmt.Sprintf("create table %s(pk int primary key auto_increment, v varchar(255));", tableName))
tk.MustExec(fmt.Sprintf("insert into %s(v) values %s;", tableName, strings.TrimSuffix(strings.Repeat("('hello, world'),", 100), ",")))
}

// full backup
backupQuery := fmt.Sprintf("BACKUP DATABASE * TO 'local://%s/full'", sqlTmp)
res := tk.MustQuery(backupQuery)
backupTs := res.Rows()[0][2].(string)

// write incremental data
for i := 0; i < 5; i++ {
tableName := fmt.Sprintf("foo%d", i)
tk.MustExec(fmt.Sprintf("insert into %s(v) values %s;", tableName, strings.TrimSuffix(strings.Repeat("('hello, world'),", 100), ",")))
}

// incremental backup
IncrementalBackupQuery := fmt.Sprintf("BACKUP DATABASE * TO 'local://%s/incremental' last_backup=%s", sqlTmp, backupTs)
_ = tk.MustQuery(IncrementalBackupQuery)

// clean up tables
for i := 0; i < 5; i++ {
tableName := fmt.Sprintf("foo%d", i)
tk.MustExec(fmt.Sprintf("drop table %s;", tableName))
}

// restore full backup
restoreQuery := fmt.Sprintf("RESTORE DATABASE test FROM 'local://%s/full'", sqlTmp)
_ = tk.MustQuery(restoreQuery)

// restore incremental backup
restoreIncrementalQuery := fmt.Sprintf("RESTORE DATABASE test FROM 'local://%s/incremental'", sqlTmp)
_ = tk.MustQuery(restoreIncrementalQuery)

for i := 0; i < 5; i++ {
tableName := fmt.Sprintf("foo%d", i)
tk.MustExec(fmt.Sprintf("drop table %s;", tableName))
}
}

// full backup `test` -> incremental backup `test` -> restore full backup * -> restore incremental backup *
func TestExistedTablesOfIncremental_2(t *testing.T) {
tk := initTestKit(t)
tmp := makeTempDirForBackup(t)
sqlTmp := strings.ReplaceAll(tmp, "'", "''")
executor.ResetGlobalBRIEQueueForTest()
tk.MustExec("use test;")
for i := 0; i < 5; i++ {
tableName := fmt.Sprintf("foo%d", i)
tk.MustExec(fmt.Sprintf("create table %s(pk int primary key auto_increment, v varchar(255));", tableName))
tk.MustExec(fmt.Sprintf("insert into %s(v) values %s;", tableName, strings.TrimSuffix(strings.Repeat("('hello, world'),", 100), ",")))
}

// full backup
backupQuery := fmt.Sprintf("BACKUP DATABASE test TO 'local://%s/full'", sqlTmp)
res := tk.MustQuery(backupQuery)
backupTs := res.Rows()[0][2].(string)

// write incremental data
for i := 0; i < 5; i++ {
tableName := fmt.Sprintf("foo%d", i)
tk.MustExec(fmt.Sprintf("insert into %s(v) values %s;", tableName, strings.TrimSuffix(strings.Repeat("('hello, world'),", 100), ",")))
}

// incremental backup
IncrementalBackupQuery := fmt.Sprintf("BACKUP DATABASE test TO 'local://%s/incremental' last_backup=%s", sqlTmp, backupTs)
_ = tk.MustQuery(IncrementalBackupQuery)

// clean up tables
for i := 0; i < 5; i++ {
tableName := fmt.Sprintf("foo%d", i)
tk.MustExec(fmt.Sprintf("drop table %s;", tableName))
}

// restore full backup
restoreQuery := fmt.Sprintf("RESTORE DATABASE * FROM 'local://%s/full'", sqlTmp)
_ = tk.MustQuery(restoreQuery)

// restore incremental backup
restoreIncrementalQuery := fmt.Sprintf("RESTORE DATABASE * FROM 'local://%s/incremental'", sqlTmp)
_ = tk.MustQuery(restoreIncrementalQuery)

for i := 0; i < 5; i++ {
tableName := fmt.Sprintf("foo%d", i)
tk.MustExec(fmt.Sprintf("drop table %s;", tableName))
}
Expand Down

0 comments on commit 649dce7

Please sign in to comment.