Skip to content

Commit e1de48f

Browse files
committed
support write details
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
1 parent 114ba40 commit e1de48f

File tree

10 files changed

+349
-43
lines changed

10 files changed

+349
-43
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ require (
1616
github.com/opentracing/opentracing-go v1.2.0
1717
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
1818
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
19-
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305
19+
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a
2020
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee
2121
github.com/pkg/errors v0.9.1
2222
github.com/prometheus/client_golang v1.11.0

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,8 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB
161161
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
162162
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
163163
github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
164-
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305 h1:TZ0teMZoKHnZDlJxNkWrp5Sgv3w+ruNbrqtBYKsfaNw=
165-
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
164+
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a h1:nP2wmyw9JTRsk5rm+tZtfAso6c/1FvuaFNbXTaYz3FE=
165+
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
166166
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
167167
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee h1:VO2t6IBpfvW34TdtD/G10VvnGqjLic1jzOuHjUb5VqM=
168168
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=

integration_tests/go.sum

-1
Original file line numberDiff line numberDiff line change
@@ -860,7 +860,6 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN
860860
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
861861
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
862862
github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
863-
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
864863
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a h1:nP2wmyw9JTRsk5rm+tZtfAso6c/1FvuaFNbXTaYz3FE=
865864
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
866865
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=

internal/client/client.go

+37-14
Original file line numberDiff line numberDiff line change
@@ -345,8 +345,9 @@ func (c *RPCClient) closeConns() {
345345
}
346346

347347
var (
348-
sendReqHistCache sync.Map
349-
sendReqCounterCache sync.Map
348+
sendReqHistCache sync.Map
349+
sendReqCounterCache sync.Map
350+
rpcNetLatencyHistmCache sync.Map
350351
)
351352

352353
type sendReqHistCacheKey struct {
@@ -365,43 +366,65 @@ type sendReqCounterCacheValue struct {
365366
timeCounter prometheus.Counter
366367
}
367368

368-
func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, start time.Time, staleRead bool) {
369+
func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvrpc.Response, start time.Time, staleRead bool) {
370+
elapsed := time.Since(start)
371+
secs := elapsed.Seconds()
372+
storeID := req.Context.GetPeer().GetStoreId()
373+
369374
histKey := sendReqHistCacheKey{
370375
req.Type,
371-
req.Context.GetPeer().GetStoreId(),
376+
storeID,
372377
staleRead,
373378
}
374379
counterKey := sendReqCounterCacheKey{
375380
histKey,
376381
req.GetRequestSource(),
377382
}
378383

384+
reqType := req.Type.String()
385+
var storeIDStr string
386+
379387
hist, ok := sendReqHistCache.Load(histKey)
380388
if !ok {
381-
reqType := req.Type.String()
382-
storeID := strconv.FormatUint(req.Context.GetPeer().GetStoreId(), 10)
383-
hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeID, strconv.FormatBool(staleRead))
389+
if len(storeIDStr) == 0 {
390+
storeIDStr = strconv.FormatUint(storeID, 10)
391+
}
392+
hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead))
384393
sendReqHistCache.Store(histKey, hist)
385394
}
386395
counter, ok := sendReqCounterCache.Load(counterKey)
387396
if !ok {
388-
reqType := req.Type.String()
389-
storeID := strconv.FormatUint(req.Context.GetPeer().GetStoreId(), 10)
397+
if len(storeIDStr) == 0 {
398+
storeIDStr = strconv.FormatUint(storeID, 10)
399+
}
390400
counter = sendReqCounterCacheValue{
391-
metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeID, strconv.FormatBool(staleRead), counterKey.requestSource),
392-
metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeID, strconv.FormatBool(staleRead), counterKey.requestSource),
401+
metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), counterKey.requestSource),
402+
metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), counterKey.requestSource),
393403
}
394404
sendReqCounterCache.Store(counterKey, counter)
395405
}
396406

