Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plan: fix a bug when using correlated column as index #7357

Merged
merged 15 commits into from
Aug 24, 2018
19 changes: 19 additions & 0 deletions expression/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,3 +375,22 @@ func IndexInfo2Cols(cols []*Column, index *model.IndexInfo) ([]*Column, []int) {
}
return retCols, lengths
}

// FindColumnsByUniqueIDs will find columns by checking the unique id.
func FindColumnsByUniqueIDs(cols []*Column, ids []int) []*Column {
retCols := make([]*Column, 0, len(ids))
for _, id := range ids {
found := false
for _, col := range cols {
if col.UniqueID == id {
retCols = append(retCols, col)
found = true
break
}
}
if !found {
break
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check can be removed. we can make this function to find all the columns in the unique id list.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we still need this check. In such case, we need to return nil or a full slice?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function caller can check whether there is any column can not be found in cols by:

len(ids) == len(retCols)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another consideration is, this function is only used in the test currently, all the column identified by ids can be found in cols. We can extend this function in the future when we meet more complicated scenarios. For now, I think we'd better to keep it simple.

}
}
return retCols
}
27 changes: 27 additions & 0 deletions expression/simple_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,33 @@ func RewriteSimpleExprWithTableInfo(ctx sessionctx.Context, tbl *model.TableInfo
return rewriter.pop(), nil
}

func ParseSimpleExprsWithSchema(ctx sessionctx.Context, exprStr string, schema *Schema) ([]Expression, error) {
exprStr = "select " + exprStr
stmts, err := parser.New().Parse(exprStr, "", "")
if err != nil {
return nil, errors.Trace(err)
}
fields := stmts[0].(*ast.SelectStmt).Fields.Fields
exprs := make([]Expression, 0, len(fields))
for _, field := range fields {
expr, err := RewriteSimpleExprWithSchema(ctx, field.Expr, schema)
if err != nil {
return nil, errors.Trace(err)
}
exprs = append(exprs, expr)
}
return exprs, nil
}

func RewriteSimpleExprWithSchema(ctx sessionctx.Context, expr ast.ExprNode, schema *Schema) (Expression, error) {
rewriter := &simpleRewriter{ctx: ctx, schema: schema}
expr.Accept(rewriter)
if rewriter.err != nil {
return nil, errors.Trace(rewriter.err)
}
return rewriter.pop(), nil
}

func (sr *simpleRewriter) rewriteColumn(nodeColName *ast.ColumnNameExpr) (*Column, error) {
col := sr.schema.FindColumnByName(nodeColName.Name.Name.L)
if col != nil {
Expand Down
14 changes: 13 additions & 1 deletion plan/cbo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ func (s *testAnalyzeSuite) TestCorrelatedEstimation(c *C) {
store.Close()
}()
tk.MustExec("use test")
tk.MustExec("create table t(a int, b int, c int)")
tk.MustExec("create table t(a int, b int, c int, index idx(c))")
tk.MustExec("insert into t values(1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5), (6,6,6), (7,7,7), (8,8,8), (9,9,9),(10,10,10)")
tk.MustExec("analyze table t")
tk.MustQuery("explain select t.c in (select count(*) from t s , t t1 where s.a = t.a and s.a = t1.a) from t;").
Expand All @@ -640,6 +640,18 @@ func (s *testAnalyzeSuite) TestCorrelatedEstimation(c *C) {
" └─TableReader_27 10.00 root data:TableScan_26",
" └─TableScan_26 10.00 cop table:t1, range:[-inf,+inf], keep order:false",
))
tk.MustQuery("explain select (select concat(t1.a, \",\", t1.b) from t t1 where t1.a=t.a and t1.c=t.c) from t").
Check(testkit.Rows(
"Projection_8 10.00 root concat(t1.a, \",\", t1.b)",
"└─Apply_10 10.00 root left outer join, inner:MaxOneRow_13",
" ├─TableReader_12 10.00 root data:TableScan_11",
" │ └─TableScan_11 10.00 cop table:t, range:[-inf,+inf], keep order:false",
" └─MaxOneRow_13 1.00 root ",
" └─Projection_14 0.80 root concat(cast(t1.a), \",\", cast(t1.b))",
" └─IndexLookUp_21 0.80 root ",
" ├─IndexScan_18 1.00 cop table:t1, index:c, range: decided by [eq(t1.c, test.t.c)], keep order:false",
" └─Selection_20 0.80 cop eq(t1.a, test.t.a)] [ └─TableScan_19 1.00 cop table:t, keep order:false",
))
}

