Skip to content

Commit dbaf491

Browse files
committed
move row hint into ranges
Signed-off-by: you06 <you1474600@gmail.com>
1 parent 5c04d78 commit dbaf491

10 files changed

+221
-129
lines changed

distsql/request_builder.go

+19-11
Original file line numberDiff line numberDiff line change
@@ -131,17 +131,16 @@ func (builder *RequestBuilder) SetHandleRangesForTables(sc *stmtctx.StatementCon
131131
// SetTableHandles sets "KeyRanges" for "kv.Request" by converting table handles
132132
// "handles" to "KeyRanges" firstly.
133133
func (builder *RequestBuilder) SetTableHandles(tid int64, handles []kv.Handle) *RequestBuilder {
134-
var keyRanges []kv.KeyRange
135-
keyRanges, builder.FixedRowCountHint = TableHandlesToKVRanges(tid, handles)
136-
builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(keyRanges)
134+
keyRanges, hints := TableHandlesToKVRanges(tid, handles)
135+
builder.Request.KeyRanges = kv.NewNonParitionedKeyRangesWithHint(keyRanges, hints)
137136
return builder
138137
}
139138

140139
// SetPartitionsAndHandles sets "KeyRanges" for "kv.Request" by converting ParitionHandles to KeyRanges.
141140
// handles in slice must be kv.PartitionHandle.
142141
func (builder *RequestBuilder) SetPartitionsAndHandles(handles []kv.Handle) *RequestBuilder {
143-
keyRanges := PartitionHandlesToKVRanges(handles)
144-
builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(keyRanges)
142+
keyRanges, hints := PartitionHandlesToKVRanges(handles)
143+
builder.Request.KeyRanges = kv.NewNonParitionedKeyRangesWithHint(keyRanges, hints)
145144
return builder
146145
}
147146

@@ -194,6 +193,12 @@ func (builder *RequestBuilder) SetKeyRanges(keyRanges []kv.KeyRange) *RequestBui
194193
return builder
195194
}
196195

