Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Table partition double write during Reorganize partition (part 2) #38508

56 changes: 56 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4764,3 +4764,59 @@ func TestAlterModifyColumnOnPartitionedTableFail(t *testing.T) {
tk.MustContainErrMsg(`alter table t modify column b varchar(5)`, "[ddl:8200]New column does not match partition definitions: [ddl:1654]Partition column values of incorrect type")
tk.MustExec(`set sql_mode = default`)
}

func TestReorgPartitionConcurrent(t *testing.T) {
t.Skip("Needs PR 38460 as well")
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
schemaName := "ReorgPartConcurrent"
tk.MustExec("create database " + schemaName)
tk.MustExec("use " + schemaName)
tk.MustExec(`create table t (a int unsigned PRIMARY KEY, b varchar(255), c int, key (b), key (c,b))` +
` partition by range (a) ` +
`(partition p0 values less than (10),` +
` partition p1 values less than (20),` +
` partition pMax values less than (MAXVALUE))`)
tk.MustExec(`insert into t values (1,"1",1), (12,"12",21),(23,"23",32),(34,"34",43),(45,"45",54),(56,"56",65)`)
dom := domain.GetDomain(tk.Session())
originHook := dom.DDL().GetHook()
defer dom.DDL().SetHook(originHook)
hook := &ddl.TestDDLCallback{Do: dom}
dom.DDL().SetHook(hook)

wait := make(chan bool)
defer close(wait)

injected := false
hook.OnJobRunBeforeExported = func(job *model.Job) {
if /* TODO: uncomment!! job.Type == model.ActionReorganizePartition && */ job.SchemaState == model.StateWriteReorganization && !injected {
injected = true
<-wait
<-wait
}
}
alterErr := make(chan error, 1)
go backgroundExec(store /* TODO: uncomment!! schemaName, */, "alter table t reorganize partition p1 into (partition p1a values less than (15), partition p1b values less than (20))", alterErr)
wait <- true
tk.MustExec(`insert into t values (14, "14", 14),(15, "15",15)`)
wait <- true
require.NoError(t, <-alterErr)
tk.MustQuery(`select * from t where c between 10 and 22`).Sort().Check(testkit.Rows(""+
"12 12 21",
"14 14 14",
"15 15 15"))
tk.MustQuery(`show create table t`).Check(testkit.Rows("" +
"t CREATE TABLE `t` (\n" +
" `a` int(10) unsigned NOT NULL,\n" +
" `b` varchar(255) DEFAULT NULL,\n" +
" `c` int(11) DEFAULT NULL,\n" +
" PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" +
" KEY `b` (`b`),\n" +
" KEY `c` (`c`,`b`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
"PARTITION BY RANGE (`a`)\n" +
"(PARTITION `p0` VALUES LESS THAN (10),\n" +
" PARTITION `p1a` VALUES LESS THAN (15),\n" +
" PARTITION `p1b` VALUES LESS THAN (20),\n" +
" PARTITION `pMax` VALUES LESS THAN (MAXVALUE))"))
}
14 changes: 4 additions & 10 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3538,12 +3538,9 @@ func (builder *dataReaderBuilder) prunePartitionForInnerExecutor(tbl table.Table

// check whether can runtime prune.
type partitionExpr interface {
PartitionExpr() (*tables.PartitionExpr, error)
}
pe, err := tbl.(partitionExpr).PartitionExpr()
if err != nil {
return nil, false, nil, err
PartitionExpr() *tables.PartitionExpr
}
pe := tbl.(partitionExpr).PartitionExpr()

// recalculate key column offsets
if len(lookUpContent) == 0 {
Expand Down Expand Up @@ -4149,12 +4146,9 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
}
tbl, _ := builder.is.TableByID(tbInfo.ID)
pt := tbl.(table.PartitionedTable)
pe, err := tbl.(interface {
PartitionExpr() (*tables.PartitionExpr, error)
pe := tbl.(interface {
PartitionExpr() *tables.PartitionExpr
}).PartitionExpr()
if err != nil {
return nil, err
}
partitionInfo := &v.PartitionInfo
usedPartitionList, err := builder.partitionPruning(pt, partitionInfo.PruningConds, partitionInfo.PartitionNames, partitionInfo.Columns, partitionInfo.ColumnNames)
if err != nil {
Expand Down
12 changes: 2 additions & 10 deletions planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -1897,12 +1897,7 @@ func getPartitionExpr(ctx sessionctx.Context, tbl *model.TableInfo) *tables.Part
}

// PartitionExpr don't need columns and names for hash partition.
partitionExpr, err := partTable.PartitionExpr()
if err != nil {
return nil
}

return partitionExpr
return partTable.PartitionExpr()
}

func getHashPartitionColumnName(ctx sessionctx.Context, tbl *model.TableInfo) *ast.ColumnName {
Expand All @@ -1919,10 +1914,7 @@ func getHashPartitionColumnName(ctx sessionctx.Context, tbl *model.TableInfo) *a
return nil
}
// PartitionExpr don't need columns and names for hash partition.
partitionExpr, err := table.(partitionTable).PartitionExpr()
if err != nil {
return nil
}
partitionExpr := table.(partitionTable).PartitionExpr()
expr := partitionExpr.OrigExpr
col, ok := expr.(*ast.ColumnNameExpr)
if !ok {
Expand Down
13 changes: 4 additions & 9 deletions planner/core/rule_partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (s *partitionProcessor) rewriteDataSource(lp LogicalPlan, opt *logicalOptim

// partitionTable is for those tables which implement partition.
type partitionTable interface {
PartitionExpr() (*tables.PartitionExpr, error)
PartitionExpr() *tables.PartitionExpr
}

func generateHashPartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo, columns []*expression.Column, names types.NameSlice) (expression.Expression, error) {
Expand Down Expand Up @@ -595,13 +595,11 @@ func (l *listPartitionPruner) findUsedListPartitions(conds []expression.Expressi
func (s *partitionProcessor) findUsedListPartitions(ctx sessionctx.Context, tbl table.Table, partitionNames []model.CIStr,
conds []expression.Expression) ([]int, error) {
pi := tbl.Meta().Partition
partExpr, err := tbl.(partitionTable).PartitionExpr()
if err != nil {
return nil, err
}
partExpr := tbl.(partitionTable).PartitionExpr()

listPruner := newListPartitionPruner(ctx, tbl, partitionNames, s, conds, partExpr.ForListPruning)
var used map[int]struct{}
var err error
if partExpr.ForListPruning.ColPrunes == nil {
used, err = listPruner.findUsedListPartitions(conds)
} else {
Expand Down Expand Up @@ -826,10 +824,7 @@ func intersectionRange(start, end, newStart, newEnd int) (int, int) {

func (s *partitionProcessor) pruneRangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, tbl table.PartitionedTable, conds []expression.Expression,
columns []*expression.Column, names types.NameSlice) (partitionRangeOR, error) {
partExpr, err := tbl.(partitionTable).PartitionExpr()
if err != nil {
return nil, err
}
partExpr := tbl.(partitionTable).PartitionExpr()

// Partition by range columns.
if len(pi.Columns) > 0 {
Expand Down
9 changes: 3 additions & 6 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2432,7 +2432,7 @@ func oldPasswordUpgrade(pass string) (string, error) {
// rebuildAllPartitionValueMapAndSorted rebuilds all value map and sorted info for list column partitions with InfoSchema.
func rebuildAllPartitionValueMapAndSorted(s *session) {
type partitionExpr interface {
PartitionExpr() (*tables.PartitionExpr, error)
PartitionExpr() *tables.PartitionExpr
}

p := parser.New()
Expand All @@ -2444,12 +2444,9 @@ func rebuildAllPartitionValueMapAndSorted(s *session) {
continue
}

pe, err := t.(partitionExpr).PartitionExpr()
if err != nil {
panic("partition table gets partition expression failed")
}
pe := t.(partitionExpr).PartitionExpr()
for _, cp := range pe.ColPrunes {
if err = cp.RebuildPartitionValueMapAndSorted(p); err != nil {
if err := cp.RebuildPartitionValueMapAndSorted(p, pi.Definitions); err != nil {
logutil.BgLogger().Warn("build list column partition value map and sorted failed")
break
}
Expand Down
Loading