Skip to content

Commit

Permalink
*: placement policy ref will be converted to direct options when reco…
Browse files Browse the repository at this point in the history
…ver or flashback table (#30705)
  • Loading branch information
lcwangchao authored Dec 16, 2021
1 parent daf525a commit ad740a6
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 0 deletions.
90 changes: 90 additions & 0 deletions ddl/placement_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2055,3 +2055,93 @@ func (s *testDBSuite6) TestPDFail(c *C) {
" PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p1` */)"))
checkAllBundlesNotChange(c, existBundles)
}

func (s *testDBSuite6) TestRecoverTableWithPlacementPolicy(c *C) {
clearAllBundles(c)
failpoint.Enable("github.com/pingcap/tidb/store/gcworker/ignoreDeleteRangeFailed", `return`)
defer func(originGC bool) {
failpoint.Disable("github.com/pingcap/tidb/store/gcworker/ignoreDeleteRangeFailed")
if originGC {
ddl.EmulatorGCEnable()
} else {
ddl.EmulatorGCDisable()
}
}(ddl.IsEmulatorGCEnable())
ddl.EmulatorGCDisable()

tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop placement policy if exists p1")
tk.MustExec("drop placement policy if exists p2")
tk.MustExec("drop placement policy if exists p3")
tk.MustExec("drop table if exists tp1, tp2")

safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '')
ON DUPLICATE KEY
UPDATE variable_value = '%[1]s'`
tk.MustExec(fmt.Sprintf(safePointSQL, "20060102-15:04:05 -0700 MST"))

tk.MustExec("create placement policy p1 primary_region='r1' regions='r1,r2'")
defer tk.MustExec("drop placement policy if exists p1")

tk.MustExec("create placement policy p2 primary_region='r2' regions='r2,r3'")
defer tk.MustExec("drop placement policy if exists p2")

tk.MustExec("create placement policy p3 primary_region='r3' regions='r3,r4'")
defer tk.MustExec("drop placement policy if exists p3")

// test recover
tk.MustExec(`CREATE TABLE tp1 (id INT) placement policy p1 PARTITION BY RANGE (id) (
PARTITION p0 VALUES LESS THAN (100) placement policy p2,
PARTITION p1 VALUES LESS THAN (1000),
PARTITION p2 VALUES LESS THAN (10000) placement policy p3
);`)
defer tk.MustExec("drop table if exists tp1")

tk.MustExec("drop table tp1")
tk.MustExec("recover table tp1")
tk.MustQuery("show create table tp1").Check(testkit.Rows("tp1 CREATE TABLE `tp1` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PRIMARY_REGION=\"r2\" REGIONS=\"r2,r3\" */,\n" +
" PARTITION `p1` VALUES LESS THAN (1000),\n" +
" PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PRIMARY_REGION=\"r3\" REGIONS=\"r3,r4\" */)"))
checkExistTableBundlesInPD(c, s.dom, "test", "tp1")

// test flashback
tk.MustExec(`CREATE TABLE tp2 (id INT) placement policy p1 PARTITION BY RANGE (id) (
PARTITION p0 VALUES LESS THAN (100) placement policy p2,
PARTITION p1 VALUES LESS THAN (1000),
PARTITION p2 VALUES LESS THAN (10000) placement policy p3
);`)
defer tk.MustExec("drop table if exists tp2")

tk.MustExec("drop table tp1")
tk.MustExec("drop table tp2")
tk.MustExec("flashback table tp2")
tk.MustQuery("show create table tp2").Check(testkit.Rows("tp2 CREATE TABLE `tp2` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PRIMARY_REGION=\"r2\" REGIONS=\"r2,r3\" */,\n" +
" PARTITION `p1` VALUES LESS THAN (1000),\n" +
" PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PRIMARY_REGION=\"r3\" REGIONS=\"r3,r4\" */)"))
checkExistTableBundlesInPD(c, s.dom, "test", "tp2")

// test recover after police drop
tk.MustExec("drop table tp2")
tk.MustExec("drop placement policy p1")
tk.MustExec("drop placement policy p2")
tk.MustExec("drop placement policy p3")

tk.MustExec("flashback table tp2 to tp3")
tk.MustQuery("show create table tp3").Check(testkit.Rows("tp3 CREATE TABLE `tp3` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PRIMARY_REGION=\"r2\" REGIONS=\"r2,r3\" */,\n" +
" PARTITION `p1` VALUES LESS THAN (1000),\n" +
" PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PRIMARY_REGION=\"r3\" REGIONS=\"r3,r4\" */)"))
checkExistTableBundlesInPD(c, s.dom, "test", "tp3")
}
13 changes: 13 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,19 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
job.Args[checkFlagIndexInJobArgs] = recoverTableCheckFlagDisableGC
}

bundles, err := placement.NewFullTableBundles(t, tblInfo)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

// Send the placement bundle to PD.
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}

job.SchemaState = model.StateWriteOnly
tblInfo.State = model.StateWriteOnly
ver, err = updateVersionAndTableInfo(t, job, tblInfo, false)
Expand Down
43 changes: 43 additions & 0 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,10 @@ func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error {
return err
}

if tblInfo, err = recoverTablePlacement(m, tblInfo); err != nil {
return err
}

recoverInfo := &ddl.RecoverInfo{
SchemaID: job.SchemaID,
TableInfo: tblInfo,
Expand All @@ -635,6 +639,40 @@ func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error {
return err
}

// recoverTablePlacement is used when recover/flashback table.
// It will replace the placement policy of table with the direct options because the original policy may be deleted
func recoverTablePlacement(snapshotMeta *meta.Meta, tblInfo *model.TableInfo) (*model.TableInfo, error) {
if ref := tblInfo.PlacementPolicyRef; ref != nil {
policy, err := snapshotMeta.GetPolicy(ref.ID)
if err != nil {
return nil, errors.Trace(err)
}

tblInfo.PlacementPolicyRef = nil
tblInfo.DirectPlacementOpts = policy.PlacementSettings
}

if tblInfo.Partition != nil {
for idx := range tblInfo.Partition.Definitions {
def := &tblInfo.Partition.Definitions[idx]
ref := def.PlacementPolicyRef
if ref == nil {
continue
}

policy, err := snapshotMeta.GetPolicy(ref.ID)
if err != nil {
return nil, errors.Trace(err)
}

def.PlacementPolicyRef = nil
def.DirectPlacementOpts = policy.PlacementSettings
}
}

return tblInfo, nil
}

func (e *DDLExec) getRecoverTableByJobID(s *ast.RecoverTableStmt, t *meta.Meta, dom *domain.Domain) (*model.Job, *model.TableInfo, error) {
job, err := t.GetHistoryDDLJob(s.JobID)
if err != nil {
Expand Down Expand Up @@ -787,6 +825,11 @@ func (e *DDLExec) executeFlashbackTable(s *ast.FlashBackTableStmt) error {
if err != nil {
return err
}

if tblInfo, err = recoverTablePlacement(m, tblInfo); err != nil {
return err
}

recoverInfo := &ddl.RecoverInfo{
SchemaID: job.SchemaID,
TableInfo: tblInfo,
Expand Down

0 comments on commit ad740a6

Please sign in to comment.