Skip to content

Commit

Permalink
plan,executor: support IndexJoin over UnionScan (pingcap#7877)
Browse files Browse the repository at this point in the history
  • Loading branch information
eurekaka authored and alivxxx committed Jul 22, 2019
1 parent 7eaea30 commit 1c5d556
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 111 deletions.
30 changes: 26 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,18 +705,22 @@ func (b *executorBuilder) buildExplain(v *plannercore.Explain) Executor {
}

func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) Executor {
src := b.build(v.Children()[0])
reader := b.build(v.Children()[0])
if b.err != nil {
b.err = errors.Trace(b.err)
return nil
}
us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src)}
return b.buildUnionScanFromReader(reader, v)
}

func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannercore.PhysicalUnionScan) Executor {
us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), reader)}
// Get the handle column index of the below plannercore.
// We can guarantee that there must be only one col in the map.
for _, cols := range v.Children()[0].Schema().TblID2Handle {
us.belowHandleIndex = cols[0].Index
}
switch x := src.(type) {
switch x := reader.(type) {
case *TableReaderExecutor:
us.desc = x.desc
us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(x.table.Meta().ID)
Expand Down Expand Up @@ -753,7 +757,7 @@ func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) E
b.err = us.buildAndSortAddedRows()
default:
// The mem table will not be written by sql directly, so we can omit the union scan to avoid err reporting.
return src
return reader
}
if b.err != nil {
b.err = errors.Trace(b.err)
Expand Down Expand Up @@ -1864,10 +1868,28 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context,
return builder.buildIndexReaderForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff)
case *plannercore.PhysicalIndexLookUpReader:
return builder.buildIndexLookUpReaderForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff)
case *plannercore.PhysicalUnionScan:
return builder.buildUnionScanForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff)
}
return nil, errors.New("Wrong plan type for dataReaderBuilder")
}

func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context, v *plannercore.PhysicalUnionScan,
values [][]types.Datum, indexRanges []*ranger.Range, keyOff2IdxOff []int) (Executor, error) {
builder.Plan = v.Children()[0]
reader, err := builder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff)
if err != nil {
return nil, errors.Trace(err)
}
e := builder.buildUnionScanFromReader(reader, v)
if e == nil {
return nil, builder.err
}
us := e.(*UnionScanExec)
us.snapshotChunkBuffer = us.newFirstChunk()
return us, nil
}

