From 8dc9abe5e7a545705e53b4c48bdb5eeecc2ccac8 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 17 Nov 2021 18:26:53 +0800 Subject: [PATCH 01/11] index_merge: enable index_merge used in transaction. Signed-off-by: guo-shaoge --- cmd/explaintest/r/index_merge.result | 89 ++++++++++++ cmd/explaintest/t/index_merge.test | 47 ++++++ executor/builder.go | 8 ++ executor/mem_reader.go | 207 +++++++++++++++++++++++++++ executor/union_scan.go | 2 + planner/core/stats.go | 11 +- 6 files changed, 354 insertions(+), 10 deletions(-) create mode 100644 cmd/explaintest/r/index_merge.result create mode 100644 cmd/explaintest/t/index_merge.test diff --git a/cmd/explaintest/r/index_merge.result b/cmd/explaintest/r/index_merge.result new file mode 100644 index 0000000000000..16b14ceac44c0 --- /dev/null +++ b/cmd/explaintest/r/index_merge.result @@ -0,0 +1,89 @@ +drop table if exists t1; +create table t1(c1 int, c2 int, c3 int, key(c1), key(c2), key(c3)); +begin; + +explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +id estRows task access object operator info +IndexMerge_9 1841.86 root +├─IndexRangeScan_5(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo +├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo +└─Selection_8(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10) + └─TableRowIDScan_7 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo +//// expect empty +select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +c1 c2 c3 + +insert into t1 values(1, 1, 1); +explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +id estRows task access object operator info +Projection_5 1841.86 root test.t1.c1, test.t1.c2, test.t1.c3 +└─UnionScan_6 1841.86 root lt(test.t1.c3, 10), or(lt(test.t1.c1, 10), lt(test.t1.c2, 10)) + └─IndexMerge_11 1841.86 root + ├─IndexRangeScan_7(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo + ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo + └─Selection_10(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10) + └─TableRowIDScan_9 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo +//// expect one row +select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +c1 c2 c3 +1 1 1 + +update t1 set c3 = 100 where c3 = 1; +explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +id estRows task access object operator info +Projection_5 1841.86 root test.t1.c1, test.t1.c2, test.t1.c3 +└─UnionScan_6 1841.86 root lt(test.t1.c3, 10), or(lt(test.t1.c1, 10), lt(test.t1.c2, 10)) + └─IndexMerge_11 1841.86 root + ├─IndexRangeScan_7(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo + ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo + └─Selection_10(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10) + └─TableRowIDScan_9 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo +//// expect empty +select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +c1 c2 c3 +commit; + + +# test partialPlan is TableScan. +drop table if exists t1; +create table t1(c1 int, c2 int, c3 int, primary key(c1), key(c2), key(c3)); +begin; + +explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +id estRows task access object operator info +IndexMerge_9 1106.67 root +├─TableRangeScan_5(Build) 3333.33 cop[tikv] table:t1 range:[-inf,10), keep order:false, stats:pseudo +├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo +└─Selection_8(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10) + └─TableRowIDScan_7 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo +//// expect empty +select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +c1 c2 c3 + +insert into t1 values(1, 1, 1); +explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +id estRows task access object operator info +UnionScan_6 1106.67 root lt(test.t1.c3, 10), or(lt(test.t1.c1, 10), lt(test.t1.c2, 10)) +└─IndexMerge_11 1106.67 root + ├─TableRangeScan_7(Build) 3333.33 cop[tikv] table:t1 range:[-inf,10), keep order:false, stats:pseudo + ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo + └─Selection_10(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10) + └─TableRowIDScan_9 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo +//// expect one row +select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +c1 c2 c3 +1 1 1 + +update t1 set c3 = 100 where c3 = 1; +explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +id estRows task access object operator info +UnionScan_6 1106.67 root lt(test.t1.c3, 10), or(lt(test.t1.c1, 10), lt(test.t1.c2, 10)) +└─IndexMerge_11 1106.67 root + ├─TableRangeScan_7(Build) 3333.33 cop[tikv] table:t1 range:[-inf,10), keep order:false, stats:pseudo + ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo + └─Selection_10(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10) + └─TableRowIDScan_9 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo +//// expect empty +select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +c1 c2 c3 +commit; diff --git a/cmd/explaintest/t/index_merge.test b/cmd/explaintest/t/index_merge.test new file mode 100644 index 0000000000000..f3891e2ad66dd --- /dev/null +++ b/cmd/explaintest/t/index_merge.test @@ -0,0 +1,47 @@ +drop table if exists t1; +create table t1(c1 int, c2 int, c3 int, key(c1), key(c2), key(c3)); + +begin; +--echo +explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +--echo //// expect empty +select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; + +--echo +insert into t1 values(1, 1, 1); +explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +--echo //// expect one row +select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; + +--echo +update t1 set c3 = 100 where c3 = 1; +explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +--echo //// expect empty +select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +commit; + +--echo +--echo +--echo # test partialPlan is TableScan. +drop table if exists t1; +create table t1(c1 int, c2 int, c3 int, primary key(c1), key(c2), key(c3)); + +begin; +--echo +explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +--echo //// expect empty +select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; + +--echo +insert into t1 values(1, 1, 1); +explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +--echo //// expect one row +select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; + +--echo +update t1 set c3 = 100 where c3 = 1; +explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +--echo //// expect empty +select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +commit; + diff --git a/executor/builder.go b/executor/builder.go index e10a2b6b11970..9e331e5e3f9cf 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1085,6 +1085,14 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco us.columns = x.columns us.table = x.table us.virtualColumnIndex = buildVirtualColumnIndex(us.Schema(), us.columns) + case *IndexMergeReaderExecutor: + // IndexMergeReader doesn't care order. + // us.desc = false + // us.usedIndex = nil + us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions) + us.columns = x.columns + us.table = x.table + us.virtualColumnIndex = buildVirtualColumnIndex(us.Schema(), us.columns) default: // The mem table will not be written by sql directly, so we can omit the union scan to avoid err reporting. return originReader diff --git a/executor/mem_reader.go b/executor/mem_reader.go index 83901d35e58b2..fb9cf61d60669 100644 --- a/executor/mem_reader.go +++ b/executor/mem_reader.go @@ -140,6 +140,23 @@ func (m *memIndexReader) decodeIndexKeyValue(key, value []byte, tps []*types.Fie return ds, nil } +// Used when memIndexMergeReader.partialPlans[i] is TableScan. +func (m *memTableReader) getMemRowsHandle() ([]kv.Handle, error) { + rows, err := m.getMemRows() + if err != nil { + return nil, err + } + handles := make([]kv.Handle, 0, len(rows)) + for _, row := range rows { + handle, err := m.handleCols.BuildHandleByDatums(row) + if err != nil { + return nil, err + } + handles = append(handles, handle) + } + return handles, nil +} + type memTableReader struct { ctx sessionctx.Context table *model.TableInfo @@ -152,6 +169,8 @@ type memTableReader struct { colIDs map[int64]int buffer allocBuf pkColIDs []int64 + // Used when extracting handles from row in memTableReader.getMemRowsHandle. + handleCols plannercore.HandleCols } type allocBuf struct { @@ -521,3 +540,191 @@ func (m *memIndexLookUpReader) getMemRows() ([][]types.Datum, error) { return memTblReader.getMemRows() } + +type memIndexMergeReader struct { + ctx sessionctx.Context + index *model.IndexInfo + columns []*model.ColumnInfo + table table.Table + desc bool + conditions []expression.Expression + retFieldTypes []*types.FieldType + indexMergeReader *IndexMergeReaderExecutor + + memIndexReaders []*memIndexReader + memTableReaders []*memTableReader + + // partition mode + partitionMode bool // if it is accessing a partition table + partitionTables []table.PhysicalTable // partition tables to access + partitionKVRanges [][][]kv.KeyRange // kv ranges for these partition tables +} + +func buildMemIndexMergeReader(us *UnionScanExec, indexMergeReader *IndexMergeReaderExecutor) *memIndexMergeReader { + indexCount := len(indexMergeReader.indexes) + indexReaders := make([]*memIndexReader, 0, indexCount) + tableReaders := make([]*memTableReader, 0, indexCount) + for i := 0; i < indexCount; i++ { + if indexMergeReader.indexes[i] == nil { + colIDs, pkColIDs, rd := getColIDAndPkColIDs(indexMergeReader.table, indexMergeReader.columns) + tableReaders = append(tableReaders, &memTableReader{ + ctx: us.ctx, + table: indexMergeReader.table.Meta(), + columns: indexMergeReader.columns, + kvRanges: nil, + conditions: us.conditions, + addedRows: make([][]types.Datum, 0), + retFieldTypes: retTypes(us), + colIDs: colIDs, + pkColIDs: pkColIDs, + buffer: allocBuf{ + handleBytes: make([]byte, 0, 16), + rd: rd, + }, + handleCols: indexMergeReader.handleCols, + }) + indexReaders = append(indexReaders, nil) + } else { + outputOffset := []int{len(indexMergeReader.indexes[i].Columns)} + indexReaders = append(indexReaders, &memIndexReader{ + ctx: us.ctx, + index: indexMergeReader.indexes[i], + table: indexMergeReader.table.Meta(), + kvRanges: nil, + desc: indexMergeReader.descs[i], + retFieldTypes: retTypes(us), + outputOffset: outputOffset, + belowHandleCols: us.belowHandleCols, + }) + tableReaders = append(tableReaders, nil) + } + } + + return &memIndexMergeReader{ + ctx: us.ctx, + table: indexMergeReader.table, + columns: indexMergeReader.columns, + conditions: us.conditions, + retFieldTypes: retTypes(us), + indexMergeReader: indexMergeReader, + memIndexReaders: indexReaders, + memTableReaders: tableReaders, + + partitionMode: indexMergeReader.partitionTableMode, + partitionTables: indexMergeReader.prunedPartitions, + partitionKVRanges: indexMergeReader.partitionKeyRanges, + } +} + +func (m *memIndexMergeReader) getMemRows() ([][]types.Datum, error) { + tbls := []table.Table{m.table} + // [partNum][indexNum][rangeNum] + var kvRanges [][][]kv.KeyRange + if m.partitionMode { + tbls = tbls[:0] + for _, p := range m.partitionTables { + tbls = append(tbls, p) + } + kvRanges = m.partitionKVRanges + } else { + kvRanges = append(kvRanges, m.indexMergeReader.keyRanges) + } + + tblKVRanges := make([]kv.KeyRange, 0, 16) + numHandles := 0 + for i, tbl := range tbls { + handles, err := unionHandles(kvRanges[i], m.memIndexReaders, m.memTableReaders) + if err != nil { + return nil, err + } + if len(handles) == 0 { + continue + } + numHandles += len(handles) + tblKVRanges = append(tblKVRanges, distsql.TableHandlesToKVRanges(getPhysicalTableID(tbl), handles)...) + } + + if numHandles == 0 { + return nil, nil + } + colIDs, pkColIDs, rd := getColIDAndPkColIDs(m.table, m.columns) + + memTblReader := &memTableReader{ + ctx: m.ctx, + table: m.table.Meta(), + columns: m.columns, + kvRanges: tblKVRanges, + conditions: m.conditions, + addedRows: make([][]types.Datum, 0, numHandles), + retFieldTypes: m.retFieldTypes, + colIDs: colIDs, + pkColIDs: pkColIDs, + buffer: allocBuf{ + handleBytes: make([]byte, 0, 16), + rd: rd, + }, + } + + return memTblReader.getMemRows() +} + +func getColIDAndPkColIDs(table table.Table, columns []*model.ColumnInfo) (colIDs map[int64]int, pkColIDs []int64, rd *rowcodec.BytesDecoder) { + colIDs = make(map[int64]int, len(columns)) + for i, col := range columns { + colIDs[col.ID] = i + } + + tblInfo := table.Meta() + colInfos := make([]rowcodec.ColInfo, 0, len(columns)) + for i := range columns { + col := columns[i] + colInfos = append(colInfos, rowcodec.ColInfo{ + ID: col.ID, + IsPKHandle: tblInfo.PKIsHandle && mysql.HasPriKeyFlag(col.Flag), + Ft: rowcodec.FieldTypeFromModelColumn(col), + }) + } + pkColIDs = tables.TryGetCommonPkColumnIds(tblInfo) + if len(pkColIDs) == 0 { + pkColIDs = []int64{-1} + } + rd = rowcodec.NewByteDecoder(colInfos, pkColIDs, nil, nil) + return +} + +// Union all handles of different Indexes. +func unionHandles(kvRanges [][]kv.KeyRange, memIndexReaders []*memIndexReader, memTableReaders []*memTableReader) (finalHandles []kv.Handle, err error) { + if len(memIndexReaders) != len(kvRanges) { + return nil, errors.Errorf("len(kvRanges) should be equal to len(memIndexReaders)") + } + if len(memTableReaders) != len(memIndexReaders) { + return nil, errors.Errorf("len(kvRanges) should be equal to len(memIndexReaders)") + } + hMap := kv.NewHandleMap() + + var handles []kv.Handle + for i, reader := range memIndexReaders { + if reader == nil { + reader := memTableReaders[i] + reader.kvRanges = kvRanges[i] + handles, err = reader.getMemRowsHandle() + } else { + reader.kvRanges = kvRanges[i] + handles, err = reader.getMemRowsHandle() + } + if err != nil { + return nil, err + } + if len(handles) == 0 { + continue + } + // Filter same row. + for _, h := range handles { + if _, ok := hMap.Get(h); !ok { + finalHandles = append(finalHandles, h) + hMap.Set(h, true) + } + } + } + return finalHandles, nil +} diff --git a/executor/union_scan.go b/executor/union_scan.go index e08e90249375c..92bcd714073f3 100644 --- a/executor/union_scan.go +++ b/executor/union_scan.go @@ -98,6 +98,8 @@ func (us *UnionScanExec) open(ctx context.Context) error { us.addedRows, err = buildMemIndexReader(us, x).getMemRows() case *IndexLookUpExecutor: us.addedRows, err = buildMemIndexLookUpReader(us, x).getMemRows() + case *IndexMergeReaderExecutor: + us.addedRows, err = buildMemIndexMergeReader(us, x).getMemRows() default: err = fmt.Errorf("unexpected union scan children:%T", reader) } diff --git a/planner/core/stats.go b/planner/core/stats.go index ef2d8c258feae..c735edf5835a2 100644 --- a/planner/core/stats.go +++ b/planner/core/stats.go @@ -409,15 +409,6 @@ func (ds *DataSource) DeriveStats(childStats []*property.StatsInfo, selfSchema * return nil, err } - // TODO: implement UnionScan + IndexMerge - isReadOnlyTxn := true - txn, err := ds.ctx.Txn(false) - if err != nil { - return nil, err - } - if txn.Valid() && !txn.IsReadOnly() { - isReadOnlyTxn = false - } // Consider the IndexMergePath. Now, we just generate `IndexMergePath` in DNF case. isPossibleIdxMerge := len(ds.pushedDownConds) > 0 && len(ds.possibleAccessPaths) > 1 sessionAndStmtPermission := (ds.ctx.GetSessionVars().GetEnableIndexMerge() || len(ds.indexMergeHints) > 0) && !ds.ctx.GetSessionVars().StmtCtx.NoIndexMergeHint @@ -432,7 +423,7 @@ func (ds *DataSource) DeriveStats(childStats []*property.StatsInfo, selfSchema * } } cond, _ := ds.ctx.GetSessionVars().StmtCtx.GetCacheTable(ds.tableInfo.ID) - if isPossibleIdxMerge && sessionAndStmtPermission && needConsiderIndexMerge && isReadOnlyTxn && ds.tableInfo.TempTableType != model.TempTableLocal && !cond { + if isPossibleIdxMerge && sessionAndStmtPermission && needConsiderIndexMerge && ds.tableInfo.TempTableType != model.TempTableLocal && !cond { err := ds.generateAndPruneIndexMergePath(ds.indexMergeHints != nil) if err != nil { return nil, err From 63a0545928fcb80b1fb59febef3a2b4acc980f21 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 17 Nov 2021 18:40:06 +0800 Subject: [PATCH 02/11] trivial change Signed-off-by: guo-shaoge --- cmd/explaintest/r/index_merge.result | 27 +++++++++++++++ cmd/explaintest/t/index_merge.test | 12 +++++++ executor/mem_reader.go | 49 +++++++++++++--------------- 3 files changed, 62 insertions(+), 26 deletions(-) diff --git a/cmd/explaintest/r/index_merge.result b/cmd/explaintest/r/index_merge.result index 16b14ceac44c0..6233aba59afab 100644 --- a/cmd/explaintest/r/index_merge.result +++ b/cmd/explaintest/r/index_merge.result @@ -41,6 +41,20 @@ Projection_5 1841.86 root test.t1.c1, test.t1.c2, test.t1.c3 //// expect empty select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; c1 c2 c3 + +delete from t1; +explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +id estRows task access object operator info +Projection_5 1841.86 root test.t1.c1, test.t1.c2, test.t1.c3 +└─UnionScan_6 1841.86 root lt(test.t1.c3, 10), or(lt(test.t1.c1, 10), lt(test.t1.c2, 10)) + └─IndexMerge_11 1841.86 root + ├─IndexRangeScan_7(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo + ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo + └─Selection_10(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10) + └─TableRowIDScan_9 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo +//// expect empty +select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +c1 c2 c3 commit; @@ -86,4 +100,17 @@ UnionScan_6 1106.67 root lt(test.t1.c3, 10), or(lt(test.t1.c1, 10), lt(test.t1. //// expect empty select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; c1 c2 c3 + +delete from t1; +explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +id estRows task access object operator info +UnionScan_6 1106.67 root lt(test.t1.c3, 10), or(lt(test.t1.c1, 10), lt(test.t1.c2, 10)) +└─IndexMerge_11 1106.67 root + ├─TableRangeScan_7(Build) 3333.33 cop[tikv] table:t1 range:[-inf,10), keep order:false, stats:pseudo + ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo + └─Selection_10(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10) + └─TableRowIDScan_9 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo +//// expect empty +select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +c1 c2 c3 commit; diff --git a/cmd/explaintest/t/index_merge.test b/cmd/explaintest/t/index_merge.test index f3891e2ad66dd..784eabe96f696 100644 --- a/cmd/explaintest/t/index_merge.test +++ b/cmd/explaintest/t/index_merge.test @@ -18,6 +18,12 @@ update t1 set c3 = 100 where c3 = 1; explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; --echo //// expect empty select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; + +--echo +delete from t1; +explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +--echo //// expect empty +select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; commit; --echo @@ -43,5 +49,11 @@ update t1 set c3 = 100 where c3 = 1; explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; --echo //// expect empty select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; + +--echo +delete from t1; +explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; +--echo //// expect empty +select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; commit; diff --git a/executor/mem_reader.go b/executor/mem_reader.go index fb9cf61d60669..69f3753ca9d58 100644 --- a/executor/mem_reader.go +++ b/executor/mem_reader.go @@ -140,23 +140,6 @@ func (m *memIndexReader) decodeIndexKeyValue(key, value []byte, tps []*types.Fie return ds, nil } -// Used when memIndexMergeReader.partialPlans[i] is TableScan. -func (m *memTableReader) getMemRowsHandle() ([]kv.Handle, error) { - rows, err := m.getMemRows() - if err != nil { - return nil, err - } - handles := make([]kv.Handle, 0, len(rows)) - for _, row := range rows { - handle, err := m.handleCols.BuildHandleByDatums(row) - if err != nil { - return nil, err - } - handles = append(handles, handle) - } - return handles, nil -} - type memTableReader struct { ctx sessionctx.Context table *model.TableInfo @@ -328,6 +311,23 @@ func (m *memTableReader) getRowData(handle kv.Handle, value []byte) ([][]byte, e return values, nil } +// Used when memIndexMergeReader.partialPlans[i] is TableScan. +func (m *memTableReader) getMemRowsHandle() ([]kv.Handle, error) { + rows, err := m.getMemRows() + if err != nil { + return nil, err + } + handles := make([]kv.Handle, 0, len(rows)) + for _, row := range rows { + handle, err := m.handleCols.BuildHandleByDatums(row) + if err != nil { + return nil, err + } + handles = append(handles, handle) + } + return handles, nil +} + func hasColVal(data [][]byte, colIDs map[int64]int, id int64) bool { offset, ok := colIDs[id] if ok && data[offset] != nil { @@ -668,8 +668,8 @@ func (m *memIndexMergeReader) getMemRows() ([][]types.Datum, error) { return memTblReader.getMemRows() } -func getColIDAndPkColIDs(table table.Table, columns []*model.ColumnInfo) (colIDs map[int64]int, pkColIDs []int64, rd *rowcodec.BytesDecoder) { - colIDs = make(map[int64]int, len(columns)) +func getColIDAndPkColIDs(table table.Table, columns []*model.ColumnInfo) (map[int64]int, []int64, *rowcodec.BytesDecoder) { + colIDs := make(map[int64]int, len(columns)) for i, col := range columns { colIDs[col.ID] = i } @@ -684,12 +684,12 @@ func getColIDAndPkColIDs(table table.Table, columns []*model.ColumnInfo) (colIDs Ft: rowcodec.FieldTypeFromModelColumn(col), }) } - pkColIDs = tables.TryGetCommonPkColumnIds(tblInfo) + pkColIDs := tables.TryGetCommonPkColumnIds(tblInfo) if len(pkColIDs) == 0 { pkColIDs = []int64{-1} } - rd = rowcodec.NewByteDecoder(colInfos, pkColIDs, nil, nil) - return + rd := rowcodec.NewByteDecoder(colInfos, pkColIDs, nil, nil) + return colIDs, pkColIDs, rd } // Union all handles of different Indexes. @@ -700,8 +700,8 @@ func unionHandles(kvRanges [][]kv.KeyRange, memIndexReaders []*memIndexReader, m if len(memTableReaders) != len(memIndexReaders) { return nil, errors.Errorf("len(kvRanges) should be equal to len(memIndexReaders)") } - hMap := kv.NewHandleMap() + hMap := kv.NewHandleMap() var handles []kv.Handle for i, reader := range memIndexReaders { if reader == nil { @@ -715,9 +715,6 @@ func unionHandles(kvRanges [][]kv.KeyRange, memIndexReaders []*memIndexReader, m if err != nil { return nil, err } - if len(handles) == 0 { - continue - } // Filter same row. for _, h := range handles { if _, ok := hMap.Get(h); !ok { From 5e356153676fd8020e576336616c4ae991c2a3a1 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 17 Nov 2021 19:36:45 +0800 Subject: [PATCH 03/11] trivial change Signed-off-by: guo-shaoge --- executor/mem_reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/mem_reader.go b/executor/mem_reader.go index 06df274415d91..11a98b433aa16 100644 --- a/executor/mem_reader.go +++ b/executor/mem_reader.go @@ -157,7 +157,7 @@ type memTableReader struct { // Used when extracting handles from row in memTableReader.getMemRowsHandle. handleCols plannercore.HandleCols - cacheTable kv.MemBuffer + cacheTable kv.MemBuffer } type allocBuf struct { From 6077031dfa69221432fd818daac6912304db3d93 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 17 Nov 2021 20:14:41 +0800 Subject: [PATCH 04/11] trivial change Signed-off-by: guo-shaoge --- executor/mem_reader.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/executor/mem_reader.go b/executor/mem_reader.go index 11a98b433aa16..d9eea0402173b 100644 --- a/executor/mem_reader.go +++ b/executor/mem_reader.go @@ -548,10 +548,8 @@ func (m *memIndexLookUpReader) getMemRows() ([][]types.Datum, error) { type memIndexMergeReader struct { ctx sessionctx.Context - index *model.IndexInfo columns []*model.ColumnInfo table table.Table - desc bool conditions []expression.Expression retFieldTypes []*types.FieldType indexMergeReader *IndexMergeReaderExecutor From 68b5e0658c103e524b9879f6ed62fbd8afa8f417 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Thu, 18 Nov 2021 14:48:37 +0800 Subject: [PATCH 05/11] fix by comments Signed-off-by: guo-shaoge --- cmd/explaintest/r/index_merge.result | 116 --------------------------- cmd/explaintest/t/index_merge.test | 59 -------------- executor/index_merge_reader_test.go | 72 +++++++++++++++++ executor/mem_reader.go | 7 +- 4 files changed, 75 insertions(+), 179 deletions(-) delete mode 100644 cmd/explaintest/r/index_merge.result delete mode 100644 cmd/explaintest/t/index_merge.test diff --git a/cmd/explaintest/r/index_merge.result b/cmd/explaintest/r/index_merge.result deleted file mode 100644 index 6233aba59afab..0000000000000 --- a/cmd/explaintest/r/index_merge.result +++ /dev/null @@ -1,116 +0,0 @@ -drop table if exists t1; -create table t1(c1 int, c2 int, c3 int, key(c1), key(c2), key(c3)); -begin; - -explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; -id estRows task access object operator info -IndexMerge_9 1841.86 root -├─IndexRangeScan_5(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo -├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo -└─Selection_8(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10) - └─TableRowIDScan_7 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo -//// expect empty -select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; -c1 c2 c3 - -insert into t1 values(1, 1, 1); -explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; -id estRows task access object operator info -Projection_5 1841.86 root test.t1.c1, test.t1.c2, test.t1.c3 -└─UnionScan_6 1841.86 root lt(test.t1.c3, 10), or(lt(test.t1.c1, 10), lt(test.t1.c2, 10)) - └─IndexMerge_11 1841.86 root - ├─IndexRangeScan_7(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo - ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo - └─Selection_10(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10) - └─TableRowIDScan_9 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo -//// expect one row -select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; -c1 c2 c3 -1 1 1 - -update t1 set c3 = 100 where c3 = 1; -explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; -id estRows task access object operator info -Projection_5 1841.86 root test.t1.c1, test.t1.c2, test.t1.c3 -└─UnionScan_6 1841.86 root lt(test.t1.c3, 10), or(lt(test.t1.c1, 10), lt(test.t1.c2, 10)) - └─IndexMerge_11 1841.86 root - ├─IndexRangeScan_7(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo - ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo - └─Selection_10(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10) - └─TableRowIDScan_9 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo -//// expect empty -select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; -c1 c2 c3 - -delete from t1; -explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; -id estRows task access object operator info -Projection_5 1841.86 root test.t1.c1, test.t1.c2, test.t1.c3 -└─UnionScan_6 1841.86 root lt(test.t1.c3, 10), or(lt(test.t1.c1, 10), lt(test.t1.c2, 10)) - └─IndexMerge_11 1841.86 root - ├─IndexRangeScan_7(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo - ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo - └─Selection_10(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10) - └─TableRowIDScan_9 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo -//// expect empty -select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; -c1 c2 c3 -commit; - - -# test partialPlan is TableScan. -drop table if exists t1; -create table t1(c1 int, c2 int, c3 int, primary key(c1), key(c2), key(c3)); -begin; - -explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; -id estRows task access object operator info -IndexMerge_9 1106.67 root -├─TableRangeScan_5(Build) 3333.33 cop[tikv] table:t1 range:[-inf,10), keep order:false, stats:pseudo -├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo -└─Selection_8(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10) - └─TableRowIDScan_7 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo -//// expect empty -select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; -c1 c2 c3 - -insert into t1 values(1, 1, 1); -explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; -id estRows task access object operator info -UnionScan_6 1106.67 root lt(test.t1.c3, 10), or(lt(test.t1.c1, 10), lt(test.t1.c2, 10)) -└─IndexMerge_11 1106.67 root - ├─TableRangeScan_7(Build) 3333.33 cop[tikv] table:t1 range:[-inf,10), keep order:false, stats:pseudo - ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo - └─Selection_10(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10) - └─TableRowIDScan_9 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo -//// expect one row -select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; -c1 c2 c3 -1 1 1 - -update t1 set c3 = 100 where c3 = 1; -explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; -id estRows task access object operator info -UnionScan_6 1106.67 root lt(test.t1.c3, 10), or(lt(test.t1.c1, 10), lt(test.t1.c2, 10)) -└─IndexMerge_11 1106.67 root - ├─TableRangeScan_7(Build) 3333.33 cop[tikv] table:t1 range:[-inf,10), keep order:false, stats:pseudo - ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo - └─Selection_10(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10) - └─TableRowIDScan_9 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo -//// expect empty -select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; -c1 c2 c3 - -delete from t1; -explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; -id estRows task access object operator info -UnionScan_6 1106.67 root lt(test.t1.c3, 10), or(lt(test.t1.c1, 10), lt(test.t1.c2, 10)) -└─IndexMerge_11 1106.67 root - ├─TableRangeScan_7(Build) 3333.33 cop[tikv] table:t1 range:[-inf,10), keep order:false, stats:pseudo - ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo - └─Selection_10(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10) - └─TableRowIDScan_9 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo -//// expect empty -select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; -c1 c2 c3 -commit; diff --git a/cmd/explaintest/t/index_merge.test b/cmd/explaintest/t/index_merge.test deleted file mode 100644 index 784eabe96f696..0000000000000 --- a/cmd/explaintest/t/index_merge.test +++ /dev/null @@ -1,59 +0,0 @@ -drop table if exists t1; -create table t1(c1 int, c2 int, c3 int, key(c1), key(c2), key(c3)); - -begin; ---echo -explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; ---echo //// expect empty -select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; - ---echo -insert into t1 values(1, 1, 1); -explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; ---echo //// expect one row -select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; - ---echo -update t1 set c3 = 100 where c3 = 1; -explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; ---echo //// expect empty -select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; - ---echo -delete from t1; -explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; ---echo //// expect empty -select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; -commit; - ---echo ---echo ---echo # test partialPlan is TableScan. -drop table if exists t1; -create table t1(c1 int, c2 int, c3 int, primary key(c1), key(c2), key(c3)); - -begin; ---echo -explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; ---echo //// expect empty -select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; - ---echo -insert into t1 values(1, 1, 1); -explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; ---echo //// expect one row -select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; - ---echo -update t1 set c3 = 100 where c3 = 1; -explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; ---echo //// expect empty -select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; - ---echo -delete from t1; -explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; ---echo //// expect empty -select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10; -commit; - diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index 51620b861a94a..b3bcdfe605701 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -170,3 +170,75 @@ func (s *testSuite1) TestPartitionTableRandomIndexMerge(c *C) { tk.MustQuery("select /*+ USE_INDEX_MERGE(tpk, a, b) */ * from tpk where " + cond).Sort().Check(result) } } + +func (s *testSuite1) TestIndexMergeInTransaction(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t1;") + tk.MustExec("create table t1(c1 int, c2 int, c3 int, pk int, key(c1), key(c2), key(c3), primary key(pk));") + tk.MustExec("begin;") + // Expect two IndexScan(c1, c2). + tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows( + "IndexMerge_9 1841.86 root ", + "├─IndexRangeScan_5(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo", + "├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", + "└─Selection_8(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10)", + " └─TableRowIDScan_7 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo")) + // Expect one IndexScan(c2) and one TableScan(pk). + tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows( + "IndexMerge_9 1106.67 root ", + "├─TableRangeScan_5(Build) 3333.33 cop[tikv] table:t1 range:[-inf,10), keep order:false, stats:pseudo", + "├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", + "└─Selection_8(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10)", + " └─TableRowIDScan_7 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo")) + + // Test with normal key. + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustExec("insert into t1 values(1, 1, 1, 1);") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) + tk.MustExec("update t1 set c3 = 100 where c3 = 1;") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustExec("delete from t1;") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + + // Test with primary key, so the partialPlan is TableScan. + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustExec("insert into t1 values(1, 1, 1, 1);") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) + tk.MustExec("update t1 set c3 = 100 where c3 = 1;") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustExec("delete from t1;") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustExec("commit;") + + // Test partition table. + tk.MustExec("drop table if exists t1;") + tk.MustExec(`create table t1(c1 int, c2 int, c3 int, pk int, part int, key(c1), key(c2), key(c3), primary key(pk, part)) + partition by range(part) ( + partition p0 values less than (10), + partition p1 values less than (20), + partition p2 values less than (maxvalue))`) + tk.MustExec("begin;") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < 20) and c3 < 20;").Check(testkit.Rows()) + + tk.MustExec("insert into t1 values(1, 1, 1, 1, 1);") + tk.MustExec("insert into t1 values(11, 11, 11, 11, 11);") + tk.MustExec("insert into t1 values(21, 21, 21, 21, 21);") + tk.MustExec("insert into t1 values(31, 31, 31, 31, 31);") + res := tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < 20) and c3 < 20;").Sort() + res.Check(testkit.Rows("1 1 1 1 1", "11 11 11 11 11")) + res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < 20) and c3 < 20;").Sort() + res.Check(testkit.Rows("1 1 1 1 1", "11 11 11 11 11")) + + tk.MustExec("update t1 set c3 = 100 where c3 = 1;") + res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < 20) and c3 < 20;") + res.Check(testkit.Rows("11 11 11 11 11")) + res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < 20) and c3 < 20;") + res.Check(testkit.Rows("11 11 11 11 11")) + + tk.MustExec("delete from t1;") + res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < 20) and c3 < 20;") + res.Check(testkit.Rows()) + res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < 20) and c3 < 20;") + res.Check(testkit.Rows()) + tk.MustExec("commit;") +} diff --git a/executor/mem_reader.go b/executor/mem_reader.go index d9eea0402173b..d43a07238c002 100644 --- a/executor/mem_reader.go +++ b/executor/mem_reader.go @@ -154,10 +154,9 @@ type memTableReader struct { colIDs map[int64]int buffer allocBuf pkColIDs []int64 - + cacheTable kv.MemBuffer // Used when extracting handles from row in memTableReader.getMemRowsHandle. handleCols plannercore.HandleCols - cacheTable kv.MemBuffer } type allocBuf struct { @@ -316,7 +315,7 @@ func (m *memTableReader) getRowData(handle kv.Handle, value []byte) ([][]byte, e return values, nil } -// Used when memIndexMergeReader.partialPlans[i] is TableScan. +// getMemRowsHandle is called when memIndexMergeReader.partialPlans[i] is TableScan. func (m *memTableReader) getMemRowsHandle() ([]kv.Handle, error) { rows, err := m.getMemRows() if err != nil { @@ -701,7 +700,7 @@ func unionHandles(kvRanges [][]kv.KeyRange, memIndexReaders []*memIndexReader, m return nil, errors.Errorf("len(kvRanges) should be equal to len(memIndexReaders)") } if len(memTableReaders) != len(memIndexReaders) { - return nil, errors.Errorf("len(kvRanges) should be equal to len(memIndexReaders)") + return nil, errors.Errorf("len(memTableReaders) should be equal to len(memIndexReaders)") } hMap := kv.NewHandleMap() From 0acbb4464a11b7f65afcbb69aff1709b289072f7 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 29 Nov 2021 18:30:59 +0800 Subject: [PATCH 06/11] fix by comment Signed-off-by: guo-shaoge --- executor/builder.go | 4 +-- executor/mem_reader.go | 65 ++++++++++++++++++++++++------------------ 2 files changed, 39 insertions(+), 30 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 9ef4917d0d818..65ac0126ab798 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1089,9 +1089,7 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco us.table = x.table us.virtualColumnIndex = buildVirtualColumnIndex(us.Schema(), us.columns) case *IndexMergeReaderExecutor: - // IndexMergeReader doesn't care order. - // us.desc = false - // us.usedIndex = nil + // IndexMergeReader doesn't care order for now. So we will not set desc and useIndex. us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions) us.columns = x.columns us.table = x.table diff --git a/executor/mem_reader.go b/executor/mem_reader.go index d43a07238c002..99fc4e52fee36 100644 --- a/executor/mem_reader.go +++ b/executor/mem_reader.go @@ -33,6 +33,18 @@ import ( "github.com/pingcap/tidb/util/rowcodec" ) +type memReader interface { + getMemRows() ([][]types.Datum, error) + getMemRowsHandle() ([]kv.Handle, error) +} + +var ( + _ memReader = &memIndexReader{} + _ memReader = &memTableReader{} + _ memReader = &memIndexLookUpReader{} + _ memReader = &memIndexMergeReader{} +) + type memIndexReader struct { ctx sessionctx.Context index *model.IndexInfo @@ -545,6 +557,10 @@ func (m *memIndexLookUpReader) getMemRows() ([][]types.Datum, error) { return memTblReader.getMemRows() } +func (m *memIndexLookUpReader) getMemRowsHandle() ([]kv.Handle, error) { + return nil, errors.New("getMemRowsHandle has not been implemented for memIndexLookUpReader") +} + type memIndexMergeReader struct { ctx sessionctx.Context columns []*model.ColumnInfo @@ -552,9 +568,7 @@ type memIndexMergeReader struct { conditions []expression.Expression retFieldTypes []*types.FieldType indexMergeReader *IndexMergeReaderExecutor - - memIndexReaders []*memIndexReader - memTableReaders []*memTableReader + memReaders []memReader // partition mode partitionMode bool // if it is accessing a partition table @@ -564,12 +578,11 @@ type memIndexMergeReader struct { func buildMemIndexMergeReader(us *UnionScanExec, indexMergeReader *IndexMergeReaderExecutor) *memIndexMergeReader { indexCount := len(indexMergeReader.indexes) - indexReaders := make([]*memIndexReader, 0, indexCount) - tableReaders := make([]*memTableReader, 0, indexCount) + memReaders := make([]memReader, 0, indexCount) for i := 0; i < indexCount; i++ { if indexMergeReader.indexes[i] == nil { colIDs, pkColIDs, rd := getColIDAndPkColIDs(indexMergeReader.table, indexMergeReader.columns) - tableReaders = append(tableReaders, &memTableReader{ + memReaders = append(memReaders, &memTableReader{ ctx: us.ctx, table: indexMergeReader.table.Meta(), columns: indexMergeReader.columns, @@ -585,10 +598,9 @@ func buildMemIndexMergeReader(us *UnionScanExec, indexMergeReader *IndexMergeRea }, handleCols: indexMergeReader.handleCols, }) - indexReaders = append(indexReaders, nil) } else { outputOffset := []int{len(indexMergeReader.indexes[i].Columns)} - indexReaders = append(indexReaders, &memIndexReader{ + memReaders = append(memReaders, &memIndexReader{ ctx: us.ctx, index: indexMergeReader.indexes[i], table: indexMergeReader.table.Meta(), @@ -598,7 +610,6 @@ func buildMemIndexMergeReader(us *UnionScanExec, indexMergeReader *IndexMergeRea outputOffset: outputOffset, belowHandleCols: us.belowHandleCols, }) - tableReaders = append(tableReaders, nil) } } @@ -609,8 +620,7 @@ func buildMemIndexMergeReader(us *UnionScanExec, indexMergeReader *IndexMergeRea conditions: us.conditions, retFieldTypes: retTypes(us), indexMergeReader: indexMergeReader, - memIndexReaders: indexReaders, - memTableReaders: tableReaders, + memReaders: memReaders, partitionMode: indexMergeReader.partitionTableMode, partitionTables: indexMergeReader.prunedPartitions, @@ -635,7 +645,7 @@ func (m *memIndexMergeReader) getMemRows() ([][]types.Datum, error) { tblKVRanges := make([]kv.KeyRange, 0, 16) numHandles := 0 for i, tbl := range tbls { - handles, err := unionHandles(kvRanges[i], m.memIndexReaders, m.memTableReaders) + handles, err := unionHandles(kvRanges[i], m.memReaders) if err != nil { return nil, err } @@ -695,26 +705,23 @@ func getColIDAndPkColIDs(table table.Table, columns []*model.ColumnInfo) (map[in } // Union all handles of different Indexes. -func unionHandles(kvRanges [][]kv.KeyRange, memIndexReaders []*memIndexReader, memTableReaders []*memTableReader) (finalHandles []kv.Handle, err error) { - if len(memIndexReaders) != len(kvRanges) { - return nil, errors.Errorf("len(kvRanges) should be equal to len(memIndexReaders)") - } - if len(memTableReaders) != len(memIndexReaders) { - return nil, errors.Errorf("len(memTableReaders) should be equal to len(memIndexReaders)") +func unionHandles(kvRanges [][]kv.KeyRange, memReaders []memReader) (finalHandles []kv.Handle, err error) { + if len(memReaders) != len(kvRanges) { + return nil, errors.Errorf("len(kvRanges) should be equal to len(memReaders)") } hMap := kv.NewHandleMap() var handles []kv.Handle - for i, reader := range memIndexReaders { - if reader == nil { - reader := memTableReaders[i] - reader.kvRanges = kvRanges[i] - handles, err = reader.getMemRowsHandle() - } else { - reader.kvRanges = kvRanges[i] - handles, err = reader.getMemRowsHandle() + for i, reader := range memReaders { + switch r := reader.(type) { + case *memTableReader: + r.kvRanges = kvRanges[i] + case *memIndexReader: + r.kvRanges = kvRanges[i] + default: + return nil, errors.New("memReader have to be memTableReader or memIndexReader") } - if err != nil { + if handles, err = reader.getMemRowsHandle(); err != nil { return nil, err } // Filter same row. @@ -727,3 +734,7 @@ func unionHandles(kvRanges [][]kv.KeyRange, memIndexReaders []*memIndexReader, m } return finalHandles, nil } + +func (m *memIndexMergeReader) getMemRowsHandle() ([]kv.Handle, error) { + return nil, errors.New("getMemRowsHandle has not been implemented for memIndexMergeReader") +} From 9ac2683da2c308ece06af07888432884d3bbee8c Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Tue, 7 Dec 2021 06:45:07 +0800 Subject: [PATCH 07/11] fix comment Signed-off-by: guo-shaoge --- executor/index_merge_reader_test.go | 89 ++++++++++++++++++++++------- executor/mem_reader.go | 73 +++++++++-------------- 2 files changed, 94 insertions(+), 68 deletions(-) diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index b3bcdfe605701..586419113a0ee 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -173,41 +173,85 @@ func (s *testSuite1) TestPartitionTableRandomIndexMerge(c *C) { func (s *testSuite1) TestIndexMergeInTransaction(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) + + for i := 0; i < 2; i++ { + tk.MustExec("drop table if exists t1;") + tk.MustExec("create table t1(c1 int, c2 int, c3 int, pk int, key(c1), key(c2), key(c3), primary key(pk));") + if i == 1 { + tk.MustExec("set transaction isolation level read committed;") + } + tk.MustExec("begin;") + // Expect two IndexScan(c1, c2). + tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows( + "IndexMerge_9 1841.86 root ", + "├─IndexRangeScan_5(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo", + "├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", + "└─Selection_8(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10)", + " └─TableRowIDScan_7 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo")) + // Expect one IndexScan(c2) and one TableScan(pk). + tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows( + "IndexMerge_9 1106.67 root ", + "├─TableRangeScan_5(Build) 3333.33 cop[tikv] table:t1 range:[-inf,10), keep order:false, stats:pseudo", + "├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", + "└─Selection_8(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10)", + " └─TableRowIDScan_7 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo")) + + // Test with normal key. + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustExec("insert into t1 values(1, 1, 1, 1);") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) + tk.MustExec("update t1 set c3 = 100 where c3 = 1;") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustExec("delete from t1;") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + + // Test with primary key, so the partialPlan is TableScan. + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustExec("insert into t1 values(1, 1, 1, 1);") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) + tk.MustExec("update t1 set c3 = 100 where c3 = 1;") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustExec("delete from t1;") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustExec("commit;") + } + + // Same with above, but select ... for update. tk.MustExec("drop table if exists t1;") tk.MustExec("create table t1(c1 int, c2 int, c3 int, pk int, key(c1), key(c2), key(c3), primary key(pk));") tk.MustExec("begin;") - // Expect two IndexScan(c1, c2). - tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows( - "IndexMerge_9 1841.86 root ", - "├─IndexRangeScan_5(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo", - "├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", - "└─Selection_8(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10)", - " └─TableRowIDScan_7 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo")) - // Expect one IndexScan(c2) and one TableScan(pk). - tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows( - "IndexMerge_9 1106.67 root ", - "├─TableRangeScan_5(Build) 3333.33 cop[tikv] table:t1 range:[-inf,10), keep order:false, stats:pseudo", - "├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", - "└─Selection_8(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10)", - " └─TableRowIDScan_7 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo")) + tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows( + "SelectLock_6 1841.86 root for update 0", + "└─IndexMerge_11 1841.86 root ", + " ├─IndexRangeScan_7(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo", + " ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", + " └─Selection_10(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10)", + " └─TableRowIDScan_9 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo")) + tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows( + "SelectLock_6 1106.67 root for update 0", + "└─IndexMerge_11 1106.67 root ", + " ├─TableRangeScan_7(Build) 3333.33 cop[tikv] table:t1 range:[-inf,10), keep order:false, stats:pseudo", + " ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", + " └─Selection_10(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10)", + " └─TableRowIDScan_9 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo")) // Test with normal key. - tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows()) tk.MustExec("insert into t1 values(1, 1, 1, 1);") - tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows("1 1 1 1")) tk.MustExec("update t1 set c3 = 100 where c3 = 1;") - tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows()) tk.MustExec("delete from t1;") - tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows()) // Test with primary key, so the partialPlan is TableScan. - tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows()) tk.MustExec("insert into t1 values(1, 1, 1, 1);") - tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows("1 1 1 1")) tk.MustExec("update t1 set c3 = 100 where c3 = 1;") - tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows()) tk.MustExec("delete from t1;") - tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows()) tk.MustExec("commit;") // Test partition table. @@ -241,4 +285,5 @@ func (s *testSuite1) TestIndexMergeInTransaction(c *C) { res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < 20) and c3 < 20;") res.Check(testkit.Rows()) tk.MustExec("commit;") + } diff --git a/executor/mem_reader.go b/executor/mem_reader.go index 99fc4e52fee36..51139e87bc6bd 100644 --- a/executor/mem_reader.go +++ b/executor/mem_reader.go @@ -517,26 +517,7 @@ func (m *memIndexLookUpReader) getMemRows() ([][]types.Datum, error) { return nil, nil } - colIDs := make(map[int64]int, len(m.columns)) - for i, col := range m.columns { - colIDs[col.ID] = i - } - - tblInfo := m.table.Meta() - colInfos := make([]rowcodec.ColInfo, 0, len(m.columns)) - for i := range m.columns { - col := m.columns[i] - colInfos = append(colInfos, rowcodec.ColInfo{ - ID: col.ID, - IsPKHandle: tblInfo.PKIsHandle && mysql.HasPriKeyFlag(col.Flag), - Ft: rowcodec.FieldTypeFromModelColumn(col), - }) - } - pkColIDs := tables.TryGetCommonPkColumnIds(tblInfo) - if len(pkColIDs) == 0 { - pkColIDs = []int64{-1} - } - rd := rowcodec.NewByteDecoder(colInfos, pkColIDs, nil, nil) + colIDs, pkColIDs, rd := getColIDAndPkColIDs(m.table, m.columns) memTblReader := &memTableReader{ ctx: m.ctx, table: m.table.Meta(), @@ -645,7 +626,7 @@ func (m *memIndexMergeReader) getMemRows() ([][]types.Datum, error) { tblKVRanges := make([]kv.KeyRange, 0, 16) numHandles := 0 for i, tbl := range tbls { - handles, err := unionHandles(kvRanges[i], m.memReaders) + handles, err := m.unionHandles(kvRanges[i], m.memReaders) if err != nil { return nil, err } @@ -680,32 +661,8 @@ func (m *memIndexMergeReader) getMemRows() ([][]types.Datum, error) { return memTblReader.getMemRows() } -func getColIDAndPkColIDs(table table.Table, columns []*model.ColumnInfo) (map[int64]int, []int64, *rowcodec.BytesDecoder) { - colIDs := make(map[int64]int, len(columns)) - for i, col := range columns { - colIDs[col.ID] = i - } - - tblInfo := table.Meta() - colInfos := make([]rowcodec.ColInfo, 0, len(columns)) - for i := range columns { - col := columns[i] - colInfos = append(colInfos, rowcodec.ColInfo{ - ID: col.ID, - IsPKHandle: tblInfo.PKIsHandle && mysql.HasPriKeyFlag(col.Flag), - Ft: rowcodec.FieldTypeFromModelColumn(col), - }) - } - pkColIDs := tables.TryGetCommonPkColumnIds(tblInfo) - if len(pkColIDs) == 0 { - pkColIDs = []int64{-1} - } - rd := rowcodec.NewByteDecoder(colInfos, pkColIDs, nil, nil) - return colIDs, pkColIDs, rd -} - // Union all handles of different Indexes. -func unionHandles(kvRanges [][]kv.KeyRange, memReaders []memReader) (finalHandles []kv.Handle, err error) { +func (m *memIndexMergeReader) unionHandles(kvRanges [][]kv.KeyRange, memReaders []memReader) (finalHandles []kv.Handle, err error) { if len(memReaders) != len(kvRanges) { return nil, errors.Errorf("len(kvRanges) should be equal to len(memReaders)") } @@ -738,3 +695,27 @@ func unionHandles(kvRanges [][]kv.KeyRange, memReaders []memReader) (finalHandle func (m *memIndexMergeReader) getMemRowsHandle() ([]kv.Handle, error) { return nil, errors.New("getMemRowsHandle has not been implemented for memIndexMergeReader") } + +func getColIDAndPkColIDs(table table.Table, columns []*model.ColumnInfo) (map[int64]int, []int64, *rowcodec.BytesDecoder) { + colIDs := make(map[int64]int, len(columns)) + for i, col := range columns { + colIDs[col.ID] = i + } + + tblInfo := table.Meta() + colInfos := make([]rowcodec.ColInfo, 0, len(columns)) + for i := range columns { + col := columns[i] + colInfos = append(colInfos, rowcodec.ColInfo{ + ID: col.ID, + IsPKHandle: tblInfo.PKIsHandle && mysql.HasPriKeyFlag(col.Flag), + Ft: rowcodec.FieldTypeFromModelColumn(col), + }) + } + pkColIDs := tables.TryGetCommonPkColumnIds(tblInfo) + if len(pkColIDs) == 0 { + pkColIDs = []int64{-1} + } + rd := rowcodec.NewByteDecoder(colInfos, pkColIDs, nil, nil) + return colIDs, pkColIDs, rd +} From 1b6df1b47580f276a3aad72e19550a75365c6835 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Tue, 7 Dec 2021 07:01:48 +0800 Subject: [PATCH 08/11] fix case Signed-off-by: guo-shaoge --- executor/index_merge_reader_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index 18896cf41f038..3278380097409 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -216,6 +216,9 @@ func (s *testSuite1) TestIndexMergeInTransaction(c *C) { tk.MustExec("delete from t1;") tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) tk.MustExec("commit;") + if i == 1 { + tk.MustExec("set transaction isolation level repeatable read;") + } } // Same with above, but select ... for update. From 0f480cb7dfdee8da6f37e31c274d374f914a9953 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Tue, 7 Dec 2021 07:33:08 +0800 Subject: [PATCH 09/11] fix comment Signed-off-by: guo-shaoge --- executor/mem_reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/mem_reader.go b/executor/mem_reader.go index 51139e87bc6bd..1bee0c3a79fb1 100644 --- a/executor/mem_reader.go +++ b/executor/mem_reader.go @@ -662,7 +662,7 @@ func (m *memIndexMergeReader) getMemRows() ([][]types.Datum, error) { } // Union all handles of different Indexes. -func (m *memIndexMergeReader) unionHandles(kvRanges [][]kv.KeyRange, memReaders []memReader) (finalHandles []kv.Handle, err error) { +func (_ *memIndexMergeReader) unionHandles(kvRanges [][]kv.KeyRange, memReaders []memReader) (finalHandles []kv.Handle, err error) { if len(memReaders) != len(kvRanges) { return nil, errors.Errorf("len(kvRanges) should be equal to len(memReaders)") } From 120c369348d1e27aff6da2de7108456d97f70c5a Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Tue, 7 Dec 2021 07:49:40 +0800 Subject: [PATCH 10/11] fix comment Signed-off-by: guo-shaoge --- executor/mem_reader.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/executor/mem_reader.go b/executor/mem_reader.go index 1bee0c3a79fb1..d345d11670c6e 100644 --- a/executor/mem_reader.go +++ b/executor/mem_reader.go @@ -626,7 +626,7 @@ func (m *memIndexMergeReader) getMemRows() ([][]types.Datum, error) { tblKVRanges := make([]kv.KeyRange, 0, 16) numHandles := 0 for i, tbl := range tbls { - handles, err := m.unionHandles(kvRanges[i], m.memReaders) + handles, err := m.unionHandles(kvRanges[i]) if err != nil { return nil, err } @@ -662,14 +662,14 @@ func (m *memIndexMergeReader) getMemRows() ([][]types.Datum, error) { } // Union all handles of different Indexes. -func (_ *memIndexMergeReader) unionHandles(kvRanges [][]kv.KeyRange, memReaders []memReader) (finalHandles []kv.Handle, err error) { - if len(memReaders) != len(kvRanges) { +func (m *memIndexMergeReader) unionHandles(kvRanges [][]kv.KeyRange) (finalHandles []kv.Handle, err error) { + if len(m.memReaders) != len(kvRanges) { return nil, errors.Errorf("len(kvRanges) should be equal to len(memReaders)") } hMap := kv.NewHandleMap() var handles []kv.Handle - for i, reader := range memReaders { + for i, reader := range m.memReaders { switch r := reader.(type) { case *memTableReader: r.kvRanges = kvRanges[i] From 6bded2d2fa8474a1670f542ccb832cd35ec437aa Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Tue, 7 Dec 2021 08:03:49 +0800 Subject: [PATCH 11/11] fix case Signed-off-by: guo-shaoge --- executor/index_merge_reader_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index 3278380097409..7fc2ac15e9473 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -180,7 +180,7 @@ func (s *testSuite1) TestIndexMergeInTransaction(c *C) { tk.MustExec("drop table if exists t1;") tk.MustExec("create table t1(c1 int, c2 int, c3 int, pk int, key(c1), key(c2), key(c3), primary key(pk));") if i == 1 { - tk.MustExec("set transaction isolation level read committed;") + tk.MustExec("set tx_isolation = 'READ-COMMITTED';") } tk.MustExec("begin;") // Expect two IndexScan(c1, c2). @@ -217,7 +217,7 @@ func (s *testSuite1) TestIndexMergeInTransaction(c *C) { tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) tk.MustExec("commit;") if i == 1 { - tk.MustExec("set transaction isolation level repeatable read;") + tk.MustExec("set tx_isolation = 'REPEATABLE-READ';") } }