Skip to content

Commit

Permalink
support more detailed statistics returned from TiKV
Browse files Browse the repository at this point in the history
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
  • Loading branch information
sticnarf committed Jul 11, 2022
1 parent 1c198aa commit a333aa5
Show file tree
Hide file tree
Showing 10 changed files with 357 additions and 43 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305 h1:TZ0teMZoKHnZDlJxNkWrp5Sgv3w+ruNbrqtBYKsfaNw=
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a h1:nP2wmyw9JTRsk5rm+tZtfAso6c/1FvuaFNbXTaYz3FE=
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee h1:VO2t6IBpfvW34TdtD/G10VvnGqjLic1jzOuHjUb5VqM=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
1 change: 0 additions & 1 deletion integration_tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,6 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a h1:nP2wmyw9JTRsk5rm+tZtfAso6c/1FvuaFNbXTaYz3FE=
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
51 changes: 37 additions & 14 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,9 @@ func (c *RPCClient) closeConns() {
}

var (
sendReqHistCache sync.Map
sendReqCounterCache sync.Map
sendReqHistCache sync.Map
sendReqCounterCache sync.Map
rpcNetLatencyHistCache sync.Map
)

type sendReqHistCacheKey struct {
Expand All @@ -394,42 +395,64 @@ type sendReqCounterCacheValue struct {
timeCounter prometheus.Counter
}

func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, start time.Time, staleRead bool) {
func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvrpc.Response, start time.Time, staleRead bool) {
elapsed := time.Since(start)
secs := elapsed.Seconds()
storeID := req.Context.GetPeer().GetStoreId()

histKey := sendReqHistCacheKey{
req.Type,
req.Context.GetPeer().GetStoreId(),
storeID,
staleRead,
}
counterKey := sendReqCounterCacheKey{
histKey,
req.GetRequestSource(),
}

reqType := req.Type.String()
var storeIDStr string

hist, ok := sendReqHistCache.Load(histKey)
if !ok {
reqType := req.Type.String()
storeID := strconv.FormatUint(req.Context.GetPeer().GetStoreId(), 10)
hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeID, strconv.FormatBool(staleRead))
if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10)
}
hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead))
sendReqHistCache.Store(histKey, hist)
}
counter, ok := sendReqCounterCache.Load(counterKey)
if !ok {
reqType := req.Type.String()
storeID := strconv.FormatUint(req.Context.GetPeer().GetStoreId(), 10)
if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10)
}
counter = sendReqCounterCacheValue{
metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeID, strconv.FormatBool(staleRead), counterKey.requestSource),
metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeID, strconv.FormatBool(staleRead), counterKey.requestSource),
metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), counterKey.requestSource),
metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), counterKey.requestSource),
}
sendReqCounterCache.Store(counterKey, counter)
}

secs := time.Since(start).Seconds()
hist.(prometheus.Observer).Observe(secs)
counter.(sendReqCounterCacheValue).counter.Inc()
counter.(sendReqCounterCacheValue).timeCounter.Add(secs)

if execDetail, err := resp.GetExecDetailsV2(); err == nil &&
execDetail != nil && execDetail.TimeDetail != nil && execDetail.TimeDetail.TotalRpcWallTimeNs > 0 {
latHist, ok := rpcNetLatencyHistCache.Load(storeID)
if !ok {
if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10)
}
latHist = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(storeIDStr)
sendReqHistCache.Store(storeID, latHist)
}
latency := elapsed - time.Duration(execDetail.TimeDetail.TotalRpcWallTimeNs)*time.Nanosecond
latHist.(prometheus.Observer).Observe(latency.Seconds())
}
}

func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, err error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan(fmt.Sprintf("rpcClient.SendRequest, region ID: %d, type: %s", req.RegionId, req.Type), opentracing.ChildOf(span.Context()))
defer span1.Finish()
Expand All @@ -456,7 +479,7 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R
detail := stmtExec.(*util.ExecDetails)
atomic.AddInt64(&detail.WaitKVRespDuration, int64(time.Since(start)))
}
c.updateTiKVSendReqHistogram(req, start, staleRead)
c.updateTiKVSendReqHistogram(req, resp, start, staleRead)
}()

// TiDB RPC server supports batch RPC, but batch connection will send heart beat, It's not necessary since
Expand Down
11 changes: 11 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ var (
TiKVSendReqHistogram *prometheus.HistogramVec
TiKVSendReqCounter *prometheus.CounterVec
TiKVSendReqTimeCounter *prometheus.CounterVec
TiKVRPCNetLatencyHistogram *prometheus.HistogramVec
TiKVCoprocessorHistogram *prometheus.HistogramVec
TiKVLockResolverCounter *prometheus.CounterVec
TiKVRegionErrorCounter *prometheus.CounterVec
Expand Down Expand Up @@ -161,6 +162,15 @@ func initMetrics(namespace, subsystem string) {
Help: "Counter of request time with multi dimensions.",
}, []string{LblType, LblStore, LblStaleRead, LblSource})

TiKVRPCNetLatencyHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "rpc_net_latency_seconds",
Help: "Bucketed histogram of time difference between TiDB and TiKV.",
Buckets: prometheus.ExponentialBuckets(5e-5, 2, 18), // 50us ~ 6.5s
}, []string{LblStore})

TiKVCoprocessorHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Expand Down Expand Up @@ -599,6 +609,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVSendReqHistogram)
prometheus.MustRegister(TiKVSendReqCounter)
prometheus.MustRegister(TiKVSendReqTimeCounter)
prometheus.MustRegister(TiKVRPCNetLatencyHistogram)
prometheus.MustRegister(TiKVCoprocessorHistogram)
prometheus.MustRegister(TiKVLockResolverCounter)
prometheus.MustRegister(TiKVRegionErrorCounter)
Expand Down
19 changes: 19 additions & 0 deletions tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,25 @@ func (resp *Response) GetRegionError() (*errorpb.Error, error) {
return err.GetRegionError(), nil
}

type getExecDetailsV2 interface {
GetExecDetailsV2() *kvrpcpb.ExecDetailsV2
}

// GetExecDetailsV2 returns the ExecDetailsV2 of the underlying concrete response.
func (resp *Response) GetExecDetailsV2() (*kvrpcpb.ExecDetailsV2, error) {
if resp == nil || resp.Resp == nil {
return nil, nil
}
details, ok := resp.Resp.(getExecDetailsV2)
if !ok {
if _, isEmpty := resp.Resp.(*tikvpb.BatchCommandsEmptyResponse); isEmpty {
return nil, nil
}
return nil, errors.Errorf("invalid response type %v", resp)
}
return details.GetExecDetailsV2(), nil
}

// CallRPC launches a rpc call.
// ch is needed to implement timeout for coprocessor streaming, the stream object's
// cancel function will be sent to the channel, together with a lease checked by a background goroutine.
Expand Down
5 changes: 4 additions & 1 deletion txnkv/transaction/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer,
sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient())
for {
attempts++
if time.Since(tBegin) > slowRequestThreshold {
reqBegin := time.Now()
if reqBegin.Sub(tBegin) > slowRequestThreshold {
logutil.BgLogger().Warn("slow commit request", zap.Uint64("startTS", c.startTS), zap.Stringer("region", &batch.region), zap.Int("attempts", attempts))
tBegin = time.Now()
}
Expand Down Expand Up @@ -140,6 +141,8 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer,
// we can clean undetermined error.
if batch.isPrimary && !c.isAsyncCommit() {
c.setUndeterminedErr(nil)
reqDuration := time.Since(reqBegin)
c.getDetail().MergeReqDetails(reqDuration, batch.region.GetID(), sender.GetStoreAddr(), commitResp.ExecDetailsV2)
}
if keyErr := commitResp.GetError(); keyErr != nil {
if rejected := keyErr.GetCommitTsExpired(); rejected != nil {
Expand Down
8 changes: 6 additions & 2 deletions txnkv/transaction/pessimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,12 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
time.Sleep(300 * time.Millisecond)
return errors.WithStack(&tikverr.ErrWriteConflict{WriteConflict: nil})
}
sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient())
startTime := time.Now()
resp, err := c.store.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
resp, err := sender.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
reqDuration := time.Since(startTime)
if action.LockCtx.Stats != nil {
atomic.AddInt64(&action.LockCtx.Stats.LockRPCTime, int64(time.Since(startTime)))
atomic.AddInt64(&action.LockCtx.Stats.LockRPCTime, int64(reqDuration))
atomic.AddInt64(&action.LockCtx.Stats.LockRPCCount, 1)
}
if err != nil {
Expand Down Expand Up @@ -183,6 +185,8 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
lockResp := resp.Resp.(*kvrpcpb.PessimisticLockResponse)
keyErrs := lockResp.GetErrors()
if len(keyErrs) == 0 {
action.LockCtx.Stats.MergeReqDetails(reqDuration, batch.region.GetID(), sender.GetStoreAddr(), lockResp.ExecDetailsV2)

if batch.isPrimary {
// After locking the primary key, we should protect the primary lock from expiring
// now in case locking the remaining keys take a long time.
Expand Down
8 changes: 7 additions & 1 deletion txnkv/transaction/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
if attempts > 1 || action.retry {
req.IsRetryRequest = true
}
if time.Since(tBegin) > slowRequestThreshold {
reqBegin := time.Now()
if reqBegin.Sub(tBegin) > slowRequestThreshold {
logutil.BgLogger().Warn("slow prewrite request", zap.Uint64("startTS", c.startTS), zap.Stringer("region", &batch.region), zap.Int("attempts", attempts))
tBegin = time.Now()
}
Expand Down Expand Up @@ -305,6 +306,10 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
// Clear the RPC Error since the request is evaluated successfully.
sender.SetRPCError(nil)

// Update CommitDetails
reqDuration := time.Since(reqBegin)
c.getDetail().MergeReqDetails(reqDuration, batch.region.GetID(), sender.GetStoreAddr(), prewriteResp.ExecDetailsV2)

if batch.isPrimary {
// After writing the primary key, if the size of the transaction is larger than 32M,
// start the ttlManager. The ttlManager will be closed in tikvTxn.Commit().
Expand Down Expand Up @@ -358,6 +363,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
c.mu.Unlock()
}
}

return nil
}
var locks []*txnlock.Lock
Expand Down
Loading

0 comments on commit a333aa5

Please sign in to comment.