397-
secs := time.Since(start).Seconds()
398407
hist.(prometheus.Observer).Observe(secs)
399408
counter.(sendReqCounterCacheValue).counter.Inc()
400409
counter.(sendReqCounterCacheValue).timeCounter.Add(secs)
410+
411+
if execDetail, err := resp.GetExecDetailsV2(); err == nil &&
412+
execDetail != nil && execDetail.TimeDetail != nil && execDetail.TimeDetail.TotalRpcWallTimeNs > 0 {
413+
latHist, ok := rpcNetLatencyHistmCache.Load(storeID)
414+
if !ok {
415+
if len(storeIDStr) == 0 {
416+
storeIDStr = strconv.FormatUint(storeID, 10)
417+
}
418+
latHist = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(storeIDStr)
419+
sendReqHistCache.Store(storeID, latHist)
420+
}
421+
latency := elapsed - time.Duration(execDetail.TimeDetail.TotalRpcWallTimeNs)*time.Nanosecond
422+
latHist.(prometheus.Observer).Observe(latency.Seconds())
423+
}
401424
}
402425

403426
// SendRequest sends a Request to server and receives Response.
404-
func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
427+
func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, err error) {
405428
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
406429
span1 := span.Tracer().StartSpan(fmt.Sprintf("rpcClient.SendRequest, region ID: %d, type: %s", req.RegionId, req.Type), opentracing.ChildOf(span.Context()))
407430
defer span1.Finish()
@@ -428,7 +451,7 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
428451
detail := stmtExec.(*util.ExecDetails)
429452
atomic.AddInt64(&detail.WaitKVRespDuration, int64(time.Since(start)))
430453
}
431-
c.updateTiKVSendReqHistogram(req, start, staleRead)
454+
c.updateTiKVSendReqHistogram(req, resp, start, staleRead)
432455
}()
433456

434457
// TiDB RPC server supports batch RPC, but batch connection will send heart beat, It's not necessary since

metrics/metrics.go

+11
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ var (
4646
TiKVSendReqHistogram *prometheus.HistogramVec
4747
TiKVSendReqCounter *prometheus.CounterVec
4848
TiKVSendReqTimeCounter *prometheus.CounterVec
49+
TiKVRPCNetLatencyHistogram *prometheus.HistogramVec
4950
TiKVCoprocessorHistogram *prometheus.HistogramVec
5051
TiKVLockResolverCounter *prometheus.CounterVec
5152
TiKVRegionErrorCounter *prometheus.CounterVec
@@ -161,6 +162,15 @@ func initMetrics(namespace, subsystem string) {
161162
Help: "Counter of request time with multi dimensions.",
162163
}, []string{LblType, LblStore, LblStaleRead, LblSource})
163164

