Skip to content

Commit

Permalink
resource_control: bypass some internal urgent request (#884)
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch authored Jul 14, 2023
1 parent f04046a commit 19ef437
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 11 deletions.
6 changes: 6 additions & 0 deletions internal/client/client_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ func buildResourceControlInterceptor(
// Build the interceptor.
interceptFn := func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc {
return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
// bypass some internal requests and it's may influence user experience. For example, the
// request of `alter user password`, totally bypasses the resource control. it's not cost
// many resources, but it's may influence the user experience.
if reqInfo.Bypass() {
return next(target, req)
}
consumption, penalty, err := resourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo)
if err != nil {
return nil, err
Expand Down
26 changes: 23 additions & 3 deletions internal/resourcecontrol/resource_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package resourcecontrol

import (
"reflect"
"strings"
"time"

"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)

Expand All @@ -34,12 +36,22 @@ type RequestInfo struct {
writeBytes int64
storeID uint64
replicaNumber int64
// bypass indicates whether the request should be bypassed.
// some internal request should be bypassed, such as Privilege request.
bypass bool
}

// MakeRequestInfo extracts the relevant information from a BatchRequest.
func MakeRequestInfo(req *tikvrpc.Request) *RequestInfo {
var bypass bool
requestSource := req.Context.GetRequestSource()
if len(requestSource) > 0 {
if strings.Contains(requestSource, util.InternalRequestPrefix+util.InternalTxnOthers) {
bypass = true
}
}
if !req.IsTxnWriteRequest() && !req.IsRawWriteRequest() {
return &RequestInfo{writeBytes: -1}
return &RequestInfo{writeBytes: -1, storeID: req.Context.Peer.StoreId, bypass: bypass}
}

var writeBytes int64
Expand All @@ -57,7 +69,7 @@ func MakeRequestInfo(req *tikvrpc.Request) *RequestInfo {
writeBytes += int64(len(k))
}
}
return &RequestInfo{writeBytes: writeBytes, storeID: req.Context.Peer.StoreId, replicaNumber: req.ReplicaNumber}
return &RequestInfo{writeBytes: writeBytes, storeID: req.Context.Peer.StoreId, replicaNumber: req.ReplicaNumber, bypass: bypass}
}

// IsWrite returns whether the request is a write request.
Expand All @@ -68,13 +80,21 @@ func (req *RequestInfo) IsWrite() bool {
// WriteBytes returns the actual write size of the request,
// -1 will be returned if it's not a write request.
func (req *RequestInfo) WriteBytes() uint64 {
return uint64(req.writeBytes)
if req.writeBytes > 0 {
return uint64(req.writeBytes)
}
return 0
}

func (req *RequestInfo) ReplicaNumber() int64 {
return req.replicaNumber
}

// Bypass returns whether the request should be bypassed.
func (req *RequestInfo) Bypass() bool {
return req.bypass
}

func (req *RequestInfo) StoreID() uint64 {
return req.storeID
}
Expand Down
40 changes: 40 additions & 0 deletions internal/resourcecontrol/resource_control_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package resourcecontrol

import (
"testing"

"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/assert"
"github.com/tikv/client-go/v2/tikvrpc"
)

func TestMakeRequestInfo(t *testing.T) {
// Test a non-write request.
req := &tikvrpc.Request{Req: &kvrpcpb.BatchGetRequest{}, Context: kvrpcpb.Context{Peer: &metapb.Peer{StoreId: 1}}}
info := MakeRequestInfo(req)
assert.False(t, info.IsWrite())
assert.Equal(t, uint64(0), info.WriteBytes())
assert.False(t, info.Bypass())
assert.Equal(t, uint64(1), info.StoreID())

// Test a prewrite request.
mutation := &kvrpcpb.Mutation{Key: []byte("foo"), Value: []byte("bar")}
prewriteReq := &kvrpcpb.PrewriteRequest{Mutations: []*kvrpcpb.Mutation{mutation}, PrimaryLock: []byte("baz")}
req = &tikvrpc.Request{Type: tikvrpc.CmdPrewrite, Req: prewriteReq, ReplicaNumber: 1, Context: kvrpcpb.Context{Peer: &metapb.Peer{StoreId: 2}}}
requestSource := "xxx_internal_others"
req.Context.RequestSource = requestSource
info = MakeRequestInfo(req)
assert.True(t, info.IsWrite())
assert.Equal(t, uint64(9), info.WriteBytes())
assert.True(t, info.Bypass())
assert.Equal(t, uint64(2), info.StoreID())
// Test a commit request.
commitReq := &kvrpcpb.CommitRequest{Keys: [][]byte{[]byte("qux")}}
req = &tikvrpc.Request{Type: tikvrpc.CmdCommit, Req: commitReq, ReplicaNumber: 2, Context: kvrpcpb.Context{Peer: &metapb.Peer{StoreId: 3}}}
info = MakeRequestInfo(req)
assert.True(t, info.IsWrite())
assert.Equal(t, uint64(3), info.WriteBytes())
assert.False(t, info.Bypass())
assert.Equal(t, uint64(3), info.StoreID())
}
3 changes: 3 additions & 0 deletions tikvrpc/interceptor/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ var interceptorCtxKey = interceptorCtxKeyType{}

// WithRPCInterceptor is a helper function used to bind RPCInterceptor with ctx.
func WithRPCInterceptor(ctx context.Context, interceptor RPCInterceptor) context.Context {
if v := ctx.Value(interceptorCtxKey); v != nil {
interceptor = ChainRPCInterceptors(v.(RPCInterceptor), interceptor)
}
return context.WithValue(ctx, interceptorCtxKey, interceptor)
}

Expand Down
5 changes: 5 additions & 0 deletions txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1450,3 +1450,8 @@ func (txn *KVTxn) SetRequestSourceInternal(internal bool) {
func (txn *KVTxn) SetRequestSourceType(tp string) {
txn.RequestSource.SetRequestSourceType(tp)
}

// SetExplicitRequestSourceType sets the explicit type of the request source.
func (txn *KVTxn) SetExplicitRequestSourceType(tp string) {
txn.RequestSource.SetExplicitRequestSourceType(tp)
}
49 changes: 41 additions & 8 deletions util/request_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,26 @@ const (
InternalTxnMeta = InternalTxnOthers
)

// explicit source types.
const (
ExplicitTypeEmpty = ""
ExplicitTypeDefault = "default"
ExplicitTypeLightning = "lightning"
ExplicitTypeBR = "br"
ExplicitTypeDumpling = "dumpling"
ExplicitTypeBackground = "background"
)

// ExplicitTypeList is the list of all explicit source types.
var ExplicitTypeList = []string{ExplicitTypeEmpty, ExplicitTypeDefault, ExplicitTypeLightning, ExplicitTypeBR, ExplicitTypeDumpling, ExplicitTypeBackground}

const (
// InternalRequest is the scope of internal queries
InternalRequest = "internal_"
InternalRequest = "internal"
// InternalRequestPrefix is the prefix of internal queries
InternalRequestPrefix = "internal_"
// ExternalRequest is the scope of external queries
ExternalRequest = "external_"
ExternalRequest = "external"
// SourceUnknown keeps same with the default value(empty string)
SourceUnknown = "unknown"
)
Expand All @@ -40,6 +55,9 @@ const (
type RequestSource struct {
RequestSourceInternal bool
RequestSourceType string
// ExplicitRequestSourceType is a type that is set from the session variable and may be specified by the client or users.
// It is a complement to the RequestSourceType and provides additional information about how a request was initiated.
ExplicitRequestSourceType string
}

// SetRequestSourceInternal sets the scope of the request source.
Expand All @@ -52,6 +70,11 @@ func (r *RequestSource) SetRequestSourceType(tp string) {
r.RequestSourceType = tp
}

// SetExplicitRequestSourceType sets the type of the request source.
func (r *RequestSource) SetExplicitRequestSourceType(tp string) {
r.ExplicitRequestSourceType = tp
}

// WithInternalSourceType create context with internal source.
func WithInternalSourceType(ctx context.Context, source string) context.Context {
return context.WithValue(ctx, RequestSourceKey, RequestSource{
Expand All @@ -71,15 +94,25 @@ func IsRequestSourceInternal(reqSrc *RequestSource) bool {

// GetRequestSource gets the request_source field of the request.
func (r *RequestSource) GetRequestSource() string {
// if r.RequestSourceType is not set, it's mostly possible that r.RequestSourceInternal is not set
// to avoid internal requests be marked as external(default value), return unknown source here.
if r == nil || r.RequestSourceType == "" {
return SourceUnknown
source := SourceUnknown
explicitSourceType := ExplicitTypeDefault
origin := ExternalRequest
if r == nil || (len(r.RequestSourceType) == 0 && len(r.ExplicitRequestSourceType) == 0) {
// if r.RequestSourceType and r.ExplicitRequestSourceType are not set, it's mostly possible that r.RequestSourceInternal is not set
// to avoid internal requests be marked as external(default value), return unknown source here.
return strings.Join([]string{source, explicitSourceType}, "_")
}

if len(r.RequestSourceType) > 0 {
source = r.RequestSourceType
}
if len(r.ExplicitRequestSourceType) > 0 {
explicitSourceType = r.ExplicitRequestSourceType
}
if r.RequestSourceInternal {
return InternalRequest + r.RequestSourceType
origin = InternalRequest
}
return ExternalRequest + r.RequestSourceType
return strings.Join([]string{origin, source, explicitSourceType}, "_")
}

// RequestSourceFromCtx extract source from passed context.
Expand Down
68 changes: 68 additions & 0 deletions util/request_source_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2023 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestGetRequestSource(t *testing.T) {
rsi := true
rst := "test"
ers := "lightning"
rs := &RequestSource{
RequestSourceInternal: rsi,
RequestSourceType: rst,
ExplicitRequestSourceType: ers,
}

// Test internal request
expected := "internal_test_lightning"
actual := rs.GetRequestSource()
assert.Equal(t, expected, actual)

// Test external request
rs.RequestSourceInternal = false
expected = "external_test_lightning"
actual = rs.GetRequestSource()
assert.Equal(t, expected, actual)

// Test nil pointer
rs = nil
expected = "unknown_default"
actual = rs.GetRequestSource()
assert.Equal(t, expected, actual)

// Test empty RequestSourceType and ExplicitRequestSourceType
rs = &RequestSource{}
expected = "unknown_default"
actual = rs.GetRequestSource()
assert.Equal(t, expected, actual)

// Test empty ExplicitRequestSourceType
rs.RequestSourceType = "test"
expected = "external_test_default"
actual = rs.GetRequestSource()
assert.Equal(t, expected, actual)

// Test empty RequestSourceType
rs.RequestSourceType = ""
rs.ExplicitRequestSourceType = "lightning"
expected = "external_unknown_lightning"
actual = rs.GetRequestSource()
assert.Equal(t, expected, actual)
}

0 comments on commit 19ef437

Please sign in to comment.