From b5823ac1042e561b630b509ccdab114a645ac127 Mon Sep 17 00:00:00 2001 From: ShuNing Date: Fri, 14 Jul 2023 18:00:00 +0800 Subject: [PATCH] resource_control: bypass some internal urgent request (#884) --- internal/client/client_interceptor.go | 7 +++- internal/resourcecontrol/resource_control.go | 26 ++++++++++-- .../resourcecontrol/resource_control_test.go | 40 +++++++++++++++++++ util/request_source.go | 2 + 4 files changed, 71 insertions(+), 4 deletions(-) create mode 100644 internal/resourcecontrol/resource_control_test.go diff --git a/internal/client/client_interceptor.go b/internal/client/client_interceptor.go index 74cabbb8c..64ae333ed 100644 --- a/internal/client/client_interceptor.go +++ b/internal/client/client_interceptor.go @@ -107,10 +107,15 @@ func buildResourceControlInterceptor( // Build the interceptor. interceptFn := func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc { return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + // bypass some internal requests and it's may influence user experience. For example, the + // request of `alter user password`, totally bypasses the resource control. it's not cost + // many resources, but it's may influence the user experience. // If the resource group has background jobs, we should not record consumption and wait for it. - if resourceControlInterceptor.IsBackgroundRequest(ctx, resourceGroupName, req.RequestSource) { + // Background jobs will record and report in tikv side. + if reqInfo.Bypass() || resourceControlInterceptor.IsBackgroundRequest(ctx, resourceGroupName, req.RequestSource) { return next(target, req) } + consumption, penalty, err := resourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo) if err != nil { return nil, err diff --git a/internal/resourcecontrol/resource_control.go b/internal/resourcecontrol/resource_control.go index fb55d25fd..15d631058 100644 --- a/internal/resourcecontrol/resource_control.go +++ b/internal/resourcecontrol/resource_control.go @@ -16,12 +16,14 @@ package resourcecontrol import ( "reflect" + "strings" "time" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/tikv/client-go/v2/internal/logutil" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/util" "go.uber.org/zap" ) @@ -34,12 +36,22 @@ type RequestInfo struct { writeBytes int64 storeID uint64 replicaNumber int64 + // bypass indicates whether the request should be bypassed. + // some internal request should be bypassed, such as Privilege request. + bypass bool } // MakeRequestInfo extracts the relevant information from a BatchRequest. func MakeRequestInfo(req *tikvrpc.Request) *RequestInfo { + var bypass bool + requestSource := req.Context.GetRequestSource() + if len(requestSource) > 0 { + if strings.Contains(requestSource, util.InternalRequestPrefix+util.InternalTxnOthers) { + bypass = true + } + } if !req.IsTxnWriteRequest() && !req.IsRawWriteRequest() { - return &RequestInfo{writeBytes: -1} + return &RequestInfo{writeBytes: -1, storeID: req.Context.Peer.StoreId, bypass: bypass} } var writeBytes int64 @@ -57,7 +69,7 @@ func MakeRequestInfo(req *tikvrpc.Request) *RequestInfo { writeBytes += int64(len(k)) } } - return &RequestInfo{writeBytes: writeBytes, storeID: req.Context.Peer.StoreId, replicaNumber: req.ReplicaNumber} + return &RequestInfo{writeBytes: writeBytes, storeID: req.Context.Peer.StoreId, replicaNumber: req.ReplicaNumber, bypass: bypass} } // IsWrite returns whether the request is a write request. @@ -68,13 +80,21 @@ func (req *RequestInfo) IsWrite() bool { // WriteBytes returns the actual write size of the request, // -1 will be returned if it's not a write request. func (req *RequestInfo) WriteBytes() uint64 { - return uint64(req.writeBytes) + if req.writeBytes > 0 { + return uint64(req.writeBytes) + } + return 0 } func (req *RequestInfo) ReplicaNumber() int64 { return req.replicaNumber } +// Bypass returns whether the request should be bypassed. +func (req *RequestInfo) Bypass() bool { + return req.bypass +} + func (req *RequestInfo) StoreID() uint64 { return req.storeID } diff --git a/internal/resourcecontrol/resource_control_test.go b/internal/resourcecontrol/resource_control_test.go new file mode 100644 index 000000000..dba077e26 --- /dev/null +++ b/internal/resourcecontrol/resource_control_test.go @@ -0,0 +1,40 @@ +package resourcecontrol + +import ( + "testing" + + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/stretchr/testify/assert" + "github.com/tikv/client-go/v2/tikvrpc" +) + +func TestMakeRequestInfo(t *testing.T) { + // Test a non-write request. + req := &tikvrpc.Request{Req: &kvrpcpb.BatchGetRequest{}, Context: kvrpcpb.Context{Peer: &metapb.Peer{StoreId: 1}}} + info := MakeRequestInfo(req) + assert.False(t, info.IsWrite()) + assert.Equal(t, uint64(0), info.WriteBytes()) + assert.False(t, info.Bypass()) + assert.Equal(t, uint64(1), info.StoreID()) + + // Test a prewrite request. + mutation := &kvrpcpb.Mutation{Key: []byte("foo"), Value: []byte("bar")} + prewriteReq := &kvrpcpb.PrewriteRequest{Mutations: []*kvrpcpb.Mutation{mutation}, PrimaryLock: []byte("baz")} + req = &tikvrpc.Request{Type: tikvrpc.CmdPrewrite, Req: prewriteReq, ReplicaNumber: 1, Context: kvrpcpb.Context{Peer: &metapb.Peer{StoreId: 2}}} + requestSource := "xxx_internal_others" + req.Context.RequestSource = requestSource + info = MakeRequestInfo(req) + assert.True(t, info.IsWrite()) + assert.Equal(t, uint64(9), info.WriteBytes()) + assert.True(t, info.Bypass()) + assert.Equal(t, uint64(2), info.StoreID()) + // Test a commit request. + commitReq := &kvrpcpb.CommitRequest{Keys: [][]byte{[]byte("qux")}} + req = &tikvrpc.Request{Type: tikvrpc.CmdCommit, Req: commitReq, ReplicaNumber: 2, Context: kvrpcpb.Context{Peer: &metapb.Peer{StoreId: 3}}} + info = MakeRequestInfo(req) + assert.True(t, info.IsWrite()) + assert.Equal(t, uint64(3), info.WriteBytes()) + assert.False(t, info.Bypass()) + assert.Equal(t, uint64(3), info.StoreID()) +} diff --git a/util/request_source.go b/util/request_source.go index 6ebadfc7e..97d3b83fd 100644 --- a/util/request_source.go +++ b/util/request_source.go @@ -57,6 +57,8 @@ var ExplicitTypeList = []string{ExplicitTypeEmpty, ExplicitTypeLightning, Explic const ( // InternalRequest is the scope of internal queries InternalRequest = "internal" + // InternalRequestPrefix is the prefix of internal queries + InternalRequestPrefix = "internal_" // ExternalRequest is the scope of external queries ExternalRequest = "external" // SourceUnknown keeps same with the default value(empty string)