196+
// SetKeyRangesWithHints sets "KeyRanges" for "kv.Request" with row count hints.
197+
func (builder *RequestBuilder) SetKeyRangesWithHints(keyRanges []kv.KeyRange, hints []int) *RequestBuilder {
198+
builder.Request.KeyRanges = kv.NewNonParitionedKeyRangesWithHint(keyRanges, hints)
199+
return builder
200+
}
201+
197202
// SetWrappedKeyRanges sets "KeyRanges" for "kv.Request".
198203
func (builder *RequestBuilder) SetWrappedKeyRanges(keyRanges *kv.KeyRanges) *RequestBuilder {
199204
builder.Request.KeyRanges = keyRanges
@@ -543,7 +548,7 @@ func SplitRangesAcrossInt64Boundary(ranges []*ranger.Range, keepOrder bool, desc
543548
// For continuous handles, we should merge them to a single key range.
544549
func TableHandlesToKVRanges(tid int64, handles []kv.Handle) ([]kv.KeyRange, []int) {
545550
krs := make([]kv.KeyRange, 0, len(handles))
546-
hint := make([]int, 0, len(handles))
551+
hints := make([]int, 0, len(handles))
547552
i := 0
548553
for i < len(handles) {
549554
if commonHandle, ok := handles[i].(*kv.CommonHandle); ok {
@@ -552,7 +557,7 @@ func TableHandlesToKVRanges(tid int64, handles []kv.Handle) ([]kv.KeyRange, []in
552557
EndKey: tablecodec.EncodeRowKey(tid, kv.Key(commonHandle.Encoded()).Next()),
553558
}
554559
krs = append(krs, ran)
555-
hint = append(hint, 1)
560+
hints = append(hints, 1)
556561
i++
557562
continue
558563
}
@@ -568,16 +573,17 @@ func TableHandlesToKVRanges(tid int64, handles []kv.Handle) ([]kv.KeyRange, []in
568573
startKey := tablecodec.EncodeRowKey(tid, low)
569574
endKey := tablecodec.EncodeRowKey(tid, high)
570575
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
571-
hint = append(hint, j-i)
576+
hints = append(hints, j-i)
572577
i = j
573578
}
574-
return krs, hint
579+
return krs, hints
575580
}
576581

577582
// PartitionHandlesToKVRanges convert ParitionHandles to kv ranges.
578583
// Handle in slices must be kv.PartitionHandle
579-
func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange {
584+
func PartitionHandlesToKVRanges(handles []kv.Handle) ([]kv.KeyRange, []int) {
580585
krs := make([]kv.KeyRange, 0, len(handles))
586+
hints := make([]int, 0, len(handles))
581587
i := 0
582588
for i < len(handles) {
583589
ph := handles[i].(kv.PartitionHandle)
@@ -589,6 +595,7 @@ func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange {
589595
EndKey: tablecodec.EncodeRowKey(pid, append(commonHandle.Encoded(), 0)),
590596
}
591597
krs = append(krs, ran)
598+
hints = append(hints, 1)
592599
i++
593600
continue
594601
}
@@ -607,9 +614,10 @@ func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange {
607614
startKey := tablecodec.EncodeRowKey(pid, low)
608615
endKey := tablecodec.EncodeRowKey(pid, high)
609616
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
617+
hints = append(hints, j-i)
610618
i = j
611619
}
612-
return krs
620+
return krs, hints
613621
}
614622

615623
// IndexRangesToKVRanges converts index ranges to "KeyRange".

distsql/request_builder_test.go

+13-13
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,11 @@ func TestTableHandlesToKVRanges(t *testing.T) {
6161

6262
// Build key ranges.
6363
expect := getExpectedRanges(1, hrs)
64-
actual, _ := TableHandlesToKVRanges(1, handles)
64+
actual, hints := TableHandlesToKVRanges(1, handles)
6565

6666
// Compare key ranges and expected key ranges.
6767
require.Equal(t, len(expect), len(actual))
68+
require.Equal(t, hints, []int{1, 4, 2, 1, 2})
6869
for i := range actual {
6970
require.Equal(t, expect[i].StartKey, actual[i].StartKey)
7071
require.Equal(t, expect[i].EndKey, actual[i].EndKey)
@@ -378,7 +379,7 @@ func TestRequestBuilder3(t *testing.T) {
378379
Tp: 103,
379380
StartTs: 0x0,
380381
Data: []uint8{0x18, 0x0, 0x20, 0x0, 0x40, 0x0, 0x5a, 0x0},
381-
KeyRanges: kv.NewNonParitionedKeyRanges([]kv.KeyRange{
382+
KeyRanges: kv.NewNonParitionedKeyRangesWithHint([]kv.KeyRange{
382383
{
383384
StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
384385
EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
@@ -395,17 +396,16 @@ func TestRequestBuilder3(t *testing.T) {
395396
StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x64},
396397
EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x65},
397398
},
398-
}),
399-
Cacheable: true,
400-
KeepOrder: false,
401-
Desc: false,
402-
Concurrency: variable.DefDistSQLScanConcurrency,
403-
IsolationLevel: 0,
404-
Priority: 0,
405-
NotFillCache: false,
406-
ReplicaRead: kv.ReplicaReadLeader,
407-
ReadReplicaScope: kv.GlobalReplicaScope,
408-
FixedRowCountHint: []int{1, 4, 2, 1},
399+
}, []int{1, 4, 2, 1}),
400+
Cacheable: true,
401+
KeepOrder: false,
402+
Desc: false,
403+
Concurrency: variable.DefDistSQLScanConcurrency,
404+
IsolationLevel: 0,
405+
Priority: 0,
406+
NotFillCache: false,
407+
ReplicaRead: kv.ReplicaReadLeader,
408+
ReadReplicaScope: kv.GlobalReplicaScope,
409409
}
410410
expect.Paging.MinPagingSize = paging.MinPagingSize
411411
expect.Paging.MaxPagingSize = paging.MaxPagingSize

executor/builder.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -4219,13 +4219,13 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
42194219
continue
42204220
}
42214221
handle := kv.IntHandle(content.keys[0].GetInt64())
4222-
tmp, _ := distsql.TableHandlesToKVRanges(pid, []kv.Handle{handle})
4223-
kvRanges = append(kvRanges, tmp...)
4222+
ranges, _ := distsql.TableHandlesToKVRanges(pid, []kv.Handle{handle})
4223+
kvRanges = append(kvRanges, ranges...)
42244224
}
42254225
} else {
42264226
for _, p := range usedPartitionList {
4227-
tmp, _ := distsql.TableHandlesToKVRanges(p.GetPhysicalID(), handles)
4228-
kvRanges = append(kvRanges, tmp...)
4227+
ranges, _ := distsql.TableHandlesToKVRanges(p.GetPhysicalID(), handles)
4228+
kvRanges = append(kvRanges, ranges...)
42294229
}
42304230
}
42314231

kv/kv.go

+26-8
Original file line numberDiff line numberDiff line change
@@ -340,25 +340,41 @@ func (t StoreType) Name() string {
340340
// KeyRanges wrap the ranges for partitioned table cases.
341341
// We might send ranges from different in the one request.
342342
type KeyRanges struct {
343-
ranges [][]KeyRange
343+
ranges [][]KeyRange
344+
rowCountHints [][]int
344345

345346
isPartitioned bool
346347
}
347348

348349
// NewPartitionedKeyRanges constructs a new RequestRange for partitioned table.
349350
func NewPartitionedKeyRanges(ranges [][]KeyRange) *KeyRanges {
351+
return NewPartitionedKeyRangesWithHints(ranges, nil)
352+
}
353+
354+
// NewNonParitionedKeyRanges constructs a new RequestRange for a non partitioned table.
355+
func NewNonParitionedKeyRanges(ranges []KeyRange) *KeyRanges {
356+
return NewNonParitionedKeyRangesWithHint(ranges, nil)
357+
}
358+
359+
// NewPartitionedKeyRangesWithHints constructs a new RequestRange for partitioned table with row count hint.
360+
func NewPartitionedKeyRangesWithHints(ranges [][]KeyRange, hints [][]int) *KeyRanges {
350361
return &KeyRanges{
351362
ranges: ranges,
363+
rowCountHints: hints,
352364
isPartitioned: true,
353365
}
354366
}
355367

356-
// NewNonParitionedKeyRanges constructs a new RequestRange for a non partitioned table.
357-
func NewNonParitionedKeyRanges(ranges []KeyRange) *KeyRanges {
358-
return &KeyRanges{
368+
// NewNonParitionedKeyRangesWithHint constructs a new RequestRange for a non partitioned table with rou count hint.
369+
func NewNonParitionedKeyRangesWithHint(ranges []KeyRange, hints []int) *KeyRanges {
370+
rr := &KeyRanges{
359371
ranges: [][]KeyRange{ranges},
360372
isPartitioned: false,
361373
}
374+
if hints != nil {
375+
rr.rowCountHints = [][]int{hints}
376+
}
377+
return rr
362378
}
363379

364380
// FirstPartitionRange returns the the result of first range.
@@ -416,9 +432,13 @@ func (rr *KeyRanges) SortByFunc(sortFunc func(i, j KeyRange) bool) {
416432
}
417433

418434
// ForEachPartitionWithErr runs the func for each partition with an error check.
419-
func (rr *KeyRanges) ForEachPartitionWithErr(theFunc func([]KeyRange) error) (err error) {
435+
func (rr *KeyRanges) ForEachPartitionWithErr(theFunc func([]KeyRange, []int) error) (err error) {
420436
for i := range rr.ranges {
421-
err = theFunc(rr.ranges[i])
437+
var hints []int
438+
if len(rr.rowCountHints) > i {
439+
hints = rr.rowCountHints[i]
440+
}
441+
err = theFunc(rr.ranges[i], hints)
422442
if err != nil {
423443
return err
424444
}
@@ -535,8 +555,6 @@ type Request struct {
535555
}
536556
// RequestSource indicates whether the request is an internal request.
537557
RequestSource util.RequestSource
538-
// FixedRowCountHint is the optimization hint for copr request for task scheduling.
539-
FixedRowCountHint []int
540558
// StoreBatchSize indicates the batch size of coprocessor in the same store.
541559
StoreBatchSize int
542560
}

store/copr/BUILD.bazel

+1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ go_test(
7777
"//store/driver/backoff",
7878
"//testkit/testsetup",
7979
"//util/paging",
80+
"//util/trxevents",
8081
"@com_github_pingcap_errors//:errors",
8182
"@com_github_pingcap_kvproto//pkg/coprocessor",
8283
"@com_github_pingcap_kvproto//pkg/mpp",

store/copr/copr_test/coprocessor_test.go

+31-31
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,11 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) {
4141
vars := kv.NewVariables(&killed)
4242
opt := &kv.ClientSendOption{}
4343

44+
ranges := copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")
4445
req := &kv.Request{
45-
Tp: kv.ReqTypeDAG,
46-
KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")),
47-
FixedRowCountHint: []int{1, 1, 3, copr.CopSmallTaskRow},
48-
Concurrency: 15,
46+
Tp: kv.ReqTypeDAG,
47+
KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{1, 1, 3, copr.CopSmallTaskRow}),
48+
Concurrency: 15,
4949
}
5050
it, errRes := copClient.BuildCopIterator(ctx, req, vars, opt)
5151
require.Nil(t, errRes)
@@ -55,11 +55,11 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) {
5555
require.Equal(t, smallConc, 1)
5656
require.Equal(t, rateLimit.GetCapacity(), 2)
5757

58+
ranges = copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")
5859
req = &kv.Request{
59-
Tp: kv.ReqTypeDAG,
60-
KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")),
61-
FixedRowCountHint: []int{1, 1, 3, 3},
62-
Concurrency: 15,
60+
Tp: kv.ReqTypeDAG,
61+
KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{1, 1, 3, 3}),
62+
Concurrency: 15,
6363
}
6464
it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt)
6565
require.Nil(t, errRes)
@@ -70,11 +70,11 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) {
7070
require.Equal(t, rateLimit.GetCapacity(), 3)
7171

7272
// cross-region long range
73+
ranges = copr.BuildKeyRanges("a", "z")
7374
req = &kv.Request{
74-
Tp: kv.ReqTypeDAG,
75-
KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "z")),
76-
FixedRowCountHint: []int{10},
77-
Concurrency: 15,
75+
Tp: kv.ReqTypeDAG,
76+
KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{10}),
77+
Concurrency: 15,
7878
}
7979
it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt)
8080
require.Nil(t, errRes)
@@ -84,11 +84,11 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) {
8484
require.Equal(t, smallConc, 2)
8585
require.Equal(t, rateLimit.GetCapacity(), 3)
8686

87+
ranges = copr.BuildKeyRanges("a", "z")
8788
req = &kv.Request{
88-
Tp: kv.ReqTypeDAG,
89-
KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "z")),
90-
FixedRowCountHint: []int{copr.CopSmallTaskRow + 1},
91-
Concurrency: 15,
89+
Tp: kv.ReqTypeDAG,
90+
KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{copr.CopSmallTaskRow + 1}),
91+
Concurrency: 15,
9292
}
9393
it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt)
9494
require.Nil(t, errRes)
@@ -115,12 +115,12 @@ func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) {
115115
vars := kv.NewVariables(&killed)
116116
opt := &kv.ClientSendOption{}
117117

118+
ranges := copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")
118119
req := &kv.Request{
119-
Tp: kv.ReqTypeDAG,
120-
KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")),
121-
FixedRowCountHint: []int{1, 1, 3, 3},
122-
Concurrency: 15,
123-
StoreBatchSize: 1,
120+
Tp: kv.ReqTypeDAG,
121+
KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{1, 1, 3, 3}),
122+
Concurrency: 15,
123+
StoreBatchSize: 1,
124124
}
125125
it, errRes := copClient.BuildCopIterator(ctx, req, vars, opt)
126126
require.Nil(t, errRes)
@@ -131,12 +131,12 @@ func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) {
131131
require.Equal(t, len(tasks[1].ToPBBatchTasks()), 1)
132132
require.Equal(t, tasks[1].RowCountHint, 9)
133133

134+
ranges = copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")
134135
req = &kv.Request{
135-
Tp: kv.ReqTypeDAG,
136-
KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")),
137-
FixedRowCountHint: []int{1, 1, 3, 3},
138-
Concurrency: 15,
139-
StoreBatchSize: 3,
136+
Tp: kv.ReqTypeDAG,
137+
KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{1, 1, 3, 3}),
138+
Concurrency: 15,
139+
StoreBatchSize: 3,
140140
}
141141
it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt)
142142
require.Nil(t, errRes)
@@ -146,12 +146,12 @@ func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) {
146146
require.Equal(t, tasks[0].RowCountHint, 14)
147147

148148
// paging will disable store batch.
149+
ranges = copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")
149150
req = &kv.Request{
150-
Tp: kv.ReqTypeDAG,
151-
KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")),
152-
FixedRowCountHint: []int{1, 1, 3, 3},
153-
Concurrency: 15,
154-
StoreBatchSize: 3,
151+
Tp: kv.ReqTypeDAG,
152+
KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{1, 1, 3, 3}),
153+
Concurrency: 15,
154+
StoreBatchSize: 3,
155155
Paging: struct {
156156
Enable bool
157157
MinPagingSize uint64

0 commit comments

Comments
 (0)