Skip to content

Commit

Permalink
planner: fix topn wrongly pushed to index scan side when it's a prefi…
Browse files Browse the repository at this point in the history
…x index (#29778) (#29798)

close #29711
  • Loading branch information
ti-srebot authored Feb 7, 2022
1 parent 6a867f7 commit bf823e1
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 11 deletions.
36 changes: 36 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4087,3 +4087,39 @@ func (s *testIntegrationSuite) TestIssues27130(c *C) {
" └─IndexRangeScan 10.00 cop[tikv] table:t3, index:a(a, b, c) range:[1,1], keep order:false, stats:pseudo",
))
}

func (s *testIntegrationSuite) TestIssues29711(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")

tk.MustExec("drop table if exists tbl_29711")
tk.MustExec("CREATE TABLE `tbl_29711` (" +
"`col_250` text COLLATE utf8_unicode_ci NOT NULL," +
"`col_251` enum('Alice','Bob','Charlie','David') COLLATE utf8_unicode_ci NOT NULL DEFAULT 'Charlie'," +
"PRIMARY KEY (`col_251`,`col_250`(1)) NONCLUSTERED);")
tk.MustQuery("explain " +
"select col_250,col_251 from tbl_29711 where col_251 between 'Bob' and 'David' order by col_250,col_251 limit 6;").
Check(testkit.Rows(
"TopN_9 6.00 root test.tbl_29711.col_250, test.tbl_29711.col_251, offset:0, count:6",
"└─IndexLookUp_22 6.00 root ",
" ├─IndexRangeScan_19(Build) 30.00 cop[tikv] table:tbl_29711, index:PRIMARY(col_251, col_250) range:[\"Bob\",\"Bob\"], [\"Charlie\",\"Charlie\"], [\"David\",\"David\"], keep order:false, stats:pseudo",
" └─TopN_21(Probe) 6.00 cop[tikv] test.tbl_29711.col_250, test.tbl_29711.col_251, offset:0, count:6",
" └─TableRowIDScan_20 30.00 cop[tikv] table:tbl_29711 keep order:false, stats:pseudo",
))

tk.MustExec("drop table if exists t29711")
tk.MustExec("CREATE TABLE `t29711` (" +
"`a` varchar(10) DEFAULT NULL," +
"`b` int(11) DEFAULT NULL," +
"`c` int(11) DEFAULT NULL," +
"KEY `ia` (`a`(2)))")
tk.MustQuery("explain select * from t29711 use index (ia) order by a limit 10;").
Check(testkit.Rows(
"TopN_8 10.00 root test.t29711.a, offset:0, count:10",
"└─IndexLookUp_17 10.00 root ",
" ├─IndexFullScan_14(Build) 10000.00 cop[tikv] table:t29711, index:ia(a) keep order:false, stats:pseudo",
" └─TopN_16(Probe) 10.00 cop[tikv] test.t29711.a, offset:0, count:10",
" └─TableRowIDScan_15 10000.00 cop[tikv] table:t29711 keep order:false, stats:pseudo",
))

}
36 changes: 25 additions & 11 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,14 +1215,6 @@ func (p *PhysicalTopN) canPushDown(storeTp kv.StoreType) bool {
return expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, exprs, p.ctx.GetClient(), storeTp)
}

func (p *PhysicalTopN) allColsFromSchema(schema *expression.Schema) bool {
cols := make([]*expression.Column, 0, len(p.ByItems))
for _, item := range p.ByItems {
cols = append(cols, expression.ExtractColumns(item.Expr)...)
}
return len(schema.ColumnsIndices(cols)) > 0
}

// GetCost computes the cost of in memory sort.
func (p *PhysicalSort) GetCost(count float64, schema *expression.Schema) float64 {
if count < 2.0 {
Expand Down Expand Up @@ -1280,14 +1272,36 @@ func (p *PhysicalTopN) getPushedDownTopN(childPlan PhysicalPlan) *PhysicalTopN {
return topN
}

// canPushToIndexPlan checks if this TopN can be pushed to the index side of copTask.
// It can be pushed to the index side when all columns used by ByItems are available from the index side and
// there's no prefix index column.
func (p *PhysicalTopN) canPushToIndexPlan(indexPlan PhysicalPlan, byItemCols []*expression.Column) bool {
schema := indexPlan.Schema()
for _, col := range byItemCols {
pos := schema.ColumnIndex(col)
if pos == -1 {
return false
}
if schema.Columns[pos].IsPrefix {
return false
}
}
return true
}

func (p *PhysicalTopN) attach2Task(tasks ...task) task {
t := tasks[0].copy()
inputCount := t.count()
if copTask, ok := t.(*copTask); ok && p.canPushDown(copTask.getStoreType()) && len(copTask.rootTaskConds) == 0 {
cols := make([]*expression.Column, 0, len(p.ByItems))
for _, item := range p.ByItems {
cols = append(cols, expression.ExtractColumns(item.Expr)...)
}
needPushDown := len(cols) > 0
if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDown(copTask.getStoreType()) && len(copTask.rootTaskConds) == 0 {
// If all columns in topN are from index plan, we push it to index plan, otherwise we finish the index plan and
// push it to table plan.
var pushedDownTopN *PhysicalTopN
if !copTask.indexPlanFinished && p.allColsFromSchema(copTask.indexPlan.Schema()) {
if !copTask.indexPlanFinished && p.canPushToIndexPlan(copTask.indexPlan, cols) {
pushedDownTopN = p.getPushedDownTopN(copTask.indexPlan)
copTask.indexPlan = pushedDownTopN
} else {
Expand All @@ -1296,7 +1310,7 @@ func (p *PhysicalTopN) attach2Task(tasks ...task) task {
copTask.tablePlan = pushedDownTopN
}
copTask.addCost(pushedDownTopN.GetCost(inputCount, false))
} else if mppTask, ok := t.(*mppTask); ok && p.canPushDown(kv.TiFlash) {
} else if mppTask, ok := t.(*mppTask); ok && needPushDown && p.canPushDown(kv.TiFlash) {
pushedDownTopN := p.getPushedDownTopN(mppTask.p)
mppTask.p = pushedDownTopN
}
Expand Down

0 comments on commit bf823e1

Please sign in to comment.