Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner, store/copr: allow limit on range partitioned table #41342

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,3 +698,35 @@ func TestCoprocessorBatchByStore(t *testing.T) {
}
}
}

func TestPushLimit2RangePartitionTable(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec(`CREATE TABLE t(id int PRIMARY KEY, val int)
PARTITION BY RANGE (id)
(PARTITION p1 VALUES LESS THAN (100),
PARTITION p2 VALUES LESS THAN (200),
PARTITION p3 VALUES LESS THAN (300))`)
tk.MustExec("INSERT INTO t VALUES(50, 50), (150, 150), (250, 250)")
tk.MustExec("ANALYZE TABLE t")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/checkKeyRangeSortedForPaging", "return"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/checkKeyRangeSortedForPaging"))
}()

tk.MustQuery("SELECT * FROM t ORDER BY id ASC LIMIT 1").Check(testkit.Rows("50 50"))
tk.MustQuery("SELECT * FROM t ORDER BY id ASC LIMIT 2").Check(testkit.Rows("50 50", "150 150"))
tk.MustQuery("SELECT * FROM t ORDER BY id DESC LIMIT 1").Check(testkit.Rows("250 250"))
tk.MustQuery("SELECT * FROM t ORDER BY id DESC LIMIT 2").Check(testkit.Rows("250 250", "150 150"))
tk.MustQuery("SELECT * FROM t WHERE id > 100 ORDER BY id ASC LIMIT 1").Check(testkit.Rows("150 150"))
tk.MustQuery("SELECT * FROM t WHERE id > 10 ORDER BY id ASC LIMIT 2").Check(testkit.Rows("50 50", "150 150"))
tk.MustQuery("SELECT * FROM t WHERE id > 100 ORDER BY id ASC LIMIT 2").Check(testkit.Rows("150 150", "250 250"))
tk.MustQuery("SELECT * FROM t WHERE id > 100 ORDER BY id DESC LIMIT 1").Check(testkit.Rows("250 250"))
tk.MustQuery("SELECT * FROM t WHERE id > 100 ORDER BY id DESC LIMIT 2").Check(testkit.Rows("250 250", "150 150"))
tk.MustQuery("SELECT * FROM t WHERE id < 200 ORDER BY id DESC LIMIT 2").Check(testkit.Rows("150 150", "50 50"))
tk.MustQuery("SELECT * FROM t WHERE id < 250 ORDER BY id DESC LIMIT 2").Check(testkit.Rows("150 150", "50 50"))
}
14 changes: 7 additions & 7 deletions executor/partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ func TestOrderByAndLimit(t *testing.T) {
require.True(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is pushed
require.True(t, tk.HasPlan(queryHashPartition, "Limit"))
require.True(t, tk.HasPlan(queryListPartition, "Limit"))
require.True(t, tk.HasPlan(queryRangePartition, "TopN")) // but not fully pushed
require.False(t, tk.HasPlan(queryRangePartition, "TopN")) // but not fully pushed
require.True(t, tk.HasPlan(queryHashPartition, "TopN"))
require.True(t, tk.HasPlan(queryListPartition, "TopN"))
regularResult = tk.MustQuery(queryRegular).Rows()
Expand All @@ -635,21 +635,21 @@ func TestOrderByAndLimit(t *testing.T) {
queryPartitionWithTiFlash := fmt.Sprintf("select /*+ read_from_storage(tiflash[trange_intpk]) */ * from trange_intpk where a > %v order by a limit %v", x, y)
// check if tiflash is used
require.True(t, tk.HasTiFlashPlan(queryPartitionWithTiFlash), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
// but order is not pushed
require.False(t, tk.HasPlan(queryPartitionWithTiFlash, "Limit"), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
// and order is pushed
require.True(t, tk.HasPlan(queryPartitionWithTiFlash, "Limit"), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
queryPartitionWithTiFlash = fmt.Sprintf("select /*+ read_from_storage(tiflash[trange_intpk]) */ /*+ LIMIT_TO_COP() */ * from trange_intpk where a > %v order by a limit %v", x, y)
// check if tiflash is used
require.True(t, tk.HasTiFlashPlan(queryPartitionWithTiFlash), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
// but order is not pushed
require.False(t, tk.HasPlan(queryPartitionWithTiFlash, "Limit"), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
// and order is pushed
require.True(t, tk.HasPlan(queryPartitionWithTiFlash, "Limit"), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
queryPartitionWithTiFlash = fmt.Sprintf("select /*+ read_from_storage(tiflash[trange_clustered]) */ * from trange_clustered where a > %v order by a limit %v", x, y)
// check if tiflash is used
require.True(t, tk.HasTiFlashPlan(queryPartitionWithTiFlash), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
queryPartitionWithTiFlash = fmt.Sprintf("select /*+ read_from_storage(tiflash[trange_clustered]) */ /*+ LIMIT_TO_COP() */ * from trange_clustered where a > %v order by a limit %v", x, y)
// check if tiflash is used
require.True(t, tk.HasTiFlashPlan(queryPartitionWithTiFlash))
// but order is not pushed
require.False(t, tk.HasPlan(queryPartitionWithTiFlash, "Limit"), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
// and order is pushed
require.True(t, tk.HasPlan(queryPartitionWithTiFlash, "Limit"), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
queryPartitionWithTiFlash = fmt.Sprintf("select /*+ read_from_storage(tiflash[thash_intpk]) */ * from thash_intpk where a > %v order by a limit %v", x, y)
// check if tiflash is used
require.True(t, tk.HasTiFlashPlan(queryPartitionWithTiFlash), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
Expand Down
6 changes: 5 additions & 1 deletion executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,11 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
return nil, err
}
kvReq.KeyRanges.SortByFunc(func(i, j kv.KeyRange) bool {
return bytes.Compare(i.StartKey, j.StartKey) < 0
cmp := bytes.Compare(i.StartKey, j.StartKey) < 0
if e.desc {
return !cmp
}
return cmp
})
e.kvRanges = kvReq.KeyRanges.AppendSelfTo(e.kvRanges)

Expand Down
16 changes: 12 additions & 4 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,21 +472,29 @@ func (rr *KeyRanges) PartitionNum() int {
return len(rr.ranges)
}

// IsFullySorted checks whether the ranges are sorted inside partition and each partition is also sorated.
func (rr *KeyRanges) IsFullySorted() bool {
// IsFullySorted checks whether the ranges are sorted inside partition and each partition is also sorted.
func (rr *KeyRanges) IsFullySorted(desc bool) bool {
sortedByPartition := slices.IsSortedFunc(rr.ranges, func(i, j []KeyRange) bool {
// A simple short-circuit since the empty range actually won't make anything wrong.
if len(i) == 0 || len(j) == 0 {
return true
}
return bytes.Compare(i[0].StartKey, j[0].StartKey) < 0
cmp := bytes.Compare(i[0].StartKey, j[0].StartKey) < 0
if desc {
return !cmp
}
return cmp
})
if !sortedByPartition {
return false
}
for _, ranges := range rr.ranges {
if !slices.IsSortedFunc(ranges, func(i, j KeyRange) bool {
return bytes.Compare(i.StartKey, j.StartKey) < 0
cmp := bytes.Compare(i.StartKey, j.StartKey) < 0
if desc {
return !cmp
}
return cmp
}) {
return false
}
Expand Down
4 changes: 2 additions & 2 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2080,8 +2080,8 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
task = copTask
if candidate.isMatchProp {
copTask.keepOrder = true
// TableScan on partition table can't keep order.
if ds.tableInfo.GetPartitionInfo() != nil {
// TableScan on partition table can't keep order, except range partition.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the partition key is not the row id key?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table schema:
primary key a
partition by b

query:
order by b limit
order by a limit

if pi := ds.tableInfo.GetPartitionInfo(); pi != nil && pi.Type != model.PartitionTypeRange {
return invalidTask, nil
}
}
Expand Down
1 change: 1 addition & 0 deletions planner/core/testdata/integration_partition_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@
"explain format='brief' select a from thash use index (ia) where a > 10 and c = 10 order by a limit 10",
"explain format='brief' select a from t use index () where b > 10 order by b limit 10",
"explain format='brief' select a from trange use index () where b > 10 order by b limit 10",
"explain format='brief' select a from trange use index () where b > 100 order by b limit 10",
"explain format='brief' select a from tlist use index () where b > 10 order by b limit 10",
"explain format='brief' select a from thash use index () where b > 10 order by b limit 10",
"explain format='brief' select a from t use index () where a > 10 order by b limit 10",
Expand Down
14 changes: 12 additions & 2 deletions planner/core/testdata/integration_partition_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -1341,12 +1341,22 @@
"SQL": "explain format='brief' select a from trange use index () where b > 10 order by b limit 10",
"Plan": [
"Projection 10.00 root test.trange.a",
"└─TopN 10.00 root test.trange.b, offset:0, count:10",
"└─Limit 10.00 root offset:0, count:10",
" └─TableReader 10.00 root partition:all data:Limit",
" └─Limit 10.00 cop[tikv] offset:0, count:10",
" └─TableRangeScan 10.00 cop[tikv] table:trange range:(10,+inf], keep order:true, stats:pseudo"
]
},
{
"SQL": "explain format='brief' select a from trange use index () where b > 100 order by b limit 10",
"Plan": [
"Projection 10.00 root test.trange.a",
"└─Limit 10.00 root offset:0, count:10",
" └─TableReader 10.00 root partition:p2,p3 data:Limit",
" └─Limit 10.00 cop[tikv] offset:0, count:10",
" └─TableRangeScan 10.00 cop[tikv] table:trange range:(100,+inf], keep order:true, stats:pseudo"
]
},
{
"SQL": "explain format='brief' select a from tlist use index () where b > 10 order by b limit 10",
"Plan": [
Expand Down Expand Up @@ -1382,7 +1392,7 @@
"SQL": "explain format='brief' select a from trange use index () where a > 10 order by b limit 10",
"Plan": [
"Projection 10.00 root test.trange.a",
"└─TopN 10.00 root test.trange.b, offset:0, count:10",
"└─Limit 10.00 root offset:0, count:10",
" └─TableReader 10.00 root partition:all data:Limit",
" └─Limit 10.00 cop[tikv] offset:0, count:10",
" └─Selection 10.00 cop[tikv] gt(test.trange.a, 10)",
Expand Down
10 changes: 4 additions & 6 deletions planner/core/testdata/ordered_result_mode_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -454,16 +454,14 @@
},
{
"Plan": [
"Sort_6 2.00 root test.trange.a",
"└─TableReader_9 2.00 root partition:p0,p2 data:TableRangeScan_8",
" └─TableRangeScan_8 2.00 cop[tikv] table:trange range:[1,1], [200,200], keep order:false, stats:pseudo"
"TableReader_11 2.00 root partition:p0,p2 data:TableRangeScan_10",
"└─TableRangeScan_10 2.00 cop[tikv] table:trange range:[1,1], [200,200], keep order:true, stats:pseudo"
]
},
{
"Plan": [
"Sort_6 100.00 root test.trange.a",
"└─TableReader_9 100.00 root partition:p0,p1 data:TableRangeScan_8",
" └─TableRangeScan_8 100.00 cop[tikv] table:trange range:[50,150], keep order:false, stats:pseudo"
"TableReader_11 100.00 root partition:p0,p1 data:TableRangeScan_10",
"└─TableRangeScan_10 100.00 cop[tikv] table:trange range:[50,150], keep order:true, stats:pseudo"
]
}
]
Expand Down
2 changes: 1 addition & 1 deletion store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars
}
failpoint.Inject("checkKeyRangeSortedForPaging", func(_ failpoint.Value) {
if req.Paging.Enable {
if !req.KeyRanges.IsFullySorted() {
if !req.KeyRanges.IsFullySorted(req.Desc) {
logutil.BgLogger().Fatal("distsql request key range not sorted!")
}
}
Expand Down