diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index 24468a1026e..51408770f32 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -796,13 +796,6 @@ "intervalFactor": 2, "legendFormat": "{{type}}", "refId": "B" - }, - { - "expr": "pd_regions_offline_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", type=\"offline-peer-region-count\", instance=\"$instance\"}", - "format": "time_series", - "intervalFactor": 1, - "legendFormat": "{{type}}", - "refId": "C" } ], "thresholds": [ diff --git a/server/api/region.go b/server/api/region.go index 244b02677c2..df67a8792e3 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -490,7 +490,7 @@ func (h *regionsHandler) GetLearnerPeerRegions(w http.ResponseWriter, r *http.Re // @Router /regions/check/offline-peer [get] func (h *regionsHandler) GetOfflinePeerRegions(w http.ResponseWriter, r *http.Request) { handler := h.svr.GetHandler() - regions, err := handler.GetOfflinePeer(statistics.OfflinePeer) + regions, err := handler.GetRegionsByType(statistics.OfflinePeer) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return diff --git a/server/api/region_test.go b/server/api/region_test.go index 84ce1ad0983..8d75c1aced8 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -24,6 +24,7 @@ import ( "net/url" "sort" "testing" + "time" "github.com/docker/go-units" "github.com/pingcap/failpoint" @@ -168,11 +169,15 @@ func (suite *regionTestSuite) TestRegion() { } func (suite *regionTestSuite) TestRegionCheck() { - r := newTestRegionInfo(2, 1, []byte("a"), []byte("b")) + re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/skipSleep", `return(true)`)) + r := newTestRegionInfo(2, 1, []byte("a"), []byte("")) downPeer := &metapb.Peer{Id: 13, StoreId: 2} r = r.Clone(core.WithAddPeer(downPeer), core.WithDownPeers([]*pdpb.PeerStats{{Peer: downPeer, DownSeconds: 3600}}), core.WithPendingPeers([]*metapb.Peer{downPeer})) - re := suite.Require() mustRegionHeartbeat(re, suite.svr, r) + tu.Eventually(re, func() bool { + return !suite.svr.GetRaftCluster().IsLastRegionStatsEmpty() + }) url := fmt.Sprintf("%s/region/id/%d", suite.urlPrefix, r.GetID()) r1 := &RegionInfo{} suite.NoError(tu.ReadGetJSON(re, testDialClient, url, r1)) @@ -199,6 +204,7 @@ func (suite *regionTestSuite) TestRegionCheck() { r = r.Clone(core.SetApproximateSize(1)) mustRegionHeartbeat(re, suite.svr, r) + time.Sleep(150 * time.Millisecond) url = fmt.Sprintf("%s/regions/check/%s", suite.urlPrefix, "empty-region") r5 := &RegionsInfo{} suite.NoError(tu.ReadGetJSON(re, testDialClient, url, r5)) @@ -207,6 +213,7 @@ func (suite *regionTestSuite) TestRegionCheck() { r = r.Clone(core.SetApproximateSize(1)) mustRegionHeartbeat(re, suite.svr, r) + time.Sleep(150 * time.Millisecond) url = fmt.Sprintf("%s/regions/check/%s", suite.urlPrefix, "hist-size") r6 := make([]*histItem, 1) suite.NoError(tu.ReadGetJSON(re, testDialClient, url, &r6)) @@ -215,11 +222,13 @@ func (suite *regionTestSuite) TestRegionCheck() { r = r.Clone(core.SetApproximateKeys(1000)) mustRegionHeartbeat(re, suite.svr, r) + time.Sleep(150 * time.Millisecond) url = fmt.Sprintf("%s/regions/check/%s", suite.urlPrefix, "hist-keys") r7 := make([]*histItem, 1) suite.NoError(tu.ReadGetJSON(re, testDialClient, url, &r7)) histKeys := []*histItem{{Start: 1000, End: 1999, Count: 1}} suite.Equal(histKeys, r7) + suite.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/skipSleep")) } func (suite *regionTestSuite) TestRegions() { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 41ff2fe3ac5..25c6f750312 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -138,6 +138,7 @@ type RaftCluster struct { coordinator *coordinator labelLevelStats *statistics.LabelStatistics regionStats *statistics.RegionStatistics + lastRegionStats *statistics.RegionStatistics hotStat *statistics.HotStat hotBuckets *buckets.HotBucketCache ruleManager *placement.RuleManager @@ -275,6 +276,7 @@ func (c *RaftCluster) Start(s Server) error { c.storeConfigManager = config.NewStoreConfigManager(c.httpClient) c.coordinator = newCoordinator(c.ctx, cluster, s.GetHBStreams()) c.regionStats = statistics.NewRegionStatistics(c.opt, c.ruleManager, c.storeConfigManager) + c.lastRegionStats = statistics.NewRegionStatistics(c.opt, c.ruleManager, c.storeConfigManager) c.limiter = NewStoreLimiter(s.GetPersistOptions()) c.externalTS, err = c.storage.LoadExternalTS() if err != nil { @@ -845,19 +847,10 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { } c.coordinator.CheckTransferWitnessLeader(region) - hasRegionStats := c.regionStats != nil // Save to storage if meta is updated. // Save to cache if meta or leader is updated, or contains any down/pending peer. // Mark isNew if the region in cache does not have leader. isNew, saveKV, saveCache, needSync := regionGuide(region, origin) - if !saveKV && !saveCache && !isNew { - // Due to some config changes need to update the region stats as well, - // so we do some extra checks here. - if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { - c.regionStats.Observe(region, c.getRegionStoresLocked(region)) - } - return nil - } failpoint.Inject("concurrentRegionHeartbeat", func() { time.Sleep(500 * time.Millisecond) @@ -875,20 +868,9 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil { return err } - - for _, item := range overlaps { - if c.regionStats != nil { - c.regionStats.ClearDefunctRegion(item.GetID()) - } - c.labelLevelStats.ClearDefunctRegion(item.GetID()) - } regionUpdateCacheEventCounter.Inc() } - if hasRegionStats { - c.regionStats.Observe(region, c.getRegionStoresLocked(region)) - } - if !c.IsPrepared() && isNew { c.coordinator.prepareChecker.collect(region) } @@ -1033,6 +1015,11 @@ func (c *RaftCluster) GetAverageRegionSize() int64 { return c.core.GetAverageRegionSize() } +// IsLastRegionStatsEmpty returns if region statistics is empty. Only for test purpose. +func (c *RaftCluster) IsLastRegionStatsEmpty() bool { + return c.lastRegionStats.IsEmpty() +} + // DropCacheRegion removes a region from the cache. func (c *RaftCluster) DropCacheRegion(id uint64) { c.core.RemoveRegionIfExist(id) @@ -1880,7 +1867,7 @@ func (c *RaftCluster) collectMetrics() { c.coordinator.collectSchedulerMetrics() c.coordinator.collectHotSpotMetrics() - c.collectClusterMetrics() + c.collectHotStatsMetrics() c.collectHealthStatus() } @@ -1895,21 +1882,29 @@ func (c *RaftCluster) resetMetrics() { c.resetProgressIndicator() } -func (c *RaftCluster) collectClusterMetrics() { - if c.regionStats == nil { +func (c *RaftCluster) collectStatsMetrics() { + if c.regionStats == nil || c.lastRegionStats == nil { return } c.regionStats.Collect() c.labelLevelStats.Collect() + lastStats := c.regionStats.GetStats() + c.lastRegionStats.SetStats(lastStats) + c.labelLevelStats.ResetStats() + c.regionStats.ResetStats() +} + +func (c *RaftCluster) collectHotStatsMetrics() { // collect hot cache metrics c.hotStat.CollectMetrics() } func (c *RaftCluster) resetClusterMetrics() { - if c.regionStats == nil { + if c.regionStats == nil || c.lastRegionStats == nil { return } c.regionStats.Reset() + c.lastRegionStats.Reset() c.labelLevelStats.Reset() // reset hot cache metrics c.hotStat.ResetMetrics() @@ -1943,22 +1938,17 @@ func (c *RaftCluster) resetProgressIndicator() { // GetRegionStatsByType gets the status of the region by types. func (c *RaftCluster) GetRegionStatsByType(typ statistics.RegionStatisticType) []*core.RegionInfo { - if c.regionStats == nil { - return nil - } - return c.regionStats.GetRegionStatsByType(typ) -} - -// GetOfflineRegionStatsByType gets the status of the offline region by types. -func (c *RaftCluster) GetOfflineRegionStatsByType(typ statistics.RegionStatisticType) []*core.RegionInfo { - if c.regionStats == nil { + if c.lastRegionStats == nil { return nil } - return c.regionStats.GetOfflineRegionStatsByType(typ) + return c.lastRegionStats.GetRegionStatsByType(typ) } -func (c *RaftCluster) updateRegionsLabelLevelStats(regions []*core.RegionInfo) { +func (c *RaftCluster) updateRegionStats(regions []*core.RegionInfo) { for _, region := range regions { + if c.regionStats != nil { + c.regionStats.Observe(region, c.getRegionStoresLocked(region)) + } c.labelLevelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.opt.GetLocationLabels()) } } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index b08c259d6c2..3120337c89c 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -950,25 +950,32 @@ func TestRegionSizeChanged(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/skipSleep", `return(true)`)) _, opt, err := newTestScheduleConfig() re.NoError(err) cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) cluster.coordinator = newCoordinator(ctx, cluster, nil) + cluster.regionLabeler, _ = labeler.NewRegionLabeler(ctx, cluster.storage, time.Second*5) cluster.regionStats = statistics.NewRegionStatistics(cluster.GetOpts(), cluster.ruleManager, cluster.storeConfigManager) + cluster.lastRegionStats = statistics.NewRegionStatistics(cluster.GetOpts(), cluster.ruleManager, cluster.storeConfigManager) + cluster.coordinator.wg.Add(1) + go cluster.coordinator.collectRegionStats() region := newTestRegions(1, 3, 3)[0] cluster.opt.GetMaxMergeRegionKeys() curMaxMergeSize := int64(cluster.opt.GetMaxMergeRegionSize()) curMaxMergeKeys := int64(cluster.opt.GetMaxMergeRegionKeys()) region = region.Clone( core.WithLeader(region.GetPeers()[2]), + core.WithEndKey([]byte{}), core.SetApproximateSize(curMaxMergeSize-1), core.SetApproximateKeys(curMaxMergeKeys-1), core.SetFromHeartbeat(true), ) cluster.processRegionHeartbeat(region) regionID := region.GetID() - re.True(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) + time.Sleep(150 * time.Millisecond) + re.True(cluster.lastRegionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) // Test ApproximateSize and ApproximateKeys change. region = region.Clone( core.WithLeader(region.GetPeers()[2]), @@ -977,16 +984,20 @@ func TestRegionSizeChanged(t *testing.T) { core.SetFromHeartbeat(true), ) cluster.processRegionHeartbeat(region) - re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) + time.Sleep(150 * time.Millisecond) + re.False(cluster.lastRegionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) // Test MaxMergeRegionSize and MaxMergeRegionKeys change. cluster.opt.SetMaxMergeRegionSize((uint64(curMaxMergeSize + 2))) cluster.opt.SetMaxMergeRegionKeys((uint64(curMaxMergeKeys + 2))) cluster.processRegionHeartbeat(region) - re.True(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) + time.Sleep(150 * time.Millisecond) + re.True(cluster.lastRegionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) cluster.opt.SetMaxMergeRegionSize((uint64(curMaxMergeSize))) cluster.opt.SetMaxMergeRegionKeys((uint64(curMaxMergeKeys))) cluster.processRegionHeartbeat(region) - re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) + time.Sleep(150 * time.Millisecond) + re.False(cluster.lastRegionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/skipSleep")) } func TestConcurrentReportBucket(t *testing.T) { @@ -1105,7 +1116,7 @@ func TestRegionLabelIsolationLevel(t *testing.T) { r := core.NewRegionInfo(region, peers[0]) re.NoError(cluster.putRegion(r)) - cluster.updateRegionsLabelLevelStats([]*core.RegionInfo{r}) + cluster.labelLevelStats.Observe(r, cluster.getStoresWithoutLabelLocked(r, core.EngineKey, core.EngineTiFlash), cluster.opt.GetLocationLabels()) counter := cluster.labelLevelStats.GetLabelCounter() re.Equal(0, counter["none"]) re.Equal(1, counter["zone"]) @@ -1225,6 +1236,7 @@ func TestOfflineAndMerge(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/skipSleep", `return(true)`)) _, opt, err := newTestScheduleConfig() re.NoError(err) @@ -1238,7 +1250,10 @@ func TestOfflineAndMerge(t *testing.T) { } } cluster.regionStats = statistics.NewRegionStatistics(cluster.GetOpts(), cluster.ruleManager, cluster.storeConfigManager) + cluster.lastRegionStats = statistics.NewRegionStatistics(cluster.GetOpts(), cluster.ruleManager, cluster.storeConfigManager) cluster.coordinator = newCoordinator(ctx, cluster, nil) + cluster.coordinator.wg.Add(1) + go cluster.coordinator.collectRegionStats() // Put 4 stores. for _, store := range newTestStores(4, "5.0.0") { @@ -1277,14 +1292,17 @@ func TestOfflineAndMerge(t *testing.T) { regions = core.SplitRegions(regions) } heartbeatRegions(re, cluster, regions) - re.Len(cluster.GetOfflineRegionStatsByType(statistics.OfflinePeer), len(regions)) + time.Sleep(150 * time.Millisecond) + re.Len(cluster.GetRegionStatsByType(statistics.OfflinePeer), len(regions)) // Merge. for i := 0; i < n; i++ { regions = core.MergeRegions(regions) heartbeatRegions(re, cluster, regions) - re.Len(cluster.GetOfflineRegionStatsByType(statistics.OfflinePeer), len(regions)) + time.Sleep(150 * time.Millisecond) + re.Len(cluster.GetRegionStatsByType(statistics.OfflinePeer), len(regions)) } + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/skipSleep")) } func TestSyncConfig(t *testing.T) { diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index 03f0182f2ba..8a64c28586c 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -46,12 +46,15 @@ import ( const ( runSchedulerCheckInterval = 3 * time.Second checkSuspectRangesInterval = 100 * time.Millisecond + collectRegionStatsInterval = 100 * time.Millisecond collectFactor = 0.9 collectTimeout = 5 * time.Minute maxScheduleRetries = 10 maxLoadConfigRetries = 10 - patrolScanRegionLimit = 128 // It takes about 14 minutes to iterate 1 million regions. + patrolScanRegionLimit = 128 // It takes about 14 minutes to iterate 1 million regions. + statsScanRegionLimit = 1000 // It takes about 3.5 minutes to iterate 2 million regions. + defaultScrapInterval = 15 * time.Second // PluginLoad means action for load plugin PluginLoad = "PluginLoad" // PluginUnload means action for unload plugin @@ -145,13 +148,11 @@ func (c *coordinator) patrolRegions() { if len(regions) == 0 { continue } - // Updates the label level isolation statistics. - c.cluster.updateRegionsLabelLevelStats(regions) if len(key) == 0 { patrolCheckRegionsGauge.Set(time.Since(start).Seconds()) start = time.Now() } - failpoint.Inject("break-patrol", func() { + failpoint.Inject("breakPatrol", func() { failpoint.Break() }) } @@ -188,6 +189,54 @@ func (c *coordinator) checkWaitingRegions() { } } +func (c *coordinator) collectRegionStats() { + defer logutil.LogPanic() + + defer c.wg.Done() + ticker := time.NewTicker(collectRegionStatsInterval) + defer ticker.Stop() + timer := time.NewTimer(defaultScrapInterval) + defer timer.Stop() + log.Info("coordinator starts collect region stats") + var key []byte + start := time.Now() + for { + select { + case <-ticker.C: + case <-c.ctx.Done(): + log.Info("collect region stats has been stopped") + return + } + + regions := c.cluster.ScanRegions(key, nil, statsScanRegionLimit) + length := len(regions) + if length == 0 { + c.cluster.collectStatsMetrics() + // Resets the scan key. + key = nil + continue + } + key = regions[length-1].GetEndKey() + c.cluster.updateRegionStats(regions) + if len(key) == 0 { + c.cluster.collectStatsMetrics() + failpoint.Inject("skipSleep", func() { + start = time.Time{} + }) + if time.Since(start) < defaultScrapInterval { + timer.Reset(defaultScrapInterval - time.Since(start)) + select { + case <-c.ctx.Done(): + return + case <-timer.C: + } + } + collectRegionStatsGauge.Set(time.Since(start).Seconds()) + start = time.Now() + } + } +} + // checkPriorityRegions checks priority regions func (c *coordinator) checkPriorityRegions() { items := c.checkers.GetPriorityRegions() @@ -410,9 +459,10 @@ func (c *coordinator) run() { log.Error("cannot persist schedule config", errs.ZapError(err)) } - c.wg.Add(3) + c.wg.Add(4) // Starts to patrol regions. go c.patrolRegions() + go c.collectRegionStats() // Checks suspect key ranges go c.checkSuspectRanges() go c.drivePushOperator() diff --git a/server/cluster/coordinator_test.go b/server/cluster/coordinator_test.go index 2fa61faa14e..89cb234db4b 100644 --- a/server/cluster/coordinator_test.go +++ b/server/cluster/coordinator_test.go @@ -243,6 +243,7 @@ func TestCollectMetricsConcurrent(t *testing.T) { tc, co, cleanup := prepare(nil, func(tc *testCluster) { tc.regionStats = statistics.NewRegionStatistics(tc.GetOpts(), nil, tc.storeConfigManager) + tc.lastRegionStats = statistics.NewRegionStatistics(tc.GetOpts(), nil, tc.storeConfigManager) }, func(co *coordinator) { co.run() }, re) defer cleanup() @@ -261,7 +262,8 @@ func TestCollectMetricsConcurrent(t *testing.T) { for i := 0; i < 1000; i++ { co.collectHotSpotMetrics() co.collectSchedulerMetrics() - co.cluster.collectClusterMetrics() + co.cluster.collectHotStatsMetrics() + co.cluster.collectStatsMetrics() } co.resetHotSpotMetrics() co.resetSchedulerMetrics() @@ -292,7 +294,7 @@ func TestCollectMetrics(t *testing.T) { for i := 0; i < 1000; i++ { co.collectHotSpotMetrics() co.collectSchedulerMetrics() - co.cluster.collectClusterMetrics() + co.cluster.collectStatsMetrics() } stores := co.cluster.GetStores() regionStats := co.cluster.RegionWriteStats() @@ -319,10 +321,11 @@ func prepare(setCfg func(*config.ScheduleConfig), setTc func(*testCluster), run } tc := newTestCluster(ctx, opt) hbStreams := hbstream.NewTestHeartbeatStreams(ctx, tc.meta.GetId(), tc, true /* need to run */) + co := newCoordinator(ctx, tc.RaftCluster, hbStreams) + tc.coordinator = co if setTc != nil { setTc(tc) } - co := newCoordinator(ctx, tc.RaftCluster, hbStreams) if run != nil { run(co) } @@ -528,7 +531,7 @@ func TestCheckCache(t *testing.T) { // Add a peer with two replicas. re.NoError(tc.addLeaderRegion(1, 2, 3)) - re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/break-patrol", `return`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/breakPatrol", `return`)) // case 1: operator cannot be created due to replica-schedule-limit restriction co.wg.Add(1) @@ -562,7 +565,7 @@ func TestCheckCache(t *testing.T) { re.Empty(co.checkers.GetWaitingRegions()) co.wg.Wait() - re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/break-patrol")) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/breakPatrol")) } func TestPeerState(t *testing.T) { diff --git a/server/cluster/metrics.go b/server/cluster/metrics.go index 8c0bceb94ca..19f09ce53f0 100644 --- a/server/cluster/metrics.go +++ b/server/cluster/metrics.go @@ -73,6 +73,14 @@ var ( Help: "Time spent of patrol checks region.", }) + collectRegionStatsGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "statistics", + Name: "collect_region_stats_time", + Help: "Time spent of collecting region stats.", + }) + updateStoreStatsGauge = prometheus.NewGauge( prometheus.GaugeOpts{ Namespace: "pd", @@ -152,4 +160,5 @@ func init() { prometheus.MustRegister(storesETAGauge) prometheus.MustRegister(storeSyncConfigEvent) prometheus.MustRegister(updateStoreStatsGauge) + prometheus.MustRegister(collectRegionStatsGauge) } diff --git a/server/handler.go b/server/handler.go index 96654069550..f79a1a0d9ca 100644 --- a/server/handler.go +++ b/server/handler.go @@ -903,15 +903,6 @@ func (h *Handler) GetSchedulerConfigHandler() (http.Handler, error) { return mux, nil } -// GetOfflinePeer gets the region with offline peer. -func (h *Handler) GetOfflinePeer(typ statistics.RegionStatisticType) ([]*core.RegionInfo, error) { - c := h.s.GetRaftCluster() - if c == nil { - return nil, errs.ErrNotBootstrapped.FastGenByArgs() - } - return c.GetOfflineRegionStatsByType(typ), nil -} - // ResetTS resets the ts with specified tso. func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) error { log.Info("reset-ts", diff --git a/server/statistics/metrics.go b/server/statistics/metrics.go index bd4c897e258..a5ea07f4f55 100644 --- a/server/statistics/metrics.go +++ b/server/statistics/metrics.go @@ -41,14 +41,6 @@ var ( Help: "Status of the regions.", }, []string{"type"}) - offlineRegionStatusGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "pd", - Subsystem: "regions", - Name: "offline_status", - Help: "Status of the offline regions.", - }, []string{"type"}) - clusterStatusGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "pd", @@ -190,7 +182,6 @@ func init() { prometheus.MustRegister(hotCacheStatusGauge) prometheus.MustRegister(storeStatusGauge) prometheus.MustRegister(regionStatusGauge) - prometheus.MustRegister(offlineRegionStatusGauge) prometheus.MustRegister(clusterStatusGauge) prometheus.MustRegister(placementStatusGauge) prometheus.MustRegister(configStatusGauge) diff --git a/server/statistics/region_collection.go b/server/statistics/region_collection.go index ab5e6b22d9c..13f2d3b0c17 100644 --- a/server/statistics/region_collection.go +++ b/server/statistics/region_collection.go @@ -40,6 +40,18 @@ const ( UndersizedRegion ) +var statsTypes = []RegionStatisticType{ + MissPeer, + ExtraPeer, + DownPeer, + PendingPeer, + OfflinePeer, + LearnerPeer, + EmptyRegion, + OversizedRegion, + UndersizedRegion, +} + const nonIsolation = "none" // RegionInfo is used to record the status of region. @@ -54,9 +66,7 @@ type RegionStatistics struct { sync.RWMutex opt *config.PersistOptions stats map[RegionStatisticType]map[uint64]*RegionInfo - offlineStats map[RegionStatisticType]map[uint64]*core.RegionInfo index map[uint64]RegionStatisticType - offlineIndex map[uint64]RegionStatisticType ruleManager *placement.RuleManager storeConfigManager *config.StoreConfigManager } @@ -68,25 +78,18 @@ func NewRegionStatistics(opt *config.PersistOptions, ruleManager *placement.Rule ruleManager: ruleManager, storeConfigManager: storeConfigManager, stats: make(map[RegionStatisticType]map[uint64]*RegionInfo), - offlineStats: make(map[RegionStatisticType]map[uint64]*core.RegionInfo), index: make(map[uint64]RegionStatisticType), - offlineIndex: make(map[uint64]RegionStatisticType), } r.stats[MissPeer] = make(map[uint64]*RegionInfo) r.stats[ExtraPeer] = make(map[uint64]*RegionInfo) r.stats[DownPeer] = make(map[uint64]*RegionInfo) r.stats[PendingPeer] = make(map[uint64]*RegionInfo) + r.stats[OfflinePeer] = make(map[uint64]*RegionInfo) r.stats[LearnerPeer] = make(map[uint64]*RegionInfo) r.stats[EmptyRegion] = make(map[uint64]*RegionInfo) r.stats[OversizedRegion] = make(map[uint64]*RegionInfo) r.stats[UndersizedRegion] = make(map[uint64]*RegionInfo) - r.offlineStats[MissPeer] = make(map[uint64]*core.RegionInfo) - r.offlineStats[ExtraPeer] = make(map[uint64]*core.RegionInfo) - r.offlineStats[DownPeer] = make(map[uint64]*core.RegionInfo) - r.offlineStats[PendingPeer] = make(map[uint64]*core.RegionInfo) - r.offlineStats[LearnerPeer] = make(map[uint64]*core.RegionInfo) - r.offlineStats[OfflinePeer] = make(map[uint64]*core.RegionInfo) return r } @@ -109,17 +112,6 @@ func (r *RegionStatistics) IsRegionStatsType(regionID uint64, typ RegionStatisti return exist } -// GetOfflineRegionStatsByType gets the status of the offline region by types. The regions here need to be cloned, otherwise, it may cause data race problems. -func (r *RegionStatistics) GetOfflineRegionStatsByType(typ RegionStatisticType) []*core.RegionInfo { - r.RLock() - defer r.RUnlock() - res := make([]*core.RegionInfo, 0, len(r.stats[typ])) - for _, r := range r.offlineStats[typ] { - res = append(res, r.Clone()) - } - return res -} - func (r *RegionStatistics) deleteEntry(deleteIndex RegionStatisticType, regionID uint64) { for typ := RegionStatisticType(1); typ <= deleteIndex; typ <<= 1 { if deleteIndex&typ != 0 { @@ -128,14 +120,6 @@ func (r *RegionStatistics) deleteEntry(deleteIndex RegionStatisticType, regionID } } -func (r *RegionStatistics) deleteOfflineEntry(deleteIndex RegionStatisticType, regionID uint64) { - for typ := RegionStatisticType(1); typ <= deleteIndex; typ <<= 1 { - if deleteIndex&typ != 0 { - delete(r.offlineStats[typ], regionID) - } - } -} - // RegionStatsNeedUpdate checks whether the region's status need to be updated // due to some special state types. func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool { @@ -155,9 +139,8 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store // Region state. regionID := region.GetID() var ( - peerTypeIndex RegionStatisticType - offlinePeerTypeIndex RegionStatisticType - deleteIndex RegionStatisticType + peerTypeIndex RegionStatisticType + deleteIndex RegionStatisticType ) desiredReplicas := r.opt.GetMaxReplicas() desiredVoters := desiredReplicas @@ -211,10 +194,6 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store for typ, c := range conditions { if c { - if isRemoving && typ < EmptyRegion { - r.offlineStats[typ][regionID] = region - offlinePeerTypeIndex |= typ - } info := r.stats[typ][regionID] if info == nil { info = &RegionInfo{ @@ -241,15 +220,11 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store } if isRemoving { - r.offlineStats[OfflinePeer][regionID] = region - offlinePeerTypeIndex |= OfflinePeer - } - - if oldIndex, ok := r.offlineIndex[regionID]; ok { - deleteIndex = oldIndex &^ offlinePeerTypeIndex + r.stats[OfflinePeer][regionID] = &RegionInfo{ + RegionInfo: region, + } + peerTypeIndex |= OfflinePeer } - r.deleteOfflineEntry(deleteIndex, regionID) - r.offlineIndex[regionID] = offlinePeerTypeIndex if oldIndex, ok := r.index[regionID]; ok { deleteIndex = oldIndex &^ peerTypeIndex @@ -258,18 +233,6 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store r.index[regionID] = peerTypeIndex } -// ClearDefunctRegion is used to handle the overlap region. -func (r *RegionStatistics) ClearDefunctRegion(regionID uint64) { - r.Lock() - defer r.Unlock() - if oldIndex, ok := r.index[regionID]; ok { - r.deleteEntry(oldIndex, regionID) - } - if oldIndex, ok := r.offlineIndex[regionID]; ok { - r.deleteOfflineEntry(oldIndex, regionID) - } -} - // Collect collects the metrics of the regions' status. func (r *RegionStatistics) Collect() { r.RLock() @@ -280,21 +243,65 @@ func (r *RegionStatistics) Collect() { regionStatusGauge.WithLabelValues("pending-peer-region-count").Set(float64(len(r.stats[PendingPeer]))) regionStatusGauge.WithLabelValues("learner-peer-region-count").Set(float64(len(r.stats[LearnerPeer]))) regionStatusGauge.WithLabelValues("empty-region-count").Set(float64(len(r.stats[EmptyRegion]))) + regionStatusGauge.WithLabelValues("offline-region-count").Set(float64(len(r.stats[OfflinePeer]))) regionStatusGauge.WithLabelValues("oversized-region-count").Set(float64(len(r.stats[OversizedRegion]))) regionStatusGauge.WithLabelValues("undersized-region-count").Set(float64(len(r.stats[UndersizedRegion]))) +} + +// SetStats sets the statistics to the given one. +func (r *RegionStatistics) SetStats(s map[RegionStatisticType]map[uint64]*RegionInfo) { + r.Lock() + defer r.Unlock() + r.stats = s +} + +// ResetStats resets the statistics. +func (r *RegionStatistics) ResetStats() { + r.Lock() + defer r.Unlock() + r.stats = make(map[RegionStatisticType]map[uint64]*RegionInfo) + r.index = make(map[uint64]RegionStatisticType) - offlineRegionStatusGauge.WithLabelValues("miss-peer-region-count").Set(float64(len(r.offlineStats[MissPeer]))) - offlineRegionStatusGauge.WithLabelValues("extra-peer-region-count").Set(float64(len(r.offlineStats[ExtraPeer]))) - offlineRegionStatusGauge.WithLabelValues("down-peer-region-count").Set(float64(len(r.offlineStats[DownPeer]))) - offlineRegionStatusGauge.WithLabelValues("pending-peer-region-count").Set(float64(len(r.offlineStats[PendingPeer]))) - offlineRegionStatusGauge.WithLabelValues("learner-peer-region-count").Set(float64(len(r.offlineStats[LearnerPeer]))) - offlineRegionStatusGauge.WithLabelValues("offline-peer-region-count").Set(float64(len(r.offlineStats[OfflinePeer]))) + r.stats[MissPeer] = make(map[uint64]*RegionInfo) + r.stats[ExtraPeer] = make(map[uint64]*RegionInfo) + r.stats[DownPeer] = make(map[uint64]*RegionInfo) + r.stats[PendingPeer] = make(map[uint64]*RegionInfo) + r.stats[OfflinePeer] = make(map[uint64]*RegionInfo) + r.stats[LearnerPeer] = make(map[uint64]*RegionInfo) + r.stats[EmptyRegion] = make(map[uint64]*RegionInfo) + r.stats[OversizedRegion] = make(map[uint64]*RegionInfo) + r.stats[UndersizedRegion] = make(map[uint64]*RegionInfo) +} + +// IsEmpty returns if the region statistics is empty. Only for test purpose. +func (r *RegionStatistics) IsEmpty() bool { + r.RLock() + defer r.RUnlock() + + return len(r.stats[MissPeer]) == 0 && len(r.stats[ExtraPeer]) == 0 && len(r.stats[DownPeer]) == 0 && + len(r.stats[PendingPeer]) == 0 && len(r.stats[OfflinePeer]) == 0 && len(r.stats[LearnerPeer]) == 0 && + len(r.stats[EmptyRegion]) == 0 && len(r.stats[OversizedRegion]) == 0 && len(r.stats[UndersizedRegion]) == 0 } // Reset resets the metrics of the regions' status. func (r *RegionStatistics) Reset() { regionStatusGauge.Reset() - offlineRegionStatusGauge.Reset() +} + +// GetStats returns the regions' status. +func (r *RegionStatistics) GetStats() map[RegionStatisticType]map[uint64]*RegionInfo { + r.RLock() + defer r.RUnlock() + stats := make(map[RegionStatisticType]map[uint64]*RegionInfo) + + for _, typ := range statsTypes { + stats[typ] = make(map[uint64]*RegionInfo) + for k, v := range r.stats[typ] { + stats[typ][k] = v + } + } + + return stats } // LabelStatistics is the statistics of the level of labels. @@ -337,21 +344,19 @@ func (l *LabelStatistics) Collect() { } } +// ResetStats resets the statistics. +func (l *LabelStatistics) ResetStats() { + l.Lock() + defer l.Unlock() + l.regionLabelStats = make(map[uint64]string) + l.labelCounter = make(map[string]int) +} + // Reset resets the metrics of the label status. func (l *LabelStatistics) Reset() { regionLabelLevelGauge.Reset() } -// ClearDefunctRegion is used to handle the overlap region. -func (l *LabelStatistics) ClearDefunctRegion(regionID uint64) { - l.Lock() - defer l.Unlock() - if label, ok := l.regionLabelStats[regionID]; ok { - l.labelCounter[label]-- - delete(l.regionLabelStats, regionID) - } -} - // GetLabelCounter is only used for tests. func (l *LabelStatistics) GetLabelCounter() map[string]int { l.RLock() diff --git a/server/statistics/region_collection_test.go b/server/statistics/region_collection_test.go index 7c686f1c9ce..1b1f139378d 100644 --- a/server/statistics/region_collection_test.go +++ b/server/statistics/region_collection_test.go @@ -70,9 +70,8 @@ func TestRegionStatistics(t *testing.T) { re.Len(regionStats.stats[ExtraPeer], 1) re.Len(regionStats.stats[LearnerPeer], 1) re.Len(regionStats.stats[EmptyRegion], 1) + re.Len(regionStats.stats[OfflinePeer], 1) re.Len(regionStats.stats[UndersizedRegion], 1) - re.Len(regionStats.offlineStats[ExtraPeer], 1) - re.Len(regionStats.offlineStats[LearnerPeer], 1) region1 = region1.Clone( core.WithDownPeers(downPeers), @@ -88,12 +87,7 @@ func TestRegionStatistics(t *testing.T) { re.Empty(regionStats.stats[EmptyRegion]) re.Len(regionStats.stats[OversizedRegion], 1) re.Empty(regionStats.stats[UndersizedRegion]) - re.Len(regionStats.offlineStats[ExtraPeer], 1) - re.Empty(regionStats.offlineStats[MissPeer]) - re.Len(regionStats.offlineStats[DownPeer], 1) - re.Len(regionStats.offlineStats[PendingPeer], 1) - re.Len(regionStats.offlineStats[LearnerPeer], 1) - re.Len(regionStats.offlineStats[OfflinePeer], 1) + re.Len(regionStats.stats[OfflinePeer], 1) region2 = region2.Clone(core.WithDownPeers(downPeers[0:1])) regionStats.Observe(region2, stores[0:2]) @@ -104,12 +98,7 @@ func TestRegionStatistics(t *testing.T) { re.Len(regionStats.stats[LearnerPeer], 1) re.Len(regionStats.stats[OversizedRegion], 1) re.Len(regionStats.stats[UndersizedRegion], 1) - re.Len(regionStats.offlineStats[ExtraPeer], 1) - re.Empty(regionStats.offlineStats[MissPeer]) - re.Len(regionStats.offlineStats[DownPeer], 1) - re.Len(regionStats.offlineStats[PendingPeer], 1) - re.Len(regionStats.offlineStats[LearnerPeer], 1) - re.Len(regionStats.offlineStats[OfflinePeer], 1) + re.Len(regionStats.stats[OfflinePeer], 1) region1 = region1.Clone(core.WithRemoveStorePeer(7)) regionStats.Observe(region1, stores[0:3]) @@ -118,12 +107,7 @@ func TestRegionStatistics(t *testing.T) { re.Len(regionStats.stats[DownPeer], 2) re.Len(regionStats.stats[PendingPeer], 1) re.Empty(regionStats.stats[LearnerPeer]) - re.Empty(regionStats.offlineStats[ExtraPeer]) - re.Empty(regionStats.offlineStats[MissPeer]) - re.Empty(regionStats.offlineStats[DownPeer]) - re.Empty(regionStats.offlineStats[PendingPeer]) - re.Empty(regionStats.offlineStats[LearnerPeer]) - re.Empty(regionStats.offlineStats[OfflinePeer]) + re.Empty(regionStats.stats[OfflinePeer]) store3 = stores[3].Clone(core.UpStore()) stores[3] = store3 diff --git a/tests/pdctl/region/region_test.go b/tests/pdctl/region/region_test.go index 951433bd432..61e69a3adf8 100644 --- a/tests/pdctl/region/region_test.go +++ b/tests/pdctl/region/region_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" + tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/api" "github.com/tikv/pd/server/core" "github.com/tikv/pd/tests" @@ -100,7 +101,9 @@ func TestRegion(t *testing.T) { core.SetRegionVersion(1), core.SetApproximateSize(10), core.SetApproximateKeys(1000), ) defer cluster.Destroy() - + tu.Eventually(re, func() bool { + return !leaderServer.GetRaftCluster().IsLastRegionStatsEmpty() + }) var testRegionsCases = []struct { args []string expect []*core.RegionInfo @@ -198,6 +201,7 @@ func TestRegion(t *testing.T) { // Test region range-holes. r5 := pdctl.MustPutRegion(re, cluster, 5, 1, []byte("x"), []byte("z")) + time.Sleep(150 * time.Millisecond) output, err := pdctl.ExecuteCommand(cmd, []string{"-u", pdAddr, "region", "range-holes"}...) re.NoError(err) rangeHoles := new([][]string)