Skip to content

Commit

Permalink
async update region stats
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Oct 27, 2022
1 parent 7aba282 commit 63cfb9d
Show file tree
Hide file tree
Showing 12 changed files with 145 additions and 158 deletions.
7 changes: 0 additions & 7 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -797,13 +797,6 @@
"legendFormat": "{{type}}",
"refId": "B"
},
{
"expr": "pd_regions_offline_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", type=\"offline-peer-region-count\", instance=\"$instance\"}",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "{{type}}",
"refId": "C"
}
],
"thresholds": [
{
Expand Down
2 changes: 1 addition & 1 deletion server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ func (h *regionsHandler) GetLearnerPeerRegions(w http.ResponseWriter, r *http.Re
// @Router /regions/check/offline-peer [get]
func (h *regionsHandler) GetOfflinePeerRegions(w http.ResponseWriter, r *http.Request) {
handler := h.svr.GetHandler()
regions, err := handler.GetOfflinePeer(statistics.OfflinePeer)
regions, err := handler.GetRegionsByType(statistics.OfflinePeer)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
Expand Down
13 changes: 12 additions & 1 deletion server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/url"
"sort"
"testing"
"time"

"github.com/docker/go-units"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -168,11 +169,17 @@ func (suite *regionTestSuite) TestRegion() {
}

func (suite *regionTestSuite) TestRegionCheck() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/skipSleep", `return(true)`))
tu.Eventually(re, func() bool {
return suite.svr.GetRaftCluster().GetLastRegionStats() != nil
})
r := newTestRegionInfo(2, 1, []byte("a"), []byte("b"))
downPeer := &metapb.Peer{Id: 13, StoreId: 2}
r = r.Clone(core.WithAddPeer(downPeer), core.WithDownPeers([]*pdpb.PeerStats{{Peer: downPeer, DownSeconds: 3600}}), core.WithPendingPeers([]*metapb.Peer{downPeer}))
re := suite.Require()

mustRegionHeartbeat(re, suite.svr, r)
time.Sleep(150 * time.Millisecond)
url := fmt.Sprintf("%s/region/id/%d", suite.urlPrefix, r.GetID())
r1 := &RegionInfo{}
suite.NoError(tu.ReadGetJSON(re, testDialClient, url, r1))
Expand All @@ -199,6 +206,7 @@ func (suite *regionTestSuite) TestRegionCheck() {

r = r.Clone(core.SetApproximateSize(1))
mustRegionHeartbeat(re, suite.svr, r)
time.Sleep(150 * time.Millisecond)
url = fmt.Sprintf("%s/regions/check/%s", suite.urlPrefix, "empty-region")
r5 := &RegionsInfo{}
suite.NoError(tu.ReadGetJSON(re, testDialClient, url, r5))
Expand All @@ -207,6 +215,7 @@ func (suite *regionTestSuite) TestRegionCheck() {

r = r.Clone(core.SetApproximateSize(1))
mustRegionHeartbeat(re, suite.svr, r)
time.Sleep(150 * time.Millisecond)
url = fmt.Sprintf("%s/regions/check/%s", suite.urlPrefix, "hist-size")
r6 := make([]*histItem, 1)
suite.NoError(tu.ReadGetJSON(re, testDialClient, url, &r6))
Expand All @@ -215,11 +224,13 @@ func (suite *regionTestSuite) TestRegionCheck() {

r = r.Clone(core.SetApproximateKeys(1000))
mustRegionHeartbeat(re, suite.svr, r)
time.Sleep(150 * time.Millisecond)
url = fmt.Sprintf("%s/regions/check/%s", suite.urlPrefix, "hist-keys")
r7 := make([]*histItem, 1)
suite.NoError(tu.ReadGetJSON(re, testDialClient, url, &r7))
histKeys := []*histItem{{Start: 1000, End: 1999, Count: 1}}
suite.Equal(histKeys, r7)
suite.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/skipSleep"))
}

func (suite *regionTestSuite) TestRegions() {
Expand Down
51 changes: 26 additions & 25 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ type RaftCluster struct {
coordinator *coordinator
labelLevelStats *statistics.LabelStatistics
regionStats *statistics.RegionStatistics
lastRegionStats *statistics.RegionStatistics
hotStat *statistics.HotStat
hotBuckets *buckets.HotBucketCache
ruleManager *placement.RuleManager
Expand Down Expand Up @@ -814,14 +815,6 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil {
return err
}

for _, item := range overlaps {
if c.regionStats != nil {
c.regionStats.ClearDefunctRegion(item.GetID())
}
c.labelLevelStats.ClearDefunctRegion(item.GetID())
}

// Update related stores.
storeMap := make(map[uint64]struct{})
for _, p := range region.GetPeers() {
Expand All @@ -839,10 +832,6 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
regionEventCounter.WithLabelValues("update_cache").Inc()
}

if c.regionStats != nil {
c.regionStats.Observe(region, c.getRegionStoresLocked(region))
}

if !c.IsPrepared() && isNew {
c.coordinator.prepareChecker.collect(region)
}
Expand Down Expand Up @@ -987,6 +976,11 @@ func (c *RaftCluster) GetAverageRegionSize() int64 {
return c.core.GetAverageRegionSize()
}

// GetLastRegionStats returns region statistics from cluster. Only for test purpose.
func (c *RaftCluster) GetLastRegionStats() *statistics.RegionStatistics {
return c.lastRegionStats
}

// DropCacheRegion removes a region from the cache.
func (c *RaftCluster) DropCacheRegion(id uint64) {
c.core.RemoveRegionIfExist(id)
Expand Down Expand Up @@ -1798,7 +1792,7 @@ func (c *RaftCluster) collectMetrics() {

c.coordinator.collectSchedulerMetrics()
c.coordinator.collectHotSpotMetrics()
c.collectClusterMetrics()
c.collectHotStatsMetrics()
c.collectHealthStatus()
}

Expand All @@ -1813,12 +1807,20 @@ func (c *RaftCluster) resetMetrics() {
c.resetProgressIndicator()
}

func (c *RaftCluster) collectClusterMetrics() {
func (c *RaftCluster) collectStatsMetrics() {
c.Lock()
defer c.Unlock()
if c.regionStats == nil {
return
}
c.regionStats.Collect()
c.labelLevelStats.Collect()
c.lastRegionStats = c.regionStats.Clone()
c.labelLevelStats = statistics.NewLabelStatistics()
c.regionStats = statistics.NewRegionStatistics(c.opt, c.ruleManager, c.storeConfigManager)
}

func (c *RaftCluster) collectHotStatsMetrics() {
// collect hot cache metrics
c.hotStat.CollectMetrics()
}
Expand Down Expand Up @@ -1861,22 +1863,21 @@ func (c *RaftCluster) resetProgressIndicator() {

// GetRegionStatsByType gets the status of the region by types.
func (c *RaftCluster) GetRegionStatsByType(typ statistics.RegionStatisticType) []*core.RegionInfo {
if c.regionStats == nil {
return nil
}
return c.regionStats.GetRegionStatsByType(typ)
}

// GetOfflineRegionStatsByType gets the status of the offline region by types.
func (c *RaftCluster) GetOfflineRegionStatsByType(typ statistics.RegionStatisticType) []*core.RegionInfo {
if c.regionStats == nil {
c.RLock()
defer c.RUnlock()
if c.lastRegionStats == nil {
return nil
}
return c.regionStats.GetOfflineRegionStatsByType(typ)
return c.lastRegionStats.GetRegionStatsByType(typ)
}

func (c *RaftCluster) updateRegionsLabelLevelStats(regions []*core.RegionInfo) {
func (c *RaftCluster) updateRegionStats(regions []*core.RegionInfo) {
c.Lock()
defer c.Unlock()
for _, region := range regions {
if c.regionStats != nil {
c.regionStats.Observe(region, c.getRegionStoresLocked(region))
}
c.labelLevelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.opt.GetLocationLabels())
}
}
Expand Down
6 changes: 3 additions & 3 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1089,7 +1089,7 @@ func TestRegionLabelIsolationLevel(t *testing.T) {
r := core.NewRegionInfo(region, peers[0])
re.NoError(cluster.putRegion(r))

cluster.updateRegionsLabelLevelStats([]*core.RegionInfo{r})
cluster.labelLevelStats.Observe(r, cluster.getStoresWithoutLabelLocked(r, core.EngineKey, core.EngineTiFlash), cluster.opt.GetLocationLabels())
counter := cluster.labelLevelStats.GetLabelCounter()
re.Equal(0, counter["none"])
re.Equal(1, counter["zone"])
Expand Down Expand Up @@ -1261,13 +1261,13 @@ func TestOfflineAndMerge(t *testing.T) {
regions = core.SplitRegions(regions)
}
heartbeatRegions(re, cluster, regions)
re.Len(cluster.GetOfflineRegionStatsByType(statistics.OfflinePeer), len(regions))
re.Len(cluster.GetRegionStatsByType(statistics.OfflinePeer), len(regions))

// Merge.
for i := 0; i < n; i++ {
regions = core.MergeRegions(regions)
heartbeatRegions(re, cluster, regions)
re.Len(cluster.GetOfflineRegionStatsByType(statistics.OfflinePeer), len(regions))
re.Len(cluster.GetRegionStatsByType(statistics.OfflinePeer), len(regions))
}
}

Expand Down
58 changes: 54 additions & 4 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,15 @@ import (
const (
runSchedulerCheckInterval = 3 * time.Second
checkSuspectRangesInterval = 100 * time.Millisecond
collectRegionStatsInterval = 100 * time.Millisecond
collectFactor = 0.9
collectTimeout = 5 * time.Minute
maxScheduleRetries = 10
maxLoadConfigRetries = 10

patrolScanRegionLimit = 128 // It takes about 14 minutes to iterate 1 million regions.
statsScanRegionLimit = 1000
defaultScrapInterval = 15 * time.Second
// PluginLoad means action for load plugin
PluginLoad = "PluginLoad"
// PluginUnload means action for unload plugin
Expand Down Expand Up @@ -144,13 +147,11 @@ func (c *coordinator) patrolRegions() {
if len(regions) == 0 {
continue
}
// Updates the label level isolation statistics.
c.cluster.updateRegionsLabelLevelStats(regions)
if len(key) == 0 {
patrolCheckRegionsGauge.Set(time.Since(start).Seconds())
start = time.Now()
}
failpoint.Inject("break-patrol", func() {
failpoint.Inject("breakPatrol", func() {
failpoint.Break()
})
}
Expand Down Expand Up @@ -187,6 +188,54 @@ func (c *coordinator) checkWaitingRegions() {
}
}

func (c *coordinator) collectRegionStats() {
defer logutil.LogPanic()

defer c.wg.Done()
ticker := time.NewTicker(collectRegionStatsInterval)
defer ticker.Stop()
timer := time.NewTimer(defaultScrapInterval)
defer timer.Stop()
log.Info("coordinator starts collect region stats")
var key []byte
start := time.Now()
for {
select {
case <-ticker.C:
case <-c.ctx.Done():
log.Info("collect region stats has been stopped")
return
}

regions := c.cluster.ScanRegions(key, nil, statsScanRegionLimit)
length := len(regions)
if length == 0 {
c.cluster.collectStatsMetrics()
// Resets the scan key.
key = nil
continue
}

key = regions[length-1].GetEndKey()
c.cluster.updateRegionStats(regions)
if len(key) == 0 {
c.cluster.collectStatsMetrics()
failpoint.Inject("skipSleep", func() {
start = time.Time{}
})
if time.Since(start) < defaultScrapInterval {
timer.Reset(defaultScrapInterval - time.Since(start))
select {
case <-c.ctx.Done():
return
case <-timer.C:
}
}
start = time.Now()
}
}
}

// checkPriorityRegions checks priority regions
func (c *coordinator) checkPriorityRegions() {
items := c.checkers.GetPriorityRegions()
Expand Down Expand Up @@ -409,9 +458,10 @@ func (c *coordinator) run() {
log.Error("cannot persist schedule config", errs.ZapError(err))
}

c.wg.Add(3)
c.wg.Add(4)
// Starts to patrol regions.
go c.patrolRegions()
go c.collectRegionStats()
// Checks suspect key ranges
go c.checkSuspectRanges()
go c.drivePushOperator()
Expand Down
10 changes: 6 additions & 4 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ func TestCollectMetrics(t *testing.T) {
for i := 0; i < 1000; i++ {
co.collectHotSpotMetrics()
co.collectSchedulerMetrics()
co.cluster.collectClusterMetrics()
co.cluster.collectHotStatsMetrics()
co.cluster.collectStatsMetrics()
}
co.resetHotSpotMetrics()
co.resetSchedulerMetrics()
Expand All @@ -278,10 +279,11 @@ func prepare(setCfg func(*config.ScheduleConfig), setTc func(*testCluster), run
}
tc := newTestCluster(ctx, opt)
hbStreams := hbstream.NewTestHeartbeatStreams(ctx, tc.meta.GetId(), tc, true /* need to run */)
co := newCoordinator(ctx, tc.RaftCluster, hbStreams)
tc.coordinator = co
if setTc != nil {
setTc(tc)
}
co := newCoordinator(ctx, tc.RaftCluster, hbStreams)
if run != nil {
run(co)
}
Expand Down Expand Up @@ -487,7 +489,7 @@ func TestCheckCache(t *testing.T) {

// Add a peer with two replicas.
re.NoError(tc.addLeaderRegion(1, 2, 3))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/break-patrol", `return`))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/breakPatrol", `return`))

// case 1: operator cannot be created due to replica-schedule-limit restriction
co.wg.Add(1)
Expand Down Expand Up @@ -521,7 +523,7 @@ func TestCheckCache(t *testing.T) {
re.Empty(co.checkers.GetWaitingRegions())

co.wg.Wait()
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/break-patrol"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/breakPatrol"))
}

func TestPeerState(t *testing.T) {
Expand Down
9 changes: 0 additions & 9 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -903,15 +903,6 @@ func (h *Handler) GetSchedulerConfigHandler() (http.Handler, error) {
return mux, nil
}

// GetOfflinePeer gets the region with offline peer.
func (h *Handler) GetOfflinePeer(typ statistics.RegionStatisticType) ([]*core.RegionInfo, error) {
c := h.s.GetRaftCluster()
if c == nil {
return nil, errs.ErrNotBootstrapped.FastGenByArgs()
}
return c.GetOfflineRegionStatsByType(typ), nil
}

// ResetTS resets the ts with specified tso.
func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) error {
log.Info("reset-ts",
Expand Down
9 changes: 0 additions & 9 deletions server/statistics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,6 @@ var (
Help: "Status of the regions.",
}, []string{"type"})

offlineRegionStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "regions",
Name: "offline_status",
Help: "Status of the offline regions.",
}, []string{"type"})

clusterStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Expand Down Expand Up @@ -190,7 +182,6 @@ func init() {
prometheus.MustRegister(hotCacheStatusGauge)
prometheus.MustRegister(storeStatusGauge)
prometheus.MustRegister(regionStatusGauge)
prometheus.MustRegister(offlineRegionStatusGauge)
prometheus.MustRegister(clusterStatusGauge)
prometheus.MustRegister(placementStatusGauge)
prometheus.MustRegister(configStatusGauge)
Expand Down
Loading

0 comments on commit 63cfb9d

Please sign in to comment.