Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: avoid using columnEvaluator for the Projectin build by buildProjtion4Union (#8142) #8168

Merged
merged 5 commits into from
Nov 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ func (b *executorBuilder) buildProjection(v *plan.PhysicalProjection) Executor {
}
e := &ProjectionExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec),
evaluatorSuit: expression.NewEvaluatorSuit(v.Exprs),
evaluatorSuit: expression.NewEvaluatorSuite(v.Exprs, v.AvoidColumnEvaluator),
calculateNoDelay: v.CalculateNoDelay,
}
return e
Expand Down Expand Up @@ -1035,6 +1035,9 @@ func (b *executorBuilder) buildMaxOneRow(v *plan.PhysicalMaxOneRow) Executor {
func (b *executorBuilder) buildUnionAll(v *plan.PhysicalUnionAll) Executor {
childExecs := make([]Executor, len(v.Children()))
for i, child := range v.Children() {
if proj, isProj := child.(*plan.PhysicalProjection); isProj {
proj.AvoidColumnEvaluator = true
}
childExecs[i] = b.build(child)
if b.err != nil {
b.err = errors.Trace(b.err)
Expand Down
2 changes: 1 addition & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ func init() {
type ProjectionExec struct {
baseExecutor

evaluatorSuit *expression.EvaluatorSuit
evaluatorSuit *expression.EvaluatorSuite
calculateNoDelay bool
}

Expand Down
7 changes: 7 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,13 @@ func (s *testSuite) TestUnion(c *C) {
tk.MustExec(`set @@tidb_max_chunk_size=2;`)
tk.MustQuery(`select count(*) from (select t1.a, t1.b from t1 left join t2 on t1.a=t2.a union all select t1.a, t1.a from t1 left join t2 on t1.a=t2.a) tmp;`).Check(testkit.Rows("128"))
tk.MustQuery(`select tmp.a, count(*) from (select t1.a, t1.b from t1 left join t2 on t1.a=t2.a union all select t1.a, t1.a from t1 left join t2 on t1.a=t2.a) tmp;`).Check(testkit.Rows("1 128"))

// #issue 8141
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(a int, b int)")
tk.MustExec("insert into t1 value(1,2),(1,1),(2,2),(2,2),(3,2),(3,2)")
tk.MustExec("set @@tidb_max_chunk_size=2;")
tk.MustQuery("select count(*) from (select a as c, a as d from t1 union all select a, b from t1) t;").Check(testkit.Rows("12"))
}

func (s *testSuite) TestNeighbouringProj(c *C) {
Expand Down
37 changes: 18 additions & 19 deletions expression/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,36 +64,35 @@ func (e *defaultEvaluator) run(ctx sessionctx.Context, input, output *chunk.Chun
return nil
}

// EvaluatorSuit is responsible for the evaluation of a list of expressions.
// EvaluatorSuite is responsible for the evaluation of a list of expressions.
// It separates them to "column" and "other" expressions and evaluates "other"
// expressions before "column" expressions.
type EvaluatorSuit struct {
type EvaluatorSuite struct {
*columnEvaluator // Evaluator for column expressions.
*defaultEvaluator // Evaluator for other expressions.
}

// NewEvaluatorSuit creates an EvaluatorSuit to evaluate all the exprs.
func NewEvaluatorSuit(exprs []Expression) *EvaluatorSuit {
e := &EvaluatorSuit{}
// NewEvaluatorSuite creates an EvaluatorSuite to evaluate all the exprs.
func NewEvaluatorSuite(exprs []Expression, avoidColumnEvaluator bool) *EvaluatorSuite {
e := &EvaluatorSuite{}

for i, expr := range exprs {
switch x := expr.(type) {
case *Column:
for i := 0; i < len(exprs); i++ {
if col, isCol := exprs[i].(*Column); isCol && !avoidColumnEvaluator {
if e.columnEvaluator == nil {
e.columnEvaluator = &columnEvaluator{inputIdxToOutputIdxes: make(map[int][]int)}
}
inputIdx, outputIdx := x.Index, i
inputIdx, outputIdx := col.Index, i
e.columnEvaluator.inputIdxToOutputIdxes[inputIdx] = append(e.columnEvaluator.inputIdxToOutputIdxes[inputIdx], outputIdx)
default:
if e.defaultEvaluator == nil {
e.defaultEvaluator = &defaultEvaluator{
outputIdxes: make([]int, 0, len(exprs)),
exprs: make([]Expression, 0, len(exprs)),
}
continue
}
if e.defaultEvaluator == nil {
e.defaultEvaluator = &defaultEvaluator{
outputIdxes: make([]int, 0, len(exprs)),
exprs: make([]Expression, 0, len(exprs)),
}
e.defaultEvaluator.exprs = append(e.defaultEvaluator.exprs, x)
e.defaultEvaluator.outputIdxes = append(e.defaultEvaluator.outputIdxes, i)
}
e.defaultEvaluator.exprs = append(e.defaultEvaluator.exprs, exprs[i])
e.defaultEvaluator.outputIdxes = append(e.defaultEvaluator.outputIdxes, i)
}

if e.defaultEvaluator != nil {
Expand All @@ -102,9 +101,9 @@ func NewEvaluatorSuit(exprs []Expression) *EvaluatorSuit {
return e
}

// Run evaluates all the expressions hold by this EvaluatorSuit.
// Run evaluates all the expressions hold by this EvaluatorSuite.
// NOTE: "defaultEvaluator" must be evaluated before "columnEvaluator".
func (e *EvaluatorSuit) Run(ctx sessionctx.Context, input, output *chunk.Chunk) error {
func (e *EvaluatorSuite) Run(ctx sessionctx.Context, input, output *chunk.Chunk) error {
if e.defaultEvaluator != nil {
err := e.defaultEvaluator.run(ctx, input, output)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions plan/gen_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,9 @@ func (p *LogicalProjection) exhaustPhysicalPlans(prop *requiredProp) []PhysicalP
return nil
}
proj := PhysicalProjection{
Exprs: p.Exprs,
CalculateNoDelay: p.calculateNoDelay,
Exprs: p.Exprs,
CalculateNoDelay: p.calculateNoDelay,
AvoidColumnEvaluator: p.avoidColumnEvaluator,
}.init(p.ctx, p.stats.scaleByExpectCnt(prop.expectedCnt), newProp)
proj.SetSchema(p.schema)
return []PhysicalPlan{proj}
Expand Down
2 changes: 1 addition & 1 deletion plan/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ func (b *planBuilder) buildProjection4Union(u *LogicalUnionAll) {
}
if _, isProj := child.(*LogicalProjection); needProjection || !isProj {
b.optFlag |= flagEliminateProjection
proj := LogicalProjection{Exprs: exprs}.init(b.ctx)
proj := LogicalProjection{Exprs: exprs, avoidColumnEvaluator: true}.init(b.ctx)
if childID == 0 {
for _, col := range unionSchema.Columns {
col.FromID = proj.ID()
Expand Down
7 changes: 7 additions & 0 deletions plan/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ type LogicalProjection struct {
// Currently it is "true" only when the current sql query is a "DO" statement.
// See "https://dev.mysql.com/doc/refman/5.7/en/do.html" for more detail.
calculateNoDelay bool

// avoidColumnRef is a temporary variable which is ONLY used to avoid
// building columnEvaluator for the expressions of Projection which is
// built by buildProjection4Union.
// This can be removed after column pool being supported.
// Related issue: TiDB#8141(https://github.com/pingcap/tidb/issues/8141)
avoidColumnEvaluator bool
}

func (p *LogicalProjection) extractCorrelatedCols() []*expression.CorrelatedColumn {
Expand Down
5 changes: 3 additions & 2 deletions plan/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,9 @@ type PhysicalTableScan struct {
type PhysicalProjection struct {
physicalSchemaProducer

Exprs []expression.Expression
CalculateNoDelay bool
Exprs []expression.Expression
CalculateNoDelay bool
AvoidColumnEvaluator bool
}

// PhysicalTopN is the physical operator of topN.
Expand Down