Skip to content

Commit

Permalink
planner: cherry-pick #45824, #45831 and #46312 (#46391)
Browse files Browse the repository at this point in the history
close #45758, close #45804, close #45805
  • Loading branch information
Defined2014 authored Aug 25, 2023
1 parent cf44157 commit 81ceb31
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 36 deletions.
4 changes: 2 additions & 2 deletions planner/core/casetest/testdata/plan_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -3850,8 +3850,8 @@
{
"SQL": "with recursive cte(a) as (select 1 union select a from cte) select * from cte; -- recursive cte cannot be inlined",
"Plan": [
"CTEFullScan 1.00 root CTE:cte data:CTE_0",
"CTE_0 1.00 root Recursive CTE",
"CTEFullScan 2.00 root CTE:cte data:CTE_0",
"CTE_0 2.00 root Recursive CTE",
"├─Projection(Seed Part) 1.00 root 1->Column#2",
"│ └─TableDual 1.00 root rows:1",
"└─CTETable(Recursive Part) 1.00 root Scan on CTE_0"
Expand Down
2 changes: 1 addition & 1 deletion planner/core/issuetest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ go_test(
timeout = "short",
srcs = ["planner_issue_test.go"],
flaky = True,
shard_count = 5,
shard_count = 6,
deps = ["//testkit"],
)
10 changes: 10 additions & 0 deletions planner/core/issuetest/planner_issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,13 @@ func TestIssue45036(t *testing.T) {
" └─TableReader_9 10000.00 root partition:all data:TableRangeScan_8",
" └─TableRangeScan_8 10000.00 cop[tikv] table:s range:[1,100000], keep order:false, stats:pseudo"))
}

