diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index aac428b48..8e4cb1cab 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -259,6 +259,7 @@ type replicaSelector struct { region *Region regionStore *regionStore replicas []*replica + labels []*metapb.StoreLabel state selectorState // replicas[targetIdx] is the replica handling the request this time targetIdx AccessIndex @@ -747,6 +748,10 @@ func newReplicaSelector( ) } + option := storeSelectorOp{} + for _, op := range opts { + op(&option) + } var state selectorState if !req.ReplicaReadType.IsFollowerRead() { if regionCache.enableForwarding && regionStore.proxyTiKVIdx >= 0 { @@ -755,10 +760,6 @@ func newReplicaSelector( state = &accessKnownLeader{leaderIdx: regionStore.workTiKVIdx} } } else { - option := storeSelectorOp{} - for _, op := range opts { - op(&option) - } if req.ReplicaReadType == kv.ReplicaReadPreferLeader { WithPerferLeader()(&option) } @@ -778,6 +779,7 @@ func newReplicaSelector( cachedRegion, regionStore, replicas, + option.labels, state, -1, -1, @@ -1156,9 +1158,14 @@ func (s *RegionRequestSender) SendReqCtx( var staleReadCollector *staleReadMetricsCollector if req.StaleRead { - staleReadCollector = &staleReadMetricsCollector{hit: true} - staleReadCollector.onReq(req) - defer staleReadCollector.collect() + staleReadCollector = &staleReadMetricsCollector{} + defer func() { + if retryTimes == 0 { + metrics.StaleReadHitCounter.Add(1) + } else { + metrics.StaleReadMissCounter.Add(1) + } + }() } for { @@ -1171,9 +1178,6 @@ func (s *RegionRequestSender) SendReqCtx( zap.Int("times", retryTimes), ) } - if req.StaleRead && staleReadCollector != nil { - staleReadCollector.hit = false - } } rpcCtx, err = s.getRPCContext(bo, req, regionID, et, opts...) @@ -1204,6 +1208,14 @@ func (s *RegionRequestSender) SendReqCtx( return resp, nil, retryTimes, err } + var isLocalTraffic bool + if staleReadCollector != nil && s.replicaSelector != nil { + if target := s.replicaSelector.targetReplica(); target != nil { + isLocalTraffic = target.store.IsLabelsMatch(s.replicaSelector.labels) + staleReadCollector.onReq(req, isLocalTraffic) + } + } + logutil.Eventf(bo.GetCtx(), "send %s request to region %d at %s", req.Type, regionID.id, rpcCtx.Addr) s.storeAddr = rpcCtx.Addr @@ -1262,7 +1274,7 @@ func (s *RegionRequestSender) SendReqCtx( } } if staleReadCollector != nil { - staleReadCollector.onResp(resp) + staleReadCollector.onResp(req.Type, resp, isLocalTraffic) } return resp, rpcCtx, retryTimes, nil } @@ -1946,35 +1958,36 @@ func (s *RegionRequestSender) onRegionError( } type staleReadMetricsCollector struct { - tp tikvrpc.CmdType - hit bool - out int - in int } -func (s *staleReadMetricsCollector) onReq(req *tikvrpc.Request) { +func (s *staleReadMetricsCollector) onReq(req *tikvrpc.Request, isLocalTraffic bool) { size := 0 switch req.Type { case tikvrpc.CmdGet: - size += req.Get().Size() + size = req.Get().Size() case tikvrpc.CmdBatchGet: - size += req.BatchGet().Size() + size = req.BatchGet().Size() case tikvrpc.CmdScan: - size += req.Scan().Size() + size = req.Scan().Size() case tikvrpc.CmdCop: - size += req.Cop().Size() + size = req.Cop().Size() default: // ignore non-read requests return } - s.tp = req.Type size += req.Context.Size() - s.out = size + if isLocalTraffic { + metrics.StaleReadLocalOutBytes.Add(float64(size)) + metrics.StaleReadReqLocalCounter.Add(1) + } else { + metrics.StaleReadRemoteOutBytes.Add(float64(size)) + metrics.StaleReadReqCrossZoneCounter.Add(1) + } } -func (s *staleReadMetricsCollector) onResp(resp *tikvrpc.Response) { +func (s *staleReadMetricsCollector) onResp(tp tikvrpc.CmdType, resp *tikvrpc.Response, isLocalTraffic bool) { size := 0 - switch s.tp { + switch tp { case tikvrpc.CmdGet: size += resp.Resp.(*kvrpcpb.GetResponse).Size() case tikvrpc.CmdBatchGet: @@ -1984,19 +1997,12 @@ func (s *staleReadMetricsCollector) onResp(resp *tikvrpc.Response) { case tikvrpc.CmdCop: size += resp.Resp.(*coprocessor.Response).Size() default: - // unreachable + // ignore non-read requests return } - s.in = size -} - -func (s *staleReadMetricsCollector) collect() { - in, out := metrics.StaleReadHitInTraffic, metrics.StaleReadHitOutTraffic - if !s.hit { - in, out = metrics.StaleReadMissInTraffic, metrics.StaleReadMissOutTraffic - } - if s.in > 0 && s.out > 0 { - in.Observe(float64(s.in)) - out.Observe(float64(s.out)) + if isLocalTraffic { + metrics.StaleReadLocalInBytes.Add(float64(size)) + } else { + metrics.StaleReadRemoteInBytes.Add(float64(size)) } } diff --git a/metrics/metrics.go b/metrics/metrics.go index 4f5abfd8a..5c72f05b2 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -101,7 +101,9 @@ var ( TiKVAggressiveLockedKeysCounter *prometheus.CounterVec TiKVStoreSlowScoreGauge *prometheus.GaugeVec TiKVPreferLeaderFlowsGauge *prometheus.GaugeVec - TiKVStaleReadSizeSummary *prometheus.SummaryVec + TiKVStaleReadCounter *prometheus.CounterVec + TiKVStaleReadReqCounter *prometheus.CounterVec + TiKVStaleReadBytes *prometheus.CounterVec ) // Label constants. @@ -700,13 +702,28 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { ConstLabels: constLabels, }, []string{LblType, LblStore}) - TiKVStaleReadSizeSummary = prometheus.NewSummaryVec( - prometheus.SummaryOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "stale_read_bytes", - Help: "Size of stale read.", - ConstLabels: constLabels, + TiKVStaleReadCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "stale_read_counter", + Help: "Counter of stale read hit/miss", + }, []string{LblResult}) + + TiKVStaleReadReqCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "stale_read_req_counter", + Help: "Counter of stale read requests", + }, []string{LblType}) + + TiKVStaleReadBytes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "stale_read_bytes", + Help: "Counter of stale read requests bytes", }, []string{LblResult, LblDirection}) initShortcuts() @@ -789,7 +806,9 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVAggressiveLockedKeysCounter) prometheus.MustRegister(TiKVStoreSlowScoreGauge) prometheus.MustRegister(TiKVPreferLeaderFlowsGauge) - prometheus.MustRegister(TiKVStaleReadSizeSummary) + prometheus.MustRegister(TiKVStaleReadCounter) + prometheus.MustRegister(TiKVStaleReadReqCounter) + prometheus.MustRegister(TiKVStaleReadBytes) } // readCounter reads the value of a prometheus.Counter. diff --git a/metrics/shortcuts.go b/metrics/shortcuts.go index 0acd7aba6..c791cb987 100644 --- a/metrics/shortcuts.go +++ b/metrics/shortcuts.go @@ -160,10 +160,16 @@ var ( AggressiveLockedKeysLockedWithConflict prometheus.Counter AggressiveLockedKeysNonForceLock prometheus.Counter - StaleReadHitInTraffic prometheus.Observer - StaleReadHitOutTraffic prometheus.Observer - StaleReadMissInTraffic prometheus.Observer - StaleReadMissOutTraffic prometheus.Observer + StaleReadHitCounter prometheus.Counter + StaleReadMissCounter prometheus.Counter + + StaleReadReqLocalCounter prometheus.Counter + StaleReadReqCrossZoneCounter prometheus.Counter + + StaleReadLocalInBytes prometheus.Counter + StaleReadLocalOutBytes prometheus.Counter + StaleReadRemoteInBytes prometheus.Counter + StaleReadRemoteOutBytes prometheus.Counter ) func initShortcuts() { @@ -296,8 +302,14 @@ func initShortcuts() { // TiKV). AggressiveLockedKeysNonForceLock = TiKVAggressiveLockedKeysCounter.WithLabelValues("non_force_lock") - StaleReadHitInTraffic = TiKVStaleReadSizeSummary.WithLabelValues("hit", "in") - StaleReadHitOutTraffic = TiKVStaleReadSizeSummary.WithLabelValues("hit", "out") - StaleReadMissInTraffic = TiKVStaleReadSizeSummary.WithLabelValues("miss", "in") - StaleReadMissOutTraffic = TiKVStaleReadSizeSummary.WithLabelValues("miss", "out") + StaleReadHitCounter = TiKVStaleReadCounter.WithLabelValues("hit") + StaleReadMissCounter = TiKVStaleReadCounter.WithLabelValues("miss") + + StaleReadReqLocalCounter = TiKVStaleReadReqCounter.WithLabelValues("local") + StaleReadReqCrossZoneCounter = TiKVStaleReadReqCounter.WithLabelValues("cross-zone") + + StaleReadLocalInBytes = TiKVStaleReadBytes.WithLabelValues("local", "in") + StaleReadLocalOutBytes = TiKVStaleReadBytes.WithLabelValues("local", "out") + StaleReadRemoteInBytes = TiKVStaleReadBytes.WithLabelValues("cross-zone", "in") + StaleReadRemoteOutBytes = TiKVStaleReadBytes.WithLabelValues("cross-zone", "out") }