Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner, executor: fix the logic for inline projection of executors #45158

Merged
merged 8 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec exec.Exec
},
}

childrenUsedSchema := markChildrenUsedCols(e.Schema(), e.Children(0).Schema(), e.Children(1).Schema())
childrenUsedSchema := markChildrenUsedColsForTest(e.Schema(), e.Children(0).Schema(), e.Children(1).Schema())
defaultValues := make([]types.Datum, e.buildWorker.buildSideExec.Schema().Len())
lhsTypes, rhsTypes := retTypes(innerExec), retTypes(outerExec)
for i := uint(0); i < e.concurrency; i++ {
Expand All @@ -959,6 +959,31 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec exec.Exec
return e
}

// markChildrenUsedColsForTest compares each child with the output schema, and mark
// each column of the child is used by output or not.
func markChildrenUsedColsForTest(outputSchema *expression.Schema, childSchemas ...*expression.Schema) (childrenUsed [][]bool) {
childrenUsed = make([][]bool, 0, len(childSchemas))
markedOffsets := make(map[int]struct{})
for _, col := range outputSchema.Columns {
markedOffsets[col.Index] = struct{}{}
}
prefixLen := 0
for _, childSchema := range childSchemas {
used := make([]bool, len(childSchema.Columns))
for i := range childSchema.Columns {
if _, ok := markedOffsets[prefixLen+i]; ok {
used[i] = true
}
}
childrenUsed = append(childrenUsed, used)
}
for _, child := range childSchemas {
used := expression.GetUsedList(outputSchema.Columns, child)
childrenUsed = append(childrenUsed, used)
}
return
}

func benchmarkHashJoinExecWithCase(b *testing.B, casTest *hashJoinTestCase) {
opt1 := mockDataSourceParameters{
rows: casTest.rows,
Expand Down
45 changes: 37 additions & 8 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) exec.Executor
end: v.Offset + v.Count,
}

childUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema())[0]
childUsedSchema := markChildrenUsedCols(v.Schema().Columns, v.Children()[0].Schema())[0]
e.columnIdxsUsedByChild = make([]int, 0, len(childUsedSchema))
for i, used := range childUsedSchema {
if used {
Expand Down Expand Up @@ -1497,6 +1497,11 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) exec.
}
}

colsFromChildren := v.Schema().Columns
if v.JoinType == plannercore.LeftOuterSemiJoin || v.JoinType == plannercore.AntiLeftOuterSemiJoin {
colsFromChildren = colsFromChildren[:len(colsFromChildren)-1]
}

