Skip to content

Commit

Permalink
*: reduce WithLabelValues of prometheus (#5802)
Browse files Browse the repository at this point in the history
close #5801

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
lhy1024 and ti-chi-bot authored Jan 10, 2023
1 parent ba81088 commit 275c43e
Show file tree
Hide file tree
Showing 36 changed files with 609 additions and 233 deletions.
18 changes: 14 additions & 4 deletions pkg/storage/kv/etcd_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ const (
slowRequestTime = time.Second
)

var (
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
txnFailedCounter = txnCounter.WithLabelValues("failed")
txnSuccessCounter = txnCounter.WithLabelValues("success")
txnFailedDurationHist = txnDuration.WithLabelValues("failed")
txnSuccessDurationHist = txnDuration.WithLabelValues("success")
)

type etcdKVBase struct {
client *clientv3.Client
rootPath string
Expand Down Expand Up @@ -162,12 +170,14 @@ func (t *SlowLogTxn) Commit() (*clientv3.TxnResponse, error) {
zap.Duration("cost", cost),
errs.ZapError(err))
}
label := "success"

if err != nil {
label = "failed"
txnFailedCounter.Inc()
txnFailedDurationHist.Observe(cost.Seconds())
} else {
txnSuccessCounter.Inc()
txnSuccessDurationHist.Observe(cost.Seconds())
}
txnCounter.WithLabelValues(label).Inc()
txnDuration.WithLabelValues(label).Observe(cost.Seconds())

return resp, errors.WithStack(err)
}
14 changes: 9 additions & 5 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,12 @@ var (
// DefaultMinResolvedTSPersistenceInterval is the default value of min resolved ts persistence interval.
// If interval in config is zero, it means not to persist resolved ts and check config with this DefaultMinResolvedTSPersistenceInterval
DefaultMinResolvedTSPersistenceInterval = config.DefaultMinResolvedTSPersistenceInterval
regionUpdateCacheEventCounter = regionEventCounter.WithLabelValues("update_cache")
regionUpdateKVEventCounter = regionEventCounter.WithLabelValues("update_kv")
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
regionUpdateCacheEventCounter = regionEventCounter.WithLabelValues("update_cache")
regionUpdateKVEventCounter = regionEventCounter.WithLabelValues("update_kv")
regionCacheMissCounter = bucketEventCounter.WithLabelValues("region_cache_miss")
versionNotMatchCounter = bucketEventCounter.WithLabelValues("version_not_match")
updateFailedCounter = bucketEventCounter.WithLabelValues("update_failed")
)

