Skip to content

Commit

Permalink
executor: fix bug when using IndexMerge in transaction (#30719)
Browse files Browse the repository at this point in the history
close #30685
  • Loading branch information
guo-shaoge authored Dec 21, 2021
1 parent 87ab28e commit 66d7673
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 33 deletions.
21 changes: 12 additions & 9 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,21 +139,24 @@ func (e *IndexMergeReaderExecutor) Open(ctx context.Context) (err error) {
}

func (e *IndexMergeReaderExecutor) buildKeyRangesForTable(tbl table.Table) (ranges [][]kv.KeyRange, err error) {
sc := e.ctx.GetSessionVars().StmtCtx
for i, plan := range e.partialPlans {
_, ok := plan[0].(*plannercore.PhysicalIndexScan)
if !ok {
if tbl.Meta().IsCommonHandle {
keyRanges, err := distsql.CommonHandleRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, []int64{getPhysicalTableID(tbl)}, e.ranges[i])
if err != nil {
return nil, err
}
ranges = append(ranges, keyRanges)
} else {
ranges = append(ranges, nil)
firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(e.ranges[i], false, e.descs[i], tbl.Meta().IsCommonHandle)
firstKeyRanges, err := distsql.TableHandleRangesToKVRanges(sc, []int64{getPhysicalTableID(tbl)}, tbl.Meta().IsCommonHandle, firstPartRanges, nil)
if err != nil {
return nil, err
}
secondKeyRanges, err := distsql.TableHandleRangesToKVRanges(sc, []int64{getPhysicalTableID(tbl)}, tbl.Meta().IsCommonHandle, secondPartRanges, nil)
if err != nil {
return nil, err
}
keyRanges := append(firstKeyRanges, secondKeyRanges...)
ranges = append(ranges, keyRanges)
continue
}
keyRange, err := distsql.IndexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(tbl), e.indexes[i].ID, e.ranges[i], e.feedbacks[i])
keyRange, err := distsql.IndexRangesToKVRanges(sc, getPhysicalTableID(tbl), e.indexes[i].ID, e.ranges[i], e.feedbacks[i])
if err != nil {
return nil, err
}
Expand Down
123 changes: 109 additions & 14 deletions executor/index_merge_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,22 +232,31 @@ func (s *testSuite1) TestIndexMergeInTransaction(c *C) {
" └─TableRowIDScan_7 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo"))

// Test with normal key.
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustExec("insert into t1 values(1, 1, 1, 1);")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustExec("update t1 set c3 = 100 where c3 = 1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustExec("delete from t1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())

// Test with primary key, so the partialPlan is TableScan.
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustExec("insert into t1 values(1, 1, 1, 1);")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustExec("update t1 set c3 = 100 where c3 = 1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustExec("delete from t1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())

tk.MustExec("commit;")
if i == 1 {
tk.MustExec("set tx_isolation = 'REPEATABLE-READ';")
Expand Down Expand Up @@ -306,22 +315,108 @@ func (s *testSuite1) TestIndexMergeInTransaction(c *C) {
tk.MustExec("insert into t1 values(11, 11, 11, 11, 11);")
tk.MustExec("insert into t1 values(21, 21, 21, 21, 21);")
tk.MustExec("insert into t1 values(31, 31, 31, 31, 31);")
res := tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < 20) and c3 < 20;").Sort()

res := tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 20) and c3 < 20;").Sort()
res.Check(testkit.Rows("1 1 1 1 1", "11 11 11 11 11"))
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < -1) and c3 < 20;").Sort()
res.Check(testkit.Rows("1 1 1 1 1", "11 11 11 11 11"))

res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 20) and c3 < 20;").Sort()
res.Check(testkit.Rows("1 1 1 1 1", "11 11 11 11 11"))
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < 20) and c3 < 20;").Sort()
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < -1) and c3 < 20;").Sort()
res.Check(testkit.Rows("1 1 1 1 1", "11 11 11 11 11"))

