Skip to content

Commit

Permalink
Implement RequestInfo and ResponseInfo for RU calculation
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Dec 26, 2022
1 parent 9051b71 commit 3174a4d
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 80 deletions.
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ module github.com/tikv/client-go/v2

go 1.18

replace (
github.com/pingcap/kvproto => github.com/nolouch/kvproto v0.0.0-20221220074114-39f3378735d2
github.com/tikv/pd => github.com/tidblabs/pd v0.0.0-20221226085939-3b2f71ab2ba0
github.com/tikv/pd/client => github.com/tidblabs/pd/client v0.0.0-20221226085939-3b2f71ab2ba0
)

require (
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2
Expand All @@ -20,7 +26,7 @@ require (
github.com/prometheus/client_model v0.2.0
github.com/stathat/consistent v1.0.0
github.com/stretchr/testify v1.8.0
github.com/tikv/pd v0.0.0-20221031025758-80f0d8ca4d07
github.com/tikv/pd v0.0.0-00010101000000-000000000000
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07
github.com/twmb/murmur3 v1.1.3
go.etcd.io/etcd/api/v3 v3.5.2
Expand Down Expand Up @@ -59,9 +65,3 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
stathat.com/c/consistent v1.0.0 // indirect
)

replace github.com/tikv/pd => github.com/tidblabs/pd v0.0.0-20221222021041-db3be941c254

replace github.com/tikv/pd/client => github.com/tidblabs/pd/client v0.0.0-20221222021041-db3be941c254

replace github.com/pingcap/kvproto => github.com/nolouch/kvproto v0.0.0-20221220074114-39f3378735d2
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,10 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/tidblabs/pd v0.0.0-20221222021041-db3be941c254 h1:wc3p2J1SiYeyGS5ClZ67ZDg2ouQiBrU+mjtkzErxi7s=
github.com/tidblabs/pd v0.0.0-20221222021041-db3be941c254/go.mod h1:PpgqnYUA6dnuHKmol7XVxkb6oKxzJ+YjsOamWCwrATw=
github.com/tidblabs/pd/client v0.0.0-20221222021041-db3be941c254 h1:ChBFxejvM8yhi6E7R6EDq9sGXlqWyIwJcWy0ri5Pd4A=
github.com/tidblabs/pd/client v0.0.0-20221222021041-db3be941c254/go.mod h1:+YPCXGf03hN+LJV4ZvYjgnj49KnCdyffAuog6iFjdfI=
github.com/tidblabs/pd v0.0.0-20221226085939-3b2f71ab2ba0 h1:A3LJXELZ260/EDoB3dhnteCN8CYB3dUISF2Hu0v/G/E=
github.com/tidblabs/pd v0.0.0-20221226085939-3b2f71ab2ba0/go.mod h1:PpgqnYUA6dnuHKmol7XVxkb6oKxzJ+YjsOamWCwrATw=
github.com/tidblabs/pd/client v0.0.0-20221226085939-3b2f71ab2ba0 h1:H2UTFrRRHf+gtBstNf1w8arSi4+RrATE7ufWWVCn1KQ=
github.com/tidblabs/pd/client v0.0.0-20221226085939-3b2f71ab2ba0/go.mod h1:+YPCXGf03hN+LJV4ZvYjgnj49KnCdyffAuog6iFjdfI=
github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA=
github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
14 changes: 14 additions & 0 deletions internal/client/client_interceptor.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2021 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
Expand Down
84 changes: 57 additions & 27 deletions keyspace/resourcegroup/resourcemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ import (
"go.uber.org/zap"
)

// RequestInfo contains information about a request that is able to calculate the RU cost
// before the request is sent. Specifically, the write bytes RU cost of a write request
// could be calculated by its key size to write.
type RequestInfo struct {

// writeBytes is the write size if the request is a write, or -1 if it is a read.
// writeBytes is the actual write size if the request is a write request,
// or -1 if it's a read request.
writeBytes int64
}

Expand All @@ -34,58 +37,85 @@ func MakeRequestInfo(req *tikvrpc.Request) *RequestInfo {
return &RequestInfo{writeBytes: writeBytes}
}

// IsWrite returns whether the request is a write, and if so the write size in
// bytes.
// IsWrite returns whether the request is a write request.
func (req *RequestInfo) IsWrite() bool {
return req.writeBytes > -1
}

// WriteBytes returns the actual write size of the request,
// -1 will be returned if it's not a write request.
func (req *RequestInfo) WriteBytes() uint64 {
return uint64(req.writeBytes)
}

// ResponseInfo contains information about a response that is able to calculate the RU cost
// after the response is received. Specifically, the read bytes RU cost of a read request
// could be calculated by its response size, and the KV CPU time RU cost of a request could
// be calculated by its execution details info.
type ResponseInfo struct {
readBytes int64
readBytes uint64
kvCPUMs uint64
}

// MakeResponseInfo extracts the relevant information from a BatchResponse.
func MakeResponseInfo(resp *tikvrpc.Response) *ResponseInfo {
var (
readBytes int64
detailV2 *kvrpcpb.ExecDetailsV2
detail *kvrpcpb.ExecDetails
)
if resp.Resp == nil {
return &ResponseInfo{readBytes}
return &ResponseInfo{}
}
// Parse the response to extract the info.
var (
readBytes uint64
detailsV2 *kvrpcpb.ExecDetailsV2
details *kvrpcpb.ExecDetails
)
switch r := resp.Resp.(type) {
case *coprocessor.Response:
detailV2 = r.ExecDetailsV2
detail = r.ExecDetails
detailsV2 = r.GetExecDetailsV2()
details = r.GetExecDetails()
readBytes = uint64(r.Data.Size())
case *tikvrpc.CopStreamResponse:
// streaming request returns io.EOF, so the first CopStreamResponse.Response maybe nil.
// Streaming request returns `io.EOF``, so the first `CopStreamResponse.Response`` may be nil.
if r.Response != nil {
detailV2 = r.Response.ExecDetailsV2
detail = r.Response.ExecDetails
detailsV2 = r.Response.GetExecDetailsV2()
details = r.Response.GetExecDetails()
}
readBytes = uint64(r.Data.Size())
case *kvrpcpb.GetResponse:
detailsV2 = r.GetExecDetailsV2()
case *kvrpcpb.BatchGetResponse:
detailsV2 = r.GetExecDetailsV2()
case *kvrpcpb.ScanResponse:
// TODO: using a more accurate size rather than using the whole response size as the read bytes.
readBytes = uint64(r.Size())
default:
log.Warn("[kv resource]unreachable resp type", zap.Any("type", reflect.TypeOf(r)))
return &ResponseInfo{readBytes}
// TODO: support the raw kv response maybe.
log.Warn("[kv resource] unknown response type to collect the info", zap.Any("type", reflect.TypeOf(r)))
return &ResponseInfo{}
}

if detailV2 != nil && detailV2.TimeDetail != nil {
readBytes = int64(detailV2.ScanDetailV2.RocksdbBlockReadByte)
} else if detail != nil && detail.TimeDetail != nil {
readBytes = detail.ScanDetail.Lock.ReadBytes + detail.ScanDetail.Write.ReadBytes + detail.ScanDetail.Write.ReadBytes
// Try to get read bytes from the `detailsV2`.
// TODO: clarify whether we should count the underlying storage engine read bytes or not.
if scanDetail := detailsV2.GetScanDetailV2(); scanDetail != nil {
readBytes = scanDetail.GetProcessedVersionsSize()
}
// Get the KV CPU time in milliseconds from the execution time details.
kvCPUMs := getKVCPUMs(detailsV2, details)
return &ResponseInfo{readBytes: readBytes, kvCPUMs: kvCPUMs}
}

return &ResponseInfo{readBytes: readBytes}
func getKVCPUMs(detailsV2 *kvrpcpb.ExecDetailsV2, details *kvrpcpb.ExecDetails) uint64 {
if timeDetail := detailsV2.GetTimeDetail(); timeDetail != nil {
return timeDetail.GetProcessWallTimeMs()
}
if timeDetail := details.GetTimeDetail(); timeDetail != nil {
return timeDetail.GetProcessWallTimeMs()
}
return 0
}

func (res *ResponseInfo) ReadBytes() uint64 {
return uint64(res.readBytes)
return res.readBytes
}

func (res *ResponseInfo) KVCPUms() uint64 {
return 10
func (res *ResponseInfo) KVCPUMs() uint64 {
return res.kvCPUMs
}
76 changes: 34 additions & 42 deletions tikvrpc/interceptor/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,43 +32,38 @@ import (
//
// We can implement an RPCInterceptor like this:
// ```
//
// func LogInterceptor(next InterceptorFunc) RPCInterceptorFunc {
// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
// log.Println("before")
// resp, err := next(target, req)
// log.Println("after")
// return resp, err
// }
// }
//
// func LogInterceptor(next InterceptorFunc) RPCInterceptorFunc {
// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
// log.Println("before")
// resp, err := next(target, req)
// log.Println("after")
// return resp, err
// }
// }
// txn.SetRPCInterceptor(LogInterceptor)
// ```
//
// Or you want to inject some dependent modules:
// ```
//
// func GetLogInterceptor(lg *log.Logger) RPCInterceptor {
// return func(next RPCInterceptorFunc) RPCInterceptorFunc {
// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
// lg.Println("before")
// resp, err := next(target, req)
// lg.Println("after")
// return resp, err
// }
// }
// }
//
// func GetLogInterceptor(lg *log.Logger) RPCInterceptor {
// return func(next RPCInterceptorFunc) RPCInterceptorFunc {
// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
// lg.Println("before")
// resp, err := next(target, req)
// lg.Println("after")
// return resp, err
// }
// }
// }
// txn.SetRPCInterceptor(GetLogInterceptor())
// ```
//
// NOTE: Interceptor calls may not correspond one-to-one with the underlying gRPC requests.
// This is because there may be some exceptions, such as: request batched, no
// valid connection etc. If you have questions about the execution location of
// RPCInterceptor, please refer to:
//
// tikv/kv.go#NewKVStore()
// internal/client/client_interceptor.go#SendRequest.
// tikv/kv.go#NewKVStore()
// internal/client/client_interceptor.go#SendRequest.
type RPCInterceptor func(next RPCInterceptorFunc) RPCInterceptorFunc

// RPCInterceptorFunc is a callable function used to initiate a request to TiKV.
Expand All @@ -82,23 +77,20 @@ type RPCInterceptorFunc func(target string, req *tikvrpc.Request) (*tikvrpc.Resp
//
// We can use RPCInterceptorChain like this:
// ```
//
// func Interceptor1(next InterceptorFunc) RPCInterceptorFunc {
// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
// fmt.Println("begin-interceptor-1")
// defer fmt.Println("end-interceptor-1")
// return next(target, req)
// }
// }
//
// func Interceptor2(next InterceptorFunc) RPCInterceptorFunc {
// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
// fmt.Println("begin-interceptor-2")
// defer fmt.Println("end-interceptor-2")
// return next(target, req)
// }
// }
//
// func Interceptor1(next InterceptorFunc) RPCInterceptorFunc {
// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
// fmt.Println("begin-interceptor-1")
// defer fmt.Println("end-interceptor-1")
// return next(target, req)
// }
// }
// func Interceptor2(next InterceptorFunc) RPCInterceptorFunc {
// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
// fmt.Println("begin-interceptor-2")
// defer fmt.Println("end-interceptor-2")
// return next(target, req)
// }
// }
// txn.SetRPCInterceptor(NewRPCInterceptorChain().Link(Interceptor1).Link(Interceptor2).Build())
// ```
//
Expand Down

0 comments on commit 3174a4d

Please sign in to comment.