Skip to content

Commit

Permalink
statistics, schedulers: add collecting threads information and calcul…
Browse files Browse the repository at this point in the history
…ating store scores (#1903)

Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 authored and sre-bot committed Nov 13, 2019
1 parent 109d6b1 commit 099369e
Show file tree
Hide file tree
Showing 7 changed files with 455 additions and 27 deletions.
17 changes: 15 additions & 2 deletions server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package server
import (
"context"
"fmt"
"strconv"
"sync"
"time"

Expand All @@ -36,8 +37,8 @@ const (
maxScheduleRetries = 10
maxLoadConfigRetries = 10

regionheartbeatSendChanCap = 1024
hotRegionScheduleName = "balance-hot-region-scheduler"
heartbeatChanCapacity = 1024
hotRegionScheduleName = "balance-hot-region-scheduler"

patrolScanRegionLimit = 128 // It takes about 14 minutes to iterate 1 million regions.
)
Expand Down Expand Up @@ -262,6 +263,7 @@ func (c *coordinator) stop() {
type hasHotStatus interface {
GetHotReadStatus() *statistics.StoreHotPeersInfos
GetHotWriteStatus() *statistics.StoreHotPeersInfos
GetStoresScore() map[uint64]float64
}

func (c *coordinator) getHotWriteRegions() *statistics.StoreHotPeersInfos {
Expand Down Expand Up @@ -367,6 +369,17 @@ func (c *coordinator) collectHotSpotMetrics() {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(0)
}
}

// Collects score of stores stats metrics.
scores := s.Scheduler.(hasHotStatus).GetStoresScore()
for _, store := range stores {
storeAddress := store.GetAddress()
storeID := store.GetID()
score, ok := scores[storeID]
if ok {
hotSpotStatusGauge.WithLabelValues(storeAddress, strconv.FormatUint(storeID, 10), "store_score").Set(score)
}
}
}

func (c *coordinator) resetHotSpotMetrics() {
Expand Down
2 changes: 1 addition & 1 deletion server/heartbeat_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func newHeartbeatStreams(ctx context.Context, clusterID uint64, cluster *RaftClu
hbStreamCancel: hbStreamCancel,
clusterID: clusterID,
streams: make(map[uint64]heartbeatStream),
msgCh: make(chan *pdpb.RegionHeartbeatResponse, regionheartbeatSendChanCap),
msgCh: make(chan *pdpb.RegionHeartbeatResponse, heartbeatChanCapacity),
streamCh: make(chan streamUpdate, 1),
cluster: cluster,
}
Expand Down
33 changes: 33 additions & 0 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ const (
storeHotPeersDefaultLen = 100
hotRegionScheduleFactor = 0.9
balanceHotRegionName = "balance-hot-region-scheduler"
minFlowBytes = 128 * 1024
minScoreLimit = 0.35
)

// BalanceType : the perspective of balance
Expand Down Expand Up @@ -89,6 +91,8 @@ type balanceHotRegionsScheduler struct {
// store id -> hot regions statistics as the role of leader
stats *storeStatistics
r *rand.Rand
// ScoreInfos stores storeID and score of all stores.
scoreInfos *ScoreInfos
}

func newBalanceHotRegionsScheduler(opController *schedule.OperatorController) *balanceHotRegionsScheduler {
Expand All @@ -101,6 +105,7 @@ func newBalanceHotRegionsScheduler(opController *schedule.OperatorController) *b
stats: newStoreStaticstics(),
types: []BalanceType{hotWriteRegionBalance, hotReadRegionBalance},
r: rand.New(rand.NewSource(time.Now().UnixNano())),
scoreInfos: NewScoreInfos(),
}
}

Expand Down Expand Up @@ -157,6 +162,7 @@ func (h *balanceHotRegionsScheduler) Schedule(cluster opt.Cluster) []*operator.O
func (h *balanceHotRegionsScheduler) dispatch(typ BalanceType, cluster opt.Cluster) []*operator.Operator {
h.Lock()
defer h.Unlock()
h.analyzeStoreLoad(cluster.GetStoresStats())
storesStat := cluster.GetStoresStats()
switch typ {
case hotReadRegionBalance:
Expand All @@ -170,6 +176,23 @@ func (h *balanceHotRegionsScheduler) dispatch(typ BalanceType, cluster opt.Clust
return nil
}

func (h *balanceHotRegionsScheduler) analyzeStoreLoad(storesStats *statistics.StoresStats) {
readFlowScoreInfos := NormalizeStoresStats(storesStats.GetStoresBytesReadStat())
writeFlowScoreInfos := NormalizeStoresStats(storesStats.GetStoresBytesWriteStat())
readFlowMean := MeanStoresStats(storesStats.GetStoresBytesReadStat())
writeFlowMean := MeanStoresStats(storesStats.GetStoresBytesWriteStat())

var weights []float64
means := readFlowMean + writeFlowMean
if means <= minFlowBytes {
weights = append(weights, 0, 0)
} else {
weights = append(weights, readFlowMean/means, writeFlowMean/means)
}

h.scoreInfos = AggregateScores([]*ScoreInfos{readFlowScoreInfos, writeFlowScoreInfos}, weights)
}

func (h *balanceHotRegionsScheduler) balanceHotReadRegions(cluster opt.Cluster) []*operator.Operator {
// balance by leader
srcRegion, newLeader := h.balanceByLeader(cluster, h.stats.readStatAsLeader)
Expand Down Expand Up @@ -499,3 +522,13 @@ func (h *balanceHotRegionsScheduler) GetHotWriteStatus() *statistics.StoreHotPee
AsPeer: asPeer,
}
}

func (h *balanceHotRegionsScheduler) GetStoresScore() map[uint64]float64 {
h.RLock()
defer h.RUnlock()
storesScore := make(map[uint64]float64, 0)
for _, info := range h.scoreInfos.ToSlice() {
storesScore[info.GetStoreID()] = info.GetScore()
}
return storesScore
}
182 changes: 172 additions & 10 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
package schedulers

import (
"context"
"math"
"net/url"
"sort"
"time"

"github.com/montanaflynn/stats"
"github.com/pingcap/log"
"github.com/pingcap/pd/pkg/cache"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/schedule/operator"
"github.com/pingcap/pd/server/schedule/opt"
Expand Down Expand Up @@ -136,15 +136,177 @@ func adjustBalanceLimit(cluster opt.Cluster, kind core.ResourceKind) uint64 {
return maxUint64(1, uint64(limit))
}

const (
taintCacheGCInterval = time.Second * 5
taintCacheTTL = time.Minute * 5
)
// ScoreInfo stores storeID and score of a store.
type ScoreInfo struct {
storeID uint64
score float64
}

// NewScoreInfo returns a ScoreInfo.
func NewScoreInfo(storeID uint64, score float64) *ScoreInfo {
return &ScoreInfo{
storeID: storeID,
score: score,
}
}

// GetStoreID returns the storeID.
func (s *ScoreInfo) GetStoreID() uint64 {
return s.storeID
}

// GetScore returns the score.
func (s *ScoreInfo) GetScore() float64 {
return s.score
}

// SetScore sets the score.
func (s *ScoreInfo) SetScore(score float64) {
s.score = score
}

// ScoreInfos is used for sorting ScoreInfo.
type ScoreInfos struct {
scoreInfos []*ScoreInfo
isSorted bool
}

// NewScoreInfos returns a ScoreInfos.
func NewScoreInfos() *ScoreInfos {
return &ScoreInfos{
scoreInfos: make([]*ScoreInfo, 0),
isSorted: true,
}
}

// Add adds a scoreInfo into the slice.
func (s *ScoreInfos) Add(scoreInfo *ScoreInfo) {
infosLen := len(s.scoreInfos)
if s.isSorted == true && infosLen != 0 && s.scoreInfos[infosLen-1].score > scoreInfo.score {
s.isSorted = false
}
s.scoreInfos = append(s.scoreInfos, scoreInfo)
}

// Len returns length of slice.
func (s *ScoreInfos) Len() int { return len(s.scoreInfos) }

// Less returns if one number is less than another.
func (s *ScoreInfos) Less(i, j int) bool { return s.scoreInfos[i].score < s.scoreInfos[j].score }

// Swap switches out two numbers in slice.
func (s *ScoreInfos) Swap(i, j int) {
s.scoreInfos[i], s.scoreInfos[j] = s.scoreInfos[j], s.scoreInfos[i]
}

// Sort sorts the slice.
func (s *ScoreInfos) Sort() {
if !s.isSorted {
sort.Sort(s)
s.isSorted = true
}
}

// ToSlice returns the scoreInfo slice.
func (s *ScoreInfos) ToSlice() []*ScoreInfo {
return s.scoreInfos
}

// Min returns the min of the slice.
func (s *ScoreInfos) Min() *ScoreInfo {
s.Sort()
return s.scoreInfos[0]
}

// Mean returns the mean of the slice.
func (s *ScoreInfos) Mean() float64 {
if s.Len() == 0 {
return 0
}

var sum float64
for _, info := range s.scoreInfos {
sum += info.score
}

return sum / float64(s.Len())
}

// StdDev returns the standard deviation of the slice.
func (s *ScoreInfos) StdDev() float64 {
if s.Len() == 0 {
return 0
}

var res float64
mean := s.Mean()
for _, info := range s.ToSlice() {
diff := info.GetScore() - mean
res += diff * diff
}
res /= float64(s.Len())
res = math.Sqrt(res)

return res
}

// MeanStoresStats returns the mean of stores' stats.
func MeanStoresStats(storesStats map[uint64]float64) float64 {
if len(storesStats) == 0 {
return 0.0
}

var sum float64
for _, storeStat := range storesStats {
sum += storeStat
}
return sum / float64(len(storesStats))
}

// NormalizeStoresStats returns the normalized score scoreInfos. Normalize: x_i => (x_i - x_min)/x_avg.
func NormalizeStoresStats(storesStats map[uint64]float64) *ScoreInfos {
scoreInfos := NewScoreInfos()

for storeID, score := range storesStats {
scoreInfos.Add(NewScoreInfo(storeID, score))
}

mean := scoreInfos.Mean()
if mean == 0 {
return scoreInfos
}

minScore := scoreInfos.Min().GetScore()

for _, info := range scoreInfos.ToSlice() {
info.SetScore((info.GetScore() - minScore) / mean)
}

return scoreInfos
}

// AggregateScores aggregates stores' scores by using their weights.
func AggregateScores(storesStats []*ScoreInfos, weights []float64) *ScoreInfos {
num := len(storesStats)
if num > len(weights) {
num = len(weights)
}

scoreMap := make(map[uint64]float64, 0)
for i := 0; i < num; i++ {
scoreInfos := storesStats[i]
for _, info := range scoreInfos.ToSlice() {
scoreMap[info.GetStoreID()] += info.GetScore() * weights[i]
}
}

res := NewScoreInfos()
for storeID, score := range scoreMap {
res.Add(NewScoreInfo(storeID, score))
}

// newTaintCache creates a TTL cache to hold stores that are not able to
// schedule operators.
func newTaintCache(ctx context.Context) *cache.TTLUint64 {
return cache.NewIDTTL(ctx, taintCacheGCInterval, taintCacheTTL)
res.Sort()
return res
}

func getKeyRanges(args []string) ([]core.KeyRange, error) {
Expand Down
Loading

0 comments on commit 099369e

Please sign in to comment.