Skip to content

Commit

Permalink
*: fix stale read ops metric (#878)
Browse files Browse the repository at this point in the history
Signed-off-by: crazycs520 <crazycs520@gmail.com>
  • Loading branch information
crazycs520 committed Jul 13, 2023
1 parent 51633ad commit f9b7bd3
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 53 deletions.
78 changes: 42 additions & 36 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ type replicaSelector struct {
region *Region
regionStore *regionStore
replicas []*replica
labels []*metapb.StoreLabel
state selectorState
// replicas[targetIdx] is the replica handling the request this time
targetIdx AccessIndex
Expand Down Expand Up @@ -747,6 +748,10 @@ func newReplicaSelector(
)
}

option := storeSelectorOp{}
for _, op := range opts {
op(&option)
}
var state selectorState
if !req.ReplicaReadType.IsFollowerRead() {
if regionCache.enableForwarding && regionStore.proxyTiKVIdx >= 0 {
Expand All @@ -755,10 +760,6 @@ func newReplicaSelector(
state = &accessKnownLeader{leaderIdx: regionStore.workTiKVIdx}
}
} else {
option := storeSelectorOp{}
for _, op := range opts {
op(&option)
}
if req.ReplicaReadType == kv.ReplicaReadPreferLeader {
WithPerferLeader()(&option)
}
Expand All @@ -778,6 +779,7 @@ func newReplicaSelector(
cachedRegion,
regionStore,
replicas,
option.labels,
state,
-1,
-1,
Expand Down Expand Up @@ -1156,9 +1158,14 @@ func (s *RegionRequestSender) SendReqCtx(

var staleReadCollector *staleReadMetricsCollector
if req.StaleRead {
staleReadCollector = &staleReadMetricsCollector{hit: true}
staleReadCollector.onReq(req)
defer staleReadCollector.collect()
staleReadCollector = &staleReadMetricsCollector{}
defer func() {
if retryTimes == 0 {
metrics.StaleReadHitCounter.Add(1)
} else {
metrics.StaleReadMissCounter.Add(1)
}
}()
}

for {
Expand All @@ -1171,9 +1178,6 @@ func (s *RegionRequestSender) SendReqCtx(
zap.Int("times", retryTimes),
)
}
if req.StaleRead && staleReadCollector != nil {
staleReadCollector.hit = false
}
}

rpcCtx, err = s.getRPCContext(bo, req, regionID, et, opts...)
Expand Down Expand Up @@ -1204,6 +1208,14 @@ func (s *RegionRequestSender) SendReqCtx(
return resp, nil, retryTimes, err
}

var isLocalTraffic bool
if staleReadCollector != nil && s.replicaSelector != nil {
if target := s.replicaSelector.targetReplica(); target != nil {
isLocalTraffic = target.store.IsLabelsMatch(s.replicaSelector.labels)
staleReadCollector.onReq(req, isLocalTraffic)
}
}

logutil.Eventf(bo.GetCtx(), "send %s request to region %d at %s", req.Type, regionID.id, rpcCtx.Addr)
s.storeAddr = rpcCtx.Addr

Expand Down Expand Up @@ -1262,7 +1274,7 @@ func (s *RegionRequestSender) SendReqCtx(
}
}
if staleReadCollector != nil {
staleReadCollector.onResp(resp)
staleReadCollector.onResp(req.Type, resp, isLocalTraffic)
}
return resp, rpcCtx, retryTimes, nil
}
Expand Down Expand Up @@ -1946,35 +1958,36 @@ func (s *RegionRequestSender) onRegionError(
}

type staleReadMetricsCollector struct {
tp tikvrpc.CmdType
hit bool
out int
in int
}

func (s *staleReadMetricsCollector) onReq(req *tikvrpc.Request) {
func (s *staleReadMetricsCollector) onReq(req *tikvrpc.Request, isLocalTraffic bool) {
size := 0
switch req.Type {
case tikvrpc.CmdGet:
size += req.Get().Size()
size = req.Get().Size()
case tikvrpc.CmdBatchGet:
size += req.BatchGet().Size()
size = req.BatchGet().Size()
case tikvrpc.CmdScan:
size += req.Scan().Size()
size = req.Scan().Size()
case tikvrpc.CmdCop:
size += req.Cop().Size()
size = req.Cop().Size()
default:
// ignore non-read requests
return
}
s.tp = req.Type
size += req.Context.Size()
s.out = size
if isLocalTraffic {
metrics.StaleReadLocalOutBytes.Add(float64(size))
metrics.StaleReadReqLocalCounter.Add(1)
} else {
metrics.StaleReadRemoteOutBytes.Add(float64(size))
metrics.StaleReadReqCrossZoneCounter.Add(1)
}
}

func (s *staleReadMetricsCollector) onResp(resp *tikvrpc.Response) {
func (s *staleReadMetricsCollector) onResp(tp tikvrpc.CmdType, resp *tikvrpc.Response, isLocalTraffic bool) {
size := 0
switch s.tp {
switch tp {
case tikvrpc.CmdGet:
size += resp.Resp.(*kvrpcpb.GetResponse).Size()
case tikvrpc.CmdBatchGet:
Expand All @@ -1984,19 +1997,12 @@ func (s *staleReadMetricsCollector) onResp(resp *tikvrpc.Response) {
case tikvrpc.CmdCop:
size += resp.Resp.(*coprocessor.Response).Size()
default:
// unreachable
// ignore non-read requests
return
}
s.in = size
}

func (s *staleReadMetricsCollector) collect() {
in, out := metrics.StaleReadHitInTraffic, metrics.StaleReadHitOutTraffic
if !s.hit {
in, out = metrics.StaleReadMissInTraffic, metrics.StaleReadMissOutTraffic
}
if s.in > 0 && s.out > 0 {
in.Observe(float64(s.in))
out.Observe(float64(s.out))
if isLocalTraffic {
metrics.StaleReadLocalInBytes.Add(float64(size))
} else {
metrics.StaleReadRemoteInBytes.Add(float64(size))
}
}
37 changes: 28 additions & 9 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ var (
TiKVAggressiveLockedKeysCounter *prometheus.CounterVec
TiKVStoreSlowScoreGauge *prometheus.GaugeVec
TiKVPreferLeaderFlowsGauge *prometheus.GaugeVec
TiKVStaleReadSizeSummary *prometheus.SummaryVec
TiKVStaleReadCounter *prometheus.CounterVec
TiKVStaleReadReqCounter *prometheus.CounterVec
TiKVStaleReadBytes *prometheus.CounterVec
)

// Label constants.
Expand Down Expand Up @@ -700,13 +702,28 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) {
ConstLabels: constLabels,
}, []string{LblType, LblStore})

TiKVStaleReadSizeSummary = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "stale_read_bytes",
Help: "Size of stale read.",
ConstLabels: constLabels,
TiKVStaleReadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "stale_read_counter",
Help: "Counter of stale read hit/miss",
}, []string{LblResult})

TiKVStaleReadReqCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "stale_read_req_counter",
Help: "Counter of stale read requests",
}, []string{LblType})

TiKVStaleReadBytes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "stale_read_bytes",
Help: "Counter of stale read requests bytes",
}, []string{LblResult, LblDirection})