tk.MustExec("update t1 set c3 = 100 where c3 = 1;")
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < 20) and c3 < 20;")
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 20) and c3 < 20;")
res.Check(testkit.Rows("11 11 11 11 11"))
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < 20) and c3 < 20;")
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < -1) and c3 < 20;")
res.Check(testkit.Rows("11 11 11 11 11"))

res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 20) and c3 < 20;")
res.Check(testkit.Rows("11 11 11 11 11"))
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < -1) and c3 < 20;")
res.Check(testkit.Rows("11 11 11 11 11"))

tk.MustExec("delete from t1;")
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < 20) and c3 < 20;")
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 20) and c3 < 20;")
res.Check(testkit.Rows())
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < 20) and c3 < 20;")
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < -1) and c3 < 20;")
res.Check(testkit.Rows())

res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 20) and c3 < 20;")
res.Check(testkit.Rows())
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < -1) and c3 < 20;")
res.Check(testkit.Rows())
tk.MustExec("commit;")
}

func (s *testSuite1) TestIndexMergeReaderInTransIssue30685(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)

// This is a case generated by sqlgen to test if clustered index is ok.
// Detect the bugs in memIndexMergeReader.getMemRowsHandle().
tk.MustExec("drop table if exists t1;")
tk.MustExec(`create table t1 (col_30 decimal default 0 ,
col_31 char(99) collate utf8_bin default 'sVgzHblmYYtEjVg' not null ,
col_37 int unsigned default 377206828 ,
primary key idx_16 ( col_37 ) , key idx_19 ( col_31) ) collate utf8mb4_general_ci ;`)
tk.MustExec("begin;")
tk.MustExec("insert ignore into t1 values (388021, '', 416235653);")
tk.MustQuery("select /*+ use_index_merge( t1 ) */ 1 from t1 where ( t1.col_31 in ( 'OiOXzpCs' , 'oaVv' ) or t1.col_37 <= 4059907010 ) and t1.col_30 ;").Check(testkit.Rows("1"))
tk.MustExec("commit;")

tk.MustExec("drop table if exists tbl_3;")
tk.MustExec(`create table tbl_3 ( col_30 decimal , col_31 char(99) , col_32 smallint ,
col_33 tinyint unsigned not null , col_34 char(209) ,
col_35 char(110) , col_36 int unsigned , col_37 int unsigned ,
col_38 decimal(50,15) not null , col_39 char(104),
primary key ( col_37 ) , unique key ( col_33,col_30,col_36,col_39 ) ,
unique key ( col_32,col_35 ) , key ( col_31,col_38 ) ,
key ( col_31,col_33,col_32,col_35,col_36 ) ,
unique key ( col_38,col_34,col_33,col_31,col_30,col_36,col_35,col_37,col_39 ) ,
unique key ( col_39,col_32 ) , unique key ( col_30,col_35,col_31,col_38 ) ,
key ( col_38,col_32,col_33 ) )`)
tk.MustExec("begin;")
tk.MustExec("insert ignore into tbl_3 values ( 71,'Fipc',-6676,30,'','FgfK',2464927398,4084082400,5602.5868,'' );")
tk.MustQuery("select /*+ use_index_merge( tbl_3 ) */ 1 from tbl_3 where ( tbl_3.col_37 not in ( 1626615245 , 2433569159 ) or tbl_3.col_38 = 0.06 ) ;").Check(testkit.Rows("1"))
tk.MustExec("commit;")

// int + int compound type as clustered index pk.
tk.MustExec("drop table if exists t1;")
tk.MustExec("create table t1(c1 int, c2 int, c3 int, c4 int, primary key(c1, c2) /*T![clustered_index] CLUSTERED */, key(c3));")

