Skip to content

Commit

Permalink
resouce_manager: impl resource controller for tokens client (#5811)
Browse files Browse the repository at this point in the history
ref #5851

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
CabinfeverB and ti-chi-bot authored Jan 19, 2023
1 parent 6e35a4d commit 402c2bf
Show file tree
Hide file tree
Showing 8 changed files with 1,105 additions and 51 deletions.
785 changes: 785 additions & 0 deletions pkg/mcs/resource_manager/client/client.go

Large diffs are not rendered by default.

43 changes: 43 additions & 0 deletions pkg/mcs/resource_manager/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,42 @@

package client

import (
"time"

rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
)

var (
requestUnitList map[rmpb.RequestUnitType]struct{} = map[rmpb.RequestUnitType]struct{}{
rmpb.RequestUnitType_RRU: {},
rmpb.RequestUnitType_WRU: {},
}
requestResourceList map[rmpb.RawResourceType]struct{} = map[rmpb.RawResourceType]struct{}{
rmpb.RawResourceType_IOReadFlow: {},
rmpb.RawResourceType_IOWriteFlow: {},
rmpb.RawResourceType_CPU: {},
}
)

const (
initialRequestUnits = 10000
bufferRUs = 2000
// movingAvgFactor is the weight applied to a new "sample" of RU usage (with one
// sample per mainLoopUpdateInterval).
//
// If we want a factor of 0.5 per second, this should be:
//
// 0.5^(1 second / mainLoopUpdateInterval)
movingAvgFactor = 0.5
notifyFraction = 0.1
consumptionsReportingThreshold = 100
extendedReportingPeriodFactor = 4
defaultGroupLoopUpdateInterval = 1 * time.Second
defaultTargetPeriod = 10 * time.Second
defaultMaxRequestTokens = 1e8
)

const (
defaultReadBaseCost = 1
defaultReadCostPerByte = 1. / 1024 / 1024
Expand Down Expand Up @@ -55,6 +91,10 @@ func DefaultRequestUnitConfig() *RequestUnitConfig {
// units or request resource cost standards. It should be calculated by a given `RequestUnitConfig`
// or `RequestResourceConfig`.
type Config struct {
groupLoopUpdateInterval time.Duration
targetPeriod time.Duration
maxRequestTokens float64

ReadBaseCost RequestUnit
ReadBytesCost RequestUnit
ReadCPUMsCost RequestUnit
Expand All @@ -79,5 +119,8 @@ func generateConfig(ruConfig *RequestUnitConfig) *Config {
WriteBaseCost: RequestUnit(ruConfig.WriteBaseCost),
WriteBytesCost: RequestUnit(ruConfig.WriteCostPerByte),
}
cfg.groupLoopUpdateInterval = defaultGroupLoopUpdateInterval
cfg.targetPeriod = defaultTargetPeriod
cfg.maxRequestTokens = defaultMaxRequestTokens
return cfg
}
20 changes: 14 additions & 6 deletions pkg/mcs/resource_manager/client/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (r *Reservation) CancelAt(now time.Time) {
// Act()
//
// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
func (lim *Limiter) Reserve(ctx context.Context, now time.Time, n float64) *Reservation {
func (lim *Limiter) Reserve(ctx context.Context, waitDuration time.Duration, now time.Time, n float64) *Reservation {
// Check if ctx is already cancelled
select {
case <-ctx.Done():
Expand All @@ -184,7 +184,7 @@ func (lim *Limiter) Reserve(ctx context.Context, now time.Time, n float64) *Rese
default:
}
// Determine wait limit
waitLimit := InfDuration
waitLimit := waitDuration
if deadline, ok := ctx.Deadline(); ok {
waitLimit = deadline.Sub(now)
}
Expand All @@ -194,6 +194,8 @@ func (lim *Limiter) Reserve(ctx context.Context, now time.Time, n float64) *Rese

// SetupNotificationThreshold enables the notification at the given threshold.
func (lim *Limiter) SetupNotificationThreshold(now time.Time, threshold float64) {
lim.mu.Lock()
defer lim.mu.Unlock()
lim.advance(now)
lim.notifyThreshold = threshold
}
Expand All @@ -215,19 +217,25 @@ func (lim *Limiter) notify() {
// maybeNotify checks if it's time to send the notification and if so, performs
// the notification.
func (lim *Limiter) maybeNotify() {
if lim.IsLowTokens() {
if lim.isLowTokensLocked() {
lim.notify()
}
}

// IsLowTokens returns whether the limiter is in low tokens
func (lim *Limiter) IsLowTokens() bool {
func (lim *Limiter) isLowTokensLocked() bool {
if lim.isLowProcess || (lim.notifyThreshold > 0 && lim.tokens < lim.notifyThreshold) {
return true
}
return false
}

// IsLowTokens returns whether the limiter is in low tokens
func (lim *Limiter) IsLowTokens() bool {
lim.mu.Lock()
defer lim.mu.Unlock()
return lim.isLowTokensLocked()
}

// RemoveTokens decreases the amount of tokens currently available.
func (lim *Limiter) RemoveTokens(now time.Time, amount float64) {
lim.mu.Lock()
Expand Down Expand Up @@ -373,7 +381,7 @@ func WaitReservations(ctx context.Context, now time.Time, reservations []*Reserv
for _, res := range reservations {
if !res.ok {
cancel()
return fmt.Errorf("[resource group controller] limiter has no enough token")
return fmt.Errorf("[resource group controller] limiter has no enough token or needs wait too long")
}
delay := res.DelayFrom(now)
if delay > longestDelayDuration {
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/resource_manager/client/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ func TestCancel(t *testing.T) {
r1.CancelAt(t1)
checkTokens(re, lim1, t1, 11)

r1 = lim1.Reserve(ctx, t1, 5)
r2 := lim2.Reserve(ctx1, t1, 5)
r1 = lim1.Reserve(ctx, InfDuration, t1, 5)
r2 := lim2.Reserve(ctx1, InfDuration, t1, 5)
checkTokens(re, lim1, t2, 7)
checkTokens(re, lim2, t2, 2)
err := WaitReservations(ctx, t2, []*Reservation{r1, r2})
Expand All @@ -151,8 +151,8 @@ func TestCancel(t *testing.T) {
cancel1()

ctx2, cancel2 := context.WithCancel(ctx)
r1 = lim1.Reserve(ctx, t3, 5)
r2 = lim2.Reserve(ctx2, t3, 5)
r1 = lim1.Reserve(ctx, InfDuration, t3, 5)
r2 = lim2.Reserve(ctx2, InfDuration, t3, 5)
checkTokens(re, lim1, t3, 8)
checkTokens(re, lim2, t3, -2)
var wg sync.WaitGroup
Expand Down
56 changes: 50 additions & 6 deletions pkg/mcs/resource_manager/client/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ type KVCalculator struct {

var _ ResourceCalculator = (*KVCalculator)(nil)

// func newKVCalculator(cfg *Config) *KVCalculator {
// return &KVCalculator{Config: cfg}
// }
func newKVCalculator(cfg *Config) *KVCalculator {
return &KVCalculator{Config: cfg}
}

// Trickle ...
func (kc *KVCalculator) Trickle(ctx context.Context, consumption *rmpb.Consumption) {
Expand Down Expand Up @@ -104,9 +104,9 @@ type SQLCalculator struct {

var _ ResourceCalculator = (*SQLCalculator)(nil)

// func newSQLCalculator(cfg *Config) *SQLCalculator {
// return &SQLCalculator{Config: cfg}
// }
func newSQLCalculator(cfg *Config) *SQLCalculator {
return &SQLCalculator{Config: cfg}
}

// Trickle ...
// TODO: calculate the SQL CPU cost and related resource consumption.
Expand All @@ -120,3 +120,47 @@ func (dsc *SQLCalculator) BeforeKVRequest(consumption *rmpb.Consumption, req Req
// AfterKVRequest ...
func (dsc *SQLCalculator) AfterKVRequest(consumption *rmpb.Consumption, req RequestInfo, res ResponseInfo) {
}

func getRUValueFromConsumption(custom *rmpb.Consumption, typ rmpb.RequestUnitType) float64 {
switch typ {
case 0:
return custom.RRU
case 1:
return custom.WRU
}
return 0
}

func getRawResourceValueFromConsumption(custom *rmpb.Consumption, typ rmpb.RawResourceType) float64 {
switch typ {
case 0:
return custom.TotalCpuTimeMs
case 1:
return custom.ReadBytes
case 2:
return custom.WriteBytes
}
return 0
}

func add(custom1 *rmpb.Consumption, custom2 *rmpb.Consumption) {
custom1.RRU += custom2.RRU
custom1.WRU += custom2.WRU
custom1.ReadBytes += custom2.ReadBytes
custom1.WriteBytes += custom2.WriteBytes
custom1.TotalCpuTimeMs += custom2.TotalCpuTimeMs
custom1.SqlLayerCpuTimeMs += custom2.SqlLayerCpuTimeMs
custom1.KvReadRpcCount += custom2.KvReadRpcCount
custom1.KvWriteRpcCount += custom2.KvWriteRpcCount
}

func sub(custom1 *rmpb.Consumption, custom2 *rmpb.Consumption) {
custom1.RRU -= custom2.RRU
custom1.WRU -= custom2.WRU
custom1.ReadBytes -= custom2.ReadBytes
custom1.WriteBytes -= custom2.WriteBytes
custom1.TotalCpuTimeMs -= custom2.TotalCpuTimeMs
custom1.SqlLayerCpuTimeMs -= custom2.SqlLayerCpuTimeMs
custom1.KvReadRpcCount -= custom2.KvReadRpcCount
custom1.KvWriteRpcCount -= custom2.KvWriteRpcCount
}
10 changes: 5 additions & 5 deletions pkg/mcs/resource_manager/server/token_buckets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ func TestGroupTokenBucketRequest(t *testing.T) {

gtb := NewGroupTokenBucket(tbSetting)
time1 := time.Now()
tb, trickle := gtb.request(time1, 100000, uint64(time.Second)*10/uint64(time.Millisecond))
re.LessOrEqual(math.Abs(tb.Tokens-100000), 1e-7)
tb, trickle := gtb.request(time1, 190000, uint64(time.Second)*10/uint64(time.Millisecond))
re.LessOrEqual(math.Abs(tb.Tokens-190000), 1e-7)
re.Equal(trickle, int64(0))
// need to lend token
tb, trickle = gtb.request(time1, 101000, uint64(time.Second)*10/uint64(time.Millisecond))
re.LessOrEqual(math.Abs(tb.Tokens-101000), 1e-7)
re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond))
tb, trickle = gtb.request(time1, 11000, uint64(time.Second)*10/uint64(time.Millisecond))
re.LessOrEqual(math.Abs(tb.Tokens-11000), 1e-7)
re.Equal(trickle, int64(time.Second)*11000./4000./int64(time.Millisecond))
tb, trickle = gtb.request(time1, 35000, uint64(time.Second)*10/uint64(time.Millisecond))
re.LessOrEqual(math.Abs(tb.Tokens-35000), 1e-7)
re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond))
Expand Down
14 changes: 13 additions & 1 deletion pkg/mcs/resource_manager/server/token_bukets.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package server

import (
"math"
"time"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -117,10 +118,12 @@ func (t *GroupTokenBucket) request(now time.Time, neededTokens float64, targetPe

// Firstly allocate the remaining tokens
var grantedTokens float64
hasRemaining := false
if t.Tokens > 0 {
grantedTokens = t.Tokens
neededTokens -= grantedTokens
t.Tokens = 0
hasRemaining = true
}

var targetPeriodTime = time.Duration(targetPeriodMs) * time.Millisecond
Expand Down Expand Up @@ -155,6 +158,7 @@ func (t *GroupTokenBucket) request(now time.Time, neededTokens float64, targetPe
if roundReserveTokens > neededTokens {
t.Tokens -= neededTokens
grantedTokens += neededTokens
trickleTime += grantedTokens / fillRate
neededTokens = 0
} else {
roundReserveTime := roundReserveTokens / fillRate
Expand All @@ -177,5 +181,13 @@ func (t *GroupTokenBucket) request(now time.Time, neededTokens float64, targetPe
grantedTokens = defaultReserveRatio * float64(t.Settings.FillRate) * targetPeriodTime.Seconds()
}
res.Tokens = grantedTokens
return &res, targetPeriodTime.Milliseconds()
var trickleDuration time.Duration
// can't directly treat targetPeriodTime as trickleTime when there is a token remaining.
// If treat, client consumption will be slowed down (actually cloud be increased).
if hasRemaining {
trickleDuration = time.Duration(math.Min(trickleTime, targetPeriodTime.Seconds()) * float64(time.Second))
} else {
trickleDuration = targetPeriodTime
}
return &res, trickleDuration.Milliseconds()
}
Loading

0 comments on commit 402c2bf

Please sign in to comment.