Skip to content

Commit

Permalink
schedulers: improve balance-leader scheduler. (#996)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored Mar 22, 2018
1 parent 459a21a commit 8d41166
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 107 deletions.
10 changes: 10 additions & 0 deletions server/cache/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ func (c *TTL) Len() int {
return len(c.items)
}

// Clear removes all items in the ttl cache.
func (c *TTL) Clear() {
c.Lock()
defer c.Unlock()

for k := range c.items {
delete(c.items, k)
}
}

func (c *TTL) doGC() {
ticker := time.NewTicker(c.gcInterval)
defer ticker.Stop()
Expand Down
106 changes: 91 additions & 15 deletions server/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
package schedulers

import (
"strconv"

"github.com/pingcap/pd/server/cache"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/schedule"
log "github.com/sirupsen/logrus"
Expand All @@ -25,26 +28,31 @@ func init() {
})
}

// balanceLeaderRetryLimit is the limit to retry schedule for selected source store and target store.
const balanceLeaderRetryLimit = 10

type balanceLeaderScheduler struct {
*baseScheduler
limit uint64
selector schedule.Selector
selector schedule.Selector
taintStores *cache.TTLUint64
}

// newBalanceLeaderScheduler creates a scheduler that tends to keep leaders on
// each store balanced.
func newBalanceLeaderScheduler(limiter *schedule.Limiter) schedule.Scheduler {
taintStores := newTaintCache()
filters := []schedule.Filter{
schedule.NewBlockFilter(),
schedule.NewStateFilter(),
schedule.NewHealthFilter(),
schedule.NewRejectLeaderFilter(),
schedule.NewCacheFilter(taintStores),
}
base := newBaseScheduler(limiter)
return &balanceLeaderScheduler{
baseScheduler: base,
limit: 1,
selector: schedule.NewBalanceSelector(core.LeaderKind, filters),
taintStores: taintStores,
}
}

Expand All @@ -57,27 +65,95 @@ func (l *balanceLeaderScheduler) GetType() string {
}

func (l *balanceLeaderScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool {
limit := minUint64(l.limit, cluster.GetLeaderScheduleLimit())
return l.limiter.OperatorCount(schedule.OpLeader) < limit
return l.limiter.OperatorCount(schedule.OpLeader) < cluster.GetLeaderScheduleLimit()
}

func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster, opInfluence schedule.OpInfluence) *schedule.Operator {
schedulerCounter.WithLabelValues(l.GetName(), "schedule").Inc()
region, newLeader := scheduleTransferLeader(cluster, l.GetName(), l.selector)
if region == nil {

stores := cluster.GetStores()

// source/target is the store with highest/lowest leader score in the list that
// can be selected as balance source/target.
source := l.selector.SelectSource(cluster, stores)
target := l.selector.SelectTarget(cluster, stores)

// No store can be selected as source or target.
if source == nil || target == nil {
schedulerCounter.WithLabelValues(l.GetName(), "no_store").Inc()
// When the cluster is balanced, all stores will be added to the cache once
// all of them have been selected. This will cause the scheduler to not adapt
// to sudden change of a store's leader. Here we clear the taint cache and
// re-iterate.
l.taintStores.Clear()
return nil
}

// Skip hot regions.
if cluster.IsRegionHot(region.GetId()) {
schedulerCounter.WithLabelValues(l.GetName(), "region_hot").Inc()
log.Debugf("[%s] store%d has the max leader score, store%d has the min leader score", l.GetName(), source.GetId(), target.GetId())
sourceStoreLabel := strconv.FormatUint(source.GetId(), 10)
targetStoreLabel := strconv.FormatUint(target.GetId(), 10)
balanceLeaderCounter.WithLabelValues("high_score", sourceStoreLabel).Inc()
balanceLeaderCounter.WithLabelValues("low_score", targetStoreLabel).Inc()

for i := 0; i < balanceLeaderRetryLimit; i++ {
if op := l.transferLeaderOut(source, cluster, opInfluence); op != nil {
balanceLeaderCounter.WithLabelValues("transfer_out", sourceStoreLabel).Inc()
return op
}
if op := l.transferLeaderIn(target, cluster, opInfluence); op != nil {
balanceLeaderCounter.WithLabelValues("transfer_in", targetStoreLabel).Inc()
return op
}
}

// If no operator can be created for the selected stores, ignore them for a while.
log.Debugf("[%s] no operator created for selected store%d and store%d", l.GetName(), source.GetId(), target.GetId())
balanceLeaderCounter.WithLabelValues("add_taint", strconv.FormatUint(source.GetId(), 10)).Inc()
l.taintStores.Put(source.GetId())
balanceLeaderCounter.WithLabelValues("add_taint", strconv.FormatUint(target.GetId(), 10)).Inc()
l.taintStores.Put(target.GetId())
return nil
}

func (l *balanceLeaderScheduler) transferLeaderOut(source *core.StoreInfo, cluster schedule.Cluster, opInfluence schedule.OpInfluence) *schedule.Operator {
region := cluster.RandLeaderRegion(source.GetId())
if region == nil {
log.Debugf("[%s] store%d has no leader", l.GetName(), source.GetId())
schedulerCounter.WithLabelValues(l.GetName(), "no_leader_region").Inc()
return nil
}
target := l.selector.SelectTarget(cluster, cluster.GetFollowerStores(region))
if target == nil {
log.Debugf("[%s] region %d has no target store", l.GetName(), region.GetId())
schedulerCounter.WithLabelValues(l.GetName(), "no_target_store").Inc()
return nil
}
return l.createOperator(region, source, target, cluster, opInfluence)
}

func (l *balanceLeaderScheduler) transferLeaderIn(target *core.StoreInfo, cluster schedule.Cluster, opInfluence schedule.OpInfluence) *schedule.Operator {
region := cluster.RandFollowerRegion(target.GetId())
if region == nil {
log.Debugf("[%s] store%d has no follower", l.GetName(), target.GetId())
schedulerCounter.WithLabelValues(l.GetName(), "no_follower_region").Inc()
return nil
}
source := cluster.GetStore(region.Leader.GetStoreId())
target := cluster.GetStore(newLeader.GetStoreId())
log.Debugf("[region %d] source store id is %v, target store id is %v", region.GetId(), source.GetId(), target.GetId())
if source == nil {
log.Debugf("[%s] region %d has no leader", l.GetName(), region.GetId())
schedulerCounter.WithLabelValues(l.GetName(), "no_leader").Inc()
return nil
}
return l.createOperator(region, source, target, cluster, opInfluence)
}

func (l *balanceLeaderScheduler) createOperator(region *core.RegionInfo, source, target *core.StoreInfo, cluster schedule.Cluster, opInfluence schedule.OpInfluence) *schedule.Operator {
log.Debugf("[%s] verify balance region %d, from: %d, to: %d", l.GetName(), region.GetId(), source.GetId(), target.GetId())
if cluster.IsRegionHot(region.GetId()) {
log.Debugf("[%s] region %d is hot region, ignore it", l.GetName(), region.GetId())
schedulerCounter.WithLabelValues(l.GetName(), "region_hot").Inc()
return nil
}
sourceSize := source.LeaderSize + int64(opInfluence.GetStoreInfluence(source.GetId()).LeaderSize)
targetSize := target.LeaderSize + int64(opInfluence.GetStoreInfluence(target.GetId()).LeaderSize)
regionSize := float64(region.ApproximateSize) * cluster.GetTolerantSizeRatio()
Expand All @@ -86,8 +162,8 @@ func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster, opInfluence
schedulerCounter.WithLabelValues(l.GetName(), "skip").Inc()
return nil
}
l.limit = adjustBalanceLimit(cluster, core.LeaderKind)
schedulerCounter.WithLabelValues(l.GetName(), "new_operator").Inc()
step := schedule.TransferLeader{FromStore: region.Leader.GetStoreId(), ToStore: newLeader.GetStoreId()}
return schedule.NewOperator("balance-leader", region.GetId(), schedule.OpBalance|schedule.OpLeader, step)
step := schedule.TransferLeader{FromStore: region.Leader.GetStoreId(), ToStore: target.GetId()}
log.Debugf("[%s] start balance region %d, from: %d, to: %d", l.GetName(), region.GetId(), source.GetId(), target.GetId())
return schedule.NewOperator("balanceLeader", region.GetId(), schedule.OpBalance|schedule.OpLeader, step)
}
45 changes: 38 additions & 7 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ func (s *testBalanceLeaderSchedulerSuite) TestBalanceSelector(c *C) {
s.tc.addLeaderStore(4, 16)
s.tc.addLeaderRegion(1, 4, 2, 3)
s.tc.addLeaderRegion(2, 3, 1, 2)
// Average leader is 5.5. Select store 4 as source.
// store4 has max leader score, store1 has min leader score.
// The scheduler try to move a leader out of 16 first.
CheckTransferLeader(c, s.schedule(nil), schedule.OpBalance, 4, 2)

// Stores: 1 2 3 4
Expand All @@ -234,17 +235,47 @@ func (s *testBalanceLeaderSchedulerSuite) TestBalanceSelector(c *C) {
// Region2: F F L -
s.tc.updateLeaderCount(2, 14)
s.tc.updateLeaderCount(3, 15)
// Average leader is 11.5. Select store 1 as target.
// Cannot move leader out of store4, move a leader into store1.
CheckTransferLeader(c, s.schedule(nil), schedule.OpBalance, 3, 1)

// Stores: 1 2 3 4
// Leaders: 1 2 15 16
// Region1: - F F L
// Region2: - F L F
s.tc.addLeaderRegion(2, 3, 2, 4)
// Region1: - F L F
// Region2: L F F -
s.tc.addLeaderStore(2, 2)
// Unable to find a region in store 1. Transfer a leader out of store 4 instead.
CheckTransferLeader(c, s.schedule(nil), schedule.OpBalance, 4, 2)
s.tc.addLeaderRegion(1, 3, 2, 4)
s.tc.addLeaderRegion(2, 1, 2, 3)
// No leader in store16, no follower in store1. No operator is created.
c.Assert(s.schedule(nil), IsNil)
// store4 and store1 are marked taint.
// Now source and target are store3 and store2.
CheckTransferLeader(c, s.schedule(nil), schedule.OpBalance, 3, 2)

// Stores: 1 2 3 4
// Leaders: 9 10 10 11
// Region1: - F F L
// Region2: L F F -
s.tc.addLeaderStore(1, 10)
s.tc.addLeaderStore(2, 10)
s.tc.addLeaderStore(3, 10)
s.tc.addLeaderStore(4, 10)
s.tc.addLeaderRegion(1, 4, 2, 3)
s.tc.addLeaderRegion(2, 1, 2, 3)
// The cluster is balanced.
c.Assert(s.schedule(nil), IsNil) // store1, store4 are marked taint.
c.Assert(s.schedule(nil), IsNil) // store2, store3 are marked taint.

// store3's leader drops:
// Stores: 1 2 3 4
// Leaders: 11 13 0 16
// Region1: - F F L
// Region2: L F F -
s.tc.addLeaderStore(1, 11)
s.tc.addLeaderStore(2, 13)
s.tc.addLeaderStore(3, 0)
s.tc.addLeaderStore(4, 16)
c.Assert(s.schedule(nil), IsNil) // All stores are marked taint.
CheckTransferLeader(c, s.schedule(nil), schedule.OpBalance, 4, 3) // The taint store will be clear.
}

var _ = Suite(&testBalanceRegionSchedulerSuite{})
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/base_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

// options for interval of schedulers
const (
MaxScheduleInterval = time.Minute
MaxScheduleInterval = time.Second * 5
MinScheduleInterval = time.Millisecond * 10
MinSlowScheduleInterval = time.Second * 3

Expand Down
9 changes: 9 additions & 0 deletions server/schedulers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,16 @@ var schedulerStatus = prometheus.NewGaugeVec(
Help: "Inner status of the scheduler.",
}, []string{"type", "name"})

var balanceLeaderCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "scheduler",
Name: "balance_leader",
Help: "Counter of balance leader scheduler.",
}, []string{"type", "store"})

func init() {
prometheus.MustRegister(schedulerCounter)
prometheus.MustRegister(schedulerStatus)
prometheus.MustRegister(balanceLeaderCounter)
}
96 changes: 12 additions & 84 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,99 +14,16 @@
package schedulers

import (
"math"
"time"

"github.com/montanaflynn/stats"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/server/cache"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/schedule"
log "github.com/sirupsen/logrus"
)

// scheduleTransferLeader schedules a region to transfer leader to the peer.
func scheduleTransferLeader(cluster schedule.Cluster, schedulerName string, s schedule.Selector, filters ...schedule.Filter) (region *core.RegionInfo, peer *metapb.Peer) {
stores := cluster.GetStores()
if len(stores) == 0 {
schedulerCounter.WithLabelValues(schedulerName, "no_store").Inc()
return nil, nil
}

var averageLeader float64
count := 0
for _, s := range stores {
if schedule.FilterSource(cluster, s, filters) {
continue
}
averageLeader += float64(s.LeaderScore())
count++
}
averageLeader /= float64(count)
log.Debugf("[%s] averageLeader is %v", schedulerName, averageLeader)

mostLeaderStore := s.SelectSource(cluster, stores, filters...)
leastLeaderStore := s.SelectTarget(cluster, stores, filters...)
log.Debugf("[%s] mostLeaderStore is %v, leastLeaderStore is %v", schedulerName, mostLeaderStore, leastLeaderStore)

var mostLeaderDistance, leastLeaderDistance float64
if mostLeaderStore != nil {
mostLeaderDistance = math.Abs(mostLeaderStore.LeaderScore() - averageLeader)
}
if leastLeaderStore != nil {
leastLeaderDistance = math.Abs(leastLeaderStore.LeaderScore() - averageLeader)
}
log.Debugf("[%s] mostLeaderDistance is %v, leastLeaderDistance is %v", schedulerName, mostLeaderDistance, leastLeaderDistance)
if mostLeaderDistance == 0 && leastLeaderDistance == 0 {
schedulerCounter.WithLabelValues(schedulerName, "already_balanced").Inc()
return nil, nil
}

if mostLeaderDistance > leastLeaderDistance {
region, peer = scheduleRemoveLeader(cluster, schedulerName, mostLeaderStore.GetId(), s)
if region == nil {
region, peer = scheduleAddLeader(cluster, schedulerName, leastLeaderStore.GetId())
}
} else {
region, peer = scheduleAddLeader(cluster, schedulerName, leastLeaderStore.GetId())
if region == nil {
region, peer = scheduleRemoveLeader(cluster, schedulerName, mostLeaderStore.GetId(), s)
}
}
if region == nil {
log.Debugf("[%v] select no region", schedulerName)
} else {
log.Debugf("[region %v][%v] select %v to be new leader", region.GetId(), schedulerName, peer)
}
return region, peer
}

// scheduleAddLeader transfers a leader into the store.
func scheduleAddLeader(cluster schedule.Cluster, schedulerName string, storeID uint64) (*core.RegionInfo, *metapb.Peer) {
region := cluster.RandFollowerRegion(storeID)
if region == nil {
schedulerCounter.WithLabelValues(schedulerName, "no_target_peer").Inc()
return nil, nil
}
return region, region.GetStorePeer(storeID)
}

// scheduleRemoveLeader transfers a leader out of the store.
func scheduleRemoveLeader(cluster schedule.Cluster, schedulerName string, storeID uint64, s schedule.Selector) (*core.RegionInfo, *metapb.Peer) {
region := cluster.RandLeaderRegion(storeID)
if region == nil {
schedulerCounter.WithLabelValues(schedulerName, "no_leader_region").Inc()
return nil, nil
}
targetStores := cluster.GetFollowerStores(region)
target := s.SelectTarget(cluster, targetStores)
if target == nil {
schedulerCounter.WithLabelValues(schedulerName, "no_target_store").Inc()
return nil, nil
}

return region, region.GetStorePeer(target.GetId())
}

// scheduleRemovePeer schedules a region to remove the peer.
func scheduleRemovePeer(cluster schedule.Cluster, schedulerName string, s schedule.Selector, filters ...schedule.Filter) (*core.RegionInfo, *metapb.Peer) {
stores := cluster.GetStores()
Expand Down Expand Up @@ -190,3 +107,14 @@ func adjustBalanceLimit(cluster schedule.Cluster, kind core.ResourceKind) uint64
limit, _ := stats.StandardDeviation(stats.Float64Data(counts))
return maxUint64(1, uint64(limit))
}

const (
taintCacheGCInterval = time.Second * 5
taintCacheTTL = time.Minute * 5
)

// newTaintCache creates a TTL cache to hold stores that are not able to
// schedule operators.
func newTaintCache() *cache.TTLUint64 {
return cache.NewIDTTL(taintCacheGCInterval, taintCacheTTL)
}

0 comments on commit 8d41166

Please sign in to comment.