diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 96ee88259da..59b2e691658 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -485,10 +485,6 @@ func (c *Cluster) collectMetrics() { c.coordinator.GetSchedulersController().CollectSchedulerMetrics() c.coordinator.CollectHotSpotMetrics() - c.collectClusterMetrics() -} - -func (c *Cluster) collectClusterMetrics() { if c.regionStats == nil { return } @@ -500,20 +496,8 @@ func (c *Cluster) collectClusterMetrics() { func (c *Cluster) resetMetrics() { statistics.Reset() - schedulers.ResetSchedulerMetrics() schedule.ResetHotSpotMetrics() - c.resetClusterMetrics() -} - -func (c *Cluster) resetClusterMetrics() { - if c.regionStats == nil { - return - } - c.regionStats.Reset() - c.labelStats.Reset() - // reset hot cache metrics - c.hotStat.ResetMetrics() } // StartBackgroundJobs starts background jobs. diff --git a/pkg/statistics/hot_cache.go b/pkg/statistics/hot_cache.go index de7189a1332..1868e323b0f 100644 --- a/pkg/statistics/hot_cache.go +++ b/pkg/statistics/hot_cache.go @@ -125,8 +125,8 @@ func (w *HotCache) CollectMetrics() { w.CheckReadAsync(newCollectMetricsTask()) } -// ResetMetrics resets the hot cache metrics. -func (w *HotCache) ResetMetrics() { +// ResetHotCacheStatusMetrics resets the hot cache metrics. +func ResetHotCacheStatusMetrics() { hotCacheStatusGauge.Reset() } diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go index 26cbea9ef92..21af8e152fd 100644 --- a/pkg/statistics/region_collection.go +++ b/pkg/statistics/region_collection.go @@ -272,8 +272,8 @@ func (r *RegionStatistics) Collect() { regionWitnessLeaderRegionCounter.Set(float64(len(r.stats[WitnessLeader]))) } -// Reset resets the metrics of the regions' status. -func (r *RegionStatistics) Reset() { +// ResetRegionStatsMetrics resets the metrics of the regions' status. +func ResetRegionStatsMetrics() { regionMissPeerRegionCounter.Set(0) regionExtraPeerRegionCounter.Set(0) regionDownPeerRegionCounter.Set(0) @@ -326,8 +326,8 @@ func (l *LabelStatistics) Collect() { } } -// Reset resets the metrics of the label status. -func (l *LabelStatistics) Reset() { +// ResetLabelStatsMetrics resets the metrics of the label status. +func ResetLabelStatsMetrics() { regionLabelLevelGauge.Reset() } diff --git a/pkg/statistics/store_collection.go b/pkg/statistics/store_collection.go index dcdd77d9112..aacd45338d1 100644 --- a/pkg/statistics/store_collection.go +++ b/pkg/statistics/store_collection.go @@ -322,4 +322,7 @@ func Reset() { storeStatusGauge.Reset() clusterStatusGauge.Reset() placementStatusGauge.Reset() + ResetRegionStatsMetrics() + ResetLabelStatsMetrics() + ResetHotCacheStatusMetrics() } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 3b826d8d33e..e9cece0faa7 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -654,7 +654,7 @@ func (c *RaftCluster) runMetricsCollectionJob() { ticker := time.NewTicker(metricsCollectionJobInterval) failpoint.Inject("highFrequencyClusterJobs", func() { ticker.Stop() - ticker = time.NewTicker(time.Microsecond) + ticker = time.NewTicker(time.Millisecond) }) defer ticker.Stop() @@ -734,10 +734,10 @@ func (c *RaftCluster) Stop() { return } c.running = false + c.cancel() if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { c.stopSchedulingJobs() } - c.cancel() c.Unlock() c.wg.Wait() diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index e9ce35dfb54..b1a3535e90f 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2485,7 +2485,10 @@ func TestCollectMetricsConcurrent(t *testing.T) { nil) }, func(co *schedule.Coordinator) { co.Run() }, re) defer cleanup() - + rc := co.GetCluster().(*RaftCluster) + rc.schedulingController = newSchedulingController(rc.serverCtx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager()) + rc.schedulingController.coordinator = co + controller := co.GetSchedulersController() // Make sure there are no problem when concurrent write and read var wg sync.WaitGroup count := 10 @@ -2498,15 +2501,14 @@ func TestCollectMetricsConcurrent(t *testing.T) { } }(i) } - controller := co.GetSchedulersController() for i := 0; i < 1000; i++ { co.CollectHotSpotMetrics() controller.CollectSchedulerMetrics() - co.GetCluster().(*RaftCluster).collectStatisticsMetrics() + rc.collectSchedulingMetrics() } schedule.ResetHotSpotMetrics() schedulers.ResetSchedulerMetrics() - co.GetCluster().(*RaftCluster).resetStatisticsMetrics() + rc.resetSchedulingMetrics() wg.Wait() } @@ -2520,6 +2522,11 @@ func TestCollectMetrics(t *testing.T) { nil) }, func(co *schedule.Coordinator) { co.Run() }, re) defer cleanup() + + rc := co.GetCluster().(*RaftCluster) + rc.schedulingController = newSchedulingController(rc.serverCtx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager()) + rc.schedulingController.coordinator = co + controller := co.GetSchedulersController() count := 10 for i := 0; i <= count; i++ { for k := 0; k < 200; k++ { @@ -2533,11 +2540,11 @@ func TestCollectMetrics(t *testing.T) { tc.hotStat.HotCache.Update(item, utils.Write) } } - controller := co.GetSchedulersController() + for i := 0; i < 1000; i++ { co.CollectHotSpotMetrics() controller.CollectSchedulerMetrics() - co.GetCluster().(*RaftCluster).collectStatisticsMetrics() + rc.collectSchedulingMetrics() } stores := co.GetCluster().GetStores() regionStats := co.GetCluster().RegionWriteStats() @@ -2552,7 +2559,7 @@ func TestCollectMetrics(t *testing.T) { re.Equal(status1, status2) schedule.ResetHotSpotMetrics() schedulers.ResetSchedulerMetrics() - co.GetCluster().(*RaftCluster).resetStatisticsMetrics() + rc.resetSchedulingMetrics() } func prepare(setCfg func(*sc.ScheduleConfig), setTc func(*testCluster), run func(*schedule.Coordinator), re *require.Assertions) (*testCluster, *schedule.Coordinator, func()) { diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go index bb6470252b0..04c77498948 100644 --- a/server/cluster/scheduling_controller.go +++ b/server/cluster/scheduling_controller.go @@ -149,7 +149,7 @@ func (sc *schedulingController) runSchedulingMetricsCollectionJob() { ticker := time.NewTicker(metricsCollectionJobInterval) failpoint.Inject("highFrequencyClusterJobs", func() { ticker.Stop() - ticker = time.NewTicker(time.Microsecond) + ticker = time.NewTicker(time.Millisecond) }) defer ticker.Stop() @@ -170,7 +170,10 @@ func (sc *schedulingController) resetSchedulingMetrics() { statistics.Reset() schedulers.ResetSchedulerMetrics() schedule.ResetHotSpotMetrics() - sc.resetStatisticsMetrics() + statistics.ResetRegionStatsMetrics() + statistics.ResetLabelStatsMetrics() + // reset hot cache metrics + statistics.ResetHotCacheStatusMetrics() } func (sc *schedulingController) collectSchedulingMetrics() { @@ -183,20 +186,6 @@ func (sc *schedulingController) collectSchedulingMetrics() { statsMap.Collect() sc.coordinator.GetSchedulersController().CollectSchedulerMetrics() sc.coordinator.CollectHotSpotMetrics() - sc.collectStatisticsMetrics() -} - -func (sc *schedulingController) resetStatisticsMetrics() { - if sc.regionStats == nil { - return - } - sc.regionStats.Reset() - sc.labelStats.Reset() - // reset hot cache metrics - sc.hotStat.ResetMetrics() -} - -func (sc *schedulingController) collectStatisticsMetrics() { if sc.regionStats == nil { return } diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 6d233a8c8ab..a5861f1ba43 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -518,8 +518,6 @@ func TestRaftClusterMultipleRestart(t *testing.T) { err = rc.Start(leaderServer.GetServer()) re.NoError(err) time.Sleep(time.Millisecond) - rc = leaderServer.GetRaftCluster() - re.NotNil(rc) rc.Stop() } re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs"))