func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalTableReader, datums [][]types.Datum) (Executor, error) {
e, err := buildNoRangeTableReader(builder.executorBuilder, v)
if err != nil {
Expand Down
76 changes: 51 additions & 25 deletions executor/index_lookup_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,30 +38,56 @@ func (s *testSuite) TestIndexLookupJoinHang(c *C) {
rs.Close()
}

func (s *testSuite) TestInapplicableIndexJoinHint(c *C) {
func (s *testSuite) TestIndexJoinUnionScan(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec(`drop table if exists t1, t2;`)
tk.MustExec(`create table t1(a bigint, b bigint);`)
tk.MustExec(`create table t2(a bigint, b bigint);`)
tk.MustQuery(`select /*+ TIDB_INLJ(t1, t2) */ * from t1, t2;`).Check(testkit.Rows())
tk.MustQuery(`show warnings;`).Check(testkit.Rows(`Warning 1815 Optimizer Hint /*+ TIDB_INLJ(t1, t2) */ is inapplicable without column equal ON condition`))
tk.MustQuery(`select /*+ TIDB_INLJ(t1, t2) */ * from t1 join t2 on t1.a=t2.a;`).Check(testkit.Rows())
tk.MustQuery(`show warnings;`).Check(testkit.Rows(`Warning 1815 Optimizer Hint /*+ TIDB_INLJ(t1, t2) */ is inapplicable`))

tk.MustExec(`drop table if exists t1, t2;`)
tk.MustExec(`create table t1(a bigint, b bigint, index idx_a(a));`)
tk.MustExec(`create table t2(a bigint, b bigint);`)
tk.MustQuery(`select /*+ TIDB_INLJ(t1) */ * from t1 left join t2 on t1.a=t2.a;`).Check(testkit.Rows())
tk.MustQuery(`show warnings;`).Check(testkit.Rows(`Warning 1815 Optimizer Hint /*+ TIDB_INLJ(t1) */ is inapplicable`))
tk.MustQuery(`select /*+ TIDB_INLJ(t2) */ * from t1 right join t2 on t1.a=t2.a;`).Check(testkit.Rows())
tk.MustQuery(`show warnings;`).Check(testkit.Rows(`Warning 1815 Optimizer Hint /*+ TIDB_INLJ(t2) */ is inapplicable`))
}

func (s *testSuite) TestIndexJoinOverflow(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec(`drop table if exists t1, t2`)
tk.MustExec(`create table t1(a int)`)
tk.MustExec(`insert into t1 values (-1)`)
tk.MustExec(`create table t2(a int unsigned, index idx(a));`)
tk.MustQuery(`select /*+ TIDB_INLJ(t2) */ * from t1 join t2 on t1.a = t2.a;`).Check(testkit.Rows())
tk.MustExec("create table t1(id int primary key, a int)")
tk.MustExec("create table t2(id int primary key, a int, b int, key idx_a(a))")
tk.MustExec("insert into t2 values (1,1,1),(4,2,4)")
tk.MustExec("begin")
tk.MustExec("insert into t1 values(2,2)")
tk.MustExec("insert into t2 values(2,2,2), (3,3,3)")
// TableScan below UnionScan
tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.id").Check(testkit.Rows(
"IndexJoin_11 12500.00 root inner join, inner:UnionScan_10, outer key:test.t1.a, inner key:test.t2.id",
"├─UnionScan_12 10000.00 root ",
"│ └─TableReader_14 10000.00 root data:TableScan_13",
"│ └─TableScan_13 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo",
"└─UnionScan_10 10.00 root ",
" └─TableReader_9 10.00 root data:TableScan_8",
" └─TableScan_8 10.00 cop table:t2, range: decided by [test.t1.a], keep order:false, stats:pseudo",
))
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.id").Check(testkit.Rows(
"2 2 2 2 2",
))
// IndexLookUp below UnionScan
tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.a").Check(testkit.Rows(
"IndexJoin_12 12500.00 root inner join, inner:UnionScan_11, outer key:test.t1.a, inner key:test.t2.a",
"├─UnionScan_13 10000.00 root ",
"│ └─TableReader_15 10000.00 root data:TableScan_14",
"│ └─TableScan_14 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo",
"└─UnionScan_11 10.00 root ",
" └─IndexLookUp_10 10.00 root ",
" ├─IndexScan_8 10.00 cop table:t2, index:a, range: decided by [test.t1.a], keep order:false, stats:pseudo",
" └─TableScan_9 10.00 cop table:t2, keep order:false, stats:pseudo",
))
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.a").Check(testkit.Rows(
"2 2 2 2 2",
"2 2 4 2 4",
))
// IndexScan below UnionScan
tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ t1.a, t2.a from t1 join t2 on t1.a = t2.a").Check(testkit.Rows(
"Projection_7 12500.00 root test.t1.a, test.t2.a",
"└─IndexJoin_11 12500.00 root inner join, inner:UnionScan_10, outer key:test.t1.a, inner key:test.t2.a",
" ├─UnionScan_12 10000.00 root ",
" │ └─TableReader_14 10000.00 root data:TableScan_13",
" │ └─TableScan_13 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo",
" └─UnionScan_10 10.00 root ",
" └─IndexReader_9 10.00 root index:IndexScan_8",
" └─IndexScan_8 10.00 cop table:t2, index:a, range: decided by [test.t1.a], keep order:false, stats:pseudo",
))
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ t1.a, t2.a from t1 join t2 on t1.a = t2.a").Check(testkit.Rows(
"2 2",
"2 2",
))
tk.MustExec("rollback")
}
49 changes: 30 additions & 19 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,20 +411,23 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou
innerJoinKeys = p.LeftJoinKeys
outerJoinKeys = p.RightJoinKeys
}
x, ok := innerChild.(*DataSource)
if !ok {
ds, isDataSource := innerChild.(*DataSource)
us, isUnionScan := innerChild.(*LogicalUnionScan)
if !isDataSource && !isUnionScan {
return nil
}
if isUnionScan {
ds = us.Children()[0].(*DataSource)
}
var tblPath *accessPath
for _, path := range x.possibleAccessPaths {
for _, path := range ds.possibleAccessPaths {
if path.isTablePath {
tblPath = path
break
}
}
if pkCol := x.getPKIsHandleCol(); pkCol != nil && tblPath != nil {
if pkCol := ds.getPKIsHandleCol(); pkCol != nil && tblPath != nil {
keyOff2IdxOff := make([]int, len(innerJoinKeys))
pkCol := x.getPKIsHandleCol()
pkMatched := false
for i, key := range innerJoinKeys {
if !key.Equal(nil, pkCol) {
Expand All @@ -435,7 +438,7 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou
keyOff2IdxOff[i] = 0
}
if pkMatched {
innerPlan := p.constructInnerTableScan(x, pkCol, outerJoinKeys)
innerPlan := p.constructInnerTableScan(ds, pkCol, outerJoinKeys, us)
// Since the primary key means one value corresponding to exact one row, this will always be a no worse one
// comparing to other index.
return p.constructIndexJoin(prop, innerJoinKeys, outerJoinKeys, outerIdx, innerPlan, nil, keyOff2IdxOff)
Expand All @@ -448,12 +451,12 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou
remainedOfBest []expression.Expression
keyOff2IdxOff []int
)
for _, path := range x.possibleAccessPaths {
for _, path := range ds.possibleAccessPaths {
if path.isTablePath {
continue
}
indexInfo := path.index
ranges, remained, tmpKeyOff2IdxOff := p.buildRangeForIndexJoin(indexInfo, x, innerJoinKeys)
ranges, remained, tmpKeyOff2IdxOff := p.buildRangeForIndexJoin(indexInfo, ds, innerJoinKeys)
// We choose the index by the number of used columns of the range, the much the better.
// Notice that there may be the cases like `t1.a=t2.a and b > 2 and b < 1`. So ranges can be nil though the conditions are valid.
// But obviously when the range is nil, we don't need index join.
Expand All @@ -466,20 +469,15 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou
}
}
if bestIndexInfo != nil {
innerPlan := p.constructInnerIndexScan(x, bestIndexInfo, remainedOfBest, outerJoinKeys)
innerPlan := p.constructInnerIndexScan(ds, bestIndexInfo, remainedOfBest, outerJoinKeys, us)
return p.constructIndexJoin(prop, innerJoinKeys, outerJoinKeys, outerIdx, innerPlan, rangesOfBest, keyOff2IdxOff)
}
return nil
}

// constructInnerTableScan is specially used to construct the inner plan for PhysicalIndexJoin.
func (p *LogicalJoin) constructInnerTableScan(ds *DataSource, pk *expression.Column, outerJoinKeys []*expression.Column) PhysicalPlan {
var ranges []*ranger.Range
if pk != nil {
ranges = ranger.FullIntRange(mysql.HasUnsignedFlag(pk.RetType.Flag))
} else {
ranges = ranger.FullIntRange(false)
}
func (p *LogicalJoin) constructInnerTableScan(ds *DataSource, pk *expression.Column, outerJoinKeys []*expression.Column, us *LogicalUnionScan) PhysicalPlan {
ranges := ranger.FullIntRange(mysql.HasUnsignedFlag(pk.RetType.Flag))
ts := PhysicalTableScan{
Table: ds.tableInfo,
Columns: ds.Columns,
Expand All @@ -504,11 +502,23 @@ func (p *LogicalJoin) constructInnerTableScan(ds *DataSource, pk *expression.Col
selStats := ts.stats.Scale(selectionFactor)
ts.addPushedDownSelection(copTask, selStats)
t := finishCopTask(ds.ctx, copTask)
return t.plan()
reader := t.plan()
return p.constructInnerUnionScan(us, reader)
}

func (p *LogicalJoin) constructInnerUnionScan(us *LogicalUnionScan, reader PhysicalPlan) PhysicalPlan {
if us == nil {
return reader
}
// Use `reader.stats` instead of `us.stats` because it should be more accurate. No need to specify
// childrenReqProps now since we have got reader already.
physicalUnionScan := PhysicalUnionScan{Conditions: us.conditions}.init(us.ctx, reader.statsInfo(), nil)
physicalUnionScan.SetChildren(reader)
return physicalUnionScan
}

// constructInnerIndexScan is specially used to construct the inner plan for PhysicalIndexJoin.
func (p *LogicalJoin) constructInnerIndexScan(ds *DataSource, idx *model.IndexInfo, remainedConds []expression.Expression, outerJoinKeys []*expression.Column) PhysicalPlan {
func (p *LogicalJoin) constructInnerIndexScan(ds *DataSource, idx *model.IndexInfo, remainedConds []expression.Expression, outerJoinKeys []*expression.Column, us *LogicalUnionScan) PhysicalPlan {
is := PhysicalIndexScan{
Table: ds.tableInfo,
TableAsName: ds.TableAsName,
Expand Down Expand Up @@ -550,7 +560,8 @@ func (p *LogicalJoin) constructInnerIndexScan(ds *DataSource, idx *model.IndexIn
path := &accessPath{indexFilters: indexConds, tableFilters: tblConds, countAfterIndex: math.MaxFloat64}
is.addPushedDownSelection(cop, ds, math.MaxFloat64, path)
t := finishCopTask(ds.ctx, cop)
return t.plan()
reader := t.plan()
return p.constructInnerUnionScan(us, reader)
}

// buildRangeForIndexJoin checks whether this index can be used for building index join and return the range if this index is ok.
Expand Down
11 changes: 7 additions & 4 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ func (p *LogicalTableDual) findBestTask(prop *property.PhysicalProperty) (task,

// findBestTask implements LogicalPlan interface.
func (p *baseLogicalPlan) findBestTask(prop *property.PhysicalProperty) (bestTask task, err error) {
// If p is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself,
// and set inner child prop nil, so here we do nothing.
if prop == nil {
return nil, nil
}
// Look up the task with this prop in the task map.
// It's used to reduce double counting.
bestTask = p.getTask(prop)
Expand Down Expand Up @@ -329,10 +334,8 @@ func (ds *DataSource) skylinePruning(prop *property.PhysicalProperty) []*candida
// findBestTask implements the PhysicalPlan interface.
// It will enumerate all the available indices and choose a plan with least cost.
func (ds *DataSource) findBestTask(prop *property.PhysicalProperty) (t task, err error) {
// If ds is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself.
// So here we do nothing.
// TODO: Add a special prop to handle IndexJoin's inner plan.
// Then we can remove forceToTableScan and forceToIndexScan.
// If ds is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself,
// and set inner child prop nil, so here we do nothing.
if prop == nil {
return nil, nil
}
Expand Down
81 changes: 22 additions & 59 deletions planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/util/testleak"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -1305,7 +1304,7 @@ func (s *testPlanSuite) TestIndexLookupCartesianJoin(c *C) {
c.Assert(lastWarn.Err.Error(), Equals, "[planner:1815]Optimizer Hint /*+ TIDB_INLJ(t1, t2) */ is inapplicable without column equal ON condition")
}

func (s *testPlanSuite) TestDoSubquery(c *C) {
func (s *testPlanSuite) TestIndexJoinUnionScan(c *C) {
defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
Expand All @@ -1321,71 +1320,35 @@ func (s *testPlanSuite) TestDoSubquery(c *C) {
sql string
best string
}{
// Test Index Join + UnionScan + TableScan.
{
sql: "do 1 in (select a from t)",
best: "LeftHashJoin{Dual->TableReader(Table(t))}->Projection",
},
}
for _, tt := range tests {
comment := Commentf("for %s", tt.sql)
stmt, err := s.ParseOneStmt(tt.sql, "", "")
c.Assert(err, IsNil, comment)
p, err := core.Optimize(se, stmt, s.is)
c.Assert(err, IsNil)
c.Assert(core.ToString(p), Equals, tt.best, comment)
}
}

func (s *testPlanSuite) TestUnmatchedTableInHint(c *C) {
defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
defer func() {
dom.Close()
store.Close()
}()
se, err := session.CreateSession4Test(store)
c.Assert(err, IsNil)
_, err = se.Execute(context.Background(), "use test")
c.Assert(err, IsNil)
tests := []struct {
sql string
warning string
}{
{
sql: "SELECT /*+ TIDB_SMJ(t3, t4) */ * from t t1, t t2 where t1.a = t2.a",
warning: "[planner:1815]There are no matching table names for (t3, t4) in optimizer hint /*+ TIDB_SMJ(t3, t4) */. Maybe you can use the table alias name",
},
{
sql: "SELECT /*+ TIDB_HJ(t3, t4) */ * from t t1, t t2 where t1.a = t2.a",
warning: "[planner:1815]There are no matching table names for (t3, t4) in optimizer hint /*+ TIDB_HJ(t3, t4) */. Maybe you can use the table alias name",
},
{
sql: "SELECT /*+ TIDB_INLJ(t3, t4) */ * from t t1, t t2 where t1.a = t2.a",
warning: "[planner:1815]There are no matching table names for (t3, t4) in optimizer hint /*+ TIDB_INLJ(t3, t4) */. Maybe you can use the table alias name",
sql: "select /*+ TIDB_INLJ(t1, t2) */ * from t t1, t t2 where t1.a = t2.a",
best: "IndexJoin{TableReader(Table(t))->UnionScan([])->TableReader(Table(t))->UnionScan([])}(t1.a,t2.a)",
},
// Test Index Join + UnionScan + DoubleRead.
{
sql: "SELECT /*+ TIDB_SMJ(t1, t2) */ * from t t1, t t2 where t1.a = t2.a",
warning: "",
sql: "select /*+ TIDB_INLJ(t1, t2) */ * from t t1, t t2 where t1.a = t2.c",
best: "IndexJoin{TableReader(Table(t))->UnionScan([])->IndexLookUp(Index(t.c_d_e)[[NULL,+inf]], Table(t))->UnionScan([])}(t1.a,t2.c)",
},
// Test Index Join + UnionScan + IndexScan.
{
sql: "SELECT /*+ TIDB_SMJ(t3, t4) */ * from t t1, t t2, t t3 where t1.a = t2.a and t2.a = t3.a",
warning: "[planner:1815]There are no matching table names for (t4) in optimizer hint /*+ TIDB_SMJ(t3, t4) */. Maybe you can use the table alias name",
sql: "select /*+ TIDB_INLJ(t1, t2) */ t1.a , t2.c from t t1, t t2 where t1.a = t2.c",
best: "IndexJoin{TableReader(Table(t))->UnionScan([])->IndexReader(Index(t.c_d_e)[[NULL,+inf]])->UnionScan([])}(t1.a,t2.c)->Projection",
},
}
for _, test := range tests {
se.GetSessionVars().StmtCtx.SetWarnings(nil)
stmt, err := s.ParseOneStmt(test.sql, "", "")
for i, tt := range tests {
comment := Commentf("case:%v sql:%s", i, tt.sql)
stmt, err := s.ParseOneStmt(tt.sql, "", "")
c.Assert(err, IsNil, comment)
err = se.NewTxn()
c.Assert(err, IsNil)
_, err = core.Optimize(se, stmt, s.is)
// Make txn not read only.
txn, err := se.Txn(true)
c.Assert(err, IsNil)
warnings := se.GetSessionVars().StmtCtx.GetWarnings()
if test.warning == "" {
c.Assert(len(warnings), Equals, 0)
} else {
c.Assert(len(warnings), Equals, 1)
c.Assert(warnings[0].Level, Equals, stmtctx.WarnLevelWarning)
c.Assert(warnings[0].Err.Error(), Equals, test.warning)
}
txn.Set(kv.Key("AAA"), []byte("BBB"))
se.StmtCommit()
p, err := core.Optimize(se, stmt, s.is)
c.Assert(err, IsNil)
c.Assert(core.ToString(p), Equals, tt.best, comment)
}
}

0 comments on commit 1c5d556

Please sign in to comment.