165+
TiKVRPCNetLatencyHistogram = prometheus.NewHistogramVec(
166+
prometheus.HistogramOpts{
167+
Namespace: namespace,
168+
Subsystem: subsystem,
169+
Name: "rpc_net_latency_seconds",
170+
Help: "Bucketed histogram of time difference between TiDB and TiKV.",
171+
Buckets: prometheus.ExponentialBuckets(5e-5, 2, 18), // 50us ~ 6.5s
172+
}, []string{LblStore})
173+
164174
TiKVCoprocessorHistogram = prometheus.NewHistogramVec(
165175
prometheus.HistogramOpts{
166176
Namespace: namespace,
@@ -599,6 +609,7 @@ func RegisterMetrics() {
599609
prometheus.MustRegister(TiKVSendReqHistogram)
600610
prometheus.MustRegister(TiKVSendReqCounter)
601611
prometheus.MustRegister(TiKVSendReqTimeCounter)
612+
prometheus.MustRegister(TiKVRPCNetLatencyHistogram)
602613
prometheus.MustRegister(TiKVCoprocessorHistogram)
603614
prometheus.MustRegister(TiKVLockResolverCounter)
604615
prometheus.MustRegister(TiKVRegionErrorCounter)

tikvrpc/tikvrpc.go

+19
Original file line numberDiff line numberDiff line change
@@ -926,6 +926,25 @@ func (resp *Response) GetRegionError() (*errorpb.Error, error) {
926926
return err.GetRegionError(), nil
927927
}
928928

929+
type getExecDetailsV2 interface {
930+
GetExecDetailsV2() *kvrpcpb.ExecDetailsV2
931+
}
932+
933+
// GetExecDetailsV2 returns the ExecDetailsV2 of the underlying concrete response.
934+
func (resp *Response) GetExecDetailsV2() (*kvrpcpb.ExecDetailsV2, error) {
935+
if resp == nil || resp.Resp == nil {
936+
return nil, nil
937+
}
938+
details, ok := resp.Resp.(getExecDetailsV2)
939+
if !ok {
940+
if _, isEmpty := resp.Resp.(*tikvpb.BatchCommandsEmptyResponse); isEmpty {
941+
return nil, nil
942+
}
943+
return nil, errors.Errorf("invalid response type %v", resp)
944+
}
945+
return details.GetExecDetailsV2(), nil
946+
}
947+
929948
// CallRPC launches a rpc call.
930949
// ch is needed to implement timeout for coprocessor streaming, the stream object's
931950
// cancel function will be sent to the channel, together with a lease checked by a background goroutine.

txnkv/transaction/commit.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer,
8888
sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient())
8989
for {
9090
attempts++
91-
if time.Since(tBegin) > slowRequestThreshold {
91+
reqBegin := time.Now()
92+
if reqBegin.Sub(tBegin) > slowRequestThreshold {
9293
logutil.BgLogger().Warn("slow commit request", zap.Uint64("startTS", c.startTS), zap.Stringer("region", &batch.region), zap.Int("attempts", attempts))
9394
tBegin = time.Now()
9495
}
@@ -140,6 +141,8 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer,
140141
// we can clean undetermined error.
141142
if batch.isPrimary && !c.isAsyncCommit() {
142143
c.setUndeterminedErr(nil)
144+
reqDuration := time.Since(reqBegin)
145+
c.getDetail().MergeReqDetails(reqDuration, batch.region.GetID(), sender.GetStoreAddr(), commitResp.ExecDetailsV2)
143146
}
144147
if keyErr := commitResp.GetError(); keyErr != nil {
145148
if rejected := keyErr.GetCommitTsExpired(); rejected != nil {

txnkv/transaction/pessimistic.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -144,10 +144,12 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
144144
time.Sleep(300 * time.Millisecond)
145145
return errors.WithStack(&tikverr.ErrWriteConflict{WriteConflict: nil})
146146
}
147+
sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient())
147148
startTime := time.Now()
148-
resp, err := c.store.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
149+
resp, err := sender.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
150+
reqDuration := time.Since(startTime)
149151
if action.LockCtx.Stats != nil {
150-
atomic.AddInt64(&action.LockCtx.Stats.LockRPCTime, int64(time.Since(startTime)))
152+
atomic.AddInt64(&action.LockCtx.Stats.LockRPCTime, int64(reqDuration))
151153
atomic.AddInt64(&action.LockCtx.Stats.LockRPCCount, 1)
152154
}
153155
if err != nil {
@@ -183,6 +185,8 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
183185
lockResp := resp.Resp.(*kvrpcpb.PessimisticLockResponse)
184186
keyErrs := lockResp.GetErrors()
185187
if len(keyErrs) == 0 {
188+
action.LockCtx.Stats.MergeReqDetails(reqDuration, batch.region.GetID(), sender.GetStoreAddr(), lockResp.ExecDetailsV2)
189+
186190
if batch.isPrimary {
187191
// After locking the primary key, we should protect the primary lock from expiring
188192
// now in case locking the remaining keys take a long time.

txnkv/transaction/prewrite.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,8 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
247247
if attempts > 1 || action.retry {
248248
req.IsRetryRequest = true
249249
}
250-
if time.Since(tBegin) > slowRequestThreshold {
250+
reqBegin := time.Now()
251+
if reqBegin.Sub(tBegin) > slowRequestThreshold {
251252
logutil.BgLogger().Warn("slow prewrite request", zap.Uint64("startTS", c.startTS), zap.Stringer("region", &batch.region), zap.Int("attempts", attempts))
252253
tBegin = time.Now()
253254
}
@@ -305,6 +306,10 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
305306
// Clear the RPC Error since the request is evaluated successfully.
306307
sender.SetRPCError(nil)
307308

309+
// Update CommitDetails
310+
reqDuration := time.Since(reqBegin)
311+
c.getDetail().MergeReqDetails(reqDuration, batch.region.GetID(), sender.GetStoreAddr(), prewriteResp.ExecDetailsV2)
312+
308313
if batch.isPrimary {
309314
// After writing the primary key, if the size of the transaction is larger than 32M,
310315
// start the ttlManager. The ttlManager will be closed in tikvTxn.Commit().
@@ -358,6 +363,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
358363
c.mu.Unlock()
359364
}
360365
}
366+
361367
return nil
362368
}
363369
var locks []*txnlock.Lock

0 commit comments

Comments
 (0)