diff --git a/server/api/region_test.go b/server/api/region_test.go index 4d4b31e93d3e..6a5240eedbb5 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -171,15 +171,13 @@ func (suite *regionTestSuite) TestRegion() { func (suite *regionTestSuite) TestRegionCheck() { re := suite.Require() re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/skipSleep", `return(true)`)) - tu.Eventually(re, func() bool { - return suite.svr.GetRaftCluster().GetLastRegionStats() != nil - }) - r := newTestRegionInfo(2, 1, []byte("a"), []byte("b")) + r := newTestRegionInfo(2, 1, []byte("a"), []byte("")) downPeer := &metapb.Peer{Id: 13, StoreId: 2} r = r.Clone(core.WithAddPeer(downPeer), core.WithDownPeers([]*pdpb.PeerStats{{Peer: downPeer, DownSeconds: 3600}}), core.WithPendingPeers([]*metapb.Peer{downPeer})) - mustRegionHeartbeat(re, suite.svr, r) - time.Sleep(150 * time.Millisecond) + tu.Eventually(re, func() bool { + return !suite.svr.GetRaftCluster().IsLastRegionStatsEmpty() + }) url := fmt.Sprintf("%s/region/id/%d", suite.urlPrefix, r.GetID()) r1 := &RegionInfo{} suite.NoError(tu.ReadGetJSON(re, testDialClient, url, r1)) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index c48fe0cef15d..54d650e8c5af 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -270,6 +270,7 @@ func (c *RaftCluster) Start(s Server) error { c.storeConfigManager = config.NewStoreConfigManager(c.httpClient) c.coordinator = newCoordinator(c.ctx, cluster, s.GetHBStreams()) c.regionStats = statistics.NewRegionStatistics(c.opt, c.ruleManager, c.storeConfigManager) + c.lastRegionStats = statistics.NewRegionStatistics(c.opt, c.ruleManager, c.storeConfigManager) c.limiter = NewStoreLimiter(s.GetPersistOptions()) c.wg.Add(8) @@ -793,14 +794,6 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { // Save to cache if meta or leader is updated, or contains any down/pending peer. // Mark isNew if the region in cache does not have leader. isNew, saveKV, saveCache, needSync := regionGuide(region, origin) - if !saveKV && !saveCache && !isNew { - // Due to some config changes need to update the region stats as well, - // so we do some extra checks here. - if c.regionStats != nil && c.regionStats.RegionStatsNeedUpdate(region) { - c.regionStats.Observe(region, c.getRegionStoresLocked(region)) - } - return nil - } failpoint.Inject("concurrentRegionHeartbeat", func() { time.Sleep(500 * time.Millisecond) @@ -976,9 +969,9 @@ func (c *RaftCluster) GetAverageRegionSize() int64 { return c.core.GetAverageRegionSize() } -// GetLastRegionStats returns region statistics from cluster. Only for test purpose. -func (c *RaftCluster) GetLastRegionStats() *statistics.RegionStatistics { - return c.lastRegionStats +// IsLastRegionStatsEmpty returns if region statistics is empty. Only for test purpose. +func (c *RaftCluster) IsLastRegionStatsEmpty() bool { + return c.lastRegionStats.IsEmpty() } // DropCacheRegion removes a region from the cache. @@ -1808,16 +1801,15 @@ func (c *RaftCluster) resetMetrics() { } func (c *RaftCluster) collectStatsMetrics() { - c.Lock() - defer c.Unlock() - if c.regionStats == nil { + if c.regionStats == nil || c.lastRegionStats == nil { return } c.regionStats.Collect() c.labelLevelStats.Collect() - c.lastRegionStats = c.regionStats.Clone() - c.labelLevelStats = statistics.NewLabelStatistics() - c.regionStats = statistics.NewRegionStatistics(c.opt, c.ruleManager, c.storeConfigManager) + lastStats := c.regionStats.GetStats() + c.lastRegionStats.SetStats(lastStats) + c.labelLevelStats.ResetStats() + c.regionStats.ResetStats() } func (c *RaftCluster) collectHotStatsMetrics() { @@ -1826,10 +1818,11 @@ func (c *RaftCluster) collectHotStatsMetrics() { } func (c *RaftCluster) resetClusterMetrics() { - if c.regionStats == nil { + if c.regionStats == nil || c.lastRegionStats == nil { return } c.regionStats.Reset() + c.lastRegionStats.Reset() c.labelLevelStats.Reset() // reset hot cache metrics c.hotStat.ResetMetrics() @@ -1863,8 +1856,6 @@ func (c *RaftCluster) resetProgressIndicator() { // GetRegionStatsByType gets the status of the region by types. func (c *RaftCluster) GetRegionStatsByType(typ statistics.RegionStatisticType) []*core.RegionInfo { - c.RLock() - defer c.RUnlock() if c.lastRegionStats == nil { return nil } @@ -1872,8 +1863,6 @@ func (c *RaftCluster) GetRegionStatsByType(typ statistics.RegionStatisticType) [ } func (c *RaftCluster) updateRegionStats(regions []*core.RegionInfo) { - c.Lock() - defer c.Unlock() for _, region := range regions { if c.regionStats != nil { c.regionStats.Observe(region, c.getRegionStoresLocked(region)) diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 9ffa0ee62ad2..43c69d67a825 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -934,25 +934,32 @@ func TestRegionSizeChanged(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/skipSleep", `return(true)`)) _, opt, err := newTestScheduleConfig() re.NoError(err) cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) cluster.coordinator = newCoordinator(ctx, cluster, nil) + cluster.regionLabeler, _ = labeler.NewRegionLabeler(ctx, cluster.storage, time.Second*5) cluster.regionStats = statistics.NewRegionStatistics(cluster.GetOpts(), cluster.ruleManager, cluster.storeConfigManager) + cluster.lastRegionStats = statistics.NewRegionStatistics(cluster.GetOpts(), cluster.ruleManager, cluster.storeConfigManager) + cluster.coordinator.wg.Add(1) + go cluster.coordinator.collectRegionStats() region := newTestRegions(1, 3, 3)[0] cluster.opt.GetMaxMergeRegionKeys() curMaxMergeSize := int64(cluster.opt.GetMaxMergeRegionSize()) curMaxMergeKeys := int64(cluster.opt.GetMaxMergeRegionKeys()) region = region.Clone( core.WithLeader(region.GetPeers()[2]), + core.WithEndKey([]byte{}), core.SetApproximateSize(curMaxMergeSize-1), core.SetApproximateKeys(curMaxMergeKeys-1), core.SetFromHeartbeat(true), ) cluster.processRegionHeartbeat(region) regionID := region.GetID() - re.True(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) + time.Sleep(150 * time.Millisecond) + re.True(cluster.lastRegionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) // Test ApproximateSize and ApproximateKeys change. region = region.Clone( core.WithLeader(region.GetPeers()[2]), @@ -961,16 +968,20 @@ func TestRegionSizeChanged(t *testing.T) { core.SetFromHeartbeat(true), ) cluster.processRegionHeartbeat(region) - re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) + time.Sleep(150 * time.Millisecond) + re.False(cluster.lastRegionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) // Test MaxMergeRegionSize and MaxMergeRegionKeys change. cluster.opt.SetMaxMergeRegionSize((uint64(curMaxMergeSize + 2))) cluster.opt.SetMaxMergeRegionKeys((uint64(curMaxMergeKeys + 2))) cluster.processRegionHeartbeat(region) - re.True(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) + time.Sleep(150 * time.Millisecond) + re.True(cluster.lastRegionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) cluster.opt.SetMaxMergeRegionSize((uint64(curMaxMergeSize))) cluster.opt.SetMaxMergeRegionKeys((uint64(curMaxMergeKeys))) cluster.processRegionHeartbeat(region) - re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) + time.Sleep(150 * time.Millisecond) + re.False(cluster.lastRegionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/skipSleep")) } func TestConcurrentReportBucket(t *testing.T) { @@ -1209,6 +1220,7 @@ func TestOfflineAndMerge(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/skipSleep", `return(true)`)) _, opt, err := newTestScheduleConfig() re.NoError(err) @@ -1222,7 +1234,10 @@ func TestOfflineAndMerge(t *testing.T) { } } cluster.regionStats = statistics.NewRegionStatistics(cluster.GetOpts(), cluster.ruleManager, cluster.storeConfigManager) + cluster.lastRegionStats = statistics.NewRegionStatistics(cluster.GetOpts(), cluster.ruleManager, cluster.storeConfigManager) cluster.coordinator = newCoordinator(ctx, cluster, nil) + cluster.coordinator.wg.Add(1) + go cluster.coordinator.collectRegionStats() // Put 4 stores. for _, store := range newTestStores(4, "5.0.0") { @@ -1261,14 +1276,17 @@ func TestOfflineAndMerge(t *testing.T) { regions = core.SplitRegions(regions) } heartbeatRegions(re, cluster, regions) + time.Sleep(150 * time.Millisecond) re.Len(cluster.GetRegionStatsByType(statistics.OfflinePeer), len(regions)) // Merge. for i := 0; i < n; i++ { regions = core.MergeRegions(regions) heartbeatRegions(re, cluster, regions) + time.Sleep(150 * time.Millisecond) re.Len(cluster.GetRegionStatsByType(statistics.OfflinePeer), len(regions)) } + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/skipSleep")) } func TestSyncConfig(t *testing.T) { diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index c3fdebf902d9..8dc9b23eb813 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -215,7 +215,6 @@ func (c *coordinator) collectRegionStats() { key = nil continue } - key = regions[length-1].GetEndKey() c.cluster.updateRegionStats(regions) if len(key) == 0 { diff --git a/server/cluster/coordinator_test.go b/server/cluster/coordinator_test.go index 01f400a3e802..4875d1f8fdbb 100644 --- a/server/cluster/coordinator_test.go +++ b/server/cluster/coordinator_test.go @@ -243,6 +243,7 @@ func TestCollectMetrics(t *testing.T) { tc, co, cleanup := prepare(nil, func(tc *testCluster) { tc.regionStats = statistics.NewRegionStatistics(tc.GetOpts(), nil, tc.storeConfigManager) + tc.lastRegionStats = statistics.NewRegionStatistics(tc.GetOpts(), nil, tc.storeConfigManager) }, func(co *coordinator) { co.run() }, re) defer cleanup() diff --git a/server/statistics/region_collection.go b/server/statistics/region_collection.go index 9a58ab537cac..13f2d3b0c174 100644 --- a/server/statistics/region_collection.go +++ b/server/statistics/region_collection.go @@ -48,6 +48,8 @@ var statsTypes = []RegionStatisticType{ OfflinePeer, LearnerPeer, EmptyRegion, + OversizedRegion, + UndersizedRegion, } const nonIsolation = "none" @@ -246,25 +248,60 @@ func (r *RegionStatistics) Collect() { regionStatusGauge.WithLabelValues("undersized-region-count").Set(float64(len(r.stats[UndersizedRegion]))) } +// SetStats sets the statistics to the given one. +func (r *RegionStatistics) SetStats(s map[RegionStatisticType]map[uint64]*RegionInfo) { + r.Lock() + defer r.Unlock() + r.stats = s +} + +// ResetStats resets the statistics. +func (r *RegionStatistics) ResetStats() { + r.Lock() + defer r.Unlock() + r.stats = make(map[RegionStatisticType]map[uint64]*RegionInfo) + r.index = make(map[uint64]RegionStatisticType) + + r.stats[MissPeer] = make(map[uint64]*RegionInfo) + r.stats[ExtraPeer] = make(map[uint64]*RegionInfo) + r.stats[DownPeer] = make(map[uint64]*RegionInfo) + r.stats[PendingPeer] = make(map[uint64]*RegionInfo) + r.stats[OfflinePeer] = make(map[uint64]*RegionInfo) + r.stats[LearnerPeer] = make(map[uint64]*RegionInfo) + r.stats[EmptyRegion] = make(map[uint64]*RegionInfo) + r.stats[OversizedRegion] = make(map[uint64]*RegionInfo) + r.stats[UndersizedRegion] = make(map[uint64]*RegionInfo) +} + +// IsEmpty returns if the region statistics is empty. Only for test purpose. +func (r *RegionStatistics) IsEmpty() bool { + r.RLock() + defer r.RUnlock() + + return len(r.stats[MissPeer]) == 0 && len(r.stats[ExtraPeer]) == 0 && len(r.stats[DownPeer]) == 0 && + len(r.stats[PendingPeer]) == 0 && len(r.stats[OfflinePeer]) == 0 && len(r.stats[LearnerPeer]) == 0 && + len(r.stats[EmptyRegion]) == 0 && len(r.stats[OversizedRegion]) == 0 && len(r.stats[UndersizedRegion]) == 0 +} + // Reset resets the metrics of the regions' status. func (r *RegionStatistics) Reset() { regionStatusGauge.Reset() } -// Clone clones the regions' status. -func (r *RegionStatistics) Clone() *RegionStatistics { - r1 := &RegionStatistics{ - stats: make(map[RegionStatisticType]map[uint64]*RegionInfo), - } +// GetStats returns the regions' status. +func (r *RegionStatistics) GetStats() map[RegionStatisticType]map[uint64]*RegionInfo { + r.RLock() + defer r.RUnlock() + stats := make(map[RegionStatisticType]map[uint64]*RegionInfo) for _, typ := range statsTypes { - r1.stats[typ] = make(map[uint64]*RegionInfo) + stats[typ] = make(map[uint64]*RegionInfo) for k, v := range r.stats[typ] { - r1.stats[typ][k] = v + stats[typ][k] = v } } - return r1 + return stats } // LabelStatistics is the statistics of the level of labels. @@ -307,6 +344,14 @@ func (l *LabelStatistics) Collect() { } } +// ResetStats resets the statistics. +func (l *LabelStatistics) ResetStats() { + l.Lock() + defer l.Unlock() + l.regionLabelStats = make(map[uint64]string) + l.labelCounter = make(map[string]int) +} + // Reset resets the metrics of the label status. func (l *LabelStatistics) Reset() { regionLabelLevelGauge.Reset() diff --git a/tests/pdctl/region/region_test.go b/tests/pdctl/region/region_test.go index 00146b9e1a15..dda8505c74a0 100644 --- a/tests/pdctl/region/region_test.go +++ b/tests/pdctl/region/region_test.go @@ -102,7 +102,7 @@ func TestRegion(t *testing.T) { ) defer cluster.Destroy() tu.Eventually(re, func() bool { - return leaderServer.GetRaftCluster().GetLastRegionStats() != nil + return !leaderServer.GetRaftCluster().IsLastRegionStatsEmpty() }) var testRegionsCases = []struct { args []string