Skip to content

Commit

Permalink
resource_manager: Remove buffer and initialRequestUnits var (#6176)
Browse files Browse the repository at this point in the history
close #6185, close #6187

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
CabinfeverB and ti-chi-bot authored Mar 21, 2023
1 parent 1e49be5 commit af5b019
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 33 deletions.
3 changes: 1 addition & 2 deletions client/resource_group/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ var (
)

const (
initialRequestUnits = 10000
bufferRUs = 2000
// movingAvgFactor is the weight applied to a new "sample" of RU usage (with one
// sample per mainLoopUpdateInterval).
//
Expand All @@ -42,6 +40,7 @@ const (
// 0.5^(1 second / mainLoopUpdateInterval)
movingAvgFactor = 0.5
notifyFraction = 0.1
tokenReserveFraction = 0.8
consumptionsReportingThreshold = 100
extendedReportingPeriodFactor = 4
// defaultGroupCleanupInterval is the interval to clean up the deleted resource groups in memory.
Expand Down
58 changes: 29 additions & 29 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ import (
)

const (
controllerConfigPath = "resource_group/controller"
maxRetry = 3
maxNotificationChanLen = 200
controllerConfigPath = "resource_group/controller"
maxRetry = 3
maxNotificationChanLen = 200
needTokensAmplification = 1.1
)

type selectType int
Expand Down Expand Up @@ -188,6 +189,10 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
failpoint.Inject("fastCleanup", func() {
cleanupTicker.Stop()
cleanupTicker = time.NewTicker(100 * time.Millisecond)
// because of checking `gc.run.consumption` in cleanupTicker,
// so should also change the stateUpdateTicker.
stateUpdateTicker.Stop()
stateUpdateTicker = time.NewTicker(200 * time.Millisecond)
})

for {
Expand Down Expand Up @@ -366,8 +371,8 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex
request := gc.collectRequestAndConsumption(typ)
if request != nil {
c.run.currentRequests = append(c.run.currentRequests, request)
gc.tokenRequestCounter.Inc()
}
gc.tokenRequestCounter.Inc()
return true
})
if len(c.run.currentRequests) > 0 {
Expand All @@ -380,6 +385,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
req := &rmpb.TokenBucketsRequest{
Requests: requests,
TargetRequestPeriodMs: uint64(defaultTargetPeriod / time.Millisecond),
ClientUniqueId: c.clientUniqueID,
}
if c.config.DegradedModeWaitDuration > 0 && c.responseDeadlineCh == nil {
c.run.responseDeadline.Reset(c.config.DegradedModeWaitDuration)
Expand Down Expand Up @@ -556,21 +562,26 @@ func newGroupCostController(
func (gc *groupCostController) initRunState() {
now := time.Now()
gc.run.now = now
gc.run.lastRequestTime = now
gc.run.lastRequestTime = now.Add(-defaultTargetPeriod)
gc.run.targetPeriod = defaultTargetPeriod
gc.run.consumption = &rmpb.Consumption{}
gc.run.lastRequestConsumption = &rmpb.Consumption{SqlLayerCpuTimeMs: getSQLProcessCPUTime(gc.mainCfg.isSingleGroupByKeyspace)}

isBurstable := true
cfgFunc := func(tb *rmpb.TokenBucket) tokenBucketReconfigureArgs {
initialToken := float64(tb.Settings.FillRate)
cfg := tokenBucketReconfigureArgs{
NewTokens: initialRequestUnits,
NewTokens: initialToken,
NewBurst: tb.Settings.BurstLimit,
// This is to trigger token requests as soon as resource group start consuming tokens.
NotifyThreshold: math.Max(initialRequestUnits-float64(tb.Settings.FillRate)*0.2, 1),
NotifyThreshold: math.Max(initialToken*tokenReserveFraction, 1),
}
if cfg.NewBurst >= 0 {
cfg.NewBurst = 0
}
if tb.Settings.BurstLimit >= 0 {
isBurstable = false
}
return cfg
}

Expand Down Expand Up @@ -602,6 +613,7 @@ func (gc *groupCostController) initRunState() {
gc.run.resourceTokens[typ] = counter
}
}
gc.burstable.Store(isBurstable)
}

// applyDegradedMode is used to apply degraded mode for resource group which is in low-process.
Expand Down Expand Up @@ -711,9 +723,6 @@ func (gc *groupCostController) updateAvgRUPerSec() {

func (gc *groupCostController) calcAvg(counter *tokenCounter, new float64) bool {
deltaDuration := gc.run.now.Sub(counter.avgLastTime)
if deltaDuration <= 500*time.Millisecond {
return false
}
delta := (new - counter.avgRUPerSecLastRU) / deltaDuration.Seconds()
counter.avgRUPerSec = movingAvgFactor*counter.avgRUPerSec + (1-movingAvgFactor)*delta
counter.avgLastTime = gc.run.now
Expand All @@ -722,6 +731,9 @@ func (gc *groupCostController) calcAvg(counter *tokenCounter, new float64) bool
}

func (gc *groupCostController) shouldReportConsumption() bool {
if !gc.run.initialRequestCompleted {
return true
}
timeSinceLastRequest := gc.run.now.Sub(gc.run.lastRequestTime)
if timeSinceLastRequest >= defaultTargetPeriod {
if timeSinceLastRequest >= extendedReportingPeriodFactor*defaultTargetPeriod {
Expand All @@ -748,17 +760,7 @@ func (gc *groupCostController) shouldReportConsumption() bool {
func (gc *groupCostController) handleTokenBucketResponse(resp *rmpb.TokenBucketResponse) {
gc.run.requestInProgress = false
gc.handleRespFunc(resp)
if !gc.run.initialRequestCompleted {
gc.run.initialRequestCompleted = true
// This is the first successful request. Take back the initial RUs that we
// used to pre-fill the bucket.
for _, counter := range gc.run.resourceTokens {
counter.limiter.RemoveTokens(gc.run.now, initialRequestUnits)
}
for _, counter := range gc.run.requestUnitTokens {
counter.limiter.RemoveTokens(gc.run.now, initialRequestUnits)
}
}
gc.run.initialRequestCompleted = true
}

func (gc *groupCostController) handleRawResourceTokenResponse(resp *rmpb.TokenBucketResponse) {
Expand Down Expand Up @@ -833,20 +835,15 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket
}
initCounterNotify(counter)
counter.inDegradedMode = false
notifyThreshold := granted * notifyFraction
if notifyThreshold < bufferRUs {
notifyThreshold = bufferRUs
}

var cfg tokenBucketReconfigureArgs
cfg.NewBurst = bucket.GetSettings().GetBurstLimit()
// when trickleTimeMs equals zero, server has enough tokens and does not need to
// limit client consume token. So all token is granted to client right now.
if trickleTimeMs == 0 {
cfg.NewTokens = granted
cfg.NewRate = float64(bucket.GetSettings().FillRate)
cfg.NotifyThreshold = notifyThreshold
counter.lastDeadline = time.Time{}
cfg.NotifyThreshold = math.Min((granted+counter.limiter.AvailableTokens(gc.run.now)), counter.avgRUPerSec*float64(defaultTargetPeriod)) * notifyFraction
// In the non-trickle case, clients can be allowed to accumulate more tokens.
if cfg.NewBurst >= 0 {
cfg.NewBurst = 0
Expand All @@ -865,7 +862,7 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket
counter.notify.mu.Lock()
counter.notify.setupNotificationTimer = time.NewTimer(timerDuration)
counter.notify.setupNotificationCh = counter.notify.setupNotificationTimer.C
counter.notify.setupNotificationThreshold = notifyThreshold
counter.notify.setupNotificationThreshold = 1
counter.notify.mu.Unlock()
counter.lastDeadline = deadline
select {
Expand Down Expand Up @@ -958,7 +955,10 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType
}

func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 {
value := counter.avgRUPerSec*gc.run.targetPeriod.Seconds() + bufferRUs
// `needTokensAmplification` is used to properly amplify a need. The reason is that in the current implementation,
// the token returned from the server determines the average consumption speed.
// Therefore, when the fillrate of resource group increases, `needTokensAmplification` can enable the client to obtain more tokens.
value := counter.avgRUPerSec * gc.run.targetPeriod.Seconds() * needTokensAmplification
value -= counter.limiter.AvailableTokens(gc.run.now)
if value < 0 {
value = 0
Expand Down
14 changes: 12 additions & 2 deletions tests/mcs/resource_manager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,10 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() {
rres := cas.tcs[i].makeReadResponse()
wres := cas.tcs[i].makeWriteResponse()
startTime := time.Now()
controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq)
controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq)
_, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq)
re.NoError(err)
_, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq)
re.NoError(err)
sum += time.Since(startTime)
controller.OnResponse(cas.resourceGroupName, rreq, rres)
controller.OnResponse(cas.resourceGroupName, wreq, wres)
Expand Down Expand Up @@ -706,9 +708,17 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMo
rruTokensAtATime: 0,
wruTokensAtATime: 10000,
}
tc2 := tokenConsumptionPerSecond{
rruTokensAtATime: 0,
wruTokensAtATime: 2,
}
controller.OnRequestWait(suite.ctx, "modetest", tc.makeWriteRequest())
time.Sleep(time.Second * 2)
beginTime := time.Now()
// This is used to make sure resource group in lowRU.
for i := 0; i < 100; i++ {
controller.OnRequestWait(suite.ctx, "modetest", tc2.makeWriteRequest())
}
for i := 0; i < 100; i++ {
controller.OnRequestWait(suite.ctx, "modetest", tc.makeWriteRequest())
}
Expand Down

0 comments on commit af5b019

Please sign in to comment.