func (s *testAnalyzeSuite) TestInconsistentEstimation(c *C) {
Expand Down
5 changes: 3 additions & 2 deletions plan/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,14 +484,15 @@ func (path *accessPath) splitCorColAccessCondFromFilters() (access, remained []e
for i := path.eqCondCount; i < len(path.idxCols); i++ {
matched := false
for j, filter := range path.tableFilters {
if !isColEqCorColOrConstant(filter, path.idxCols[i]) {
break
if used[j] || !isColEqCorColOrConstant(filter, path.idxCols[i]) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to add a UT for function splitCorColAccessCondFromFilters though we already have integration tests, with UT we can test some corner cases that are not easy to be covered.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function is not big enough and not special enough to add test that tests this method seperately.
This is involved in how should we add test in plan package.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have UT for this function, even without the integration test you added in this PR, we can avoid this bug in the very beginning.

You can see the advantage of unit test through Wikipedia: https://en.wikipedia.org/wiki/Unit_testing#Advantages

continue
}
matched = true
access[i-path.eqCondCount] = filter
if path.idxColLens[i] == types.UnspecifiedLength {
used[j] = true
}
break
}
if !matched {
access = access[:i-path.eqCondCount]
Expand Down
197 changes: 197 additions & 0 deletions plan/logical_plans_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Copyright 2016 PingCAP, Inc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2018

//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package plan

import (
"fmt"

"github.com/juju/errors"
. "github.com/pingcap/check"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/testleak"
)

var _ = Suite(&testUnitTestSuit{})

type testUnitTestSuit struct {
ctx sessionctx.Context
}

func (s *testUnitTestSuit) SetUpSuite(c *C) {
s.ctx = mockContext()
}

func (s *testUnitTestSuit) newTypeWithFlen(typeByte byte, flen int) *types.FieldType {
tp := types.NewFieldType(typeByte)
tp.Flen = flen
return tp
}

func (s *testUnitTestSuit) SubstituteCol2CorCol(expr expression.Expression, colIDs map[int]struct{}) (expression.Expression, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do no need to return error?

switch x := expr.(type) {
case *expression.ScalarFunction:
newArgs := make([]expression.Expression, 0, len(x.GetArgs()))
for _, arg := range x.GetArgs() {
newArg, err := s.SubstituteCol2CorCol(arg, colIDs)
if err != nil {
return nil, errors.Trace(err)
}
newArgs = append(newArgs, newArg)
}
newSf := expression.NewFunctionInternal(x.GetCtx(), x.FuncName.L, x.GetType(), newArgs...)
return newSf, nil
case *expression.Column:
if _, ok := colIDs[x.UniqueID]; ok {
return &expression.CorrelatedColumn{Column: *x}, nil
}
default:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant default? line#66 is enough.

return x, nil
}
return expr, nil
}

func (s *testUnitTestSuit) TestIndexPathSplitCorColCond(c *C) {
defer testleak.AfterTest(c)()
totalSchema := expression.NewSchema()
totalSchema.Append(&expression.Column{
ColName: model.NewCIStr("a"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a better way is to:

s/a/col1/
s/b/col2/
s/c/col3/
s/d/col4/
s/e/col5/

It helps us understanding the test case.

UniqueID: 1,
RetType: types.NewFieldType(mysql.TypeLonglong),
})
totalSchema.Append(&expression.Column{
ColName: model.NewCIStr("b"),
UniqueID: 2,
RetType: types.NewFieldType(mysql.TypeLonglong),
})
totalSchema.Append(&expression.Column{
ColName: model.NewCIStr("c"),
UniqueID: 3,
RetType: s.newTypeWithFlen(mysql.TypeVarchar, 10),
})
totalSchema.Append(&expression.Column{
ColName: model.NewCIStr("d"),
UniqueID: 4,
RetType: s.newTypeWithFlen(mysql.TypeVarchar, 10),
})
totalSchema.Append(&expression.Column{
ColName: model.NewCIStr("e"),
UniqueID: 5,
RetType: types.NewFieldType(mysql.TypeLonglong),
})
testCases := []struct {
expr string
corColIDs []int
idxColIDs []int
idxColLens []int
access string
remained string
}{
{
expr: "a = b",
corColIDs: []int{2},
idxColIDs: []int{1},
idxColLens: []int{types.UnspecifiedLength},
access: "[eq(a, b)]",
remained: "[]",
},
{
expr: "a = e and b = 1",
corColIDs: []int{5},
idxColIDs: []int{1, 2},
idxColLens: []int{types.UnspecifiedLength, types.UnspecifiedLength},
access: "[eq(a, e) eq(b, 1)]",
remained: "[]",
},
{
expr: "a = e and b = 1",
corColIDs: []int{5},
idxColIDs: []int{2, 1},
idxColLens: []int{types.UnspecifiedLength, types.UnspecifiedLength},
access: "[eq(b, 1) eq(a, e)]",
remained: "[]",
},
{
expr: "a = e and b = 1",
corColIDs: []int{5},
idxColIDs: []int{1},
idxColLens: []int{types.UnspecifiedLength},
access: "[eq(a, e)]",
remained: "[eq(b, 1)]",
},
{
expr: "b = 1 and a = e",
corColIDs: []int{5},
idxColIDs: []int{1},
idxColLens: []int{types.UnspecifiedLength},
access: "[eq(a, e)]",
remained: "[eq(b, 1)]",
},
{
expr: "a = b and c = d and e = 1",
corColIDs: []int{2, 4},
idxColIDs: []int{1, 3},
idxColLens: []int{types.UnspecifiedLength, types.UnspecifiedLength},
access: "[eq(a, b) eq(c, d)]",
remained: "[eq(e, 1)]",
},
{
expr: "a = b and c = d and e = 1",
corColIDs: []int{2, 4},
idxColIDs: []int{1, 3},
idxColLens: []int{types.UnspecifiedLength, 2},
access: "[eq(a, b) eq(c, d)]",
remained: "[eq(c, d) eq(e, 1)]",
},
{
expr: `a = e and c = "a" and b = e`,
corColIDs: []int{5},
idxColIDs: []int{1, 2, 3},
idxColLens: []int{types.UnspecifiedLength, types.UnspecifiedLength, types.UnspecifiedLength},
access: "[eq(a, e) eq(b, e) eq(c, a)]",
remained: "[]",
},
}
for _, tt := range testCases {
comment := Commentf("failed at case:\nexpr: %v\ncorColIDs: %v\nidxColIDs: %v\nidxColLens: %v\naccess: %v\nremained: %v\n", tt.expr, tt.corColIDs, tt.idxColIDs, tt.idxColLens, tt.access, tt.remained)
filters, err := expression.ParseSimpleExprsWithSchema(s.ctx, tt.expr, totalSchema)
if sf, ok := filters[0].(*expression.ScalarFunction); ok && sf.FuncName.L == ast.LogicAnd {
filters = expression.FlattenCNFConditions(sf)
}
c.Assert(err, IsNil, comment)
trueFilters := make([]expression.Expression, 0, len(filters))
idMap := make(map[int]struct{})
for _, id := range tt.corColIDs {
idMap[id] = struct{}{}
}
for _, filter := range filters {
trueFilter, err := s.SubstituteCol2CorCol(filter, idMap)
c.Assert(err, IsNil, comment)
trueFilters = append(trueFilters, trueFilter)
}
path := accessPath{
eqCondCount: 0,
tableFilters: trueFilters,
idxCols: expression.FindColumnsByUniqueIDs(totalSchema.Columns, tt.idxColIDs),
idxColLens: tt.idxColLens,
}
access, remained := path.splitCorColAccessCondFromFilters()
c.Assert(fmt.Sprintf("%s", access), Equals, tt.access, comment)
c.Assert(fmt.Sprintf("%s", remained), Equals, tt.remained, comment)
}
}