e := &MergeJoinExec{
stmtCtx: b.ctx.GetSessionVars().StmtCtx,
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), leftExec, rightExec),
Expand All @@ -1509,7 +1514,7 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) exec.
v.OtherConditions,
retTypes(leftExec),
retTypes(rightExec),
markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema()),
markChildrenUsedCols(colsFromChildren, v.Children()[0].Schema(), v.Children()[1].Schema()),
false,
),
isOuterJoin: v.JoinType.IsOuterJoin(),
Expand Down Expand Up @@ -1638,7 +1643,11 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) exec.Ex
probeNAKeColIdx[i] = probeNAKeys[i].Index
}
isNAJoin := len(v.LeftNAJoinKeys) > 0
childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema())
colsFromChildren := v.Schema().Columns
if v.JoinType == plannercore.LeftOuterSemiJoin || v.JoinType == plannercore.AntiLeftOuterSemiJoin {
colsFromChildren = colsFromChildren[:len(colsFromChildren)-1]
}
childrenUsedSchema := markChildrenUsedCols(colsFromChildren, v.Children()[0].Schema(), v.Children()[1].Schema())
for i := uint(0); i < e.concurrency; i++ {
e.probeWorkers[i] = &probeWorker{
hashJoinCtx: e.hashJoinCtx,
Expand Down Expand Up @@ -3069,10 +3078,22 @@ func (b *executorBuilder) buildAnalyze(v *plannercore.Analyze) exec.Executor {

// markChildrenUsedCols compares each child with the output schema, and mark
// each column of the child is used by output or not.
func markChildrenUsedCols(outputSchema *expression.Schema, childSchema ...*expression.Schema) (childrenUsed [][]bool) {
for _, child := range childSchema {
used := expression.GetUsedList(outputSchema.Columns, child)
func markChildrenUsedCols(outputCols []*expression.Column, childSchemas ...*expression.Schema) (childrenUsed [][]bool) {
childrenUsed = make([][]bool, 0, len(childSchemas))
markedOffsets := make(map[int]struct{})
for _, col := range outputCols {
markedOffsets[col.Index] = struct{}{}
}
prefixLen := 0
for _, childSchema := range childSchemas {
used := make([]bool, len(childSchema.Columns))
for i := range childSchema.Columns {
if _, ok := markedOffsets[prefixLen+i]; ok {
used[i] = true
}
}
childrenUsed = append(childrenUsed, used)
prefixLen += childSchema.Len()
}
return
}
Expand Down Expand Up @@ -3224,7 +3245,11 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
lastColHelper: v.CompareFilters,
finished: &atomic.Value{},
}
childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema())
colsFromChildren := v.Schema().Columns
if v.JoinType == plannercore.LeftOuterSemiJoin || v.JoinType == plannercore.AntiLeftOuterSemiJoin {
colsFromChildren = colsFromChildren[:len(colsFromChildren)-1]
}
childrenUsedSchema := markChildrenUsedCols(colsFromChildren, v.Children()[0].Schema(), v.Children()[1].Schema())
e.joiner = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues, v.OtherConditions, leftTypes, rightTypes, childrenUsedSchema, false)
outerKeyCols := make([]int, len(v.OuterJoinKeys))
for i := 0; i < len(v.OuterJoinKeys); i++ {
Expand Down Expand Up @@ -3346,7 +3371,11 @@ func (b *executorBuilder) buildIndexLookUpMergeJoin(v *plannercore.PhysicalIndex
keyOff2IdxOff: v.KeyOff2IdxOff,
lastColHelper: v.CompareFilters,
}
childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema())
colsFromChildren := v.Schema().Columns
if v.JoinType == plannercore.LeftOuterSemiJoin || v.JoinType == plannercore.AntiLeftOuterSemiJoin {
colsFromChildren = colsFromChildren[:len(colsFromChildren)-1]
}
childrenUsedSchema := markChildrenUsedCols(colsFromChildren, v.Children()[0].Schema(), v.Children()[1].Schema())
joiners := make([]joiner, e.Ctx().GetSessionVars().IndexLookupJoinConcurrency())
for i := 0; i < len(joiners); i++ {
joiners[i] = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues, v.OtherConditions, leftTypes, rightTypes, childrenUsedSchema, false)
Expand Down
2 changes: 2 additions & 0 deletions planner/cascades/implementation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ func (*ImplLimit) OnImplement(expr *memo.GroupExpr, _ *property.PhysicalProperty
Offset: logicalLimit.Offset,
Count: logicalLimit.Count,
}.Init(logicalLimit.SCtx(), expr.Group.Prop.Stats, logicalLimit.SelectBlockOffset(), newProp)
physicalLimit.SetSchema(expr.Group.Prop.Schema.Clone())
return []memo.Implementation{impl.NewLimitImpl(physicalLimit)}, nil
}

Expand Down Expand Up @@ -420,6 +421,7 @@ func (*ImplTopNAsLimit) OnImplement(expr *memo.GroupExpr, _ *property.PhysicalPr
Offset: lt.Offset,
Count: lt.Count,
}.Init(lt.SCtx(), expr.Group.Prop.Stats, lt.SelectBlockOffset(), newProp)
physicalLimit.SetSchema(expr.Group.Prop.Schema.Clone())
return []memo.Implementation{impl.NewLimitImpl(physicalLimit)}, nil
}

