Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: clean up handling metrics process #7370

Merged
merged 10 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,10 +485,6 @@ func (c *Cluster) collectMetrics() {

c.coordinator.GetSchedulersController().CollectSchedulerMetrics()
c.coordinator.CollectHotSpotMetrics()
c.collectClusterMetrics()
}

func (c *Cluster) collectClusterMetrics() {
if c.regionStats == nil {
return
}
Expand All @@ -500,20 +496,8 @@ func (c *Cluster) collectClusterMetrics() {

func (c *Cluster) resetMetrics() {
statistics.Reset()

schedulers.ResetSchedulerMetrics()
schedule.ResetHotSpotMetrics()
c.resetClusterMetrics()
}

func (c *Cluster) resetClusterMetrics() {
if c.regionStats == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need

return
}
c.regionStats.Reset()
c.labelStats.Reset()
// reset hot cache metrics
c.hotStat.ResetMetrics()
}

// StartBackgroundJobs starts background jobs.
Expand Down
4 changes: 2 additions & 2 deletions pkg/statistics/hot_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ func (w *HotCache) CollectMetrics() {
w.CheckReadAsync(newCollectMetricsTask())
}

// ResetMetrics resets the hot cache metrics.
func (w *HotCache) ResetMetrics() {
// ResetHotCacheStatusMetrics resets the hot cache metrics.
func ResetHotCacheStatusMetrics() {
hotCacheStatusGauge.Reset()
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/statistics/region_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ func (r *RegionStatistics) Collect() {
regionWitnessLeaderRegionCounter.Set(float64(len(r.stats[WitnessLeader])))
}

// Reset resets the metrics of the regions' status.
func (r *RegionStatistics) Reset() {
// ResetRegionStatsMetrics resets the metrics of the regions' status.
func ResetRegionStatsMetrics() {
regionMissPeerRegionCounter.Set(0)
regionExtraPeerRegionCounter.Set(0)
regionDownPeerRegionCounter.Set(0)
Expand Down Expand Up @@ -326,8 +326,8 @@ func (l *LabelStatistics) Collect() {
}
}

// Reset resets the metrics of the label status.
func (l *LabelStatistics) Reset() {
// ResetLabelStatsMetrics resets the metrics of the label status.
func ResetLabelStatsMetrics() {
regionLabelLevelGauge.Reset()
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/statistics/store_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,4 +322,7 @@ func Reset() {
storeStatusGauge.Reset()
clusterStatusGauge.Reset()
placementStatusGauge.Reset()
ResetRegionStatsMetrics()
ResetLabelStatsMetrics()
ResetHotCacheStatusMetrics()
}
4 changes: 2 additions & 2 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ func (c *RaftCluster) runMetricsCollectionJob() {
ticker := time.NewTicker(metricsCollectionJobInterval)
failpoint.Inject("highFrequencyClusterJobs", func() {
ticker.Stop()
ticker = time.NewTicker(time.Microsecond)
ticker = time.NewTicker(time.Millisecond)
})
defer ticker.Stop()

Expand Down Expand Up @@ -734,10 +734,10 @@ func (c *RaftCluster) Stop() {
return
}
c.running = false
c.cancel()
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
c.stopSchedulingJobs()
}
c.cancel()
c.Unlock()

c.wg.Wait()
Expand Down
21 changes: 14 additions & 7 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2485,7 +2485,10 @@ func TestCollectMetricsConcurrent(t *testing.T) {
nil)
}, func(co *schedule.Coordinator) { co.Run() }, re)
defer cleanup()

rc := co.GetCluster().(*RaftCluster)
rc.schedulingController = newSchedulingController(rc.serverCtx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager())
rc.schedulingController.coordinator = co
controller := co.GetSchedulersController()
// Make sure there are no problem when concurrent write and read
var wg sync.WaitGroup
count := 10
Expand All @@ -2498,15 +2501,14 @@ func TestCollectMetricsConcurrent(t *testing.T) {
}
}(i)
}
controller := co.GetSchedulersController()
for i := 0; i < 1000; i++ {
co.CollectHotSpotMetrics()
controller.CollectSchedulerMetrics()
co.GetCluster().(*RaftCluster).collectStatisticsMetrics()
rc.collectSchedulingMetrics()
}
schedule.ResetHotSpotMetrics()
schedulers.ResetSchedulerMetrics()
co.GetCluster().(*RaftCluster).resetStatisticsMetrics()
rc.resetSchedulingMetrics()
wg.Wait()
}

Expand All @@ -2520,6 +2522,11 @@ func TestCollectMetrics(t *testing.T) {
nil)
}, func(co *schedule.Coordinator) { co.Run() }, re)
defer cleanup()

