Skip to content

Commit

Permalink
*: Table partition double write during Reorganize partition (part 2) …
Browse files Browse the repository at this point in the history
…| tidb-test=pr/2044 (#38508)

ref #15000, ref #38535
  • Loading branch information
mjonss authored Dec 30, 2022
1 parent 25df00e commit b259112
Show file tree
Hide file tree
Showing 7 changed files with 334 additions and 142 deletions.
56 changes: 56 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4554,3 +4554,59 @@ func TestAlterModifyColumnOnPartitionedTableRename(t *testing.T) {
tk.MustExec(`create table t (a int, b char) partition by hash (a) partitions 3`)
tk.MustContainErrMsg(`alter table t change a c int`, "[planner:1054]Unknown column 'a' in 'expression'")
}

func TestReorgPartitionConcurrent(t *testing.T) {
t.Skip("Needs PR 38460 as well")
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
schemaName := "ReorgPartConcurrent"
tk.MustExec("create database " + schemaName)
tk.MustExec("use " + schemaName)
tk.MustExec(`create table t (a int unsigned PRIMARY KEY, b varchar(255), c int, key (b), key (c,b))` +
` partition by range (a) ` +
`(partition p0 values less than (10),` +
` partition p1 values less than (20),` +
` partition pMax values less than (MAXVALUE))`)
tk.MustExec(`insert into t values (1,"1",1), (12,"12",21),(23,"23",32),(34,"34",43),(45,"45",54),(56,"56",65)`)
dom := domain.GetDomain(tk.Session())
originHook := dom.DDL().GetHook()
defer dom.DDL().SetHook(originHook)
hook := &ddl.TestDDLCallback{Do: dom}
dom.DDL().SetHook(hook)

wait := make(chan bool)
defer close(wait)

injected := false
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionReorganizePartition && job.SchemaState == model.StateWriteReorganization && !injected {
injected = true
<-wait
<-wait
}
}
alterErr := make(chan error, 1)
go backgroundExec(store, schemaName, "alter table t reorganize partition p1 into (partition p1a values less than (15), partition p1b values less than (20))", alterErr)
wait <- true
tk.MustExec(`insert into t values (14, "14", 14),(15, "15",15)`)
wait <- true
require.NoError(t, <-alterErr)
tk.MustQuery(`select * from t where c between 10 and 22`).Sort().Check(testkit.Rows(""+
"12 12 21",
"14 14 14",
"15 15 15"))
tk.MustQuery(`show create table t`).Check(testkit.Rows("" +
"t CREATE TABLE `t` (\n" +
" `a` int(10) unsigned NOT NULL,\n" +
" `b` varchar(255) DEFAULT NULL,\n" +
" `c` int(11) DEFAULT NULL,\n" +
" PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" +
" KEY `b` (`b`),\n" +
" KEY `c` (`c`,`b`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
"PARTITION BY RANGE (`a`)\n" +
"(PARTITION `p0` VALUES LESS THAN (10),\n" +
" PARTITION `p1a` VALUES LESS THAN (15),\n" +
" PARTITION `p1b` VALUES LESS THAN (20),\n" +
" PARTITION `pMax` VALUES LESS THAN (MAXVALUE))"))
}
12 changes: 8 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3527,10 +3527,14 @@ func getPartitionKeyColOffsets(keyColIDs []int64, pt table.PartitionedTable) []i
keyColOffsets[i] = offset
}

pe, err := pt.(interface {
PartitionExpr() (*tables.PartitionExpr, error)
}).PartitionExpr()
if err != nil {
t, ok := pt.(interface {
PartitionExpr() *tables.PartitionExpr
})
if !ok {
return nil
}
pe := t.PartitionExpr()
if pe == nil {
return nil
}

Expand Down
12 changes: 2 additions & 10 deletions planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -1898,12 +1898,7 @@ func getPartitionExpr(ctx sessionctx.Context, tbl *model.TableInfo) *tables.Part
}