Expand Down
18 changes: 13 additions & 5 deletions planner/core/pb_to_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,17 @@ func NewPBPlanBuilder(sctx sessionctx.Context, is infoschema.InfoSchema, ranges
func (b *PBPlanBuilder) Build(executors []*tipb.Executor) (p PhysicalPlan, err error) {
var src PhysicalPlan
for i := 0; i < len(executors); i++ {
curr, err := b.pbToPhysicalPlan(executors[i])
curr, err := b.pbToPhysicalPlan(executors[i], src)
if err != nil {
return nil, errors.Trace(err)
}
if src != nil {
curr.SetChildren(src)
}
src = curr
}
_, src = b.predicatePushDown(src, nil)
return src, nil
}

func (b *PBPlanBuilder) pbToPhysicalPlan(e *tipb.Executor) (p PhysicalPlan, err error) {
func (b *PBPlanBuilder) pbToPhysicalPlan(e *tipb.Executor, subPlan PhysicalPlan) (p PhysicalPlan, err error) {
switch e.Tp {
case tipb.ExecType_TypeTableScan:
p, err = b.pbToTableScan(e)
Expand All @@ -81,6 +78,17 @@ func (b *PBPlanBuilder) pbToPhysicalPlan(e *tipb.Executor) (p PhysicalPlan, err
// TODO: Support other types.
err = errors.Errorf("this exec type %v doesn't support yet", e.GetTp())
}
if subPlan != nil {
p.SetChildren(subPlan)
}
// The limit missed its output cols via the protobuf.
// We need to add it back and do a ResolveIndicies for the later inline projection.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

limit schema will not always like what its children are, like its children are projection, and it may have been eliminated

if limit, ok := p.(*PhysicalLimit); ok {
limit.SetSchema(p.Children()[0].Schema().Clone())
for i, col := range limit.Schema().Columns {
col.Index = i
}
}
return p, err
}

Expand Down
74 changes: 72 additions & 2 deletions planner/core/resolve_indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,33 @@ func (p *PhysicalHashJoin) ResolveIndicesItself() (err error) {
return err
}
}

mergedSchema := expression.MergeSchema(lSchema, rSchema)

for i, expr := range p.OtherConditions {
p.OtherConditions[i], err = expr.ResolveIndices(expression.MergeSchema(lSchema, rSchema))
p.OtherConditions[i], err = expr.ResolveIndices(mergedSchema)
if err != nil {
return err
}
}

colsNeedResolving := p.schema.Len()
// The last output column of this two join is the generated column to indicate whether the row is matched or not.
if p.JoinType == LeftOuterSemiJoin || p.JoinType == AntiLeftOuterSemiJoin {
colsNeedResolving--
}
// To avoid that two plan shares the same column slice.
shallowColSlice := make([]*expression.Column, p.schema.Len())
copy(shallowColSlice, p.schema.Columns)
p.schema = expression.NewSchema(shallowColSlice...)
for i := 0; i < colsNeedResolving; i++ {
newCol, err := p.schema.Columns[i].ResolveIndices(mergedSchema)
if err != nil {
return err
}
p.schema.Columns[i] = newCol.(*expression.Column)
}

return
}

Expand Down Expand Up @@ -173,12 +194,32 @@ func (p *PhysicalMergeJoin) ResolveIndices() (err error) {
return err
}
}

mergedSchema := expression.MergeSchema(lSchema, rSchema)

for i, expr := range p.OtherConditions {
p.OtherConditions[i], err = expr.ResolveIndices(expression.MergeSchema(lSchema, rSchema))
p.OtherConditions[i], err = expr.ResolveIndices(mergedSchema)
if err != nil {
return err
}
}

colsNeedResolving := p.schema.Len()
// The last output column of this two join is the generated column to indicate whether the row is matched or not.
if p.JoinType == LeftOuterSemiJoin || p.JoinType == AntiLeftOuterSemiJoin {
colsNeedResolving--
}
// To avoid that two plan shares the same column slice.
shallowColSlice := make([]*expression.Column, p.schema.Len())
copy(shallowColSlice, p.schema.Columns)
p.schema = expression.NewSchema(shallowColSlice...)
for i := 0; i < colsNeedResolving; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only thing we can trust is the column.Index
while there will be some same cols ref to single one problems

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see #45831

newCol, err := p.schema.Columns[i].ResolveIndices(mergedSchema)
if err != nil {
return err
}
p.schema.Columns[i] = newCol.(*expression.Column)
}
return
}

Expand Down Expand Up @@ -245,6 +286,24 @@ func (p *PhysicalIndexJoin) ResolveIndices() (err error) {
}
p.OuterHashKeys[i], p.InnerHashKeys[i] = outerKey.(*expression.Column), innerKey.(*expression.Column)
}

colsNeedResolving := p.schema.Len()
// The last output column of this two join is the generated column to indicate whether the row is matched or not.
if p.JoinType == LeftOuterSemiJoin || p.JoinType == AntiLeftOuterSemiJoin {
colsNeedResolving--
}
// To avoid that two plan shares the same column slice.
shallowColSlice := make([]*expression.Column, p.schema.Len())
copy(shallowColSlice, p.schema.Columns)
p.schema = expression.NewSchema(shallowColSlice...)
for i := 0; i < colsNeedResolving; i++ {
newCol, err := p.schema.Columns[i].ResolveIndices(mergedSchema)
if err != nil {
return err
}
p.schema.Columns[i] = newCol.(*expression.Column)
}

return
}

Expand Down Expand Up @@ -618,6 +677,17 @@ func (p *PhysicalLimit) ResolveIndices() (err error) {
}
p.PartitionBy[i].Col = newCol.(*expression.Column)
}
// To avoid that two plan shares the same column slice.
shallowColSlice := make([]*expression.Column, p.schema.Len())
copy(shallowColSlice, p.schema.Columns)
p.schema = expression.NewSchema(shallowColSlice...)
for i, col := range p.schema.Columns {
newCol, err := col.ResolveIndices(p.children[0].Schema())
if err != nil {
return err
}
p.schema.Columns[i] = newCol.(*expression.Column)
}
return
}

Expand Down