// regionLabelGCInterval is the interval to run region-label's GC work.
Expand Down Expand Up @@ -795,7 +799,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest
func (c *RaftCluster) processReportBuckets(buckets *metapb.Buckets) error {
region := c.core.GetRegion(buckets.GetRegionId())
if region == nil {
bucketEventCounter.WithLabelValues("region_cache_miss").Inc()
regionCacheMissCounter.Inc()
return errors.Errorf("region %v not found", buckets.GetRegionId())
}
// use CAS to update the bucket information.
Expand All @@ -806,7 +810,7 @@ func (c *RaftCluster) processReportBuckets(buckets *metapb.Buckets) error {
old := region.GetBuckets()
// region should not update if the version of the buckets is less than the old one.
if old != nil && buckets.GetVersion() <= old.GetVersion() {
bucketEventCounter.WithLabelValues("version_not_match").Inc()
versionNotMatchCounter.Inc()
return nil
}
failpoint.Inject("concurrentBucketHeartbeat", func() {
Expand All @@ -816,7 +820,7 @@ func (c *RaftCluster) processReportBuckets(buckets *metapb.Buckets) error {
return nil
}
}
bucketEventCounter.WithLabelValues("update_failed").Inc()
updateFailedCounter.Inc()
return nil
}

Expand Down
11 changes: 9 additions & 2 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ const (
PluginUnload = "PluginUnload"
)

var (
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
waitingListGauge = regionListGauge.WithLabelValues("waiting_list")
priorityListGauge = regionListGauge.WithLabelValues("priority_list")
)

// coordinator is used to manage all schedulers and checkers to decide if the region needs to be scheduled.
type coordinator struct {
syncutil.RWMutex
Expand Down Expand Up @@ -181,7 +187,7 @@ func (c *coordinator) checkSuspectRegions() {

func (c *coordinator) checkWaitingRegions() {
items := c.checkers.GetWaitingRegions()
regionListGauge.WithLabelValues("waiting_list").Set(float64(len(items)))
waitingListGauge.Set(float64(len(items)))
for _, item := range items {
region := c.cluster.GetRegion(item.Key)
c.tryAddOperators(region)
Expand All @@ -192,7 +198,7 @@ func (c *coordinator) checkWaitingRegions() {
func (c *coordinator) checkPriorityRegions() {
items := c.checkers.GetPriorityRegions()
removes := make([]uint64, 0)
regionListGauge.WithLabelValues("priority_list").Set(float64(len(items)))
priorityListGauge.Set(float64(len(items)))
for _, id := range items {
region := c.cluster.GetRegion(id)
if region == nil {
Expand Down Expand Up @@ -569,6 +575,7 @@ func collectHotMetrics(cluster *RaftCluster, stores []*core.StoreInfo, typ stati
status := statistics.CollectHotPeerInfos(stores, regionStats) // only returns TotalBytesRate,TotalKeysRate,TotalQueryRate,Count

for _, s := range stores {
// todo: pre-allocate gauge metrics
storeAddress := s.GetAddress()
storeID := s.GetID()
storeLabel := strconv.FormatUint(storeID, 10)
Expand Down
1 change: 1 addition & 0 deletions server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ var (
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 29), // 0.1ms ~ 7hours
}, []string{"address", "store"})

// todo: pre-allocate gauge metrics
storeHeartbeatHandleDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Expand Down
13 changes: 10 additions & 3 deletions server/region_syncer/history_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ const (
defaultFlushCount = 100
)

var (
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
syncIndexGauge = regionSyncerStatus.WithLabelValues("sync_index")
firstIndexGauge = regionSyncerStatus.WithLabelValues("first_index")
lastIndexGauge = regionSyncerStatus.WithLabelValues("last_index")
)

type historyBuffer struct {
syncutil.RWMutex
index uint64
Expand Down Expand Up @@ -80,7 +87,7 @@ func (h *historyBuffer) firstIndex() uint64 {
func (h *historyBuffer) Record(r *core.RegionInfo) {
h.Lock()
defer h.Unlock()
regionSyncerStatus.WithLabelValues("sync_index").Set(float64(h.index))
syncIndexGauge.Set(float64(h.index))
h.records[h.tail] = r
h.tail = (h.tail + 1) % h.size
if h.tail == h.head {
Expand Down Expand Up @@ -148,8 +155,8 @@ func (h *historyBuffer) reload() {
}

func (h *historyBuffer) persist() {
regionSyncerStatus.WithLabelValues("first_index").Set(float64(h.firstIndex()))
regionSyncerStatus.WithLabelValues("last_index").Set(float64(h.nextIndex()))
firstIndexGauge.Set(float64(h.firstIndex()))
lastIndexGauge.Set(float64(h.nextIndex()))
err := h.kv.Save(historyKey, strconv.FormatUint(h.nextIndex(), 10))
if err != nil {
log.Warn("persist history index failed", zap.Uint64("persist-index", h.nextIndex()), errs.ZapError(err))
Expand Down
21 changes: 16 additions & 5 deletions server/schedule/checker/joint_state_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ type JointStateChecker struct {
cluster schedule.Cluster
}

const jointStateCheckerName = "joint_state_checker"

var (
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
jointCheckCounter = checkerCounter.WithLabelValues(jointStateCheckerName, "check")
jointCheckerPausedCounter = checkerCounter.WithLabelValues(jointStateCheckerName, "paused")
jointCheckerFailedCounter = checkerCounter.WithLabelValues(jointStateCheckerName, "create-operator-fail")
jointCheckerNewOpCounter = checkerCounter.WithLabelValues(jointStateCheckerName, "new-operator")
jointCheckerTransferLeaderCounter = checkerCounter.WithLabelValues(jointStateCheckerName, "transfer-leader")
)

// NewJointStateChecker creates a joint state checker.
func NewJointStateChecker(cluster schedule.Cluster) *JointStateChecker {
return &JointStateChecker{
Expand All @@ -37,23 +48,23 @@ func NewJointStateChecker(cluster schedule.Cluster) *JointStateChecker {

// Check verifies a region's role, creating an Operator if need.
func (c *JointStateChecker) Check(region *core.RegionInfo) *operator.Operator {
checkerCounter.WithLabelValues("joint_state_checker", "check").Inc()
jointCheckCounter.Inc()
if c.IsPaused() {
checkerCounter.WithLabelValues("joint_state_checker", "paused").Inc()
jointCheckerPausedCounter.Inc()
return nil
}
if !core.IsInJointState(region.GetPeers()...) {
return nil
}
op, err := operator.CreateLeaveJointStateOperator(operator.OpDescLeaveJointState, c.cluster, region)
if err != nil {
checkerCounter.WithLabelValues("joint_state_checker", "create-operator-fail").Inc()
jointCheckerFailedCounter.Inc()
log.Debug("fail to create leave joint state operator", errs.ZapError(err))
return nil
} else if op != nil {
checkerCounter.WithLabelValues("joint_state_checker", "new-operator").Inc()
jointCheckerNewOpCounter.Inc()
if op.Len() > 1 {
checkerCounter.WithLabelValues("joint_state_checker", "transfer-leader").Inc()
jointCheckerTransferLeaderCounter.Inc()
}
op.SetPriorityLevel(core.High)
}
Expand Down
7 changes: 6 additions & 1 deletion server/schedule/checker/learner_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ type LearnerChecker struct {
cluster schedule.Cluster
}

var (
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
learnerCheckerPausedCounter = checkerCounter.WithLabelValues("learner_checker", "paused")
)

// NewLearnerChecker creates a learner checker.
func NewLearnerChecker(cluster schedule.Cluster) *LearnerChecker {
return &LearnerChecker{
Expand All @@ -38,7 +43,7 @@ func NewLearnerChecker(cluster schedule.Cluster) *LearnerChecker {
// Check verifies a region's role, creating an Operator if need.
func (l *LearnerChecker) Check(region *core.RegionInfo) *operator.Operator {
if l.IsPaused() {
checkerCounter.WithLabelValues("learner_checker", "paused").Inc()
learnerCheckerPausedCounter.Inc()
return nil
}
for _, p := range region.GetLearners() {
Expand Down
Loading

0 comments on commit 275c43e

Please sign in to comment.