tk.MustExec("begin;")
tk.MustExec("insert into t1 values(1, 1, 1, 1);")
tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c3 < 10) and c4 < 10;").Check(testkit.Rows(
"UnionScan_6 1841.86 root lt(test.t1.c4, 10), or(lt(test.t1.c1, -1), lt(test.t1.c3, 10))",
"└─IndexMerge_11 1841.86 root ",
" ├─TableRangeScan_7(Build) 3323.33 cop[tikv] table:t1 range:[-inf,-1), keep order:false, stats:pseudo",
" ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c3(c3) range:[-inf,10), keep order:false, stats:pseudo",
" └─Selection_10(Probe) 1841.86 cop[tikv] lt(test.t1.c4, 10)",
" └─TableRowIDScan_9 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo"))

tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c3 < 10) and c4 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c3 < -1) and c4 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c3 < -1) and c4 < 10;").Check(testkit.Rows())
tk.MustExec("commit;")

// Single int type as clustered index pk.
tk.MustExec("drop table if exists t1;")
tk.MustExec("create table t1(c1 varchar(100), c2 int, c3 int, c4 int, primary key(c1) /*T![clustered_index] CLUSTERED */, key(c3));")

tk.MustExec("begin;")
tk.MustExec("insert into t1 values('b', 1, 1, 1);")
tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 'a' or c3 < 10) and c4 < 10;").Check(testkit.Rows(
"UnionScan_6 1841.86 root lt(test.t1.c4, 10), or(lt(test.t1.c1, \"a\"), lt(test.t1.c3, 10))",
"└─IndexMerge_11 1841.86 root ",
" ├─TableRangeScan_7(Build) 3323.33 cop[tikv] table:t1 range:[-inf,\"a\"), keep order:false, stats:pseudo",
" ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c3(c3) range:[-inf,10), keep order:false, stats:pseudo",
" └─Selection_10(Probe) 1841.86 cop[tikv] lt(test.t1.c4, 10)",
" └─TableRowIDScan_9 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo"))

tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 'a' or c3 < 10) and c4 < 10;").Check(testkit.Rows("b 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 <= 'b' or c3 < -1) and c4 < 10;").Check(testkit.Rows("b 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 'a' or c3 < -1) and c4 < 10;").Check(testkit.Rows())
tk.MustExec("commit;")
}

Expand Down
23 changes: 13 additions & 10 deletions executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,6 @@ type memTableReader struct {
buffer allocBuf
pkColIDs []int64
cacheTable kv.MemBuffer
// Used when extracting handles from row in memTableReader.getMemRowsHandle.
handleCols plannercore.HandleCols
}

type allocBuf struct {
Expand Down Expand Up @@ -329,17 +327,23 @@ func (m *memTableReader) getRowData(handle kv.Handle, value []byte) ([][]byte, e

// getMemRowsHandle is called when memIndexMergeReader.partialPlans[i] is TableScan.
func (m *memTableReader) getMemRowsHandle() ([]kv.Handle, error) {
rows, err := m.getMemRows()
handles := make([]kv.Handle, 0, 16)
err := iterTxnMemBuffer(m.ctx, m.cacheTable, m.kvRanges, func(key, value []byte) error {
handle, err := tablecodec.DecodeRowKey(key)
if err != nil {
return err
}
handles = append(handles, handle)
return nil
})
if err != nil {
return nil, err
}
handles := make([]kv.Handle, 0, len(rows))
for _, row := range rows {
handle, err := m.handleCols.BuildHandleByDatums(row)
if err != nil {
return nil, err

if m.desc {
for i, j := 0, len(handles)-1; i < j; i, j = i+1, j-1 {
handles[i], handles[j] = handles[j], handles[i]
}
handles = append(handles, handle)
}
return handles, nil
}
Expand Down Expand Up @@ -577,7 +581,6 @@ func buildMemIndexMergeReader(us *UnionScanExec, indexMergeReader *IndexMergeRea
handleBytes: make([]byte, 0, 16),
rd: rd,
},
handleCols: indexMergeReader.handleCols,
})
} else {
outputOffset := []int{len(indexMergeReader.indexes[i].Columns)}
Expand Down

0 comments on commit 66d7673

Please sign in to comment.