Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix cte nil pointer error when got multiple apply #44782

Merged
merged 19 commits into from
Jun 20, 2023
44 changes: 30 additions & 14 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5343,7 +5343,12 @@ func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) Executor {
iterInTbl = storages.IterInTbl
producer = storages.Producer
} else {
if v.SeedPlan == nil {
b.err = errors.New("cte.seedPlan cannot be nil")
return nil
}
// Build seed part.
corCols := plannercore.ExtractOuterApplyCorrelatedCols(v.SeedPlan)
seedExec := b.build(v.SeedPlan)
if b.err != nil {
return nil
Expand All @@ -5364,10 +5369,15 @@ func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) Executor {
storageMap[v.CTE.IDForStorage] = &CTEStorages{ResTbl: resTbl, IterInTbl: iterInTbl}

// Build recursive part.
recursiveExec := b.build(v.RecurPlan)
if b.err != nil {
return nil
var recursiveExec Executor
if v.RecurPlan != nil {
recursiveExec = b.build(v.RecurPlan)
if b.err != nil {
return nil
}
corCols = append(corCols, plannercore.ExtractOuterApplyCorrelatedCols(v.RecurPlan)...)
}

var sel []int
if v.CTE.IsDistinct {
sel = make([]int, chkSize)
Expand All @@ -5376,18 +5386,24 @@ func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) Executor {
}
}

var corColHashCodes [][]byte
for _, corCol := range corCols {
corColHashCodes = append(corColHashCodes, corCol.HashCode(b.ctx.GetSessionVars().StmtCtx))
}

producer = &cteProducer{
ctx: b.ctx,
seedExec: seedExec,
recursiveExec: recursiveExec,
resTbl: resTbl,
iterInTbl: iterInTbl,
isDistinct: v.CTE.IsDistinct,
sel: sel,
hasLimit: v.CTE.HasLimit,
limitBeg: v.CTE.LimitBeg,
limitEnd: v.CTE.LimitEnd,
isInApply: v.CTE.IsInApply,
ctx: b.ctx,
seedExec: seedExec,
recursiveExec: recursiveExec,
resTbl: resTbl,
iterInTbl: iterInTbl,
isDistinct: v.CTE.IsDistinct,
sel: sel,
hasLimit: v.CTE.HasLimit,
limitBeg: v.CTE.LimitBeg,
limitEnd: v.CTE.LimitEnd,
corCols: corCols,
corColHashCodes: corColHashCodes,
}
storageMap[v.CTE.IDForStorage].Producer = producer
}
Expand Down
37 changes: 27 additions & 10 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
package executor

import (
"bytes"
"context"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -81,8 +83,11 @@ func (e *CTEExec) Open(ctx context.Context) (err error) {
e.producer.resTbl.Lock()
defer e.producer.resTbl.Unlock()

if e.producer.isInApply {
if e.producer.checkAndUpdateCorColHashCode() {
e.producer.reset()
if err = e.producer.reopenTbls(); err != nil {
return err
}
}
if !e.producer.opened {
if err = e.producer.openProducer(ctx, e); err != nil {
Expand All @@ -108,6 +113,10 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
func (e *CTEExec) Close() (err error) {
e.producer.resTbl.Lock()
if !e.producer.closed {
// closeProducer() only close seedExec and recursiveExec, will not touch resTbl.
// It means you can still read resTbl after call closeProducer().
// You can even call all three functions(openProducer/produce/closeProducer) in CTEExec.Next().
// Separating these three function calls is only to follow the abstraction of the volcano model.
err = e.producer.closeProducer()
}
e.producer.resTbl.Unlock()
Expand Down Expand Up @@ -155,10 +164,9 @@ type cteProducer struct {
memTracker *memory.Tracker
diskTracker *disk.Tracker

// isInApply indicates whether CTE is in inner side of Apply
// and should resTbl/iterInTbl be reset for each outer row of Apply.
// Because we reset them when SQL is finished instead of when CTEExec.Close() is called.
isInApply bool
// Correlated Column.
corCols []*expression.CorrelatedColumn
corColHashCodes [][]byte
}

func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err error) {
Expand Down Expand Up @@ -224,11 +232,6 @@ func (p *cteProducer) closeProducer() (err error) {
}
}
p.closed = true
if p.isInApply {
if err = p.reopenTbls(); err != nil {
return err
}
}
return nil
}

Expand Down Expand Up @@ -657,3 +660,17 @@ func (p *cteProducer) checkHasDup(probeKey uint64,
}
return false, nil
}

// Return true if cor col has changed.
func (p *cteProducer) checkAndUpdateCorColHashCode() bool {
var changed bool
for i, corCol := range p.corCols {
corCol.CleanHashCode()
newHashCode := corCol.HashCode(p.ctx.GetSessionVars().StmtCtx)
if !bytes.Equal(newHashCode, p.corColHashCodes[i]) {
changed = true
p.corColHashCodes[i] = newHashCode
}
}
return changed
}
5 changes: 5 additions & 0 deletions executor/cte_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,4 +499,9 @@ func TestCTEShareCorColumn(t *testing.T) {
tk.MustQuery("with cte1 as (select t1.c1, (select t2.c2 from t2 where t2.c2 = str_to_date(t1.c2, '%Y-%m-%d')) from t1 inner join t2 on t1.c1 = t2.c1) select /*+ hash_join_build(alias1) */ * from cte1 alias1 inner join cte1 alias2 on alias1.c1 = alias2.c1;").Check(testkit.Rows("1 2020-10-10 1 2020-10-10"))
tk.MustQuery("with cte1 as (select t1.c1, (select t2.c2 from t2 where t2.c2 = str_to_date(t1.c2, '%Y-%m-%d')) from t1 inner join t2 on t1.c1 = t2.c1) select /*+ hash_join_build(alias2) */ * from cte1 alias1 inner join cte1 alias2 on alias1.c1 = alias2.c1;").Check(testkit.Rows("1 2020-10-10 1 2020-10-10"))
}

tk.MustExec("drop table if exists t1;")
tk.MustExec("create table t1(a int);")
tk.MustExec("insert into t1 values(1), (2);")
tk.MustQuery("SELECT * FROM t1 dt WHERE EXISTS( WITH RECURSIVE qn AS (SELECT a AS b UNION ALL SELECT b+1 FROM qn WHERE b=0 or b = 1) SELECT * FROM qn dtqn1 where exists (select /*+ NO_DECORRELATE() */ b from qn where dtqn1.b+1));").Check(testkit.Rows("1", "2"))
}
27 changes: 26 additions & 1 deletion expression/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import (
type CorrelatedColumn struct {
Column

Data *types.Datum
Data *types.Datum
columnHashCode []byte
}

// Clone implements Expression interface.
Expand Down Expand Up @@ -223,6 +224,30 @@ func (col *CorrelatedColumn) RemapColumn(m map[int64]*Column) (Expression, error
}, nil
}

// HashCode implements Expression interface.
func (col *CorrelatedColumn) HashCode(sc *stmtctx.StatementContext) []byte {
if len(col.columnHashCode) == 0 {
col.columnHashCode = make([]byte, 0, 9)
col.columnHashCode = append(col.columnHashCode, columnFlag)
col.columnHashCode = codec.EncodeInt(col.columnHashCode, col.UniqueID)
}

if len(col.hashcode) < len(col.columnHashCode) {
if len(col.hashcode) == 0 {
col.hashcode = make([]byte, 0, len(col.columnHashCode))
} else {
col.hashcode = col.hashcode[:0]
}
col.hashcode = append(col.hashcode, col.columnHashCode...)
}

// Because col.Data can be changed anytime, so always use newest Datum to calc hash code.
if col.Data != nil {
col.hashcode = codec.HashCode(col.hashcode, *col.Data)
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
}
return col.hashcode
}

// Column represents a column.
type Column struct {
RetType *types.FieldType
Expand Down
28 changes: 28 additions & 0 deletions expression/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package expression

import (
"bytes"
"fmt"
"testing"

Expand Down Expand Up @@ -263,3 +264,30 @@ func TestGcColumnExprIsTidbShard(t *testing.T) {
shardExpr := NewFunctionInternal(ctx, ast.TiDBShard, ft, col)
require.True(t, GcColumnExprIsTidbShard(shardExpr))
}

func TestCorColHashCode(t *testing.T) {
ctx := mock.NewContext()
sc := ctx.GetSessionVars().StmtCtx
col := &Column{UniqueID: 0, ID: 0, RetType: types.NewFieldType(mysql.TypeLonglong)}

corCol := CorrelatedColumn{
Column: *col,
}

oriCorColHashCode := corCol.HashCode(sc)
oriColHashCode := col.HashCode(sc)
// hash code is same when Data is not set.
require.True(t, bytes.Equal(oriColHashCode, oriCorColHashCode))

// corCol.hashcode changes after datum changed.
d1 := types.NewDatum(1)
corCol.Data = &d1
require.False(t, bytes.Equal(col.HashCode(sc), corCol.HashCode(sc)))
d1HashCode := corCol.HashCode(sc)
d2 := types.NewFloat64Datum(1.1)
corCol.Data = &d2
require.False(t, bytes.Equal(d1HashCode, corCol.HashCode(sc)))

// col.hashcode doesn't change.
require.True(t, bytes.Equal(oriColHashCode, col.HashCode(sc)))
}
71 changes: 71 additions & 0 deletions planner/core/rule_decorrelate.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,77 @@ func ExtractCorrelatedCols4PhysicalPlan(p PhysicalPlan) []*expression.Correlated
return corCols
}

// ExtractOuterApplyCorrelatedCols only extract the correlated columns whose corresponding Apply operator is outside the plan.
// For Plan-1, ExtractOuterApplyCorrelatedCols(CTE-1) will return cor_col_1.
// Plan-1:
//
// Apply_1
// |_ outerSide
// |_CTEExec(CTE-1)
//
// CTE-1
// |_Selection(cor_col_1)
//
// For Plan-2, the result of ExtractOuterApplyCorrelatedCols(CTE-2) will not return cor_col_3.
// Because Apply_3 is inside CTE-2.
// Plan-2:
//
// Apply_2
// |_ outerSide
// |_ Selection(cor_col_2)
// |_CTEExec(CTE-2)
// CTE-2
// |_ Apply_3
// |_ outerSide
// |_ innerSide(cor_col_3)
func ExtractOuterApplyCorrelatedCols(p PhysicalPlan) []*expression.CorrelatedColumn {
return extractOuterApplyCorrelatedColsHelper(p, []*expression.Schema{})
}

func extractOuterApplyCorrelatedColsHelper(p PhysicalPlan, outerSchemas []*expression.Schema) []*expression.CorrelatedColumn {
if p == nil {
return nil
}
curCorCols := p.ExtractCorrelatedCols()
newCorCols := make([]*expression.CorrelatedColumn, 0, len(curCorCols))

// If a corresponding Apply is found inside this PhysicalPlan, ignore it.
for _, corCol := range curCorCols {
var found bool
for _, outerSchema := range outerSchemas {
if outerSchema.ColumnIndex(&corCol.Column) != -1 {
found = true
break
}
}
if !found {
newCorCols = append(newCorCols, corCol)
}
}

switch v := p.(type) {
case *PhysicalApply:
var outerPlan PhysicalPlan
if v.InnerChildIdx == 0 {
outerPlan = v.Children()[1]
} else {
outerPlan = v.Children()[0]
}
outerSchemas = append(outerSchemas, outerPlan.Schema())
newCorCols = append(newCorCols, extractOuterApplyCorrelatedColsHelper(v.Children()[0], outerSchemas)...)
newCorCols = append(newCorCols, extractOuterApplyCorrelatedColsHelper(v.Children()[1], outerSchemas)...)
case *PhysicalCTE:
newCorCols = append(newCorCols, extractOuterApplyCorrelatedColsHelper(v.SeedPlan, outerSchemas)...)
newCorCols = append(newCorCols, extractOuterApplyCorrelatedColsHelper(v.RecurPlan, outerSchemas)...)
default:
for _, child := range p.Children() {
newCorCols = append(newCorCols, extractOuterApplyCorrelatedColsHelper(child, outerSchemas)...)
}
}

return newCorCols
}

// decorrelateSolver tries to convert apply plan to join plan.
type decorrelateSolver struct{}

Expand Down