Skip to content

Commit

Permalink
storelimit: impl fead back by snapshot report (#6161) (#6349)
Browse files Browse the repository at this point in the history
close #6147, ref #6161

Signed-off-by: bufferflies <1045931706@qq.com>

Co-authored-by: buffer <1045931706@qq.com>
Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
ti-chi-bot and bufferflies authored Apr 21, 2023
1 parent 5f99e0c commit 715a78d
Show file tree
Hide file tree
Showing 15 changed files with 273 additions and 28 deletions.
15 changes: 15 additions & 0 deletions pkg/core/constant/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ const (
PriorityLevelLen
)

func (p PriorityLevel) String() string {
switch p {
case Low:
return "low"
case Medium:
return "medium"
case High:
return "high"
case Urgent:
return "urgent"
default:
return "unknown"
}
}

// ScheduleKind distinguishes resources and schedule policy.
type ScheduleKind struct {
Resource ResourceKind
Expand Down
18 changes: 17 additions & 1 deletion pkg/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,27 @@ func (s *StoreInfo) Clone(opts ...StoreCreateOption) *StoreInfo {
store := *s
store.meta = typeutil.DeepClone(s.meta, StoreFactory)
for _, opt := range opts {
opt(&store)
if opt != nil {
opt(&store)
}
}
return &store
}

// LimitVersion returns the limit version of the store.
func (s *StoreInfo) LimitVersion() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.limiter.Version()
}

// Feedback is used to update the store's limit.
func (s *StoreInfo) Feedback(e float64) {
if limit := s.limiter; limit != nil {
limit.Feedback(e)
}
}

// ShallowClone creates a copy of current StoreInfo, but not clone 'meta'.
func (s *StoreInfo) ShallowClone(opts ...StoreCreateOption) *StoreInfo {
store := *s
Expand Down
11 changes: 11 additions & 0 deletions pkg/core/storelimit/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ const (
storeLimitTypeLen
)

const (
// VersionV1 represents the rate limit version of the store limit
VersionV1 = "v1"
// VersionV2 represents the sliding window version of the store limit
VersionV2 = "v2"
)

// StoreLimit is an interface to control the operator rate of store
// TODO: add a method to control the rate of store
// the normal control flow is:
Expand All @@ -48,7 +55,11 @@ type StoreLimit interface {
Take(count int64, typ Type, level constant.PriorityLevel) bool
// Reset resets the store limit
Reset(rate float64, typ Type)
// Feedback update limit capacity by auto-tuning.
Feedback(e float64)
// Ack put back the cost into the limit for the next waiting operator after the operator is finished.
// only snapshot type can use this method.
Ack(cost int64, typ Type)
// Version returns the version of the store limit
Version() string
}
98 changes: 91 additions & 7 deletions pkg/core/storelimit/limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
package storelimit

import (
"container/list"
"context"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/core/constant"
Expand All @@ -41,8 +45,8 @@ func TestStoreLimit(t *testing.T) {
func TestSlidingWindow(t *testing.T) {
t.Parallel()
re := require.New(t)
capacity := int64(10)
s := NewSlidingWindows(float64(capacity))
capacity := int64(defaultWindowSize)
s := NewSlidingWindows()
re.Len(s.windows, int(constant.PriorityLevelLen))
// capacity:[10, 10, 10, 10]
for i, v := range s.windows {
Expand All @@ -61,23 +65,23 @@ func TestSlidingWindow(t *testing.T) {

// case 1: it will occupy the normal window size not the core.High window.
re.True(s.Take(capacity, SendSnapshot, constant.High))
re.EqualValues(capacity, s.GetUsed())
re.EqualValues([]int64{capacity, 0, 0, 0}, s.GetUsed())
re.EqualValues(0, s.windows[constant.High].getUsed())
s.Ack(capacity, SendSnapshot)
re.EqualValues(s.GetUsed(), 0)
re.EqualValues([]int64{0, 0, 0, 0}, s.GetUsed())

// case 2: it will occupy the core.High window size if the normal window is full.
capacity = 2000
s.Reset(float64(capacity), SendSnapshot)
s.set(float64(capacity), SendSnapshot)
re.True(s.Take(capacity-minSnapSize, SendSnapshot, constant.Low))
re.True(s.Take(capacity-minSnapSize, SendSnapshot, constant.Low))
re.False(s.Take(capacity, SendSnapshot, constant.Low))
re.True(s.Take(capacity-minSnapSize, SendSnapshot, constant.Medium))
re.False(s.Take(capacity-minSnapSize, SendSnapshot, constant.Medium))
re.EqualValues(s.GetUsed(), capacity+capacity+capacity-minSnapSize*3)
re.EqualValues([]int64{capacity + capacity - minSnapSize*2, capacity - minSnapSize, 0, 0}, s.GetUsed())
s.Ack(capacity-minSnapSize, SendSnapshot)
s.Ack(capacity-minSnapSize, SendSnapshot)
re.Equal(s.GetUsed(), capacity-minSnapSize)
re.Equal([]int64{capacity - minSnapSize, 0, 0, 0}, s.GetUsed())

// case 3: skip the type is not the SendSnapshot
for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -108,3 +112,83 @@ func TestWindow(t *testing.T) {
re.EqualValues(s.ack(minSnapSize*2), minSnapSize)
re.EqualValues(s.getUsed(), 0)
}

func TestFeedback(t *testing.T) {
s := NewSlidingWindows()
re := require.New(t)
type SnapshotStats struct {
total int64
remaining int64
size int64
start int64
}
// region size is 10GB, snapshot write limit is 100MB/s and the snapshot concurrency is 3.
// the best strategy is that the tikv executing queue equals the wait.
regionSize, limit, wait := int64(10000), int64(100), int64(4)
iter := 100
ops := make(chan int64, 10)
ctx, cancel := context.WithCancel(context.Background())

// generate the operator
go func() {
for {
if s.Available(regionSize, SendSnapshot, constant.Low) && iter > 0 {
iter--
size := regionSize - rand.Int63n(regionSize/10)
s.Take(size, SendSnapshot, constant.Low)
ops <- size
}
if iter == 0 {
cancel()
return
}
}
}()

// receive the operator
queue := list.List{}
interval := time.Microsecond * 100
ticker := time.NewTicker(interval)
defer ticker.Stop()

// tick is the time that the snapshot has been executed.
tick := int64(0)
for {
select {
case op := <-ops:
stats := &SnapshotStats{
total: op / limit,
remaining: op,
size: op,
start: tick,
}
queue.PushBack(stats)
case <-ctx.Done():
return
case <-ticker.C:
tick++
first := queue.Front()
if first == nil {
continue
}
stats := first.Value.(*SnapshotStats)
if stats.remaining > 0 {
stats.remaining -= limit
continue
}
cost := tick - stats.start
exec := stats.total
if exec < 5 {
exec = 5
}
err := exec*wait - cost
queue.Remove(first)
s.Feedback(float64(err))
if iter < 5 {
re.Greater(float64(s.GetCap()), float64(regionSize*(wait-2)))
re.Less(float64(s.GetCap()), float64(regionSize*wait))
}
s.Ack(stats.size, SendSnapshot)
}
}
}
63 changes: 49 additions & 14 deletions pkg/core/storelimit/sliding_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
const (
// minSnapSize is the min value to check the windows has enough size.
minSnapSize = 10
// defaultWindowSize is the default window size.
defaultWindowSize = 100

defaultProportion = 20
defaultIntegral = 10
)

var _ StoreLimit = &SlidingWindows{}
Expand All @@ -30,30 +35,53 @@ var _ StoreLimit = &SlidingWindows{}
type SlidingWindows struct {
mu syncutil.RWMutex
windows []*window
lastSum float64
}

// NewSlidingWindows is the construct of SlidingWindows.
func NewSlidingWindows(cap float64) *SlidingWindows {
if cap < 0 {
cap = minSnapSize
}
func NewSlidingWindows() *SlidingWindows {
windows := make([]*window, constant.PriorityLevelLen)
for i := 0; i < int(constant.PriorityLevelLen); i++ {
windows[i] = newWindow(int64(cap) >> i)
windows[i] = newWindow(int64(defaultWindowSize) >> i)
}
return &SlidingWindows{
windows: windows,
}
}

// Reset resets the capacity of the sliding windows.
// It doesn't clear all the used, only set the capacity.
func (s *SlidingWindows) Reset(cap float64, typ Type) {
// Version returns v2
func (s *SlidingWindows) Version() string {
return VersionV2
}

// Feedback is used to update the capacity of the sliding windows.
func (s *SlidingWindows) Feedback(e float64) {
s.mu.Lock()
defer s.mu.Unlock()
// If the limiter is available, we don't need to update the capacity.
if s.windows[constant.Low].available() {
return
}
s.lastSum += e
// There are two constants to control the proportion of the sum and the current error.
// The sum of the error is used to ensure the capacity is more stable even if the error is zero.
// In the final scene, the sum of the error should be stable and the current error should be zero.
cap := defaultProportion*e + defaultIntegral*s.lastSum
// The capacity should be at least the default window size.
if cap < defaultWindowSize {
cap = defaultWindowSize
}
s.set(cap, SendSnapshot)
}

// Reset does nothing because the capacity depends on the feedback.
func (s *SlidingWindows) Reset(_ float64, _ Type) {
}

func (s *SlidingWindows) set(cap float64, typ Type) {
if typ != SendSnapshot {
return
}
s.mu.Lock()
defer s.mu.Unlock()
if cap < 0 {
cap = minSnapSize
}
Expand All @@ -62,13 +90,20 @@ func (s *SlidingWindows) Reset(cap float64, typ Type) {
}
}

// GetCap returns the capacity of the sliding windows.
func (s *SlidingWindows) GetCap() int64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.windows[0].capacity
}

// GetUsed returns the used size in the sliding windows.
func (s *SlidingWindows) GetUsed() int64 {
func (s *SlidingWindows) GetUsed() []int64 {
s.mu.RLock()
defer s.mu.RUnlock()
used := int64(0)
for _, v := range s.windows {
used += v.getUsed()
used := make([]int64, len(s.windows))
for i, v := range s.windows {
used[i] = v.getUsed()
}
return used
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/core/storelimit/store_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ func NewStoreRateLimit(ratePerSec float64) StoreLimit {
// Ack does nothing.
func (l *StoreRateLimit) Ack(_ int64, _ Type) {}

// Version returns v1
func (l *StoreRateLimit) Version() string {
return VersionV1
}

// Feedback does nothing.
func (l *StoreRateLimit) Feedback(_ float64) {}

// Available returns the number of available tokens.
// notice that the priority level is not used.
func (l *StoreRateLimit) Available(cost int64, typ Type, _ constant.PriorityLevel) bool {
Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type Config interface {
CheckLabelProperty(string, []*metapb.StoreLabel) bool
IsDebugMetricsEnabled() bool
GetClusterVersion() *semver.Version
GetStoreLimitVersion() string
// for test purpose
SetPlacementRuleEnabled(bool)
SetSplitMergeInterval(time.Duration)
Expand Down
3 changes: 1 addition & 2 deletions pkg/schedule/filter/filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,8 @@ func TestRuleFitFilter(t *testing.T) {

func TestSendStateFilter(t *testing.T) {
re := require.New(t)
store := core.NewStoreInfoWithLabel(1, map[string]string{}).Clone(core.SetStoreLimit(storelimit.NewSlidingWindows(1000)))
store := core.NewStoreInfoWithLabel(1, map[string]string{}).Clone(core.SetStoreLimit(storelimit.NewSlidingWindows()))
region := core.NewTestRegionInfo(1, 1, []byte(""), []byte(""))

snapshotFilter := NewSnapshotSendFilter([]*core.StoreInfo{store}, constant.Medium)
re.NotNil(SelectOneRegion([]*core.RegionInfo{region}, nil, snapshotFilter))
re.True(store.GetStoreLimit().Take(1000, storelimit.SendSnapshot, constant.Medium))
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/operator/influence.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewOpInfluence() *OpInfluence {
}

// Add adds another influence.
func (m OpInfluence) Add(other *OpInfluence) {
func (m *OpInfluence) Add(other *OpInfluence) {
for id, v := range other.StoresInfluence {
m.GetStoreInfluence(id).add(v)
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/statistics/store_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/server/config"
)

Expand Down Expand Up @@ -111,6 +113,16 @@ func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) {
s.RegionCount += store.GetRegionCount()
s.LeaderCount += store.GetLeaderCount()
s.WitnessCount += store.GetWitnessCount()
limit, ok := store.GetStoreLimit().(*storelimit.SlidingWindows)
if ok {
cap := limit.GetCap()
storeStatusGauge.WithLabelValues(storeAddress, id, "windows_size").Set(float64(cap))
for i, use := range limit.GetUsed() {
priority := constant.PriorityLevel(i).String()
storeStatusGauge.WithLabelValues(storeAddress, id, "windows_used_level_"+priority).Set(float64(use))
}
}

// TODO: pre-allocate gauge metrics
storeStatusGauge.WithLabelValues(storeAddress, id, "region_score").Set(store.RegionScore(s.opt.GetRegionScoreFormulaVersion(), s.opt.GetHighSpaceRatio(), s.opt.GetLowSpaceRatio(), 0))
storeStatusGauge.WithLabelValues(storeAddress, id, "leader_score").Set(store.LeaderScore(s.opt.GetLeaderSchedulePolicy(), 0))
Expand Down
Loading

0 comments on commit 715a78d

Please sign in to comment.