Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster: move stats update into a better place #3901

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -796,13 +796,6 @@
"intervalFactor": 2,
"legendFormat": "{{type}}",
"refId": "B"
},
{
"expr": "pd_regions_offline_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", type=\"offline-peer-region-count\", instance=\"$instance\"}",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "{{type}}",
"refId": "C"
}
],
"thresholds": [
Expand Down
2 changes: 1 addition & 1 deletion server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ func (h *regionsHandler) GetLearnerPeerRegions(w http.ResponseWriter, r *http.Re
// @Router /regions/check/offline-peer [get]
func (h *regionsHandler) GetOfflinePeerRegions(w http.ResponseWriter, r *http.Request) {
handler := h.svr.GetHandler()
regions, err := handler.GetOfflinePeer(statistics.OfflinePeer)
regions, err := handler.GetRegionsByType(statistics.OfflinePeer)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
Expand Down
13 changes: 11 additions & 2 deletions server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/url"
"sort"
"testing"
"time"

"github.com/docker/go-units"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -168,11 +169,15 @@ func (suite *regionTestSuite) TestRegion() {
}

func (suite *regionTestSuite) TestRegionCheck() {
r := newTestRegionInfo(2, 1, []byte("a"), []byte("b"))
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/skipSleep", `return(true)`))
r := newTestRegionInfo(2, 1, []byte("a"), []byte(""))
downPeer := &metapb.Peer{Id: 13, StoreId: 2}
r = r.Clone(core.WithAddPeer(downPeer), core.WithDownPeers([]*pdpb.PeerStats{{Peer: downPeer, DownSeconds: 3600}}), core.WithPendingPeers([]*metapb.Peer{downPeer}))
re := suite.Require()
mustRegionHeartbeat(re, suite.svr, r)
tu.Eventually(re, func() bool {
return !suite.svr.GetRaftCluster().IsLastRegionStatsEmpty()
})
url := fmt.Sprintf("%s/region/id/%d", suite.urlPrefix, r.GetID())
r1 := &RegionInfo{}
suite.NoError(tu.ReadGetJSON(re, testDialClient, url, r1))
Expand All @@ -199,6 +204,7 @@ func (suite *regionTestSuite) TestRegionCheck() {

r = r.Clone(core.SetApproximateSize(1))
mustRegionHeartbeat(re, suite.svr, r)
time.Sleep(150 * time.Millisecond)
url = fmt.Sprintf("%s/regions/check/%s", suite.urlPrefix, "empty-region")
r5 := &RegionsInfo{}
suite.NoError(tu.ReadGetJSON(re, testDialClient, url, r5))
Expand All @@ -207,6 +213,7 @@ func (suite *regionTestSuite) TestRegionCheck() {

r = r.Clone(core.SetApproximateSize(1))
mustRegionHeartbeat(re, suite.svr, r)
time.Sleep(150 * time.Millisecond)
url = fmt.Sprintf("%s/regions/check/%s", suite.urlPrefix, "hist-size")
r6 := make([]*histItem, 1)
suite.NoError(tu.ReadGetJSON(re, testDialClient, url, &r6))
Expand All @@ -215,11 +222,13 @@ func (suite *regionTestSuite) TestRegionCheck() {

r = r.Clone(core.SetApproximateKeys(1000))
mustRegionHeartbeat(re, suite.svr, r)
time.Sleep(150 * time.Millisecond)
url = fmt.Sprintf("%s/regions/check/%s", suite.urlPrefix, "hist-keys")
r7 := make([]*histItem, 1)
suite.NoError(tu.ReadGetJSON(re, testDialClient, url, &r7))
histKeys := []*histItem{{Start: 1000, End: 1999, Count: 1}}
suite.Equal(histKeys, r7)
suite.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/skipSleep"))
}

func (suite *regionTestSuite) TestRegions() {
Expand Down
60 changes: 25 additions & 35 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ type RaftCluster struct {
coordinator *coordinator
labelLevelStats *statistics.LabelStatistics
regionStats *statistics.RegionStatistics
lastRegionStats *statistics.RegionStatistics
hotStat *statistics.HotStat
hotBuckets *buckets.HotBucketCache
ruleManager *placement.RuleManager
Expand Down Expand Up @@ -275,6 +276,7 @@ func (c *RaftCluster) Start(s Server) error {
c.storeConfigManager = config.NewStoreConfigManager(c.httpClient)
c.coordinator = newCoordinator(c.ctx, cluster, s.GetHBStreams())
c.regionStats = statistics.NewRegionStatistics(c.opt, c.ruleManager, c.storeConfigManager)
c.lastRegionStats = statistics.NewRegionStatistics(c.opt, c.ruleManager, c.storeConfigManager)
c.limiter = NewStoreLimiter(s.GetPersistOptions())
c.externalTS, err = c.storage.LoadExternalTS()
if err != nil {
Expand Down Expand Up @@ -845,19 +847,10 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
}
c.coordinator.CheckTransferWitnessLeader(region)

hasRegionStats := c.regionStats != nil
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
isNew, saveKV, saveCache, needSync := regionGuide(region, origin)
if !saveKV && !saveCache && !isNew {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
c.regionStats.Observe(region, c.getRegionStoresLocked(region))
}
return nil
}

failpoint.Inject("concurrentRegionHeartbeat", func() {
time.Sleep(500 * time.Millisecond)
Expand All @@ -875,20 +868,9 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil {
return err
}

for _, item := range overlaps {
if c.regionStats != nil {
c.regionStats.ClearDefunctRegion(item.GetID())
}
c.labelLevelStats.ClearDefunctRegion(item.GetID())
}
regionUpdateCacheEventCounter.Inc()
}

if hasRegionStats {
c.regionStats.Observe(region, c.getRegionStoresLocked(region))
}

if !c.IsPrepared() && isNew {
c.coordinator.prepareChecker.collect(region)
}
Expand Down Expand Up @@ -1033,6 +1015,11 @@ func (c *RaftCluster) GetAverageRegionSize() int64 {
return c.core.GetAverageRegionSize()
}

// IsLastRegionStatsEmpty returns if region statistics is empty. Only for test purpose.
func (c *RaftCluster) IsLastRegionStatsEmpty() bool {
return c.lastRegionStats.IsEmpty()
}

// DropCacheRegion removes a region from the cache.
func (c *RaftCluster) DropCacheRegion(id uint64) {
c.core.RemoveRegionIfExist(id)
Expand Down Expand Up @@ -1880,7 +1867,7 @@ func (c *RaftCluster) collectMetrics() {

c.coordinator.collectSchedulerMetrics()
c.coordinator.collectHotSpotMetrics()
c.collectClusterMetrics()
c.collectHotStatsMetrics()
c.collectHealthStatus()
}

Expand All @@ -1895,21 +1882,29 @@ func (c *RaftCluster) resetMetrics() {
c.resetProgressIndicator()
}

func (c *RaftCluster) collectClusterMetrics() {
if c.regionStats == nil {
func (c *RaftCluster) collectStatsMetrics() {
if c.regionStats == nil || c.lastRegionStats == nil {
return
}
c.regionStats.Collect()
c.labelLevelStats.Collect()
lastStats := c.regionStats.GetStats()
c.lastRegionStats.SetStats(lastStats)
c.labelLevelStats.ResetStats()
c.regionStats.ResetStats()
}

func (c *RaftCluster) collectHotStatsMetrics() {
// collect hot cache metrics
c.hotStat.CollectMetrics()
}

func (c *RaftCluster) resetClusterMetrics() {
if c.regionStats == nil {
if c.regionStats == nil || c.lastRegionStats == nil {
return
}
c.regionStats.Reset()
c.lastRegionStats.Reset()
c.labelLevelStats.Reset()
// reset hot cache metrics
c.hotStat.ResetMetrics()
Expand Down Expand Up @@ -1943,22 +1938,17 @@ func (c *RaftCluster) resetProgressIndicator() {

// GetRegionStatsByType gets the status of the region by types.
func (c *RaftCluster) GetRegionStatsByType(typ statistics.RegionStatisticType) []*core.RegionInfo {
if c.regionStats == nil {
return nil
}
return c.regionStats.GetRegionStatsByType(typ)
}

// GetOfflineRegionStatsByType gets the status of the offline region by types.
func (c *RaftCluster) GetOfflineRegionStatsByType(typ statistics.RegionStatisticType) []*core.RegionInfo {
if c.regionStats == nil {
if c.lastRegionStats == nil {
return nil
}
return c.regionStats.GetOfflineRegionStatsByType(typ)
return c.lastRegionStats.GetRegionStatsByType(typ)
}

func (c *RaftCluster) updateRegionsLabelLevelStats(regions []*core.RegionInfo) {
func (c *RaftCluster) updateRegionStats(regions []*core.RegionInfo) {
for _, region := range regions {
if c.regionStats != nil {
c.regionStats.Observe(region, c.getRegionStoresLocked(region))
}
c.labelLevelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.opt.GetLocationLabels())
}
}
Expand Down
32 changes: 25 additions & 7 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,25 +950,32 @@ func TestRegionSizeChanged(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/skipSleep", `return(true)`))

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster.coordinator = newCoordinator(ctx, cluster, nil)
cluster.regionLabeler, _ = labeler.NewRegionLabeler(ctx, cluster.storage, time.Second*5)
cluster.regionStats = statistics.NewRegionStatistics(cluster.GetOpts(), cluster.ruleManager, cluster.storeConfigManager)
cluster.lastRegionStats = statistics.NewRegionStatistics(cluster.GetOpts(), cluster.ruleManager, cluster.storeConfigManager)
cluster.coordinator.wg.Add(1)
go cluster.coordinator.collectRegionStats()
region := newTestRegions(1, 3, 3)[0]
cluster.opt.GetMaxMergeRegionKeys()
curMaxMergeSize := int64(cluster.opt.GetMaxMergeRegionSize())
curMaxMergeKeys := int64(cluster.opt.GetMaxMergeRegionKeys())
region = region.Clone(
core.WithLeader(region.GetPeers()[2]),
core.WithEndKey([]byte{}),
core.SetApproximateSize(curMaxMergeSize-1),
core.SetApproximateKeys(curMaxMergeKeys-1),
core.SetFromHeartbeat(true),
)
cluster.processRegionHeartbeat(region)
regionID := region.GetID()
re.True(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion))
time.Sleep(150 * time.Millisecond)
re.True(cluster.lastRegionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion))
// Test ApproximateSize and ApproximateKeys change.
region = region.Clone(
core.WithLeader(region.GetPeers()[2]),
Expand All @@ -977,16 +984,20 @@ func TestRegionSizeChanged(t *testing.T) {
core.SetFromHeartbeat(true),
)
cluster.processRegionHeartbeat(region)
re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion))
time.Sleep(150 * time.Millisecond)
re.False(cluster.lastRegionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion))
// Test MaxMergeRegionSize and MaxMergeRegionKeys change.
cluster.opt.SetMaxMergeRegionSize((uint64(curMaxMergeSize + 2)))
cluster.opt.SetMaxMergeRegionKeys((uint64(curMaxMergeKeys + 2)))
cluster.processRegionHeartbeat(region)
re.True(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion))
time.Sleep(150 * time.Millisecond)
re.True(cluster.lastRegionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion))
cluster.opt.SetMaxMergeRegionSize((uint64(curMaxMergeSize)))
cluster.opt.SetMaxMergeRegionKeys((uint64(curMaxMergeKeys)))
cluster.processRegionHeartbeat(region)
re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion))
time.Sleep(150 * time.Millisecond)
re.False(cluster.lastRegionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/skipSleep"))
}

func TestConcurrentReportBucket(t *testing.T) {
Expand Down Expand Up @@ -1105,7 +1116,7 @@ func TestRegionLabelIsolationLevel(t *testing.T) {
r := core.NewRegionInfo(region, peers[0])
re.NoError(cluster.putRegion(r))

cluster.updateRegionsLabelLevelStats([]*core.RegionInfo{r})
cluster.labelLevelStats.Observe(r, cluster.getStoresWithoutLabelLocked(r, core.EngineKey, core.EngineTiFlash), cluster.opt.GetLocationLabels())
counter := cluster.labelLevelStats.GetLabelCounter()
re.Equal(0, counter["none"])
re.Equal(1, counter["zone"])
Expand Down Expand Up @@ -1225,6 +1236,7 @@ func TestOfflineAndMerge(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/skipSleep", `return(true)`))

_, opt, err := newTestScheduleConfig()
re.NoError(err)
Expand All @@ -1238,7 +1250,10 @@ func TestOfflineAndMerge(t *testing.T) {
}
}
cluster.regionStats = statistics.NewRegionStatistics(cluster.GetOpts(), cluster.ruleManager, cluster.storeConfigManager)
cluster.lastRegionStats = statistics.NewRegionStatistics(cluster.GetOpts(), cluster.ruleManager, cluster.storeConfigManager)
cluster.coordinator = newCoordinator(ctx, cluster, nil)
cluster.coordinator.wg.Add(1)
go cluster.coordinator.collectRegionStats()

// Put 4 stores.
for _, store := range newTestStores(4, "5.0.0") {
Expand Down Expand Up @@ -1277,14 +1292,17 @@ func TestOfflineAndMerge(t *testing.T) {
regions = core.SplitRegions(regions)
}
heartbeatRegions(re, cluster, regions)
re.Len(cluster.GetOfflineRegionStatsByType(statistics.OfflinePeer), len(regions))
time.Sleep(150 * time.Millisecond)
re.Len(cluster.GetRegionStatsByType(statistics.OfflinePeer), len(regions))

// Merge.
for i := 0; i < n; i++ {
regions = core.MergeRegions(regions)
heartbeatRegions(re, cluster, regions)
re.Len(cluster.GetOfflineRegionStatsByType(statistics.OfflinePeer), len(regions))
time.Sleep(150 * time.Millisecond)
re.Len(cluster.GetRegionStatsByType(statistics.OfflinePeer), len(regions))
}
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/skipSleep"))
}

func TestSyncConfig(t *testing.T) {
Expand Down
Loading