Skip to content

Commit

Permalink
store/copr: support batch coprocessor requests by store (#39525)
Browse files Browse the repository at this point in the history
ref #39361
  • Loading branch information
you06 authored Dec 1, 2022
1 parent 94ffc8e commit 9d9eaca
Show file tree
Hide file tree
Showing 8 changed files with 291 additions and 36 deletions.
1 change: 1 addition & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
}
builder.RequestSource.RequestSourceInternal = sv.InRestrictedSQL
builder.RequestSource.RequestSourceType = sv.RequestSourceType
builder.StoreBatchSize = sv.StoreBatchSize
return builder
}

Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,8 @@ type Request struct {
RequestSource util.RequestSource
// FixedRowCountHint is the optimization hint for copr request for task scheduling.
FixedRowCountHint []int
// StoreBatchSize indicates the batch size of coprocessor in the same store.
StoreBatchSize int
}

// CoprRequestAdjuster is used to check and adjust a copr request according to specific rules.
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,9 @@ type SessionVars struct {

// EnablePlanReplayerCapture indicates whether enabled plan replayer capture
EnablePlanReplayerCapture bool

// StoreBatchSize indicates the batch size limit of store batch, set this field to 0 to disable store batch.
StoreBatchSize int
}

// GetNewChunkWithCapacity Attempt to request memory from the chunk pool
Expand Down
7 changes: 7 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2103,6 +2103,13 @@ var defaultSysVars = []*SysVar{
s.EnableReuseCheck = TiDBOptOn(val)
return nil
}},
{
Scope: ScopeGlobal | ScopeSession, Name: TiDBStoreBatchSize, Value: strconv.FormatInt(DefTiDBStoreBatchSize, 10),
Type: TypeInt, MinValue: 0, MaxValue: 25000, SetSession: func(s *SessionVars, val string) error {
s.StoreBatchSize = TidbOptInt(val, DefTiDBStoreBatchSize)
return nil
},
},
}

// FeedbackProbability points to the FeedbackProbability in statistics package.
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,9 @@ const (
TiDBEnablePlanReplayerCapture = "tidb_enable_plan_replayer_capture"
// TiDBEnableReusechunk indicates whether to enable chunk alloc
TiDBEnableReusechunk = "tidb_enable_reuse_chunk"

// TiDBStoreBatchSize indicates the batch size of coprocessor in the same store.
TiDBStoreBatchSize = "tidb_store_batch_size"
)

// TiDB vars that have only global scope
Expand Down Expand Up @@ -1108,6 +1111,7 @@ const (
DefTiDBUseAlloc = false
DefTiDBEnablePlanReplayerCapture = false
DefTiDBIndexMergeIntersectionConcurrency = ConcurrencyUnset
DefTiDBStoreBatchSize = 0
)

// Process global variables.
Expand Down
47 changes: 47 additions & 0 deletions store/copr/copr_test/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,50 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) {
require.Equal(t, smallConc, 0)
require.Equal(t, rateLimit.GetCapacity(), 4)
}

func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) {
// nil --- 'g' --- 'n' --- 't' --- nil
// <- 0 -> <- 1 -> <- 2 -> <- 3 ->
store, err := mockstore.NewMockStore(
mockstore.WithClusterInspector(func(c testutils.Cluster) {
mockstore.BootstrapWithMultiRegions(c, []byte("g"), []byte("n"), []byte("t"))
}),
)
require.NoError(t, err)
defer require.NoError(t, store.Close())
copClient := store.GetClient().(*copr.CopClient)
ctx := context.Background()
killed := uint32(0)
vars := kv.NewVariables(&killed)
opt := &kv.ClientSendOption{}

req := &kv.Request{
Tp: kv.ReqTypeDAG,
KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")),
FixedRowCountHint: []int{1, 1, 3, 3},
Concurrency: 15,
StoreBatchSize: 1,
}
it, errRes := copClient.BuildCopIterator(ctx, req, vars, opt)
require.Nil(t, errRes)
tasks := it.GetTasks()
require.Equal(t, len(tasks), 2)
require.Equal(t, len(tasks[0].ToPBBatchTasks()), 1)
require.Equal(t, tasks[0].RowCountHint, 5)
require.Equal(t, len(tasks[1].ToPBBatchTasks()), 1)
require.Equal(t, tasks[1].RowCountHint, 9)

req = &kv.Request{
Tp: kv.ReqTypeDAG,
KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")),
FixedRowCountHint: []int{1, 1, 3, 3},
Concurrency: 15,
StoreBatchSize: 3,
}
it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt)
require.Nil(t, errRes)
tasks = it.GetTasks()
require.Equal(t, len(tasks), 1)
require.Equal(t, len(tasks[0].ToPBBatchTasks()), 3)
require.Equal(t, tasks[0].RowCountHint, 14)
}
Loading

0 comments on commit 9d9eaca

Please sign in to comment.