Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: label external the request source via sessionvar #44770

Merged
merged 10 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3871,8 +3871,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:GqsAoNiOFxbCJ8U8Lnts8BvdYd6HDWDsIm/oJY1sIMM=",
version = "v2.0.8-0.20230704071705-c0cf773917d9",
sum = "h1:pLUQsFZGE3z7OlZddP+WHkb85rLoxPwRd8CknfSw804=",
version = "v2.0.8-0.20230707070242-178f6fa01aab",
)

go_repository(
Expand Down
7 changes: 7 additions & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
}
builder.RequestSource.RequestSourceInternal = sv.InRestrictedSQL
builder.RequestSource.RequestSourceType = sv.RequestSourceType
builder.RequestSource.ExplicitRequestSourceType = sv.ExplicitRequestSourceType
builder.StoreBatchSize = sv.StoreBatchSize
builder.Request.ResourceGroupName = sv.ResourceGroupName
builder.Request.StoreBusyThreshold = sv.LoadBasedReplicaReadThreshold
Expand Down Expand Up @@ -358,6 +359,12 @@ func (builder *RequestBuilder) SetResourceGroupName(name string) *RequestBuilder
return builder
}

// SetExplicitRequestSourceType sets the explicit request source type.
func (builder *RequestBuilder) SetExplicitRequestSourceType(sourceType string) *RequestBuilder {
builder.RequestSource.ExplicitRequestSourceType = sourceType
return builder
}