rc := co.GetCluster().(*RaftCluster)
rc.schedulingController = newSchedulingController(rc.serverCtx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager())
rc.schedulingController.coordinator = co
controller := co.GetSchedulersController()
count := 10
for i := 0; i <= count; i++ {
for k := 0; k < 200; k++ {
Expand All @@ -2533,11 +2540,11 @@ func TestCollectMetrics(t *testing.T) {
tc.hotStat.HotCache.Update(item, utils.Write)
}
}
controller := co.GetSchedulersController()

for i := 0; i < 1000; i++ {
co.CollectHotSpotMetrics()
controller.CollectSchedulerMetrics()
co.GetCluster().(*RaftCluster).collectStatisticsMetrics()
rc.collectSchedulingMetrics()
}
stores := co.GetCluster().GetStores()
regionStats := co.GetCluster().RegionWriteStats()
Expand All @@ -2552,7 +2559,7 @@ func TestCollectMetrics(t *testing.T) {
re.Equal(status1, status2)
schedule.ResetHotSpotMetrics()
schedulers.ResetSchedulerMetrics()
co.GetCluster().(*RaftCluster).resetStatisticsMetrics()
rc.resetSchedulingMetrics()
}

func prepare(setCfg func(*sc.ScheduleConfig), setTc func(*testCluster), run func(*schedule.Coordinator), re *require.Assertions) (*testCluster, *schedule.Coordinator, func()) {
Expand Down
21 changes: 5 additions & 16 deletions server/cluster/scheduling_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (sc *schedulingController) runSchedulingMetricsCollectionJob() {
ticker := time.NewTicker(metricsCollectionJobInterval)
failpoint.Inject("highFrequencyClusterJobs", func() {
ticker.Stop()
ticker = time.NewTicker(time.Microsecond)
ticker = time.NewTicker(time.Millisecond)
})
defer ticker.Stop()

Expand All @@ -170,7 +170,10 @@ func (sc *schedulingController) resetSchedulingMetrics() {
statistics.Reset()
schedulers.ResetSchedulerMetrics()
schedule.ResetHotSpotMetrics()
sc.resetStatisticsMetrics()
statistics.ResetRegionStatsMetrics()
statistics.ResetLabelStatsMetrics()
// reset hot cache metrics
statistics.ResetHotCacheStatusMetrics()
}

func (sc *schedulingController) collectSchedulingMetrics() {
Expand All @@ -183,20 +186,6 @@ func (sc *schedulingController) collectSchedulingMetrics() {
statsMap.Collect()
sc.coordinator.GetSchedulersController().CollectSchedulerMetrics()
sc.coordinator.CollectHotSpotMetrics()
sc.collectStatisticsMetrics()
}

func (sc *schedulingController) resetStatisticsMetrics() {
if sc.regionStats == nil {
return
}
sc.regionStats.Reset()
sc.labelStats.Reset()
// reset hot cache metrics
sc.hotStat.ResetMetrics()
}

func (sc *schedulingController) collectStatisticsMetrics() {
if sc.regionStats == nil {
return
}
Expand Down
2 changes: 0 additions & 2 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,6 @@ func TestRaftClusterMultipleRestart(t *testing.T) {
err = rc.Start(leaderServer.GetServer())
re.NoError(err)
time.Sleep(time.Millisecond)
rc = leaderServer.GetRaftCluster()
re.NotNil(rc)
rc.Stop()
}
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs"))
Expand Down
Loading