Skip to content

Commit

Permalink
remove priority from limit (#6089)
Browse files Browse the repository at this point in the history
ref #5467

unify the priority level.

Signed-off-by: bufferflies <1045931706@qq.com>
  • Loading branch information
bufferflies authored Mar 3, 2023
1 parent d63eada commit c40e319
Show file tree
Hide file tree
Showing 40 changed files with 260 additions and 233 deletions.
4 changes: 3 additions & 1 deletion pkg/core/kind.go → pkg/core/constant/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package core
package constant

// PriorityLevel lower level means higher priority
type PriorityLevel int
Expand All @@ -23,6 +23,8 @@ const (
Medium
High
Urgent

PriorityLevelLen
)

// ScheduleKind distinguishes resources and schedule policy.
Expand Down
27 changes: 14 additions & 13 deletions pkg/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand Down Expand Up @@ -139,7 +140,7 @@ func (s *StoreInfo) IsEvictedAsSlowTrend() bool {
func (s *StoreInfo) IsAvailable(limitType storelimit.Type) bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.limiter.Available(storelimit.RegionInfluence[limitType], limitType, storelimit.Low)
return s.limiter.Available(storelimit.RegionInfluence[limitType], limitType, constant.Low)
}

// IsTiFlash returns true if the store is tiflash.
Expand Down Expand Up @@ -309,11 +310,11 @@ const minWeight = 1e-6
const maxScore = 1024 * 1024 * 1024

// LeaderScore returns the store's leader score.
func (s *StoreInfo) LeaderScore(policy SchedulePolicy, delta int64) float64 {
func (s *StoreInfo) LeaderScore(policy constant.SchedulePolicy, delta int64) float64 {
switch policy {
case BySize:
case constant.BySize:
return float64(s.GetLeaderSize()+delta) / math.Max(s.GetLeaderWeight(), minWeight)
case ByCount:
case constant.ByCount:
return float64(int64(s.GetLeaderCount())+delta) / math.Max(s.GetLeaderWeight(), minWeight)
default:
return 0
Expand Down Expand Up @@ -447,39 +448,39 @@ func (s *StoreInfo) IsLowSpace(lowSpaceRatio float64) bool {
}

// ResourceCount returns count of leader/region in the store.
func (s *StoreInfo) ResourceCount(kind ResourceKind) uint64 {
func (s *StoreInfo) ResourceCount(kind constant.ResourceKind) uint64 {
switch kind {
case LeaderKind:
case constant.LeaderKind:
return uint64(s.GetLeaderCount())
case RegionKind:
case constant.RegionKind:
return uint64(s.GetRegionCount())
default:
return 0
}
}

// ResourceSize returns size of leader/region in the store
func (s *StoreInfo) ResourceSize(kind ResourceKind) int64 {
func (s *StoreInfo) ResourceSize(kind constant.ResourceKind) int64 {
switch kind {
case LeaderKind:
case constant.LeaderKind:
return s.GetLeaderSize()
case RegionKind:
case constant.RegionKind:
return s.GetRegionSize()
default:
return 0
}
}

// ResourceWeight returns weight of leader/region in the score
func (s *StoreInfo) ResourceWeight(kind ResourceKind) float64 {
func (s *StoreInfo) ResourceWeight(kind constant.ResourceKind) float64 {
switch kind {
case LeaderKind:
case constant.LeaderKind:
leaderWeight := s.GetLeaderWeight()
if leaderWeight <= 0 {
return minWeight
}
return leaderWeight
case RegionKind:
case constant.RegionKind:
regionWeight := s.GetRegionWeight()
if regionWeight <= 0 {
return minWeight
Expand Down
24 changes: 6 additions & 18 deletions pkg/core/storelimit/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

package storelimit

import (
"github.com/tikv/pd/pkg/core/constant"
)

// Type indicates the type of store limit
type Type int

Expand All @@ -26,28 +30,12 @@ const (
storeLimitTypeLen
)

// PriorityLevel higher level means higher priority
type PriorityLevel int

const (
// Low represent the lowest level.
Low PriorityLevel = iota
// Medium represent the medium level.
Medium
// High represent the high level.
High
// Urgent represent the urgent level.
Urgent

priorityLevelLen
)

// StoreLimit is an interface to control the operator rate of store
type StoreLimit interface {
// Available returns true if the store can accept the operator
Available(cost int64, typ Type, level PriorityLevel) bool
Available(cost int64, typ Type, level constant.PriorityLevel) bool
// Take takes the cost of the operator, it returns false if the store can't accept any operators.
Take(count int64, typ Type, level PriorityLevel) bool
Take(count int64, typ Type, level constant.PriorityLevel) bool
// Reset resets the store limit
Reset(rate float64, typ Type)
}
43 changes: 22 additions & 21 deletions pkg/core/storelimit/limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,32 @@ import (
"testing"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/core/constant"
)

func TestStoreLimit(t *testing.T) {
re := require.New(t)
rate := int64(15)
limit := NewStoreRateLimit(float64(rate))
re.True(limit.Available(influence*rate, AddPeer, Low))
re.True(limit.Take(influence*rate, AddPeer, Low))
re.False(limit.Take(influence, AddPeer, Low))
re.True(limit.Available(influence*rate, AddPeer, constant.Low))
re.True(limit.Take(influence*rate, AddPeer, constant.Low))
re.False(limit.Take(influence, AddPeer, constant.Low))

limit.Reset(float64(rate), AddPeer)
re.False(limit.Available(influence, AddPeer, Low))
re.False(limit.Take(influence, AddPeer, Low))
re.False(limit.Available(influence, AddPeer, constant.Low))
re.False(limit.Take(influence, AddPeer, constant.Low))

limit.Reset(0, AddPeer)
re.True(limit.Available(influence, AddPeer, Low))
re.True(limit.Take(influence, AddPeer, Low))
re.True(limit.Available(influence, AddPeer, constant.Low))
re.True(limit.Take(influence, AddPeer, constant.Low))
}

func TestSlidingWindow(t *testing.T) {
t.Parallel()
re := require.New(t)
capacity := int64(10)
s := NewSlidingWindows(float64(capacity))
re.Len(s.windows, int(priorityLevelLen))
re.Len(s.windows, int(constant.PriorityLevelLen))
// capacity:[10, 10, 10, 10]
for i, v := range s.windows {
cap := capacity >> i
Expand All @@ -51,27 +52,27 @@ func TestSlidingWindow(t *testing.T) {
}
re.EqualValues(v.capacity, cap)
}
// case 0: test low level
re.True(s.Available(capacity, AddPeer, Low))
re.True(s.Take(capacity, AddPeer, Low))
re.False(s.Available(capacity, AddPeer, Low))
// case 0: test core.Low level
re.True(s.Available(capacity, AddPeer, constant.Low))
re.True(s.Take(capacity, AddPeer, constant.Low))
re.False(s.Available(capacity, AddPeer, constant.Low))
s.Ack(capacity)
re.True(s.Available(capacity, AddPeer, Low))
re.True(s.Available(capacity, AddPeer, constant.Low))

// case 1: it will occupy the normal window size not the high window.
re.True(s.Take(capacity, AddPeer, High))
// case 1: it will occupy the normal window size not the core.High window.
re.True(s.Take(capacity, AddPeer, constant.High))
re.EqualValues(capacity, s.GetUsed())
re.EqualValues(0, s.windows[High].getUsed())
re.EqualValues(0, s.windows[constant.High].getUsed())
s.Ack(capacity)
re.EqualValues(s.GetUsed(), 0)

// case 2: it will occupy the high window size if the normal window is full.
// case 2: it will occupy the core.High window size if the normal window is full.
capacity = 1000
s.Reset(float64(capacity), AddPeer)
re.True(s.Take(capacity, AddPeer, Low))
re.False(s.Take(capacity, AddPeer, Low))
re.True(s.Take(capacity-100, AddPeer, Medium))
re.False(s.Take(capacity-100, AddPeer, Medium))
re.True(s.Take(capacity, AddPeer, constant.Low))
re.False(s.Take(capacity, AddPeer, constant.Low))
re.True(s.Take(capacity-100, AddPeer, constant.Medium))
re.False(s.Take(capacity-100, AddPeer, constant.Medium))
re.EqualValues(s.GetUsed(), capacity+capacity-100)
s.Ack(capacity)
re.Equal(s.GetUsed(), capacity-100)
Expand Down
15 changes: 9 additions & 6 deletions pkg/core/storelimit/sliding_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package storelimit

import "github.com/tikv/pd/pkg/utils/syncutil"
import (
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/utils/syncutil"
)

const (
// minSnapSize is the min value to check the windows has enough size.
Expand All @@ -32,8 +35,8 @@ func NewSlidingWindows(cap float64) *SlidingWindows {
if cap < 0 {
cap = minSnapSize
}
windows := make([]*window, priorityLevelLen)
for i := 0; i < int(priorityLevelLen); i++ {
windows := make([]*window, constant.PriorityLevelLen)
for i := 0; i < int(constant.PriorityLevelLen); i++ {
windows[i] = newWindow(int64(cap) >> i)
}
return &SlidingWindows{
Expand Down Expand Up @@ -68,7 +71,7 @@ func (s *SlidingWindows) GetUsed() int64 {
// Available returns whether the token can be taken.
// The order of checking windows is from low to high.
// It checks the given window finally if the lower window has no free size.
func (s *SlidingWindows) Available(_ int64, _ Type, level PriorityLevel) bool {
func (s *SlidingWindows) Available(_ int64, _ Type, level constant.PriorityLevel) bool {
s.mu.RLock()
defer s.mu.RUnlock()
for i := 0; i <= int(level); i++ {
Expand All @@ -81,7 +84,7 @@ func (s *SlidingWindows) Available(_ int64, _ Type, level PriorityLevel) bool {

// Take tries to take the token.
// It will consume the given window finally if the lower window has no free size.
func (s *SlidingWindows) Take(token int64, _ Type, level PriorityLevel) bool {
func (s *SlidingWindows) Take(token int64, _ Type, level constant.PriorityLevel) bool {
s.mu.Lock()
defer s.mu.Unlock()
for i := 0; i <= int(level); i++ {
Expand All @@ -98,7 +101,7 @@ func (s *SlidingWindows) Take(token int64, _ Type, level PriorityLevel) bool {
func (s *SlidingWindows) Ack(token int64) {
s.mu.Lock()
defer s.mu.Unlock()
for i := priorityLevelLen - 1; i >= 0; i-- {
for i := constant.PriorityLevelLen - 1; i >= 0; i-- {
if token = s.windows[i].ack(token); token <= 0 {
break
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/core/storelimit/store_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package storelimit

import (
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/ratelimit"
)

Expand Down Expand Up @@ -80,13 +81,13 @@ func NewStoreRateLimit(ratePerSec float64) StoreLimit {

// Available returns the number of available tokens.
// notice that the priority level is not used.
func (l *StoreRateLimit) Available(cost int64, typ Type, _ PriorityLevel) bool {
func (l *StoreRateLimit) Available(cost int64, typ Type, _ constant.PriorityLevel) bool {
return l.limits[typ].Available(cost)
}

// Take takes count tokens from the bucket without blocking.
// notice that the priority level is not used.
func (l *StoreRateLimit) Take(cost int64, typ Type, _ PriorityLevel) bool {
func (l *StoreRateLimit) Take(cost int64, typ Type, _ constant.PriorityLevel) bool {
return l.limits[typ].Take(cost)
}

Expand Down
3 changes: 2 additions & 1 deletion plugin/scheduler_example/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/apiutil"
Expand Down Expand Up @@ -235,7 +236,7 @@ func (s *evictLeaderScheduler) Schedule(cluster schedule.Cluster, dryRun bool) (
log.Debug("fail to create evict leader operator", errs.ZapError(err))
continue
}
op.SetPriorityLevel(core.High)
op.SetPriorityLevel(constant.High)
ops = append(ops, op)
}

Expand Down
3 changes: 2 additions & 1 deletion server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/apiutil"
Expand Down Expand Up @@ -113,7 +114,7 @@ func newStoreInfo(opt *config.ScheduleConfig, store *core.StoreInfo) *StoreInfo
UsedSize: typeutil.ByteSize(store.GetUsedSize()),
LeaderCount: store.GetLeaderCount(),
LeaderWeight: store.GetLeaderWeight(),
LeaderScore: store.LeaderScore(core.StringToSchedulePolicy(opt.LeaderSchedulePolicy), 0),
LeaderScore: store.LeaderScore(constant.StringToSchedulePolicy(opt.LeaderSchedulePolicy), 0),
LeaderSize: store.GetLeaderSize(),
RegionCount: store.GetRegionCount(),
RegionWeight: store.GetRegionWeight(),
Expand Down
9 changes: 5 additions & 4 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/mock/mockhbstream"
"github.com/tikv/pd/pkg/storage"
Expand Down Expand Up @@ -1071,7 +1072,7 @@ func TestOperatorCount(t *testing.T) {
re.Equal(uint64(1), oc.OperatorCount(operator.OpRegion)) // 1:region 2:leader
re.Equal(uint64(1), oc.OperatorCount(operator.OpLeader))
op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpRegion)
op2.SetPriorityLevel(core.High)
op2.SetPriorityLevel(constant.High)
oc.AddWaitingOperator(op2)
re.Equal(uint64(2), oc.OperatorCount(operator.OpRegion)) // 1:region 2:region
re.Equal(uint64(0), oc.OperatorCount(operator.OpLeader))
Expand Down Expand Up @@ -1154,7 +1155,7 @@ func TestStoreOverloadedWithReplace(t *testing.T) {
op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion, operator.AddPeer{ToStore: 1, PeerID: 1})
re.True(oc.AddOperator(op1))
op2 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 2})
op2.SetPriorityLevel(core.High)
op2.SetPriorityLevel(constant.High)
re.True(oc.AddOperator(op2))
op3 := newTestOperator(1, tc.GetRegion(2).GetRegionEpoch(), operator.OpRegion, operator.AddPeer{ToStore: 1, PeerID: 3})
re.False(oc.AddOperator(op3))
Expand Down Expand Up @@ -1266,7 +1267,7 @@ func TestController(t *testing.T) {
// add a PriorityKind operator will remove old operator
{
op3 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpHotRegion)
op3.SetPriorityLevel(core.High)
op3.SetPriorityLevel(constant.High)
re.Equal(1, oc.AddWaitingOperator(op11))
re.False(sc.AllowSchedule(false))
re.Equal(1, oc.AddWaitingOperator(op3))
Expand All @@ -1280,7 +1281,7 @@ func TestController(t *testing.T) {
re.Equal(1, oc.AddWaitingOperator(op2))
re.False(sc.AllowSchedule(false))
op4 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpAdmin)
op4.SetPriorityLevel(core.High)
op4.SetPriorityLevel(constant.High)
re.Equal(1, oc.AddWaitingOperator(op4))
re.True(sc.AllowSchedule(false))
re.True(oc.RemoveOperator(op4))
Expand Down
Loading

0 comments on commit c40e319

Please sign in to comment.