func TestIssue45758(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("CREATE TABLE tb1 (cid INT, code INT, class VARCHAR(10))")
tk.MustExec("CREATE TABLE tb2 (cid INT, code INT, class VARCHAR(10))")
// result ok
tk.MustExec("UPDATE tb1, (SELECT code AS cid, code, MAX(class) AS class FROM tb2 GROUP BY code) tb3 SET tb1.cid = tb3.cid, tb1.code = tb3.code, tb1.class = tb3.class")
}
3 changes: 1 addition & 2 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,7 @@ func (b *PlanBuilder) buildResultSetNode(ctx context.Context, node ast.ResultSet
}
}
// `TableName` is not a select block, so we do not need to handle it.
if !isTableName && b.ctx.GetSessionVars().PlannerSelectBlockAsName.Load() != nil {
plannerSelectBlockAsName := *(b.ctx.GetSessionVars().PlannerSelectBlockAsName.Load())
if plannerSelectBlockAsName := *(b.ctx.GetSessionVars().PlannerSelectBlockAsName.Load()); len(plannerSelectBlockAsName) > 0 && !isTableName {
plannerSelectBlockAsName[p.SelectBlockOffset()] = ast.HintTable{DBName: p.OutputNames()[0].DBName, TableName: p.OutputNames()[0].TblName}
}
// Duplicate column name in one table is not allowed.
Expand Down
2 changes: 1 addition & 1 deletion planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ func NewPlanBuilder(opts ...PlanBuilderOpt) *PlanBuilder {
func (b *PlanBuilder) Init(sctx sessionctx.Context, is infoschema.InfoSchema, processor *hint.BlockHintProcessor) (*PlanBuilder, []ast.HintTable) {
savedBlockNames := sctx.GetSessionVars().PlannerSelectBlockAsName.Load()
if processor == nil {
sctx.GetSessionVars().PlannerSelectBlockAsName.Store(nil)
sctx.GetSessionVars().PlannerSelectBlockAsName.Store(&[]ast.HintTable{})
} else {
newPlannerSelectBlockAsName := make([]ast.HintTable, processor.MaxSelectStmtOffset()+1)
sctx.GetSessionVars().PlannerSelectBlockAsName.Store(&newPlannerSelectBlockAsName)
Expand Down
92 changes: 72 additions & 20 deletions planner/core/resolve_indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,25 @@ func (p *PhysicalHashJoin) ResolveIndicesItself() (err error) {
shallowColSlice := make([]*expression.Column, p.schema.Len())
copy(shallowColSlice, p.schema.Columns)
p.schema = expression.NewSchema(shallowColSlice...)
for i := 0; i < colsNeedResolving; i++ {
newCol, err := p.schema.Columns[i].ResolveIndices(mergedSchema)
if err != nil {
return err
foundCnt := 0
// The two column sets are all ordered. And the colsNeedResolving is the subset of the mergedSchema.
// So we can just move forward j if there's no matching is found.
// We don't use the normal ResolvIndices here since there might be duplicate columns in the schema.
// e.g. The schema of child_0 is [col0, col0, col1]
// ResolveIndices will only resolve all col0 reference of the current plan to the first col0.
for i, j := 0, 0; i < colsNeedResolving && j < len(mergedSchema.Columns); {
if !p.schema.Columns[i].Equal(nil, mergedSchema.Columns[j]) {
j++
continue
}
p.schema.Columns[i] = newCol.(*expression.Column)
p.schema.Columns[i] = p.schema.Columns[i].Clone().(*expression.Column)
p.schema.Columns[i].Index = j
i++
j++
foundCnt++
}
if foundCnt < colsNeedResolving {
return errors.Errorf("Some columns of %v cannot find the reference from its child(ren)", p.ExplainID().String())
}

return
Expand Down Expand Up @@ -213,12 +226,25 @@ func (p *PhysicalMergeJoin) ResolveIndices() (err error) {
shallowColSlice := make([]*expression.Column, p.schema.Len())
copy(shallowColSlice, p.schema.Columns)
p.schema = expression.NewSchema(shallowColSlice...)
for i := 0; i < colsNeedResolving; i++ {
newCol, err := p.schema.Columns[i].ResolveIndices(mergedSchema)
if err != nil {
return err
foundCnt := 0
// The two column sets are all ordered. And the colsNeedResolving is the subset of the mergedSchema.
// So we can just move forward j if there's no matching is found.
// We don't use the normal ResolvIndices here since there might be duplicate columns in the schema.
// e.g. The schema of child_0 is [col0, col0, col1]
// ResolveIndices will only resolve all col0 reference of the current plan to the first col0.
for i, j := 0, 0; i < colsNeedResolving && j < len(mergedSchema.Columns); {
if !p.schema.Columns[i].Equal(nil, mergedSchema.Columns[j]) {
j++
continue
}
p.schema.Columns[i] = newCol.(*expression.Column)
p.schema.Columns[i] = p.schema.Columns[i].Clone().(*expression.Column)
p.schema.Columns[i].Index = j
i++
j++
foundCnt++
}
if foundCnt < colsNeedResolving {
return errors.Errorf("Some columns of %v cannot find the reference from its child(ren)", p.ExplainID().String())
}
return
}
Expand Down Expand Up @@ -296,12 +322,25 @@ func (p *PhysicalIndexJoin) ResolveIndices() (err error) {
shallowColSlice := make([]*expression.Column, p.schema.Len())
copy(shallowColSlice, p.schema.Columns)
p.schema = expression.NewSchema(shallowColSlice...)
for i := 0; i < colsNeedResolving; i++ {
newCol, err := p.schema.Columns[i].ResolveIndices(mergedSchema)
if err != nil {
return err
foundCnt := 0
// The two column sets are all ordered. And the colsNeedResolving is the subset of the mergedSchema.
// So we can just move forward j if there's no matching is found.
// We don't use the normal ResolvIndices here since there might be duplicate columns in the schema.
// e.g. The schema of child_0 is [col0, col0, col1]
// ResolveIndices will only resolve all col0 reference of the current plan to the first col0.
for i, j := 0, 0; i < colsNeedResolving && j < len(mergedSchema.Columns); {
if !p.schema.Columns[i].Equal(nil, mergedSchema.Columns[j]) {
j++
continue
}
p.schema.Columns[i] = newCol.(*expression.Column)
p.schema.Columns[i] = p.schema.Columns[i].Clone().(*expression.Column)
p.schema.Columns[i].Index = j
i++
j++
foundCnt++
}
if foundCnt < colsNeedResolving {
return errors.Errorf("Some columns of %v cannot find the reference from its child(ren)", p.ExplainID().String())
}

return
Expand Down Expand Up @@ -670,12 +709,25 @@ func (p *PhysicalLimit) ResolveIndices() (err error) {
shallowColSlice := make([]*expression.Column, p.schema.Len())
copy(shallowColSlice, p.schema.Columns)
p.schema = expression.NewSchema(shallowColSlice...)
for i, col := range p.schema.Columns {
newCol, err := col.ResolveIndices(p.children[0].Schema())
if err != nil {
return err
foundCnt := 0
// The two column sets are all ordered. And the colsNeedResolving is the subset of the mergedSchema.
// So we can just move forward j if there's no matching is found.
// We don't use the normal ResolvIndices here since there might be duplicate columns in the schema.
// e.g. The schema of child_0 is [col0, col0, col1]
// ResolveIndices will only resolve all col0 reference of the current plan to the first col0.
for i, j := 0, 0; i < p.schema.Len() && j < p.children[0].Schema().Len(); {
if !p.schema.Columns[i].Equal(nil, p.children[0].Schema().Columns[j]) {
j++
continue
}
p.schema.Columns[i] = newCol.(*expression.Column)
p.schema.Columns[i] = p.schema.Columns[i].Clone().(*expression.Column)
p.schema.Columns[i].Index = j
i++
j++
foundCnt++
}
if foundCnt < p.schema.Len() {
return errors.Errorf("Some columns of %v cannot find the reference from its child(ren)", p.ExplainID().String())
}
return
}
Expand Down
9 changes: 0 additions & 9 deletions planner/core/rule_eliminate_projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,7 @@ func eliminatePhysicalProjection(p PhysicalPlan) PhysicalPlan {
}
})

oldSchema := p.Schema()
newRoot := doPhysicalProjectionElimination(p)
newCols := newRoot.Schema().Columns
for i, oldCol := range oldSchema.Columns {
oldCol.Index = newCols[i].Index
oldCol.ID = newCols[i].ID
oldCol.UniqueID = newCols[i].UniqueID
oldCol.VirtualExpr = newCols[i].VirtualExpr
newRoot.Schema().Columns[i] = oldCol
}
return newRoot
}

Expand Down
2 changes: 1 addition & 1 deletion planner/core/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,7 @@ func (p *LogicalCTE) DeriveStats(_ []*property.StatsInfo, selfSchema *expression
return nil, err
}
}
recurStat := p.cte.recursivePartPhysicalPlan.Stats()
recurStat := p.cte.recursivePartLogicalPlan.statsInfo()
for i, col := range selfSchema.Columns {
p.stats.ColNDVs[col.UniqueID] += recurStat.ColNDVs[p.cte.recursivePartLogicalPlan.Schema().Columns[i].UniqueID]
}
Expand Down

0 comments on commit 81ceb31

Please sign in to comment.