Skip to content

Commit

Permalink
executor: enable index_merge used in transaction. (#29875)
Browse files Browse the repository at this point in the history
  • Loading branch information
guo-shaoge authored Dec 8, 2021
1 parent d83ee8c commit 27f7b59
Show file tree
Hide file tree
Showing 5 changed files with 340 additions and 27 deletions.
6 changes: 6 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,12 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
us.columns = x.columns
us.table = x.table
us.virtualColumnIndex = buildVirtualColumnIndex(us.Schema(), us.columns)
case *IndexMergeReaderExecutor:
// IndexMergeReader doesn't care order for now. So we will not set desc and useIndex.
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
us.columns = x.columns
us.table = x.table
us.virtualColumnIndex = buildVirtualColumnIndex(us.Schema(), us.columns)
default:
// The mem table will not be written by sql directly, so we can omit the union scan to avoid err reporting.
return originReader
Expand Down
119 changes: 119 additions & 0 deletions executor/index_merge_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,125 @@ func (s *testSuite1) TestPartitionTableRandomIndexMerge(c *C) {
}
}

func (s *testSuite1) TestIndexMergeInTransaction(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)

for i := 0; i < 2; i++ {
tk.MustExec("drop table if exists t1;")
tk.MustExec("create table t1(c1 int, c2 int, c3 int, pk int, key(c1), key(c2), key(c3), primary key(pk));")
if i == 1 {
tk.MustExec("set tx_isolation = 'READ-COMMITTED';")
}
tk.MustExec("begin;")
// Expect two IndexScan(c1, c2).
tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows(
"IndexMerge_9 1841.86 root ",
"├─IndexRangeScan_5(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo",
"├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo",
"└─Selection_8(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10)",
" └─TableRowIDScan_7 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo"))
// Expect one IndexScan(c2) and one TableScan(pk).
tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows(
"IndexMerge_9 1106.67 root ",
"├─TableRangeScan_5(Build) 3333.33 cop[tikv] table:t1 range:[-inf,10), keep order:false, stats:pseudo",
"├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo",
"└─Selection_8(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10)",
" └─TableRowIDScan_7 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo"))

// Test with normal key.
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustExec("insert into t1 values(1, 1, 1, 1);")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustExec("update t1 set c3 = 100 where c3 = 1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustExec("delete from t1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())

// Test with primary key, so the partialPlan is TableScan.
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustExec("insert into t1 values(1, 1, 1, 1);")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustExec("update t1 set c3 = 100 where c3 = 1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustExec("delete from t1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustExec("commit;")
if i == 1 {
tk.MustExec("set tx_isolation = 'REPEATABLE-READ';")
}
}

// Same with above, but select ... for update.
tk.MustExec("drop table if exists t1;")
tk.MustExec("create table t1(c1 int, c2 int, c3 int, pk int, key(c1), key(c2), key(c3), primary key(pk));")
tk.MustExec("begin;")
tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows(
"SelectLock_6 1841.86 root for update 0",
"└─IndexMerge_11 1841.86 root ",
" ├─IndexRangeScan_7(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo",
" ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo",
" └─Selection_10(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10)",
" └─TableRowIDScan_9 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo"))
tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows(
"SelectLock_6 1106.67 root for update 0",
"└─IndexMerge_11 1106.67 root ",
" ├─TableRangeScan_7(Build) 3333.33 cop[tikv] table:t1 range:[-inf,10), keep order:false, stats:pseudo",
" ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo",
" └─Selection_10(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10)",
" └─TableRowIDScan_9 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo"))

// Test with normal key.
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows())
tk.MustExec("insert into t1 values(1, 1, 1, 1);")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows("1 1 1 1"))
tk.MustExec("update t1 set c3 = 100 where c3 = 1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows())
tk.MustExec("delete from t1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows())

// Test with primary key, so the partialPlan is TableScan.
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows())
tk.MustExec("insert into t1 values(1, 1, 1, 1);")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows("1 1 1 1"))
tk.MustExec("update t1 set c3 = 100 where c3 = 1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows())
tk.MustExec("delete from t1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows())
tk.MustExec("commit;")

// Test partition table.
tk.MustExec("drop table if exists t1;")
tk.MustExec(`create table t1(c1 int, c2 int, c3 int, pk int, part int, key(c1), key(c2), key(c3), primary key(pk, part))
partition by range(part) (
partition p0 values less than (10),
partition p1 values less than (20),
partition p2 values less than (maxvalue))`)
tk.MustExec("begin;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < 20) and c3 < 20;").Check(testkit.Rows())

tk.MustExec("insert into t1 values(1, 1, 1, 1, 1);")
tk.MustExec("insert into t1 values(11, 11, 11, 11, 11);")
tk.MustExec("insert into t1 values(21, 21, 21, 21, 21);")
tk.MustExec("insert into t1 values(31, 31, 31, 31, 31);")
res := tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < 20) and c3 < 20;").Sort()
res.Check(testkit.Rows("1 1 1 1 1", "11 11 11 11 11"))
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < 20) and c3 < 20;").Sort()
res.Check(testkit.Rows("1 1 1 1 1", "11 11 11 11 11"))

tk.MustExec("update t1 set c3 = 100 where c3 = 1;")
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < 20) and c3 < 20;")
res.Check(testkit.Rows("11 11 11 11 11"))
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < 20) and c3 < 20;")
res.Check(testkit.Rows("11 11 11 11 11"))

tk.MustExec("delete from t1;")
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < 20) and c3 < 20;")
res.Check(testkit.Rows())
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < 20) and c3 < 20;")
res.Check(testkit.Rows())
tk.MustExec("commit;")
}

func (test *testSerialSuite2) TestIndexMergeReaderMemTracker(c *C) {
tk := testkit.NewTestKit(c, test.store)
tk.MustExec("use test;")
Expand Down
Loading

0 comments on commit 27f7b59

Please sign in to comment.