Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle more detailed statistics from TiKV #536

Merged
merged 3 commits into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305 h1:TZ0teMZoKHnZDlJxNkWrp5Sgv3w+ruNbrqtBYKsfaNw=
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a h1:nP2wmyw9JTRsk5rm+tZtfAso6c/1FvuaFNbXTaYz3FE=
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee h1:VO2t6IBpfvW34TdtD/G10VvnGqjLic1jzOuHjUb5VqM=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
1 change: 0 additions & 1 deletion integration_tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,6 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a h1:nP2wmyw9JTRsk5rm+tZtfAso6c/1FvuaFNbXTaYz3FE=
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats() {
ProcessedVersions: 10,
ProcessedVersionsSize: 10,
TotalVersions: 15,
GetSnapshotNanos: 500,
RocksdbBlockReadCount: 20,
RocksdbBlockReadByte: 15,
RocksdbDeleteSkippedCount: 5,
Expand All @@ -322,6 +323,7 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats() {
"scan_detail: {total_process_keys: 10, " +
"total_process_keys_size: 10, " +
"total_keys: 15, " +
"get_snapshot_time: 500ns, " +
"rocksdb: {delete_skipped_count: 5, " +
"key_skipped_count: 1, " +
"block: {cache_hit_count: 10, read_count: 20, read_byte: 15 Bytes}}}"
Expand All @@ -332,6 +334,7 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats() {
"scan_detail: {total_process_keys: 20, " +
"total_process_keys_size: 20, " +
"total_keys: 30, " +
"get_snapshot_time: 1µs, " +
"rocksdb: {delete_skipped_count: 10, " +
"key_skipped_count: 2, " +
"block: {cache_hit_count: 20, read_count: 40, read_byte: 30 Bytes}}}"
Expand Down
51 changes: 37 additions & 14 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,9 @@ func (c *RPCClient) closeConns() {
}

var (
sendReqHistCache sync.Map
sendReqCounterCache sync.Map
sendReqHistCache sync.Map
sendReqCounterCache sync.Map
rpcNetLatencyHistCache sync.Map
)

type sendReqHistCacheKey struct {
Expand All @@ -394,42 +395,64 @@ type sendReqCounterCacheValue struct {
timeCounter prometheus.Counter
}

func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, start time.Time, staleRead bool) {
func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvrpc.Response, start time.Time, staleRead bool) {
elapsed := time.Since(start)
secs := elapsed.Seconds()
storeID := req.Context.GetPeer().GetStoreId()

histKey := sendReqHistCacheKey{
req.Type,
req.Context.GetPeer().GetStoreId(),
storeID,
staleRead,
}
counterKey := sendReqCounterCacheKey{
histKey,
req.GetRequestSource(),
}

reqType := req.Type.String()
var storeIDStr string

hist, ok := sendReqHistCache.Load(histKey)
if !ok {
reqType := req.Type.String()
storeID := strconv.FormatUint(req.Context.GetPeer().GetStoreId(), 10)
hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeID, strconv.FormatBool(staleRead))
if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10)
}
hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead))
sendReqHistCache.Store(histKey, hist)
}
counter, ok := sendReqCounterCache.Load(counterKey)
if !ok {
reqType := req.Type.String()
storeID := strconv.FormatUint(req.Context.GetPeer().GetStoreId(), 10)
if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10)
}
counter = sendReqCounterCacheValue{
metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeID, strconv.FormatBool(staleRead), counterKey.requestSource),
metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeID, strconv.FormatBool(staleRead), counterKey.requestSource),
metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), counterKey.requestSource),
metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), counterKey.requestSource),
}
sendReqCounterCache.Store(counterKey, counter)
}

secs := time.Since(start).Seconds()
hist.(prometheus.Observer).Observe(secs)
counter.(sendReqCounterCacheValue).counter.Inc()
counter.(sendReqCounterCacheValue).timeCounter.Add(secs)

if execDetail, err := resp.GetExecDetailsV2(); err == nil &&
execDetail != nil && execDetail.TimeDetail != nil && execDetail.TimeDetail.TotalRpcWallTimeNs > 0 {
latHist, ok := rpcNetLatencyHistCache.Load(storeID)
if !ok {
if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10)
}
latHist = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(storeIDStr)
sendReqHistCache.Store(storeID, latHist)
}
latency := elapsed - time.Duration(execDetail.TimeDetail.TotalRpcWallTimeNs)*time.Nanosecond
latHist.(prometheus.Observer).Observe(latency.Seconds())
}
}

func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, err error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan(fmt.Sprintf("rpcClient.SendRequest, region ID: %d, type: %s", req.RegionId, req.Type), opentracing.ChildOf(span.Context()))
defer span1.Finish()
Expand All @@ -456,7 +479,7 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R
detail := stmtExec.(*util.ExecDetails)
atomic.AddInt64(&detail.WaitKVRespDuration, int64(time.Since(start)))
}
c.updateTiKVSendReqHistogram(req, start, staleRead)
c.updateTiKVSendReqHistogram(req, resp, start, staleRead)
}()

// TiDB RPC server supports batch RPC, but batch connection will send heart beat, It's not necessary since
Expand Down
11 changes: 11 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ var (
TiKVSendReqHistogram *prometheus.HistogramVec
TiKVSendReqCounter *prometheus.CounterVec
TiKVSendReqTimeCounter *prometheus.CounterVec
TiKVRPCNetLatencyHistogram *prometheus.HistogramVec
TiKVCoprocessorHistogram *prometheus.HistogramVec
TiKVLockResolverCounter *prometheus.CounterVec
TiKVRegionErrorCounter *prometheus.CounterVec
Expand Down Expand Up @@ -161,6 +162,15 @@ func initMetrics(namespace, subsystem string) {
Help: "Counter of request time with multi dimensions.",
}, []string{LblType, LblStore, LblStaleRead, LblSource})

TiKVRPCNetLatencyHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "rpc_net_latency_seconds",
Help: "Bucketed histogram of time difference between TiDB and TiKV.",
Buckets: prometheus.ExponentialBuckets(5e-5, 2, 18), // 50us ~ 6.5s
}, []string{LblStore})

TiKVCoprocessorHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Expand Down Expand Up @@ -599,6 +609,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVSendReqHistogram)
prometheus.MustRegister(TiKVSendReqCounter)
prometheus.MustRegister(TiKVSendReqTimeCounter)
prometheus.MustRegister(TiKVRPCNetLatencyHistogram)
prometheus.MustRegister(TiKVCoprocessorHistogram)
prometheus.MustRegister(TiKVLockResolverCounter)
prometheus.MustRegister(TiKVRegionErrorCounter)
Expand Down
19 changes: 19 additions & 0 deletions tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,25 @@ func (resp *Response) GetRegionError() (*errorpb.Error, error) {
return err.GetRegionError(), nil
}

type getExecDetailsV2 interface {
GetExecDetailsV2() *kvrpcpb.ExecDetailsV2
}

// GetExecDetailsV2 returns the ExecDetailsV2 of the underlying concrete response.
func (resp *Response) GetExecDetailsV2() (*kvrpcpb.ExecDetailsV2, error) {
if resp == nil || resp.Resp == nil {
return nil, nil
}
details, ok := resp.Resp.(getExecDetailsV2)
if !ok {
if _, isEmpty := resp.Resp.(*tikvpb.BatchCommandsEmptyResponse); isEmpty {
return nil, nil
}
return nil, errors.Errorf("invalid response type %v", resp)
}
return details.GetExecDetailsV2(), nil
}

// CallRPC launches a rpc call.
// ch is needed to implement timeout for coprocessor streaming, the stream object's
// cancel function will be sent to the channel, together with a lease checked by a background goroutine.
Expand Down
5 changes: 4 additions & 1 deletion txnkv/transaction/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer,
sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient())
for {
attempts++
if time.Since(tBegin) > slowRequestThreshold {
reqBegin := time.Now()
if reqBegin.Sub(tBegin) > slowRequestThreshold {
logutil.BgLogger().Warn("slow commit request", zap.Uint64("startTS", c.startTS), zap.Stringer("region", &batch.region), zap.Int("attempts", attempts))
tBegin = time.Now()
}
Expand Down Expand Up @@ -140,6 +141,8 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer,
// we can clean undetermined error.
if batch.isPrimary && !c.isAsyncCommit() {
c.setUndeterminedErr(nil)
reqDuration := time.Since(reqBegin)
c.getDetail().MergeReqDetails(reqDuration, batch.region.GetID(), sender.GetStoreAddr(), commitResp.ExecDetailsV2)
}
if keyErr := commitResp.GetError(); keyErr != nil {
if rejected := keyErr.GetCommitTsExpired(); rejected != nil {
Expand Down
8 changes: 6 additions & 2 deletions txnkv/transaction/pessimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,12 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
time.Sleep(300 * time.Millisecond)
return errors.WithStack(&tikverr.ErrWriteConflict{WriteConflict: nil})
}
sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient())
startTime := time.Now()
resp, err := c.store.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
resp, err := sender.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
reqDuration := time.Since(startTime)
if action.LockCtx.Stats != nil {
atomic.AddInt64(&action.LockCtx.Stats.LockRPCTime, int64(time.Since(startTime)))
atomic.AddInt64(&action.LockCtx.Stats.LockRPCTime, int64(reqDuration))
atomic.AddInt64(&action.LockCtx.Stats.LockRPCCount, 1)
}
if err != nil {
Expand Down Expand Up @@ -183,6 +185,8 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
lockResp := resp.Resp.(*kvrpcpb.PessimisticLockResponse)
keyErrs := lockResp.GetErrors()
if len(keyErrs) == 0 {
action.LockCtx.Stats.MergeReqDetails(reqDuration, batch.region.GetID(), sender.GetStoreAddr(), lockResp.ExecDetailsV2)

if batch.isPrimary {
// After locking the primary key, we should protect the primary lock from expiring
// now in case locking the remaining keys take a long time.
Expand Down
8 changes: 7 additions & 1 deletion txnkv/transaction/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
if attempts > 1 || action.retry {
req.IsRetryRequest = true
}
if time.Since(tBegin) > slowRequestThreshold {
reqBegin := time.Now()
if reqBegin.Sub(tBegin) > slowRequestThreshold {
logutil.BgLogger().Warn("slow prewrite request", zap.Uint64("startTS", c.startTS), zap.Stringer("region", &batch.region), zap.Int("attempts", attempts))
tBegin = time.Now()
}
Expand Down Expand Up @@ -305,6 +306,10 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
// Clear the RPC Error since the request is evaluated successfully.
sender.SetRPCError(nil)

// Update CommitDetails
reqDuration := time.Since(reqBegin)
c.getDetail().MergeReqDetails(reqDuration, batch.region.GetID(), sender.GetStoreAddr(), prewriteResp.ExecDetailsV2)

if batch.isPrimary {
// After writing the primary key, if the size of the transaction is larger than 32M,
// start the ttlManager. The ttlManager will be closed in tikvTxn.Commit().
Expand Down Expand Up @@ -358,6 +363,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
c.mu.Unlock()
}
}

return nil
}
var locks []*txnlock.Lock
Expand Down
Loading