Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: placement policy ref will be converted to direct options when recover or flashback table #30705

Merged
merged 6 commits into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions ddl/placement_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2055,3 +2055,93 @@ func (s *testDBSuite6) TestPDFail(c *C) {
" PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p1` */)"))
checkAllBundlesNotChange(c, existBundles)
}

func (s *testDBSuite6) TestRecoverTableWithPlacementPolicy(c *C) {
clearAllBundles(c)
failpoint.Enable("github.com/pingcap/tidb/store/gcworker/ignoreDeleteRangeFailed", `return`)
defer func(originGC bool) {
failpoint.Disable("github.com/pingcap/tidb/store/gcworker/ignoreDeleteRangeFailed")
if originGC {
ddl.EmulatorGCEnable()
} else {
ddl.EmulatorGCDisable()
}
}(ddl.IsEmulatorGCEnable())
ddl.EmulatorGCDisable()

tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop placement policy if exists p1")
tk.MustExec("drop placement policy if exists p2")
tk.MustExec("drop placement policy if exists p3")
tk.MustExec("drop table if exists tp1, tp2")

safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '')
ON DUPLICATE KEY
UPDATE variable_value = '%[1]s'`
tk.MustExec(fmt.Sprintf(safePointSQL, "20060102-15:04:05 -0700 MST"))

tk.MustExec("create placement policy p1 primary_region='r1' regions='r1,r2'")
defer tk.MustExec("drop placement policy if exists p1")

tk.MustExec("create placement policy p2 primary_region='r2' regions='r2,r3'")
defer tk.MustExec("drop placement policy if exists p2")

tk.MustExec("create placement policy p3 primary_region='r3' regions='r3,r4'")
defer tk.MustExec("drop placement policy if exists p3")

// test recover
tk.MustExec(`CREATE TABLE tp1 (id INT) placement policy p1 PARTITION BY RANGE (id) (
PARTITION p0 VALUES LESS THAN (100) placement policy p2,
PARTITION p1 VALUES LESS THAN (1000),
PARTITION p2 VALUES LESS THAN (10000) placement policy p3
);`)
defer tk.MustExec("drop table if exists tp1")

tk.MustExec("drop table tp1")
tk.MustExec("recover table tp1")
tk.MustQuery("show create table tp1").Check(testkit.Rows("tp1 CREATE TABLE `tp1` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PRIMARY_REGION=\"r2\" REGIONS=\"r2,r3\" */,\n" +
" PARTITION `p1` VALUES LESS THAN (1000),\n" +
" PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PRIMARY_REGION=\"r3\" REGIONS=\"r3,r4\" */)"))
checkExistTableBundlesInPD(c, s.dom, "test", "tp1")

// test flashback
tk.MustExec(`CREATE TABLE tp2 (id INT) placement policy p1 PARTITION BY RANGE (id) (
PARTITION p0 VALUES LESS THAN (100) placement policy p2,
PARTITION p1 VALUES LESS THAN (1000),
PARTITION p2 VALUES LESS THAN (10000) placement policy p3
);`)
defer tk.MustExec("drop table if exists tp2")

tk.MustExec("drop table tp1")
tk.MustExec("drop table tp2")
tk.MustExec("flashback table tp2")
tk.MustQuery("show create table tp2").Check(testkit.Rows("tp2 CREATE TABLE `tp2` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PRIMARY_REGION=\"r2\" REGIONS=\"r2,r3\" */,\n" +
" PARTITION `p1` VALUES LESS THAN (1000),\n" +
" PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PRIMARY_REGION=\"r3\" REGIONS=\"r3,r4\" */)"))
checkExistTableBundlesInPD(c, s.dom, "test", "tp2")

// test recover after police drop
tk.MustExec("drop table tp2")
tk.MustExec("drop placement policy p1")
tk.MustExec("drop placement policy p2")
tk.MustExec("drop placement policy p3")

tk.MustExec("flashback table tp2 to tp3")
tk.MustQuery("show create table tp3").Check(testkit.Rows("tp3 CREATE TABLE `tp3` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PRIMARY_REGION=\"r2\" REGIONS=\"r2,r3\" */,\n" +
" PARTITION `p1` VALUES LESS THAN (1000),\n" +
" PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PRIMARY_REGION=\"r3\" REGIONS=\"r3,r4\" */)"))
checkExistTableBundlesInPD(c, s.dom, "test", "tp3")
}
13 changes: 13 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,19 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
job.Args[checkFlagIndexInJobArgs] = recoverTableCheckFlagDisableGC
}

bundles, err := placement.NewFullTableBundles(t, tblInfo)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

// Send the placement bundle to PD.
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}

job.SchemaState = model.StateWriteOnly
tblInfo.State = model.StateWriteOnly
ver, err = updateVersionAndTableInfo(t, job, tblInfo, false)
Expand Down
43 changes: 43 additions & 0 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,10 @@ func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error {
return err
}

if tblInfo, err = recoverTablePlacement(m, tblInfo); err != nil {
return err
}

recoverInfo := &ddl.RecoverInfo{
SchemaID: job.SchemaID,
TableInfo: tblInfo,
Expand All @@ -635,6 +639,40 @@ func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error {
return err
}

// recoverTablePlacement is used when recover/flashback table.
// It will replace the placement policy of table with the direct options because the original policy may be deleted
func recoverTablePlacement(snapshotMeta *meta.Meta, tblInfo *model.TableInfo) (*model.TableInfo, error) {
if ref := tblInfo.PlacementPolicyRef; ref != nil {
policy, err := snapshotMeta.GetPolicy(ref.ID)
if err != nil {
return nil, errors.Trace(err)
}
djshow832 marked this conversation as resolved.
Show resolved Hide resolved

tblInfo.PlacementPolicyRef = nil
tblInfo.DirectPlacementOpts = policy.PlacementSettings
}

if tblInfo.Partition != nil {
for idx := range tblInfo.Partition.Definitions {
def := &tblInfo.Partition.Definitions[idx]
ref := def.PlacementPolicyRef
if ref == nil {
continue
}

policy, err := snapshotMeta.GetPolicy(ref.ID)
if err != nil {
return nil, errors.Trace(err)
}

def.PlacementPolicyRef = nil
def.DirectPlacementOpts = policy.PlacementSettings
}
}

return tblInfo, nil
}

func (e *DDLExec) getRecoverTableByJobID(s *ast.RecoverTableStmt, t *meta.Meta, dom *domain.Domain) (*model.Job, *model.TableInfo, error) {
job, err := t.GetHistoryDDLJob(s.JobID)
if err != nil {
Expand Down Expand Up @@ -787,6 +825,11 @@ func (e *DDLExec) executeFlashbackTable(s *ast.FlashBackTableStmt) error {
if err != nil {
return err
}

if tblInfo, err = recoverTablePlacement(m, tblInfo); err != nil {
return err
}

recoverInfo := &ddl.RecoverInfo{
SchemaID: job.SchemaID,
TableInfo: tblInfo,
Expand Down