diff --git a/executor/builder.go b/executor/builder.go index 47663e20e5125..9224dfdd1dd03 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -38,7 +38,6 @@ import ( "github.com/pingcap/tidb/metrics" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" @@ -1086,8 +1085,12 @@ func (b *executorBuilder) buildHashAgg(v *plannercore.PhysicalHashAgg) Executor e.FinalAggFuncs = append(e.FinalAggFuncs, finalAggFunc) if partialAggDesc.Name == ast.AggFuncGroupConcat { // For group_concat, finalAggFunc and partialAggFunc need shared `truncate` flag to do duplicate. - finalAggFunc.(interface{ SetTruncated(t *int32) }).SetTruncated( - partialAggFunc.(interface{ GetTruncated() *int32 }).GetTruncated(), + finalAggFunc.(interface { + SetTruncated(t *int32) + }).SetTruncated( + partialAggFunc.(interface { + GetTruncated() *int32 + }).GetTruncated(), ) } } @@ -1848,25 +1851,25 @@ type dataReaderBuilder struct { *executorBuilder } -func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, datums [][]types.Datum, - IndexRanges []*ranger.Range, keyOff2IdxOff []int) (Executor, error) { +func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, lookUpContents []*indexJoinLookUpContent, + IndexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCompareOps) (Executor, error) { switch v := builder.Plan.(type) { case *plannercore.PhysicalTableReader: - return builder.buildTableReaderForIndexJoin(ctx, v, datums) + return builder.buildTableReaderForIndexJoin(ctx, v, lookUpContents) case *plannercore.PhysicalIndexReader: - return builder.buildIndexReaderForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff) + return builder.buildIndexReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc) case *plannercore.PhysicalIndexLookUpReader: - return builder.buildIndexLookUpReaderForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff) + return builder.buildIndexLookUpReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc) case *plannercore.PhysicalUnionScan: - return builder.buildUnionScanForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff) + return builder.buildUnionScanForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc) } return nil, errors.New("Wrong plan type for dataReaderBuilder") } func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context, v *plannercore.PhysicalUnionScan, - values [][]types.Datum, indexRanges []*ranger.Range, keyOff2IdxOff []int) (Executor, error) { + values []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCompareOps) (Executor, error) { childBuilder := &dataReaderBuilder{v.Children()[0], builder.executorBuilder} - reader, err := childBuilder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff) + reader, err := childBuilder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff, cwc) if err != nil { return nil, err } @@ -1879,14 +1882,14 @@ func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context return us, nil } -func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalTableReader, datums [][]types.Datum) (Executor, error) { +func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalTableReader, lookUpContents []*indexJoinLookUpContent) (Executor, error) { e, err := buildNoRangeTableReader(builder.executorBuilder, v) if err != nil { return nil, errors.Trace(err) } - handles := make([]int64, 0, len(datums)) - for _, datum := range datums { - handles = append(handles, datum[0].GetInt64()) + handles := make([]int64, 0, len(lookUpContents)) + for _, content := range lookUpContents { + handles = append(handles, content.keys[0].GetInt64()) } return builder.buildTableReaderFromHandles(ctx, e, handles) } @@ -1915,12 +1918,12 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex } func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalIndexReader, - values [][]types.Datum, indexRanges []*ranger.Range, keyOff2IdxOff []int) (Executor, error) { + lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCompareOps) (Executor, error) { e, err := buildNoRangeIndexReader(builder.executorBuilder, v) if err != nil { return nil, errors.Trace(err) } - kvRanges, err := buildKvRangesForIndexJoin(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, values, indexRanges, keyOff2IdxOff) + kvRanges, err := buildKvRangesForIndexJoin(e.ctx, e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc) if err != nil { return nil, errors.Trace(err) } @@ -1929,12 +1932,12 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte } func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalIndexLookUpReader, - values [][]types.Datum, indexRanges []*ranger.Range, keyOff2IdxOff []int) (Executor, error) { + lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCompareOps) (Executor, error) { e, err := buildNoRangeIndexLookUpReader(builder.executorBuilder, v) if err != nil { return nil, errors.Trace(err) } - kvRanges, err := buildKvRangesForIndexJoin(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, values, indexRanges, keyOff2IdxOff) + kvRanges, err := buildKvRangesForIndexJoin(e.ctx, e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc) if err != nil { return nil, errors.Trace(err) } @@ -1943,17 +1946,40 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context } // buildKvRangesForIndexJoin builds kv ranges for index join when the inner plan is index scan plan. -func buildKvRangesForIndexJoin(sc *stmtctx.StatementContext, tableID, indexID int64, keyDatums [][]types.Datum, indexRanges []*ranger.Range, keyOff2IdxOff []int) ([]kv.KeyRange, error) { - kvRanges := make([]kv.KeyRange, 0, len(indexRanges)*len(keyDatums)) - for _, val := range keyDatums { - for _, ran := range indexRanges { +func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, lookUpContents []*indexJoinLookUpContent, + ranges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCompareOps) ([]kv.KeyRange, error) { + kvRanges := make([]kv.KeyRange, 0, len(ranges)*len(lookUpContents)) + lastPos := len(ranges[0].LowVal) - 1 + sc := ctx.GetSessionVars().StmtCtx + for _, content := range lookUpContents { + for _, ran := range ranges { for keyOff, idxOff := range keyOff2IdxOff { - ran.LowVal[idxOff] = val[keyOff] - ran.HighVal[idxOff] = val[keyOff] + ran.LowVal[idxOff] = content.keys[keyOff] + ran.HighVal[idxOff] = content.keys[keyOff] } } + if cwc != nil { + nextColRanges, err := cwc.BuildRangesByRow(ctx, content.row) + if err != nil { + return nil, err + } + for _, nextColRan := range nextColRanges { + for _, ran := range ranges { + ran.LowVal[lastPos] = nextColRan.LowVal[0] + ran.LowVal[lastPos] = nextColRan.HighVal[0] + ran.LowExclude = nextColRan.LowExclude + ran.HighExclude = nextColRan.HighExclude + } + tmpKvRanges, err := distsql.IndexRangesToKVRanges(sc, tableID, indexID, ranges, nil) + if err != nil { + return nil, errors.Trace(err) + } + kvRanges = append(kvRanges, tmpKvRanges...) + } + continue + } - tmpKvRanges, err := distsql.IndexRangesToKVRanges(sc, tableID, indexID, indexRanges, nil) + tmpKvRanges, err := distsql.IndexRangesToKVRanges(sc, tableID, indexID, ranges, nil) if err != nil { return nil, errors.Trace(err) } diff --git a/executor/executor_pkg_test.go b/executor/executor_pkg_test.go index b699cee269ca3..1520bb30feff8 100644 --- a/executor/executor_pkg_test.go +++ b/executor/executor_pkg_test.go @@ -14,15 +14,12 @@ package executor import ( - "time" - . "github.com/pingcap/check" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/auth" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/expression" - "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" @@ -128,7 +125,7 @@ func buildSchema(names []string, ftypes []byte) *expression.Schema { return schema } -func (s *testExecSuite) TestBuildKvRangesForIndexJoin(c *C) { +func (s *testExecSuite) TestBuildKvRangesForIndexJoinWithoutCwc(c *C) { indexRanges := make([]*ranger.Range, 0, 6) indexRanges = append(indexRanges, generateIndexRange(1, 1, 1, 1, 1)) indexRanges = append(indexRanges, generateIndexRange(1, 1, 2, 1, 1)) @@ -137,16 +134,16 @@ func (s *testExecSuite) TestBuildKvRangesForIndexJoin(c *C) { indexRanges = append(indexRanges, generateIndexRange(2, 1, 1, 1, 1)) indexRanges = append(indexRanges, generateIndexRange(2, 1, 2, 1, 1)) - joinKeyRows := make([][]types.Datum, 0, 5) - joinKeyRows = append(joinKeyRows, generateDatumSlice(1, 1)) - joinKeyRows = append(joinKeyRows, generateDatumSlice(1, 2)) - joinKeyRows = append(joinKeyRows, generateDatumSlice(2, 1)) - joinKeyRows = append(joinKeyRows, generateDatumSlice(2, 2)) - joinKeyRows = append(joinKeyRows, generateDatumSlice(2, 3)) + joinKeyRows := make([]*indexJoinLookUpContent, 0, 5) + joinKeyRows = append(joinKeyRows, &indexJoinLookUpContent{keys: generateDatumSlice(1, 1)}) + joinKeyRows = append(joinKeyRows, &indexJoinLookUpContent{keys: generateDatumSlice(1, 2)}) + joinKeyRows = append(joinKeyRows, &indexJoinLookUpContent{keys: generateDatumSlice(2, 1)}) + joinKeyRows = append(joinKeyRows, &indexJoinLookUpContent{keys: generateDatumSlice(2, 2)}) + joinKeyRows = append(joinKeyRows, &indexJoinLookUpContent{keys: generateDatumSlice(2, 3)}) keyOff2IdxOff := []int{1, 3} - sc := &stmtctx.StatementContext{TimeZone: time.Local} - kvRanges, err := buildKvRangesForIndexJoin(sc, 0, 0, joinKeyRows, indexRanges, keyOff2IdxOff) + ctx := mock.NewContext() + kvRanges, err := buildKvRangesForIndexJoin(ctx, 0, 0, joinKeyRows, indexRanges, keyOff2IdxOff, nil) c.Assert(err, IsNil) // Check the kvRanges is in order. for i, kvRange := range kvRanges { diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 0a13aff72fe06..7ff3a926ad1ed 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/expression" + plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/types" @@ -67,6 +68,8 @@ type IndexLookUpJoin struct { keyOff2IdxOff []int innerPtrBytes [][]byte + nextColCompareFilters *plannercore.ColWithCompareOps + memTracker *memory.Tracker // track memory usage. } @@ -123,8 +126,9 @@ type innerWorker struct { ctx sessionctx.Context executorChk *chunk.Chunk - indexRanges []*ranger.Range - keyOff2IdxOff []int + indexRanges []*ranger.Range + nextColCompareFilters *plannercore.ColWithCompareOps + keyOff2IdxOff []int } // Open implements the Executor interface. @@ -423,13 +427,18 @@ func (iw *innerWorker) run(ctx context.Context, wg *sync.WaitGroup) { } } +type indexJoinLookUpContent struct { + keys []types.Datum + row chunk.Row +} + func (iw *innerWorker) handleTask(ctx context.Context, task *lookUpJoinTask) error { - dLookUpKeys, err := iw.constructDatumLookupKeys(task) + lookUpContents, err := iw.constructDatumLookupKeys(task) if err != nil { return errors.Trace(err) } - dLookUpKeys = iw.sortAndDedupDatumLookUpKeys(dLookUpKeys) - err = iw.fetchInnerResults(ctx, task, dLookUpKeys) + lookUpContents = iw.sortAndDedupLookUpContents(lookUpContents) + err = iw.fetchInnerResults(ctx, task, lookUpContents) if err != nil { return errors.Trace(err) } @@ -440,8 +449,8 @@ func (iw *innerWorker) handleTask(ctx context.Context, task *lookUpJoinTask) err return nil } -func (iw *innerWorker) constructDatumLookupKeys(task *lookUpJoinTask) ([][]types.Datum, error) { - dLookUpKeys := make([][]types.Datum, 0, task.outerResult.NumRows()) +func (iw *innerWorker) constructDatumLookupKeys(task *lookUpJoinTask) ([]*indexJoinLookUpContent, error) { + lookUpContents := make([]*indexJoinLookUpContent, 0, task.outerResult.NumRows()) keyBuf := make([]byte, 0, 64) for i := 0; i < task.outerResult.NumRows(); i++ { dLookUpKey, err := iw.constructDatumLookupKey(task, i) @@ -460,11 +469,11 @@ func (iw *innerWorker) constructDatumLookupKeys(task *lookUpJoinTask) ([][]types } // Store the encoded lookup key in chunk, so we can use it to lookup the matched inners directly. task.encodedLookUpKeys.AppendBytes(0, keyBuf) - dLookUpKeys = append(dLookUpKeys, dLookUpKey) + lookUpContents = append(lookUpContents, &indexJoinLookUpContent{keys: dLookUpKey, row: task.outerResult.GetRow(i)}) } task.memTracker.Consume(task.encodedLookUpKeys.MemoryUsage()) - return dLookUpKeys, nil + return lookUpContents, nil } func (iw *innerWorker) constructDatumLookupKey(task *lookUpJoinTask, rowIdx int) ([]types.Datum, error) { @@ -496,20 +505,23 @@ func (iw *innerWorker) constructDatumLookupKey(task *lookUpJoinTask, rowIdx int) return dLookupKey, nil } -func (iw *innerWorker) sortAndDedupDatumLookUpKeys(dLookUpKeys [][]types.Datum) [][]types.Datum { - if len(dLookUpKeys) < 2 { - return dLookUpKeys +func (iw *innerWorker) sortAndDedupLookUpContents(lookUpContents []*indexJoinLookUpContent) []*indexJoinLookUpContent { + if len(lookUpContents) < 2 { + return lookUpContents } sc := iw.ctx.GetSessionVars().StmtCtx - sort.Slice(dLookUpKeys, func(i, j int) bool { - cmp := compareRow(sc, dLookUpKeys[i], dLookUpKeys[j]) - return cmp < 0 + sort.Slice(lookUpContents, func(i, j int) bool { + cmp := compareRow(sc, lookUpContents[i].keys, lookUpContents[j].keys) + if cmp != 0 || iw.nextColCompareFilters == nil { + return cmp < 0 + } + return iw.nextColCompareFilters.CompareRow(lookUpContents[i].row, lookUpContents[j].row) < 0 }) - deDupedLookupKeys := dLookUpKeys[:1] - for i := 1; i < len(dLookUpKeys); i++ { - cmp := compareRow(sc, dLookUpKeys[i], dLookUpKeys[i-1]) - if cmp != 0 { - deDupedLookupKeys = append(deDupedLookupKeys, dLookUpKeys[i]) + deDupedLookupKeys := lookUpContents[:1] + for i := 1; i < len(lookUpContents); i++ { + cmp := compareRow(sc, lookUpContents[i].keys, lookUpContents[i-1].keys) + if cmp != 0 || (iw.nextColCompareFilters != nil && iw.nextColCompareFilters.CompareRow(lookUpContents[i].row, lookUpContents[i-1].row) != 0) { + deDupedLookupKeys = append(deDupedLookupKeys, lookUpContents[i]) } } return deDupedLookupKeys @@ -529,8 +541,8 @@ func compareRow(sc *stmtctx.StatementContext, left, right []types.Datum) int { return 0 } -func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTask, dLookUpKeys [][]types.Datum) error { - innerExec, err := iw.readerBuilder.buildExecutorForIndexJoin(ctx, dLookUpKeys, iw.indexRanges, iw.keyOff2IdxOff) +func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTask, lookUpContent []*indexJoinLookUpContent) error { + innerExec, err := iw.readerBuilder.buildExecutorForIndexJoin(ctx, lookUpContent, iw.indexRanges, iw.keyOff2IdxOff, iw.nextColCompareFilters) if err != nil { return errors.Trace(err) } diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 67afd825f923b..b0bd9861d5eb1 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -16,16 +16,17 @@ package core import ( "math" - "github.com/pingcap/errors" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" - "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/planner/property" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/ranger" + log "github.com/sirupsen/logrus" ) func (p *LogicalUnionScan) exhaustPhysicalPlans(prop *property.PhysicalProperty) []PhysicalPlan { @@ -310,7 +311,7 @@ func joinKeysMatchIndex(keys, indexCols []*expression.Column, colLengths []int) // When inner plan is TableReader, the parameter `ranges` will be nil. Because pk only have one column. So all of its range // is generated during execution time. func (p *LogicalJoin) constructIndexJoin(prop *property.PhysicalProperty, innerJoinKeys, outerJoinKeys []*expression.Column, outerIdx int, - innerPlan PhysicalPlan, ranges []*ranger.Range, keyOff2IdxOff []int) []PhysicalPlan { + innerPlan PhysicalPlan, ranges []*ranger.Range, keyOff2IdxOff []int, compareFilters *ColWithCompareOps) []PhysicalPlan { joinType := p.JoinType outerSchema := p.children[outerIdx].Schema() // If the order by columns are not all from outer child, index join cannot promise the order. @@ -345,6 +346,7 @@ func (p *LogicalJoin) constructIndexJoin(prop *property.PhysicalProperty, innerJ innerPlan: innerPlan, KeyOff2IdxOff: newKeyOff, Ranges: ranges, + compareFilters: compareFilters, }.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), chReqProps...) join.SetSchema(p.schema) return []PhysicalPlan{join} @@ -397,7 +399,7 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou innerPlan := p.constructInnerTableScan(ds, pkCol, outerJoinKeys, us) // Since the primary key means one value corresponding to exact one row, this will always be a no worse one // comparing to other index. - return p.constructIndexJoin(prop, innerJoinKeys, outerJoinKeys, outerIdx, innerPlan, nil, keyOff2IdxOff) + return p.constructIndexJoin(prop, innerJoinKeys, outerJoinKeys, outerIdx, innerPlan, nil, keyOff2IdxOff, nil) } } var ( @@ -405,14 +407,19 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou rangesOfBest []*ranger.Range maxUsedCols int remainedOfBest []expression.Expression - keyOff2IdxOff []int + idxOff2KeyOff []int + comparesOfBest *ColWithCompareOps ) for _, path := range ds.possibleAccessPaths { if path.isTablePath { continue } indexInfo := path.index - ranges, remained, tmpKeyOff2IdxOff := p.buildRangeForIndexJoin(indexInfo, ds, innerJoinKeys) + ranges, tmpIdxOff2KeyOff, remained, compareFilters, err := p.analyzeLookUpFilters(indexInfo, ds, innerJoinKeys) + if err != nil { + log.Warnf("[planner]: error happened when build index join: %v", err) + continue + } // We choose the index by the number of used columns of the range, the much the better. // Notice that there may be the cases like `t1.a=t2.a and b > 2 and b < 1`. So ranges can be nil though the conditions are valid. // But obviously when the range is nil, we don't need index join. @@ -421,12 +428,22 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou maxUsedCols = len(ranges[0].LowVal) rangesOfBest = ranges remainedOfBest = remained - keyOff2IdxOff = tmpKeyOff2IdxOff + idxOff2KeyOff = tmpIdxOff2KeyOff + comparesOfBest = compareFilters } } if bestIndexInfo != nil { + keyOff2IdxOff := make([]int, len(innerJoinKeys)) + for i := range keyOff2IdxOff { + keyOff2IdxOff[i] = -1 + } + for idxOff, keyOff := range idxOff2KeyOff { + if keyOff != -1 { + keyOff2IdxOff[keyOff] = idxOff + } + } innerPlan := p.constructInnerIndexScan(ds, bestIndexInfo, remainedOfBest, outerJoinKeys, us) - return p.constructIndexJoin(prop, innerJoinKeys, outerJoinKeys, outerIdx, innerPlan, rangesOfBest, keyOff2IdxOff) + return p.constructIndexJoin(prop, innerJoinKeys, outerJoinKeys, outerIdx, innerPlan, rangesOfBest, keyOff2IdxOff, comparesOfBest) } return nil } @@ -521,78 +538,252 @@ func (p *LogicalJoin) constructInnerIndexScan(ds *DataSource, idx *model.IndexIn return p.constructInnerUnionScan(us, reader) } -// buildRangeForIndexJoin checks whether this index can be used for building index join and return the range if this index is ok. -// If this index is invalid, just return nil range. -func (p *LogicalJoin) buildRangeForIndexJoin(indexInfo *model.IndexInfo, innerPlan *DataSource, innerJoinKeys []*expression.Column) ( - []*ranger.Range, []expression.Expression, []int) { - idxCols, colLengths := expression.IndexInfo2Cols(innerPlan.Schema().Columns, indexInfo) - if len(idxCols) == 0 { - return nil, nil, nil - } +var symmetricOp = map[string]string{ + ast.LT: ast.GT, + ast.GE: ast.LE, + ast.GT: ast.LT, + ast.LE: ast.GE, +} - // Extract the filter to calculate access and the filters that must be remained ones. - access, eqConds, remained, keyOff2IdxOff := p.buildFakeEqCondsForIndexJoin(innerJoinKeys, idxCols, colLengths, innerPlan.pushedDownConds) +type ColWithCompareOps struct { + targetCol *expression.Column + OpType []string + opArg []expression.Expression + tmpConstant []*expression.Constant + affectedColSchema *expression.Schema + compareFuncs []chunk.CompareFunc +} - if len(keyOff2IdxOff) == 0 { - return nil, nil, nil +func (cwc *ColWithCompareOps) appendNewExpr(opName string, arg expression.Expression, affectedCols []*expression.Column) { + cwc.OpType = append(cwc.OpType, opName) + cwc.opArg = append(cwc.opArg, arg) + cwc.tmpConstant = append(cwc.tmpConstant, &expression.Constant{RetType: cwc.targetCol.RetType}) + for _, col := range affectedCols { + if cwc.affectedColSchema.Contains(col) { + continue + } + cwc.compareFuncs = append(cwc.compareFuncs, chunk.GetCompareFunc(col.RetType)) + cwc.affectedColSchema.Append(col) } +} - // In `buildFakeEqCondsForIndexJoin`, we construct the equal conditions for join keys and remove filters that contain the join keys' column. - // When t1.a = t2.a and t1.a > 1, we can also guarantee that t1.a > 1 won't be chosen as the access condition. - // So the equal conditions we built can be successfully used to build a range if they can be used. They won't be affected by the existing filters. - ranges, accesses, moreRemained, _, err := ranger.DetachCondAndBuildRangeForIndex(p.ctx, access, idxCols, colLengths) - if err != nil { - terror.Log(errors.Trace(err)) - return nil, nil, nil +func (cwc *ColWithCompareOps) CompareRow(lhs, rhs chunk.Row) int { + for i, col := range cwc.affectedColSchema.Columns { + ret := cwc.compareFuncs[i](lhs, col.Index, rhs, col.Index) + if ret != 0 { + return ret + } } + return 0 +} - // We should guarantee that all the join's equal condition is used. - for _, eqCond := range eqConds { - if !expression.Contains(accesses, eqCond) { - return nil, nil, nil +func (cwc *ColWithCompareOps) BuildRangesByRow(ctx sessionctx.Context, row chunk.Row) ([]*ranger.Range, error) { + exprs := make([]expression.Expression, len(cwc.OpType)) + for i, opType := range cwc.OpType { + constantArg, err := cwc.opArg[i].Eval(row) + if err != nil { + return nil, err } + cwc.tmpConstant[i].Value = constantArg + newExpr, err := expression.NewFunction(ctx, opType, types.NewFieldType(mysql.TypeTiny), cwc.targetCol, cwc.tmpConstant[i]) + if err != nil { + return nil, err + } + exprs = append(exprs, newExpr) } - - return ranges, append(remained, moreRemained...), keyOff2IdxOff + ranges, err := ranger.BuildColumnRange(exprs, ctx.GetSessionVars().StmtCtx, cwc.targetCol.RetType) + if err != nil { + return nil, err + } + return ranges, nil } -func (p *LogicalJoin) buildFakeEqCondsForIndexJoin(keys, idxCols []*expression.Column, colLengths []int, - innerFilters []expression.Expression) (accesses, eqConds, remained []expression.Expression, keyOff2IdxOff []int) { - // Check whether all join keys match one column from index. - keyOff2IdxOff = joinKeysMatchIndex(keys, idxCols, colLengths) - if keyOff2IdxOff == nil { - return nil, nil, nil, nil +func (cwc *ColWithCompareOps) resolveIndices(schema *expression.Schema) { + for i := range cwc.opArg { + cwc.opArg[i] = cwc.opArg[i].ResolveIndices(schema) } +} - usableKeys := make([]*expression.Column, 0, len(keys)) - - conds := make([]expression.Expression, 0, len(keys)+len(innerFilters)) - eqConds = make([]expression.Expression, 0, len(keys)) - // Construct a fake equal expression for every join key for calculating the range. - for i, key := range keys { - if keyOff2IdxOff[i] < 0 { +func (p *LogicalJoin) analyzeLookUpFilters(indexInfo *model.IndexInfo, innerPlan *DataSource, innerJoinKeys []*expression.Column) ([]*ranger.Range, []int, []expression.Expression, *ColWithCompareOps, error) { + idxCols, colLengths := expression.IndexInfo2Cols(innerPlan.schema.Columns, indexInfo) + if len(idxCols) == 0 { + return nil, nil, nil, nil, nil + } + tmpSchema := expression.NewSchema(innerJoinKeys...) + idxOff2keyOff := make([]int, len(idxCols)) + possibleUsedKeys := make([]*expression.Column, 0, len(idxCols)) + notKeyIdxCols := make([]*expression.Column, 0, len(idxCols)) + notKeyIdxColsLen := make([]int, 0, len(idxCols)) + matchedKeyCnt := 0 + for i, idxCol := range idxCols { + idxOff2keyOff[i] = tmpSchema.ColumnIndex(idxCol) + if idxOff2keyOff[i] >= 0 { + matchedKeyCnt++ + possibleUsedKeys = append(possibleUsedKeys, idxCol) continue } - usableKeys = append(usableKeys, key) - // Int datum 1 can convert to all column's type(numeric type, string type, json, time type, enum, set) safely. - fakeConstant := &expression.Constant{Value: types.NewIntDatum(1), RetType: key.GetType()} - eqFunc := expression.NewFunctionInternal(p.ctx, ast.EQ, types.NewFieldType(mysql.TypeTiny), key, fakeConstant) - conds = append(conds, eqFunc) - eqConds = append(eqConds, eqFunc) + notKeyIdxCols = append(notKeyIdxCols, idxCol) + notKeyIdxColsLen = append(notKeyIdxColsLen, colLengths[i]) } - - // Look into every `innerFilter`, if it contains join keys' column, put this filter into `remained` part directly. - remained = make([]expression.Expression, 0, len(innerFilters)) - for _, filter := range innerFilters { - affectedCols := expression.ExtractColumns(filter) - if expression.ColumnSliceIsIntersect(affectedCols, usableKeys) { - remained = append(remained, filter) + if matchedKeyCnt <= 0 { + return nil, nil, nil, nil, nil + } + keyMatchedLen := len(idxCols) - 1 + for ; keyMatchedLen > 0; keyMatchedLen-- { + if idxOff2keyOff[keyMatchedLen] == -1 { + continue + } + } + remained := make([]expression.Expression, 0, len(innerPlan.pushedDownConds)) + rangeFilterCandidates := make([]expression.Expression, 0, len(innerPlan.pushedDownConds)) + for _, innerFilter := range innerPlan.pushedDownConds { + affectedCols := expression.ExtractColumns(innerFilter) + if expression.ColumnSliceIsIntersect(affectedCols, possibleUsedKeys) { + remained = append(remained, innerFilter) continue } - conds = append(conds, filter) + rangeFilterCandidates = append(rangeFilterCandidates, innerFilter) + } + notKeyEqAndIn, remainedEqAndIn, rangeFilterCandidates, _ := ranger.ExtractEqAndInCondition(p.ctx, rangeFilterCandidates, notKeyIdxCols, notKeyIdxColsLen) + // We hope that the index cols appeared in the join keys can all be used to build range. If it cannot be satisfied, + // we'll mark this index as cannot be used for index join. + if len(notKeyEqAndIn) < keyMatchedLen-matchedKeyCnt { + return nil, nil, nil, nil, nil + } + remained = append(remained, remainedEqAndIn...) + nextColPos := matchedKeyCnt + len(notKeyEqAndIn) + // If all cols have been considered, we can return the current result. + if nextColPos == len(idxCols) { + ranges, err := p.buildTemplateRange(idxOff2keyOff, matchedKeyCnt, notKeyEqAndIn, nil, false) + if err != nil { + return nil, nil, nil, nil, err + } + return ranges, idxOff2keyOff, remained, nil, nil + } + nextCol := idxCols[nextColPos] + nextColCmpFilterManager := &ColWithCompareOps{ + targetCol: nextCol, + affectedColSchema: expression.NewSchema(), + } +loopCandidates: + for _, filter := range rangeFilterCandidates { + sf, ok := filter.(*expression.ScalarFunction) + if !ok || !(sf.FuncName.L == ast.LE || sf.FuncName.L == ast.LT || sf.FuncName.L == ast.GE || sf.FuncName.L == ast.GT) { + continue + } + if lCol, ok := sf.GetArgs()[0].(*expression.Column); ok && lCol.Equal(nil, nextCol) { + affectedCols := expression.ExtractColumns(sf.GetArgs()[1]) + if len(affectedCols) == 0 { + continue + } + for _, col := range affectedCols { + if innerPlan.schema.Contains(col) { + continue loopCandidates + } + } + nextColCmpFilterManager.appendNewExpr(sf.FuncName.L, sf.GetArgs()[1], affectedCols) + } else if rCol, ok := sf.GetArgs()[1].(*expression.Column); ok && rCol.Equal(nil, nextCol) { + affectedCols := expression.ExtractColumns(sf.GetArgs()[0]) + if len(affectedCols) == 0 { + continue + } + for _, col := range affectedCols { + if innerPlan.schema.Contains(col) { + continue loopCandidates + } + } + nextColCmpFilterManager.appendNewExpr(symmetricOp[sf.FuncName.L], sf.GetArgs()[0], affectedCols) + } + } + if len(nextColCmpFilterManager.OpType) == 0 { + colAccesses, colRemained := ranger.DetachCondsForTableRange(p.ctx, rangeFilterCandidates, nextCol) + remained = append(remained, colRemained...) + if colLengths[nextColPos] != types.UnspecifiedLength { + remained = append(remained, colAccesses...) + } + nextColRange, err := ranger.BuildColumnRange(colAccesses, p.ctx.GetSessionVars().StmtCtx, nextCol.RetType) + ranges, err := p.buildTemplateRange(idxOff2keyOff, matchedKeyCnt, notKeyEqAndIn, nextColRange, false) + if err != nil { + return nil, nil, nil, nil, err + } + return ranges, idxOff2keyOff, remained, nil, nil + } + ranges, err := p.buildTemplateRange(idxOff2keyOff, matchedKeyCnt, notKeyEqAndIn, nil, true) + if err != nil { + return nil, nil, nil, nil, err } - return conds, eqConds, remained, keyOff2IdxOff + return ranges, idxOff2keyOff, remained, nextColCmpFilterManager, nil +} + +func (p *LogicalJoin) buildTemplateRange(idxOff2KeyOff []int, matchedKeyCnt int, eqAndInFuncs []expression.Expression, nextColRange []*ranger.Range, haveExtraCol bool) (ranges []*ranger.Range, err error) { + pointLength := matchedKeyCnt + len(eqAndInFuncs) + if nextColRange != nil { + for _, colRan := range nextColRange { + // The range's exclude status is the same with last col's. + ran := &ranger.Range{ + LowVal: make([]types.Datum, pointLength, pointLength+1), + HighVal: make([]types.Datum, pointLength, pointLength+1), + LowExclude: colRan.LowExclude, + HighExclude: colRan.HighExclude, + } + ran.LowVal = append(ran.LowVal, colRan.LowVal[0]) + ran.HighVal = append(ran.HighVal, colRan.HighVal[0]) + ranges = append(ranges, ran) + } + } else if haveExtraCol { + ranges = append(ranges, &ranger.Range{ + LowVal: make([]types.Datum, pointLength+1, pointLength+1), + HighVal: make([]types.Datum, pointLength+1, pointLength+1), + }) + } else { + ranges = append(ranges, &ranger.Range{ + LowVal: make([]types.Datum, pointLength, pointLength), + HighVal: make([]types.Datum, pointLength, pointLength), + }) + } + emptyRow := chunk.Row{} + for i, j := 0, 0; j < len(eqAndInFuncs); i++ { + // This position is occupied by join key. + if idxOff2KeyOff[i] != -1 { + continue + } + sf := eqAndInFuncs[j].(*expression.ScalarFunction) + // Deal with the first two args. + if _, ok := sf.GetArgs()[0].(*expression.Column); ok { + for _, ran := range ranges { + ran.LowVal[i], err = sf.GetArgs()[1].Eval(emptyRow) + if err != nil { + return nil, err + } + ran.HighVal[i] = ran.LowVal[i] + } + } else { + for _, ran := range ranges { + ran.LowVal[i], err = sf.GetArgs()[0].Eval(emptyRow) + if err != nil { + return nil, err + } + ran.HighVal[i] = ran.LowVal[i] + } + } + // If the length of in function's constant list is more than one, we will expand ranges. + curRangeLen := len(ranges) + for argIdx := 2; argIdx < len(sf.GetArgs()); argIdx++ { + newRanges := make([]*ranger.Range, 0, curRangeLen) + for oldRangeIdx := 0; oldRangeIdx < curRangeLen; oldRangeIdx++ { + newRange := ranges[oldRangeIdx].Clone() + newRange.LowVal[i], err = sf.GetArgs()[argIdx].Eval(emptyRow) + if err != nil { + return nil, err + } + newRanges = append(newRanges, newRange) + } + ranges = append(ranges, newRanges...) + } + j++ + } + return ranges, nil } // tryToGetIndexJoin will get index join by hints. If we can generate a valid index join by hint, the second return value diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 4c66330fabdfe..a87c7153facae 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -232,7 +232,8 @@ type PhysicalIndexJoin struct { // Ranges stores the IndexRanges when the inner plan is index scan. Ranges []*ranger.Range // KeyOff2IdxOff maps the offsets in join key to the offsets in the index. - KeyOff2IdxOff []int + KeyOff2IdxOff []int + compareFilters *ColWithCompareOps } // PhysicalMergeJoin represents merge join for inner/ outer join. diff --git a/planner/core/resolve_indices.go b/planner/core/resolve_indices.go index 27562bc21838b..92e4558940abc 100644 --- a/planner/core/resolve_indices.go +++ b/planner/core/resolve_indices.go @@ -125,6 +125,12 @@ func (p *PhysicalIndexJoin) ResolveIndices() { for i, expr := range p.OtherConditions { p.OtherConditions[i] = expr.ResolveIndices(expression.MergeSchema(lSchema, rSchema)) } + if p.compareFilters != nil { + p.compareFilters.resolveIndices(p.children[p.OuterIndex].Schema()) + for i := range p.compareFilters.affectedColSchema.Columns { + p.compareFilters.affectedColSchema.Columns[i] = p.compareFilters.affectedColSchema.Columns[i].ResolveIndices(p.children[p.OuterIndex].Schema()).(*expression.Column) + } + } } // ResolveIndices implements Plan interface. diff --git a/util/ranger/detacher.go b/util/ranger/detacher.go index f020a24b0e446..dbc0557b1e42a 100644 --- a/util/ranger/detacher.go +++ b/util/ranger/detacher.go @@ -145,7 +145,7 @@ func detachCNFCondAndBuildRangeForIndex(sctx sessionctx.Context, conditions []ex err error ) - accessConds, filterConds, newConditions, emptyRange := extractEqAndInCondition(sctx, conditions, cols, lengths) + accessConds, filterConds, newConditions, emptyRange := ExtractEqAndInCondition(sctx, conditions, cols, lengths) if emptyRange { return ranges, nil, nil, 0, nil } @@ -155,8 +155,6 @@ func detachCNFCondAndBuildRangeForIndex(sctx sessionctx.Context, conditions []ex break } } - // We should remove all accessConds, so that they will not be added to filter conditions. - newConditions = removeAccessConditions(newConditions, accessConds) eqOrInCount := len(accessConds) if eqOrInCount == len(cols) { // If curIndex equals to len of index columns, it means the rest conditions haven't been appended to filter conditions. @@ -189,7 +187,13 @@ func detachCNFCondAndBuildRangeForIndex(sctx sessionctx.Context, conditions []ex return ranges, accessConds, filterConds, eqCount, errors.Trace(err) } -func extractEqAndInCondition(sctx sessionctx.Context, conditions []expression.Expression, +// ExtractEqAndInCondition will split the given condition into three parts by the information of index columns and their lengths. +// accesses: The condition will be used to build range. +// filters: filters is the part that some access conditions need to be evaluate again since it's only the prefix part of char column. +// newConditions: We'll simplify the given conditions if there're multiple in conditions or eq conditions on the same column. +// e.g. if there're a in (1, 2, 3) and a in (2, 3, 4). This two will be combined to a in (2, 3) and pushed to newConditions. +// bool: indicate whether there's nil range when merging eq and in conditions. +func ExtractEqAndInCondition(sctx sessionctx.Context, conditions []expression.Expression, cols []*expression.Column, lengths []int) ([]expression.Expression, []expression.Expression, []expression.Expression, bool) { var filters []expression.Expression rb := builder{sc: sctx.GetSessionVars().StmtCtx} @@ -238,6 +242,8 @@ func extractEqAndInCondition(sctx sessionctx.Context, conditions []expression.Ex filters = append(filters, cond) } } + // We should remove all accessConds, so that they will not be added to filter conditions. + newConditions = removeAccessConditions(newConditions, accesses) return accesses, filters, newConditions, false } diff --git a/util/ranger/ranger.go b/util/ranger/ranger.go index 0402209e25bb7..a797a5b0db182 100644 --- a/util/ranger/ranger.go +++ b/util/ranger/ranger.go @@ -474,7 +474,7 @@ func newFieldType(tp *types.FieldType) *types.FieldType { // 1. 'expr' must be either 'EQUAL' or 'IN' function. // 2. 'points' should not be empty. func points2EqOrInCond(ctx sessionctx.Context, points []point, expr expression.Expression) expression.Expression { - // len(points) cannot be 0 here, since we impose early termination in extractEqAndInCondition + // len(points) cannot be 0 here, since we impose early termination in ExtractEqAndInCondition sf, _ := expr.(*expression.ScalarFunction) // Constant and Column args should have same RetType, simply get from first arg retType := sf.GetArgs()[0].GetType()