// PartitionExpr don't need columns and names for hash partition.
partitionExpr, err := partTable.PartitionExpr()
if err != nil {
return nil
}

return partitionExpr
return partTable.PartitionExpr()
}

func getHashPartitionColumnName(ctx sessionctx.Context, tbl *model.TableInfo) *ast.ColumnName {
Expand All @@ -1920,10 +1915,7 @@ func getHashPartitionColumnName(ctx sessionctx.Context, tbl *model.TableInfo) *a
return nil
}
// PartitionExpr don't need columns and names for hash partition.
partitionExpr, err := table.(partitionTable).PartitionExpr()
if err != nil {
return nil
}
partitionExpr := table.(partitionTable).PartitionExpr()
expr := partitionExpr.OrigExpr
col, ok := expr.(*ast.ColumnNameExpr)
if !ok {
Expand Down
13 changes: 4 additions & 9 deletions planner/core/rule_partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (s *partitionProcessor) rewriteDataSource(lp LogicalPlan, opt *logicalOptim

// partitionTable is for those tables which implement partition.
type partitionTable interface {
PartitionExpr() (*tables.PartitionExpr, error)
PartitionExpr() *tables.PartitionExpr
}

func generateHashPartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo, columns []*expression.Column, names types.NameSlice) (expression.Expression, error) {
Expand Down Expand Up @@ -595,13 +595,11 @@ func (l *listPartitionPruner) findUsedListPartitions(conds []expression.Expressi
func (s *partitionProcessor) findUsedListPartitions(ctx sessionctx.Context, tbl table.Table, partitionNames []model.CIStr,
conds []expression.Expression) ([]int, error) {
pi := tbl.Meta().Partition
partExpr, err := tbl.(partitionTable).PartitionExpr()
if err != nil {
return nil, err
}
partExpr := tbl.(partitionTable).PartitionExpr()

listPruner := newListPartitionPruner(ctx, tbl, partitionNames, s, conds, partExpr.ForListPruning)
var used map[int]struct{}
var err error
if partExpr.ForListPruning.ColPrunes == nil {
used, err = listPruner.findUsedListPartitions(conds)
} else {
Expand Down Expand Up @@ -826,10 +824,7 @@ func intersectionRange(start, end, newStart, newEnd int) (int, int) {

func (s *partitionProcessor) pruneRangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, tbl table.PartitionedTable, conds []expression.Expression,
columns []*expression.Column, names types.NameSlice) (partitionRangeOR, error) {
partExpr, err := tbl.(partitionTable).PartitionExpr()
if err != nil {
return nil, err
}
partExpr := tbl.(partitionTable).PartitionExpr()

// Partition by range columns.
if len(pi.Columns) > 0 {
Expand Down
9 changes: 3 additions & 6 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2443,7 +2443,7 @@ func oldPasswordUpgrade(pass string) (string, error) {
// rebuildAllPartitionValueMapAndSorted rebuilds all value map and sorted info for list column partitions with InfoSchema.
func rebuildAllPartitionValueMapAndSorted(s *session) {
type partitionExpr interface {
PartitionExpr() (*tables.PartitionExpr, error)
PartitionExpr() *tables.PartitionExpr
}

p := parser.New()
Expand All @@ -2455,12 +2455,9 @@ func rebuildAllPartitionValueMapAndSorted(s *session) {
continue
}

pe, err := t.(partitionExpr).PartitionExpr()
if err != nil {
panic("partition table gets partition expression failed")
}
pe := t.(partitionExpr).PartitionExpr()
for _, cp := range pe.ColPrunes {
if err = cp.RebuildPartitionValueMapAndSorted(p); err != nil {
if err := cp.RebuildPartitionValueMapAndSorted(p, pi.Definitions); err != nil {
logutil.BgLogger().Warn("build list column partition value map and sorted failed")
break
}
Expand Down
Loading

0 comments on commit b259112

Please sign in to comment.