Skip to content

Commit

Permalink
scheduler: dynamically adjust the retry limit according to the operat…
Browse files Browse the repository at this point in the history
…or (#4007) (#4046)

* scheduler: dynamically adjust the retry limit according to the operator

Signed-off-by: HunDunDM <hundundm@gmail.com>

* address comment

Signed-off-by: HunDunDM <hundundm@gmail.com>

* fix license

Signed-off-by: HunDunDM <hundundm@gmail.com>

Co-authored-by: HunDunDM <hundundm@gmail.com>
  • Loading branch information
ti-chi-bot and HunDunDM authored Oct 13, 2021
1 parent 6f3ff3f commit 46534a1
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 9 deletions.
16 changes: 12 additions & 4 deletions server/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type balanceLeaderSchedulerConfig struct {

type balanceLeaderScheduler struct {
*BaseScheduler
*retryQuota
conf *balanceLeaderSchedulerConfig
opController *schedule.OperatorController
filters []filter.Filter
Expand All @@ -83,6 +84,7 @@ func newBalanceLeaderScheduler(opController *schedule.OperatorController, conf *

s := &balanceLeaderScheduler{
BaseScheduler: base,
retryQuota: newRetryQuota(balanceLeaderRetryLimit, defaultMinRetryLimit, defaultRetryQuotaAttenuation),
conf: conf,
opController: opController,
counter: balanceLeaderCounter,
Expand Down Expand Up @@ -153,40 +155,46 @@ func (l *balanceLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
})
sort.Slice(targets, func(i, j int) bool {
iOp := plan.GetOpInfluence(targets[i].GetID())
jOp := plan.GetOpInfluence(targets[i].GetID())
jOp := plan.GetOpInfluence(targets[j].GetID())
return targets[i].LeaderScore(leaderSchedulePolicy, iOp) <
targets[j].LeaderScore(leaderSchedulePolicy, jOp)
})

for i := 0; i < len(sources) || i < len(targets); i++ {
if i < len(sources) {
plan.source, plan.target = sources[i], nil
retryLimit := l.retryQuota.GetLimit(plan.source)
log.Debug("store leader score", zap.String("scheduler", l.GetName()), zap.Uint64("source-store", plan.SourceStoreID()))
l.counter.WithLabelValues("high-score", plan.SourceMetricLabel()).Inc()
for j := 0; j < balanceLeaderRetryLimit; j++ {
for j := 0; j < retryLimit; j++ {
schedulerCounter.WithLabelValues(l.GetName(), "total").Inc()
if ops := l.transferLeaderOut(plan); len(ops) > 0 {
l.retryQuota.ResetLimit(plan.source)
ops[0].Counters = append(ops[0].Counters, l.counter.WithLabelValues("transfer-out", plan.SourceMetricLabel()))
return ops
}
}
l.Attenuate(plan.source)
log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("source", plan.SourceStoreID()))
}
if i < len(targets) {
plan.source, plan.target = nil, targets[i]
retryLimit := l.retryQuota.GetLimit(plan.target)
log.Debug("store leader score", zap.String("scheduler", l.GetName()), zap.Uint64("target-store", plan.TargetStoreID()))
l.counter.WithLabelValues("low-score", plan.TargetMetricLabel()).Inc()

for j := 0; j < balanceLeaderRetryLimit; j++ {
for j := 0; j < retryLimit; j++ {
schedulerCounter.WithLabelValues(l.GetName(), "total").Inc()
if ops := l.transferLeaderIn(plan); len(ops) > 0 {
l.retryQuota.ResetLimit(plan.target)
ops[0].Counters = append(ops[0].Counters, l.counter.WithLabelValues("transfer-in", plan.TargetMetricLabel()))
return ops
}
}
l.Attenuate(plan.target)
log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("target", plan.TargetStoreID()))
}
}
l.retryQuota.GC(append(sources, targets...))
return nil
}

Expand Down
8 changes: 7 additions & 1 deletion server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type balanceRegionSchedulerConfig struct {

type balanceRegionScheduler struct {
*BaseScheduler
*retryQuota
conf *balanceRegionSchedulerConfig
opController *schedule.OperatorController
filters []filter.Filter
Expand All @@ -82,6 +83,7 @@ func newBalanceRegionScheduler(opController *schedule.OperatorController, conf *
base := NewBaseScheduler(opController)
scheduler := &balanceRegionScheduler{
BaseScheduler: base,
retryQuota: newRetryQuota(balanceRegionRetryLimit, defaultMinRetryLimit, defaultRetryQuotaAttenuation),
conf: conf,
opController: opController,
counter: balanceRegionCounter,
Expand Down Expand Up @@ -161,7 +163,8 @@ func (s *balanceRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
}

for _, plan.source = range stores {
for i := 0; i < balanceRegionRetryLimit; i++ {
retryLimit := s.retryQuota.GetLimit(plan.source)
for i := 0; i < retryLimit; i++ {
schedulerCounter.WithLabelValues(s.GetName(), "total").Inc()
// Priority pick the region that has a pending peer.
// Pending region may means the disk is overload, remove the pending region firstly.
Expand Down Expand Up @@ -198,11 +201,14 @@ func (s *balanceRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
}

if op := s.transferPeer(plan); op != nil {
s.retryQuota.ResetLimit(plan.source)
op.Counters = append(op.Counters, schedulerCounter.WithLabelValues(s.GetName(), "new-operator"))
return []*operator.Operator{op}
}
}
s.retryQuota.Attenuate(plan.source)
}
s.retryQuota.GC(stores)
return nil
}

Expand Down
60 changes: 56 additions & 4 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ import (

const (
// adjustRatio is used to adjust TolerantSizeRatio according to region count.
adjustRatio float64 = 0.005
leaderTolerantSizeRatio float64 = 5.0
minTolerantSizeRatio float64 = 1.0
influenceAmp int64 = 100
adjustRatio float64 = 0.005
leaderTolerantSizeRatio float64 = 5.0
minTolerantSizeRatio float64 = 1.0
influenceAmp int64 = 100
defaultMinRetryLimit = 1
defaultRetryQuotaAttenuation = 2
)

type balancePlan struct {
Expand Down Expand Up @@ -753,3 +755,53 @@ func filterHotPeers(kind core.ResourceKind, peers []*statistics.HotPeerStat) []*
}
return ret
}

type retryQuota struct {
initialLimit int
minLimit int
attenuation int

limits map[uint64]int
}

func newRetryQuota(initialLimit, minLimit, attenuation int) *retryQuota {
return &retryQuota{
initialLimit: initialLimit,
minLimit: minLimit,
attenuation: attenuation,
limits: make(map[uint64]int),
}
}

func (q *retryQuota) GetLimit(store *core.StoreInfo) int {
id := store.GetID()
if limit, ok := q.limits[id]; ok {
return limit
}
q.limits[id] = q.initialLimit
return q.initialLimit
}

func (q *retryQuota) ResetLimit(store *core.StoreInfo) {
q.limits[store.GetID()] = q.initialLimit
}

func (q *retryQuota) Attenuate(store *core.StoreInfo) {
newLimit := q.GetLimit(store) / q.attenuation
if newLimit < q.minLimit {
newLimit = q.minLimit
}
q.limits[store.GetID()] = newLimit
}

func (q *retryQuota) GC(keepStores []*core.StoreInfo) {
set := make(map[uint64]struct{}, len(keepStores))
for _, store := range keepStores {
set[store.GetID()] = struct{}{}
}
for id := range q.limits {
if _, ok := set[id]; !ok {
delete(q.limits, id)
}
}
}
53 changes: 53 additions & 0 deletions server/schedulers/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2021 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schedulers

import (
. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/server/core"
)

var _ = Suite(&testUtilsSuite{})

type testUtilsSuite struct{}

func (s *testUtilsSuite) TestRetryQuota(c *C) {
q := newRetryQuota(10, 1, 2)
store1 := core.NewStoreInfo(&metapb.Store{Id: 1})
store2 := core.NewStoreInfo(&metapb.Store{Id: 2})
keepStores := []*core.StoreInfo{store1}

// test GetLimit
c.Assert(q.GetLimit(store1), Equals, 10)

// test Attenuate
for _, expected := range []int{5, 2, 1, 1, 1} {
q.Attenuate(store1)
c.Assert(q.GetLimit(store1), Equals, expected)
}

// test GC
c.Assert(q.GetLimit(store2), Equals, 10)
q.Attenuate(store2)
c.Assert(q.GetLimit(store2), Equals, 5)
q.GC(keepStores)
c.Assert(q.GetLimit(store1), Equals, 1)
c.Assert(q.GetLimit(store2), Equals, 10)

// test ResetLimit
q.ResetLimit(store1)
c.Assert(q.GetLimit(store1), Equals, 10)
}

0 comments on commit 46534a1

Please sign in to comment.