func (builder *RequestBuilder) verifyTxnScope() error {
txnScope := builder.TxnScope
if txnScope == "" || txnScope == kv.GlobalReplicaScope || builder.is == nil {
Expand Down
1 change: 1 addition & 0 deletions executor/analyze_col.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectRe
SetConcurrency(e.concurrency).
SetMemTracker(e.memTracker).
SetResourceGroupName(e.ctx.GetSessionVars().ResourceGroupName).
SetExplicitRequestSourceType(e.ctx.GetSessionVars().ExplicitRequestSourceType).
Build()
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions executor/analyze_idx.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRang
SetKeepOrder(true).
SetConcurrency(e.concurrency).
SetResourceGroupName(e.ctx.GetSessionVars().ResourceGroupName).
SetExplicitRequestSourceType(e.ctx.GetSessionVars().ExplicitRequestSourceType).
Build()
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions executor/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ func (c *checksumContext) buildTableRequest(ctx sessionctx.Context, tableID int6
SetStartTS(c.StartTs).
SetConcurrency(ctx.GetSessionVars().DistSQLScanConcurrency()).
SetResourceGroupName(ctx.GetSessionVars().ResourceGroupName).
SetExplicitRequestSourceType(ctx.GetSessionVars().ExplicitRequestSourceType).
Build()
}

Expand All @@ -266,6 +267,7 @@ func (c *checksumContext) buildIndexRequest(ctx sessionctx.Context, tableID int6
SetStartTS(c.StartTs).
SetConcurrency(ctx.GetSessionVars().DistSQLScanConcurrency()).
SetResourceGroupName(ctx.GetSessionVars().ResourceGroupName).
SetExplicitRequestSourceType(ctx.GetSessionVars().ExplicitRequestSourceType).
Build()
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/tdakkota/asciicheck v0.2.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.8-0.20230704071705-c0cf773917d9
github.com/tikv/client-go/v2 v2.0.8-0.20230707070242-178f6fa01aab
github.com/tikv/pd/client v0.0.0-20230613052906-7158cb319935
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966
github.com/twmb/murmur3 v1.1.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -977,8 +977,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.8-0.20230704071705-c0cf773917d9 h1:GqsAoNiOFxbCJ8U8Lnts8BvdYd6HDWDsIm/oJY1sIMM=
github.com/tikv/client-go/v2 v2.0.8-0.20230704071705-c0cf773917d9/go.mod h1:4KkKqjJgKlvvWMyNqdnAlYFfV4QjEj1fEb5Hb/FoT88=
github.com/tikv/client-go/v2 v2.0.8-0.20230707070242-178f6fa01aab h1:pLUQsFZGE3z7OlZddP+WHkb85rLoxPwRd8CknfSw804=
github.com/tikv/client-go/v2 v2.0.8-0.20230707070242-178f6fa01aab/go.mod h1:4KkKqjJgKlvvWMyNqdnAlYFfV4QjEj1fEb5Hb/FoT88=
github.com/tikv/pd/client v0.0.0-20230613052906-7158cb319935 h1:a5SATBxu/0Z6qNnz4KXDN91gDA06waaYcHM6dkb6lz4=
github.com/tikv/pd/client v0.0.0-20230613052906-7158cb319935/go.mod h1:YmNkj9UT8IjwFov9k3oquH0UgIUHniUaQT3jXKgZYbM=
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 h1:quvGphlmUVU+nhpFa4gg4yJyTRJ13reZMDHrKwYw53M=
Expand Down
4 changes: 3 additions & 1 deletion kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ const (
RequestSourceInternal
// RequestSourceType set request source type of the current statement.
RequestSourceType
// ExplicitRequestSourceType is a complement of RequestSourceType, it may specified by the client or users.
ExplicitRequestSourceType
// ReplicaReadAdjuster set the adjust function of cop requsts.
ReplicaReadAdjuster
// ScanBatchSize set the iter scan batch size.
Expand Down Expand Up @@ -188,7 +190,7 @@ const (
// Do not classify different tools by now.
InternalTxnTools = "tools"
// InternalTxnBR is the type of BR usage.
InternalTxnBR = InternalTxnTools
InternalTxnBR = "br"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add more internal tools?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here only just change what is in use.

// InternalTxnTrace handles the trace statement.
InternalTxnTrace = "Trace"
// InternalTxnTTL is the type of TTL usage
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ func (s *session) doCommit(ctx context.Context) error {
s.txn.SetOption(kv.EnableAsyncCommit, sessVars.EnableAsyncCommit)
s.txn.SetOption(kv.Enable1PC, sessVars.Enable1PC)
s.txn.SetOption(kv.ResourceGroupTagger, sessVars.StmtCtx.GetResourceGroupTagger())
s.txn.SetOption(kv.ExplicitRequestSourceType, sessVars.ExplicitRequestSourceType)
if sessVars.StmtCtx.KvExecCounter != nil {
// Bind an interceptor for client-go to count the number of SQL executions of each TiKV.
s.txn.SetOption(kv.RPCInterceptor, sessVars.StmtCtx.KvExecCounter.RPCInterceptor())
Expand Down
4 changes: 4 additions & 0 deletions session/sessiontest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ go_test(
"//util/memory",
"//util/sqlexec",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/coprocessor",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//tikvrpc/interceptor",
"@org_uber_go_goleak//:goleak",
],
)
35 changes: 35 additions & 0 deletions session/sessiontest/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
Expand All @@ -48,6 +50,8 @@ import (
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
)

func TestSchemaCheckerSQL(t *testing.T) {
Expand Down Expand Up @@ -2434,3 +2438,34 @@ func TestSQLModeOp(t *testing.T) {
a = mysql.SetSQLMode(s, mysql.ModeAllowInvalidDates)
require.Equal(t, mysql.ModeNoBackslashEscapes|mysql.ModeOnlyFullGroupBy|mysql.ModeAllowInvalidDates, a)
}

func TestRequestSource(t *testing.T) {
store := testkit.CreateMockStore(t, mockstore.WithStoreType(mockstore.MockTiKV))
tk := testkit.NewTestKit(t, store)
withCheckInterceptor := func(source string) interceptor.RPCInterceptor {
return interceptor.NewRPCInterceptor("kv-request-source-verify", func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc {
return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
requestSource := ""
switch r := req.Req.(type) {
case *kvrpcpb.PrewriteRequest:
requestSource = r.GetContext().GetRequestSource()
case *kvrpcpb.CommitRequest:
requestSource = r.GetContext().GetRequestSource()
case *coprocessor.Request:
requestSource = r.GetContext().GetRequestSource()
}
require.Equal(t, source, requestSource)
return next(target, req)
}
})
}
ctx := context.Background()
tk.MustExecWithContext(ctx, "use test")
tk.MustExecWithContext(ctx, "create table t(a int primary key, b int)")
tk.MustExecWithContext(ctx, "set @@tidb_request_source_type = 'lightning'")
tk.MustQueryWithContext(ctx, "select @@tidb_request_source_type").Check(testkit.Rows("lightning"))
insertCtx := interceptor.WithRPCInterceptor(context.Background(), withCheckInterceptor("external_Insert_lightning"))
tk.MustExecWithContext(insertCtx, "insert into t values(1, 1)")
selectCtx := interceptor.WithRPCInterceptor(context.Background(), withCheckInterceptor("external_Select_lightning"))
tk.MustExecWithContext(selectCtx, "select count(*) from t;")
}
12 changes: 12 additions & 0 deletions sessionctx/sessionstates/session_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,18 @@ func TestSessionCtx(t *testing.T) {
`└─TableFullScan_4 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo`))
},
},
{
// check request source
setFunc: func(tk *testkit.TestKit) any {
tk.MustExec(`set @@tidb_request_source_type="lightning"`)
require.Equal(t, "lightning", tk.Session().GetSessionVars().ExplicitRequestSourceType)
return nil
},
checkFunc: func(tk *testkit.TestKit, param any) {
tk.MustExec(`select count(*) from test.t1`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need this line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to check if the query will influence the request source.

tk.MustQuery(`select @@tidb_request_source_type`).Check(testkit.Rows("lightning"))
},
},
}

for _, tt := range tests {
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ go_library(
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//util",
"@com_github_twmb_murmur3//:murmur3",
"@org_golang_x_exp//maps",
"@org_golang_x_exp//slices",
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1324,6 +1324,8 @@ type SessionVars struct {

// RequestSourceType is the type of inner request.
RequestSourceType string
// ExplicitRequestSourceType is the type of origin external request.
ExplicitRequestSourceType string

// MemoryDebugModeMinHeapInUse indicated the minimum heapInUse threshold that triggers the memoryDebugMode.
MemoryDebugModeMinHeapInUse int64
Expand Down
7 changes: 7 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/pingcap/tidb/util/versioninfo"
tikvcfg "github.com/tikv/client-go/v2/config"
tikvstore "github.com/tikv/client-go/v2/kv"
tikvcliutil "github.com/tikv/client-go/v2/util"
atomic2 "go.uber.org/atomic"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -385,6 +386,12 @@ var defaultSysVars = []*SysVar{
{Scope: ScopeSession, Name: TiDBUseAlloc, Value: BoolToOnOff(DefTiDBUseAlloc), Type: TypeBool, ReadOnly: true, GetSession: func(s *SessionVars) (string, error) {
return BoolToOnOff(s.preUseChunkAlloc), nil
}},
{Scope: ScopeSession, Name: TiDBExplicitRequestSourceType, Value: "", Type: TypeEnum, PossibleValues: tikvcliutil.ExplicitTypeList, GetSession: func(s *SessionVars) (string, error) {
return s.ExplicitRequestSourceType, nil
}, SetSession: func(s *SessionVars, val string) error {
s.ExplicitRequestSourceType = val
return nil
}},
/* The system variables below have INSTANCE scope */
{Scope: ScopeInstance, Name: TiDBLogFileMaxDays, Value: strconv.Itoa(config.GetGlobalConfig().Log.File.MaxDays), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt32, SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
maxAge, err := strconv.ParseInt(val, 10, 32)
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ const (

// TiDBUseAlloc indicates whether the last statement used chunk alloc
TiDBUseAlloc = "last_sql_use_alloc"

// TiDBExplicitRequestSourceType indicates the source of the request, it's a complement of RequestSourceType.
// The value maybe "lightning", "br", "dumpling" etc.
TiDBExplicitRequestSourceType = "tidb_request_source_type"
)

// TiDB system variable names that both in session and global scope.
Expand Down
1 change: 1 addition & 0 deletions store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ go_library(
"@com_github_tikv_client_go_v2//metrics",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//tikvrpc/interceptor",
"@com_github_tikv_client_go_v2//txnkv/txnlock",
"@com_github_tikv_client_go_v2//txnkv/txnsnapshot",
"@com_github_tikv_client_go_v2//util",
Expand Down
2 changes: 2 additions & 0 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
"github.com/tikv/client-go/v2/txnkv/txnlock"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
"github.com/tikv/client-go/v2/util"
Expand Down Expand Up @@ -88,6 +89,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa
}
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
ctx = context.WithValue(ctx, util.RequestSourceKey, req.RequestSource)
glorv marked this conversation as resolved.
Show resolved Hide resolved
ctx = interceptor.WithRPCInterceptor(ctx, interceptor.GetRPCInterceptorFromCtx(ctx))
enabledRateLimitAction := option.EnabledRateLimitAction
sessionMemTracker := option.SessionMemTracker
it, errRes := c.BuildCopIterator(ctx, req, vars, option)
Expand Down
2 changes: 2 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) {
txn.KVTxn.SetRequestSourceInternal(val.(bool))
case kv.RequestSourceType:
txn.KVTxn.SetRequestSourceType(val.(string))
case kv.ExplicitRequestSourceType:
txn.KVTxn.SetExplicitRequestSourceType(val.(string))
case kv.ReplicaReadAdjuster:
txn.KVTxn.GetSnapshot().SetReplicaReadAdjuster(val.(txnkv.ReplicaReadAdjuster))
case kv.TxnSource:
Expand Down