Skip to content

Commit

Permalink
resolve the conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Oct 28, 2022
1 parent 63cfb9d commit a8ae999
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 42 deletions.
10 changes: 4 additions & 6 deletions server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,13 @@ func (suite *regionTestSuite) TestRegion() {
func (suite *regionTestSuite) TestRegionCheck() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/skipSleep", `return(true)`))
tu.Eventually(re, func() bool {
return suite.svr.GetRaftCluster().GetLastRegionStats() != nil
})
r := newTestRegionInfo(2, 1, []byte("a"), []byte("b"))
r := newTestRegionInfo(2, 1, []byte("a"), []byte(""))
downPeer := &metapb.Peer{Id: 13, StoreId: 2}
r = r.Clone(core.WithAddPeer(downPeer), core.WithDownPeers([]*pdpb.PeerStats{{Peer: downPeer, DownSeconds: 3600}}), core.WithPendingPeers([]*metapb.Peer{downPeer}))

mustRegionHeartbeat(re, suite.svr, r)
time.Sleep(150 * time.Millisecond)
tu.Eventually(re, func() bool {
return !suite.svr.GetRaftCluster().IsLastRegionStatsEmpty()
})
url := fmt.Sprintf("%s/region/id/%d", suite.urlPrefix, r.GetID())
r1 := &RegionInfo{}
suite.NoError(tu.ReadGetJSON(re, testDialClient, url, r1))
Expand Down
33 changes: 11 additions & 22 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ func (c *RaftCluster) Start(s Server) error {
c.storeConfigManager = config.NewStoreConfigManager(c.httpClient)
c.coordinator = newCoordinator(c.ctx, cluster, s.GetHBStreams())
c.regionStats = statistics.NewRegionStatistics(c.opt, c.ruleManager, c.storeConfigManager)
c.lastRegionStats = statistics.NewRegionStatistics(c.opt, c.ruleManager, c.storeConfigManager)
c.limiter = NewStoreLimiter(s.GetPersistOptions())

c.wg.Add(8)
Expand Down Expand Up @@ -793,14 +794,6 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
isNew, saveKV, saveCache, needSync := regionGuide(region, origin)
if !saveKV && !saveCache && !isNew {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if c.regionStats != nil && c.regionStats.RegionStatsNeedUpdate(region) {
c.regionStats.Observe(region, c.getRegionStoresLocked(region))
}
return nil
}

failpoint.Inject("concurrentRegionHeartbeat", func() {
time.Sleep(500 * time.Millisecond)
Expand Down Expand Up @@ -976,9 +969,9 @@ func (c *RaftCluster) GetAverageRegionSize() int64 {
return c.core.GetAverageRegionSize()
}

// GetLastRegionStats returns region statistics from cluster. Only for test purpose.
func (c *RaftCluster) GetLastRegionStats() *statistics.RegionStatistics {
return c.lastRegionStats
// IsLastRegionStatsEmpty returns if region statistics is empty. Only for test purpose.
func (c *RaftCluster) IsLastRegionStatsEmpty() bool {
return c.lastRegionStats.IsEmpty()
}

// DropCacheRegion removes a region from the cache.
Expand Down Expand Up @@ -1808,16 +1801,15 @@ func (c *RaftCluster) resetMetrics() {
}

func (c *RaftCluster) collectStatsMetrics() {
c.Lock()
defer c.Unlock()
if c.regionStats == nil {
if c.regionStats == nil || c.lastRegionStats == nil {
return
}
c.regionStats.Collect()
c.labelLevelStats.Collect()
c.lastRegionStats = c.regionStats.Clone()
c.labelLevelStats = statistics.NewLabelStatistics()
c.regionStats = statistics.NewRegionStatistics(c.opt, c.ruleManager, c.storeConfigManager)
lastStats := c.regionStats.GetStats()
c.lastRegionStats.SetStats(lastStats)
c.labelLevelStats.ResetStats()
c.regionStats.ResetStats()
}

func (c *RaftCluster) collectHotStatsMetrics() {
Expand All @@ -1826,10 +1818,11 @@ func (c *RaftCluster) collectHotStatsMetrics() {
}

func (c *RaftCluster) resetClusterMetrics() {
if c.regionStats == nil {
if c.regionStats == nil || c.lastRegionStats == nil {
return
}
c.regionStats.Reset()
c.lastRegionStats.Reset()
c.labelLevelStats.Reset()
// reset hot cache metrics
c.hotStat.ResetMetrics()
Expand Down Expand Up @@ -1863,17 +1856,13 @@ func (c *RaftCluster) resetProgressIndicator() {

// GetRegionStatsByType gets the status of the region by types.
func (c *RaftCluster) GetRegionStatsByType(typ statistics.RegionStatisticType) []*core.RegionInfo {
c.RLock()
defer c.RUnlock()
if c.lastRegionStats == nil {
return nil
}
return c.lastRegionStats.GetRegionStatsByType(typ)
}

func (c *RaftCluster) updateRegionStats(regions []*core.RegionInfo) {
c.Lock()
defer c.Unlock()
for _, region := range regions {
if c.regionStats != nil {
c.regionStats.Observe(region, c.getRegionStoresLocked(region))
Expand Down
26 changes: 22 additions & 4 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,25 +934,32 @@ func TestRegionSizeChanged(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/skipSleep", `return(true)`))

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster.coordinator = newCoordinator(ctx, cluster, nil)
cluster.regionLabeler, _ = labeler.NewRegionLabeler(ctx, cluster.storage, time.Second*5)
cluster.regionStats = statistics.NewRegionStatistics(cluster.GetOpts(), cluster.ruleManager, cluster.storeConfigManager)
cluster.lastRegionStats = statistics.NewRegionStatistics(cluster.GetOpts(), cluster.ruleManager, cluster.storeConfigManager)
cluster.coordinator.wg.Add(1)
go cluster.coordinator.collectRegionStats()
region := newTestRegions(1, 3, 3)[0]
cluster.opt.GetMaxMergeRegionKeys()
curMaxMergeSize := int64(cluster.opt.GetMaxMergeRegionSize())
curMaxMergeKeys := int64(cluster.opt.GetMaxMergeRegionKeys())
region = region.Clone(
core.WithLeader(region.GetPeers()[2]),
core.WithEndKey([]byte{}),
core.SetApproximateSize(curMaxMergeSize-1),
core.SetApproximateKeys(curMaxMergeKeys-1),
core.SetFromHeartbeat(true),
)
cluster.processRegionHeartbeat(region)
regionID := region.GetID()
re.True(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion))
time.Sleep(150 * time.Millisecond)
re.True(cluster.lastRegionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion))
// Test ApproximateSize and ApproximateKeys change.
region = region.Clone(
core.WithLeader(region.GetPeers()[2]),
Expand All @@ -961,16 +968,20 @@ func TestRegionSizeChanged(t *testing.T) {
core.SetFromHeartbeat(true),
)
cluster.processRegionHeartbeat(region)
re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion))
time.Sleep(150 * time.Millisecond)
re.False(cluster.lastRegionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion))
// Test MaxMergeRegionSize and MaxMergeRegionKeys change.
cluster.opt.SetMaxMergeRegionSize((uint64(curMaxMergeSize + 2)))
cluster.opt.SetMaxMergeRegionKeys((uint64(curMaxMergeKeys + 2)))
cluster.processRegionHeartbeat(region)
re.True(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion))
time.Sleep(150 * time.Millisecond)
re.True(cluster.lastRegionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion))
cluster.opt.SetMaxMergeRegionSize((uint64(curMaxMergeSize)))
cluster.opt.SetMaxMergeRegionKeys((uint64(curMaxMergeKeys)))
cluster.processRegionHeartbeat(region)
re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion))
time.Sleep(150 * time.Millisecond)
re.False(cluster.lastRegionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/skipSleep"))
}

func TestConcurrentReportBucket(t *testing.T) {
Expand Down Expand Up @@ -1209,6 +1220,7 @@ func TestOfflineAndMerge(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/skipSleep", `return(true)`))

_, opt, err := newTestScheduleConfig()
re.NoError(err)
Expand All @@ -1222,7 +1234,10 @@ func TestOfflineAndMerge(t *testing.T) {
}
}
cluster.regionStats = statistics.NewRegionStatistics(cluster.GetOpts(), cluster.ruleManager, cluster.storeConfigManager)
cluster.lastRegionStats = statistics.NewRegionStatistics(cluster.GetOpts(), cluster.ruleManager, cluster.storeConfigManager)
cluster.coordinator = newCoordinator(ctx, cluster, nil)
cluster.coordinator.wg.Add(1)
go cluster.coordinator.collectRegionStats()

// Put 4 stores.
for _, store := range newTestStores(4, "5.0.0") {
Expand Down Expand Up @@ -1261,14 +1276,17 @@ func TestOfflineAndMerge(t *testing.T) {
regions = core.SplitRegions(regions)
}
heartbeatRegions(re, cluster, regions)
time.Sleep(150 * time.Millisecond)
re.Len(cluster.GetRegionStatsByType(statistics.OfflinePeer), len(regions))

// Merge.
for i := 0; i < n; i++ {
regions = core.MergeRegions(regions)
heartbeatRegions(re, cluster, regions)
time.Sleep(150 * time.Millisecond)
re.Len(cluster.GetRegionStatsByType(statistics.OfflinePeer), len(regions))
}
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/skipSleep"))
}

func TestSyncConfig(t *testing.T) {
Expand Down
1 change: 0 additions & 1 deletion server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ func (c *coordinator) collectRegionStats() {
key = nil
continue
}

key = regions[length-1].GetEndKey()
c.cluster.updateRegionStats(regions)
if len(key) == 0 {
Expand Down
1 change: 1 addition & 0 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func TestCollectMetrics(t *testing.T) {

tc, co, cleanup := prepare(nil, func(tc *testCluster) {
tc.regionStats = statistics.NewRegionStatistics(tc.GetOpts(), nil, tc.storeConfigManager)
tc.lastRegionStats = statistics.NewRegionStatistics(tc.GetOpts(), nil, tc.storeConfigManager)
}, func(co *coordinator) { co.run() }, re)
defer cleanup()

Expand Down
61 changes: 53 additions & 8 deletions server/statistics/region_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ var statsTypes = []RegionStatisticType{
OfflinePeer,
LearnerPeer,
EmptyRegion,
OversizedRegion,
UndersizedRegion,
}

const nonIsolation = "none"
Expand Down Expand Up @@ -246,25 +248,60 @@ func (r *RegionStatistics) Collect() {
regionStatusGauge.WithLabelValues("undersized-region-count").Set(float64(len(r.stats[UndersizedRegion])))
}

// SetStats sets the statistics to the given one.
func (r *RegionStatistics) SetStats(s map[RegionStatisticType]map[uint64]*RegionInfo) {
r.Lock()
defer r.Unlock()
r.stats = s
}

// ResetStats resets the statistics.
func (r *RegionStatistics) ResetStats() {
r.Lock()
defer r.Unlock()
r.stats = make(map[RegionStatisticType]map[uint64]*RegionInfo)
r.index = make(map[uint64]RegionStatisticType)

r.stats[MissPeer] = make(map[uint64]*RegionInfo)
r.stats[ExtraPeer] = make(map[uint64]*RegionInfo)
r.stats[DownPeer] = make(map[uint64]*RegionInfo)
r.stats[PendingPeer] = make(map[uint64]*RegionInfo)
r.stats[OfflinePeer] = make(map[uint64]*RegionInfo)
r.stats[LearnerPeer] = make(map[uint64]*RegionInfo)
r.stats[EmptyRegion] = make(map[uint64]*RegionInfo)
r.stats[OversizedRegion] = make(map[uint64]*RegionInfo)
r.stats[UndersizedRegion] = make(map[uint64]*RegionInfo)
}

// IsEmpty returns if the region statistics is empty. Only for test purpose.
func (r *RegionStatistics) IsEmpty() bool {
r.RLock()
defer r.RUnlock()

return len(r.stats[MissPeer]) == 0 && len(r.stats[ExtraPeer]) == 0 && len(r.stats[DownPeer]) == 0 &&
len(r.stats[PendingPeer]) == 0 && len(r.stats[OfflinePeer]) == 0 && len(r.stats[LearnerPeer]) == 0 &&
len(r.stats[EmptyRegion]) == 0 && len(r.stats[OversizedRegion]) == 0 && len(r.stats[UndersizedRegion]) == 0
}

// Reset resets the metrics of the regions' status.
func (r *RegionStatistics) Reset() {
regionStatusGauge.Reset()
}

// Clone clones the regions' status.
func (r *RegionStatistics) Clone() *RegionStatistics {
r1 := &RegionStatistics{
stats: make(map[RegionStatisticType]map[uint64]*RegionInfo),
}
// GetStats returns the regions' status.
func (r *RegionStatistics) GetStats() map[RegionStatisticType]map[uint64]*RegionInfo {
r.RLock()
defer r.RUnlock()
stats := make(map[RegionStatisticType]map[uint64]*RegionInfo)

for _, typ := range statsTypes {
r1.stats[typ] = make(map[uint64]*RegionInfo)
stats[typ] = make(map[uint64]*RegionInfo)
for k, v := range r.stats[typ] {
r1.stats[typ][k] = v
stats[typ][k] = v
}
}

return r1
return stats
}

// LabelStatistics is the statistics of the level of labels.
Expand Down Expand Up @@ -307,6 +344,14 @@ func (l *LabelStatistics) Collect() {
}
}

// ResetStats resets the statistics.
func (l *LabelStatistics) ResetStats() {
l.Lock()
defer l.Unlock()
l.regionLabelStats = make(map[uint64]string)
l.labelCounter = make(map[string]int)
}

// Reset resets the metrics of the label status.
func (l *LabelStatistics) Reset() {
regionLabelLevelGauge.Reset()
Expand Down
2 changes: 1 addition & 1 deletion tests/pdctl/region/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestRegion(t *testing.T) {
)
defer cluster.Destroy()
tu.Eventually(re, func() bool {
return leaderServer.GetRaftCluster().GetLastRegionStats() != nil
return !leaderServer.GetRaftCluster().IsLastRegionStatsEmpty()
})
var testRegionsCases = []struct {
args []string
Expand Down

0 comments on commit a8ae999

Please sign in to comment.