initShortcuts()
Expand Down Expand Up @@ -789,7 +806,9 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVAggressiveLockedKeysCounter)
prometheus.MustRegister(TiKVStoreSlowScoreGauge)
prometheus.MustRegister(TiKVPreferLeaderFlowsGauge)
prometheus.MustRegister(TiKVStaleReadSizeSummary)
prometheus.MustRegister(TiKVStaleReadCounter)
prometheus.MustRegister(TiKVStaleReadReqCounter)
prometheus.MustRegister(TiKVStaleReadBytes)
}

// readCounter reads the value of a prometheus.Counter.
Expand Down
28 changes: 20 additions & 8 deletions metrics/shortcuts.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,16 @@ var (
AggressiveLockedKeysLockedWithConflict prometheus.Counter
AggressiveLockedKeysNonForceLock prometheus.Counter

StaleReadHitInTraffic prometheus.Observer
StaleReadHitOutTraffic prometheus.Observer
StaleReadMissInTraffic prometheus.Observer
StaleReadMissOutTraffic prometheus.Observer
StaleReadHitCounter prometheus.Counter
StaleReadMissCounter prometheus.Counter

StaleReadReqLocalCounter prometheus.Counter
StaleReadReqCrossZoneCounter prometheus.Counter

StaleReadLocalInBytes prometheus.Counter
StaleReadLocalOutBytes prometheus.Counter
StaleReadRemoteInBytes prometheus.Counter
StaleReadRemoteOutBytes prometheus.Counter
)

func initShortcuts() {
Expand Down Expand Up @@ -296,8 +302,14 @@ func initShortcuts() {
// TiKV).
AggressiveLockedKeysNonForceLock = TiKVAggressiveLockedKeysCounter.WithLabelValues("non_force_lock")

StaleReadHitInTraffic = TiKVStaleReadSizeSummary.WithLabelValues("hit", "in")
StaleReadHitOutTraffic = TiKVStaleReadSizeSummary.WithLabelValues("hit", "out")
StaleReadMissInTraffic = TiKVStaleReadSizeSummary.WithLabelValues("miss", "in")
StaleReadMissOutTraffic = TiKVStaleReadSizeSummary.WithLabelValues("miss", "out")
StaleReadHitCounter = TiKVStaleReadCounter.WithLabelValues("hit")
StaleReadMissCounter = TiKVStaleReadCounter.WithLabelValues("miss")

StaleReadReqLocalCounter = TiKVStaleReadReqCounter.WithLabelValues("local")
StaleReadReqCrossZoneCounter = TiKVStaleReadReqCounter.WithLabelValues("cross-zone")

StaleReadLocalInBytes = TiKVStaleReadBytes.WithLabelValues("local", "in")
StaleReadLocalOutBytes = TiKVStaleReadBytes.WithLabelValues("local", "out")
StaleReadRemoteInBytes = TiKVStaleReadBytes.WithLabelValues("cross-zone", "in")
StaleReadRemoteOutBytes = TiKVStaleReadBytes.WithLabelValues("cross-zone", "out")
}

0 comments on commit f9b7bd3

Please sign in to comment.