Skip to content

Commit

Permalink
allow limit on range partition table
Browse files Browse the repository at this point in the history
Signed-off-by: you06 <you1474600@gmail.com>
  • Loading branch information
you06 committed Feb 14, 2023
1 parent d1ee574 commit 7f4fe22
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 23 deletions.
32 changes: 32 additions & 0 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,3 +698,35 @@ func TestCoprocessorBatchByStore(t *testing.T) {
}
}
}

func TestPushLimit2RangePartitionTable(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec(`CREATE TABLE t(id int PRIMARY KEY, val int)
PARTITION BY RANGE (id)
(PARTITION p1 VALUES LESS THAN (100),
PARTITION p2 VALUES LESS THAN (200),
PARTITION p3 VALUES LESS THAN (300))`)
tk.MustExec("INSERT INTO t VALUES(50, 50), (150, 150), (250, 250)")
tk.MustExec("ANALYZE TABLE t")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/checkKeyRangeSortedForPaging", "return"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/checkKeyRangeSortedForPaging"))
}()

tk.MustQuery("SELECT * FROM t ORDER BY id ASC LIMIT 1").Check(testkit.Rows("50 50"))
tk.MustQuery("SELECT * FROM t ORDER BY id ASC LIMIT 2").Check(testkit.Rows("50 50", "150 150"))
tk.MustQuery("SELECT * FROM t ORDER BY id DESC LIMIT 1").Check(testkit.Rows("250 250"))
tk.MustQuery("SELECT * FROM t ORDER BY id DESC LIMIT 2").Check(testkit.Rows("250 250", "150 150"))
tk.MustQuery("SELECT * FROM t WHERE id > 100 ORDER BY id ASC LIMIT 1").Check(testkit.Rows("150 150"))
tk.MustQuery("SELECT * FROM t WHERE id > 10 ORDER BY id ASC LIMIT 2").Check(testkit.Rows("50 50", "150 150"))
tk.MustQuery("SELECT * FROM t WHERE id > 100 ORDER BY id ASC LIMIT 2").Check(testkit.Rows("150 150", "250 250"))
tk.MustQuery("SELECT * FROM t WHERE id > 100 ORDER BY id DESC LIMIT 1").Check(testkit.Rows("250 250"))
tk.MustQuery("SELECT * FROM t WHERE id > 100 ORDER BY id DESC LIMIT 2").Check(testkit.Rows("250 250", "150 150"))
tk.MustQuery("SELECT * FROM t WHERE id < 200 ORDER BY id DESC LIMIT 2").Check(testkit.Rows("150 150", "50 50"))
tk.MustQuery("SELECT * FROM t WHERE id < 250 ORDER BY id DESC LIMIT 2").Check(testkit.Rows("150 150", "50 50"))
}
14 changes: 7 additions & 7 deletions executor/partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ func TestOrderByAndLimit(t *testing.T) {
require.True(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is pushed
require.True(t, tk.HasPlan(queryHashPartition, "Limit"))
require.True(t, tk.HasPlan(queryListPartition, "Limit"))
require.True(t, tk.HasPlan(queryRangePartition, "TopN")) // but not fully pushed
require.False(t, tk.HasPlan(queryRangePartition, "TopN")) // but not fully pushed
require.True(t, tk.HasPlan(queryHashPartition, "TopN"))
require.True(t, tk.HasPlan(queryListPartition, "TopN"))
regularResult = tk.MustQuery(queryRegular).Rows()
Expand All @@ -635,21 +635,21 @@ func TestOrderByAndLimit(t *testing.T) {
queryPartitionWithTiFlash := fmt.Sprintf("select /*+ read_from_storage(tiflash[trange_intpk]) */ * from trange_intpk where a > %v order by a limit %v", x, y)
// check if tiflash is used
require.True(t, tk.HasTiFlashPlan(queryPartitionWithTiFlash), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
// but order is not pushed
require.False(t, tk.HasPlan(queryPartitionWithTiFlash, "Limit"), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
// and order is pushed
require.True(t, tk.HasPlan(queryPartitionWithTiFlash, "Limit"), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
queryPartitionWithTiFlash = fmt.Sprintf("select /*+ read_from_storage(tiflash[trange_intpk]) */ /*+ LIMIT_TO_COP() */ * from trange_intpk where a > %v order by a limit %v", x, y)
// check if tiflash is used
require.True(t, tk.HasTiFlashPlan(queryPartitionWithTiFlash), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
// but order is not pushed
require.False(t, tk.HasPlan(queryPartitionWithTiFlash, "Limit"), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
// and order is pushed
require.True(t, tk.HasPlan(queryPartitionWithTiFlash, "Limit"), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
queryPartitionWithTiFlash = fmt.Sprintf("select /*+ read_from_storage(tiflash[trange_clustered]) */ * from trange_clustered where a > %v order by a limit %v", x, y)
// check if tiflash is used
require.True(t, tk.HasTiFlashPlan(queryPartitionWithTiFlash), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
queryPartitionWithTiFlash = fmt.Sprintf("select /*+ read_from_storage(tiflash[trange_clustered]) */ /*+ LIMIT_TO_COP() */ * from trange_clustered where a > %v order by a limit %v", x, y)
// check if tiflash is used
require.True(t, tk.HasTiFlashPlan(queryPartitionWithTiFlash))
// but order is not pushed
require.False(t, tk.HasPlan(queryPartitionWithTiFlash, "Limit"), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
// and order is pushed
require.True(t, tk.HasPlan(queryPartitionWithTiFlash, "Limit"), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
queryPartitionWithTiFlash = fmt.Sprintf("select /*+ read_from_storage(tiflash[thash_intpk]) */ * from thash_intpk where a > %v order by a limit %v", x, y)
// check if tiflash is used
require.True(t, tk.HasTiFlashPlan(queryPartitionWithTiFlash), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
Expand Down
6 changes: 5 additions & 1 deletion executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,11 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
return nil, err
}
kvReq.KeyRanges.SortByFunc(func(i, j kv.KeyRange) bool {
return bytes.Compare(i.StartKey, j.StartKey) < 0
cmp := bytes.Compare(i.StartKey, j.StartKey) < 0
if e.desc {
return !cmp
}
return cmp
})
e.kvRanges = kvReq.KeyRanges.AppendSelfTo(e.kvRanges)

Expand Down
16 changes: 12 additions & 4 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,21 +472,29 @@ func (rr *KeyRanges) PartitionNum() int {
return len(rr.ranges)
}

// IsFullySorted checks whether the ranges are sorted inside partition and each partition is also sorated.
func (rr *KeyRanges) IsFullySorted() bool {
// IsFullySorted checks whether the ranges are sorted inside partition and each partition is also sorted.
func (rr *KeyRanges) IsFullySorted(desc bool) bool {
sortedByPartition := slices.IsSortedFunc(rr.ranges, func(i, j []KeyRange) bool {
// A simple short-circuit since the empty range actually won't make anything wrong.
if len(i) == 0 || len(j) == 0 {
return true
}
return bytes.Compare(i[0].StartKey, j[0].StartKey) < 0
cmp := bytes.Compare(i[0].StartKey, j[0].StartKey) < 0
if desc {
return !cmp
}
return cmp
})
if !sortedByPartition {
return false
}
for _, ranges := range rr.ranges {
if !slices.IsSortedFunc(ranges, func(i, j KeyRange) bool {
return bytes.Compare(i.StartKey, j.StartKey) < 0
cmp := bytes.Compare(i.StartKey, j.StartKey) < 0
if desc {
return !cmp
}
return cmp
}) {
return false
}
Expand Down
4 changes: 2 additions & 2 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2080,8 +2080,8 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
task = copTask
if candidate.isMatchProp {
copTask.keepOrder = true
// TableScan on partition table can't keep order.
if ds.tableInfo.GetPartitionInfo() != nil {
// TableScan on partition table can't keep order, except range partition.
if pi := ds.tableInfo.GetPartitionInfo(); pi != nil && pi.Type != model.PartitionTypeRange {
return invalidTask, nil
}
}
Expand Down
1 change: 1 addition & 0 deletions planner/core/testdata/integration_partition_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@
"explain format='brief' select a from thash use index (ia) where a > 10 and c = 10 order by a limit 10",
"explain format='brief' select a from t use index () where b > 10 order by b limit 10",
"explain format='brief' select a from trange use index () where b > 10 order by b limit 10",
"explain format='brief' select a from trange use index () where b > 100 order by b limit 10",
"explain format='brief' select a from tlist use index () where b > 10 order by b limit 10",
"explain format='brief' select a from thash use index () where b > 10 order by b limit 10",
"explain format='brief' select a from t use index () where a > 10 order by b limit 10",
Expand Down
14 changes: 12 additions & 2 deletions planner/core/testdata/integration_partition_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -1341,12 +1341,22 @@
"SQL": "explain format='brief' select a from trange use index () where b > 10 order by b limit 10",
"Plan": [
"Projection 10.00 root test.trange.a",
"└─TopN 10.00 root test.trange.b, offset:0, count:10",
"└─Limit 10.00 root offset:0, count:10",
" └─TableReader 10.00 root partition:all data:Limit",
" └─Limit 10.00 cop[tikv] offset:0, count:10",
" └─TableRangeScan 10.00 cop[tikv] table:trange range:(10,+inf], keep order:true, stats:pseudo"
]
},
{
"SQL": "explain format='brief' select a from trange use index () where b > 100 order by b limit 10",
"Plan": [
"Projection 10.00 root test.trange.a",
"└─Limit 10.00 root offset:0, count:10",
" └─TableReader 10.00 root partition:p2,p3 data:Limit",
" └─Limit 10.00 cop[tikv] offset:0, count:10",
" └─TableRangeScan 10.00 cop[tikv] table:trange range:(100,+inf], keep order:true, stats:pseudo"
]
},
{
"SQL": "explain format='brief' select a from tlist use index () where b > 10 order by b limit 10",
"Plan": [
Expand Down Expand Up @@ -1382,7 +1392,7 @@
"SQL": "explain format='brief' select a from trange use index () where a > 10 order by b limit 10",
"Plan": [
"Projection 10.00 root test.trange.a",
"└─TopN 10.00 root test.trange.b, offset:0, count:10",
"└─Limit 10.00 root offset:0, count:10",
" └─TableReader 10.00 root partition:all data:Limit",
" └─Limit 10.00 cop[tikv] offset:0, count:10",
" └─Selection 10.00 cop[tikv] gt(test.trange.a, 10)",
Expand Down
10 changes: 4 additions & 6 deletions planner/core/testdata/ordered_result_mode_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -454,16 +454,14 @@
},
{
"Plan": [
"Sort_6 2.00 root test.trange.a",
"└─TableReader_9 2.00 root partition:p0,p2 data:TableRangeScan_8",
" └─TableRangeScan_8 2.00 cop[tikv] table:trange range:[1,1], [200,200], keep order:false, stats:pseudo"
"TableReader_11 2.00 root partition:p0,p2 data:TableRangeScan_10",
"└─TableRangeScan_10 2.00 cop[tikv] table:trange range:[1,1], [200,200], keep order:true, stats:pseudo"
]
},
{
"Plan": [
"Sort_6 100.00 root test.trange.a",
"└─TableReader_9 100.00 root partition:p0,p1 data:TableRangeScan_8",
" └─TableRangeScan_8 100.00 cop[tikv] table:trange range:[50,150], keep order:false, stats:pseudo"
"TableReader_11 100.00 root partition:p0,p1 data:TableRangeScan_10",
"└─TableRangeScan_10 100.00 cop[tikv] table:trange range:[50,150], keep order:true, stats:pseudo"
]
}
]
Expand Down
2 changes: 1 addition & 1 deletion store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars
}
failpoint.Inject("checkKeyRangeSortedForPaging", func(_ failpoint.Value) {
if req.Paging.Enable {
if !req.KeyRanges.IsFullySorted() {
if !req.KeyRanges.IsFullySorted(req.Desc) {
logutil.BgLogger().Fatal("distsql request key range not sorted!")
}
}
Expand Down

0 comments on commit 7f4fe22

Please sign in to comment.