From 402c2bfee2f38832f7d390751b46e2721baa7fb5 Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Thu, 19 Jan 2023 19:41:49 +0800 Subject: [PATCH] resouce_manager: impl resource controller for tokens client (#5811) ref tikv/pd#5851 Signed-off-by: Cabinfever_B Co-authored-by: Ti Chi Robot --- pkg/mcs/resource_manager/client/client.go | 785 ++++++++++++++++++ pkg/mcs/resource_manager/client/config.go | 43 + pkg/mcs/resource_manager/client/limiter.go | 20 +- .../resource_manager/client/limiter_test.go | 8 +- pkg/mcs/resource_manager/client/model.go | 56 +- .../server/token_buckets_test.go | 10 +- .../resource_manager/server/token_bukets.go | 14 +- .../resource_manager/resource_manager_test.go | 220 ++++- 8 files changed, 1105 insertions(+), 51 deletions(-) create mode 100644 pkg/mcs/resource_manager/client/client.go diff --git a/pkg/mcs/resource_manager/client/client.go b/pkg/mcs/resource_manager/client/client.go new file mode 100644 index 00000000000..f5c60b7d653 --- /dev/null +++ b/pkg/mcs/resource_manager/client/client.go @@ -0,0 +1,785 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "context" + "sync" + "time" + + "github.com/pingcap/errors" + rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/pingcap/log" + "go.uber.org/zap" +) + +const ( + defaultMaxWaitDuration = time.Second + maxRetry = 3 +) + +// ResourceGroupKVInterceptor is used as quato limit controller for resource group using kv store. +type ResourceGroupKVInterceptor interface { + // OnRequestWait is used to check whether resource group has enough tokens. It maybe needs wait some time. + OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) error + // OnResponse is used to consume tokens atfer receiving response + OnResponse(ctx context.Context, resourceGroupName string, req RequestInfo, resp ResponseInfo) error +} + +// ResourceGroupProvider provides some api to interact with resource manager server。 +type ResourceGroupProvider interface { + ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) + GetResourceGroup(ctx context.Context, resourceGroupName string) (*rmpb.ResourceGroup, error) + AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) + ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) + DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) + AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) +} + +var _ ResourceGroupKVInterceptor = (*ResourceGroupsController)(nil) + +// ResourceGroupsController impls ResourceGroupKVInterceptor. +type ResourceGroupsController struct { + clientUniqueID uint64 + provider ResourceGroupProvider + groupsController sync.Map + config *Config + + loopCtx context.Context + loopCancel func() + + calculators []ResourceCalculator + + // tokenResponseChan receives token bucket response from server. + // And it handles all resource group and runs in main loop + tokenResponseChan chan []*rmpb.TokenBucketResponse + + // lowTokenNotifyChan receives chan notification when the number of available token is low + lowTokenNotifyChan chan struct{} + + run struct { + now time.Time + lastRequestTime time.Time + + // requestInProgress is true if we are in the process of sending a request. + // It gets set to false when we receives the response in the main loop, + // even in error cases. + requestInProgress bool + + // requestNeedsRetry is set if the last token bucket request encountered an + // error. This triggers a retry attempt on the next tick. + // + // Note: requestNeedsRetry and requestInProgress are never true at the same time. + requestNeedsRetry bool + + // targetPeriod indicate how long it is expected to cost token when acquiring token. + // last update. + targetPeriod time.Duration + } +} + +// NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor +func NewResourceGroupController(clientUniqueID uint64, provider ResourceGroupProvider, requestUnitConfig *RequestUnitConfig) (*ResourceGroupsController, error) { + var config *Config + if requestUnitConfig != nil { + config = generateConfig(requestUnitConfig) + } else { + config = DefaultConfig() + } + return &ResourceGroupsController{ + clientUniqueID: clientUniqueID, + provider: provider, + config: config, + lowTokenNotifyChan: make(chan struct{}, 1), + tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1), + calculators: []ResourceCalculator{newKVCalculator(config), newSQLCalculator(config)}, + }, nil +} + +// Start starts ResourceGroupController service. +func (c *ResourceGroupsController) Start(ctx context.Context) { + if err := c.updateAllResourceGroups(ctx); err != nil { + log.Error("update ResourceGroup failed", zap.Error(err)) + } + c.initRunState() + c.loopCtx, c.loopCancel = context.WithCancel(ctx) + go c.mainLoop(ctx) +} + +// Stop stops ResourceGroupController service. +func (c *ResourceGroupsController) Stop() error { + if c.loopCancel == nil { + return errors.Errorf("resourceGroupsController does not start.") + } + c.loopCancel() + return nil +} + +func (c *ResourceGroupsController) putResourceGroup(ctx context.Context, name string) (*groupCostController, error) { + group, err := c.provider.GetResourceGroup(ctx, name) + if err != nil { + return nil, err + } + log.Info("create resource group cost controller", zap.String("name", group.GetName())) + gc := newGroupCostController(group, c.config, c.lowTokenNotifyChan) + // A future case: If user change mode from RU to RAW mode. How to re-init? + gc.initRunState() + c.groupsController.Store(group.GetName(), gc) + return gc, nil +} + +func (c *ResourceGroupsController) updateAllResourceGroups(ctx context.Context) error { + groups, err := c.provider.ListResourceGroups(ctx) + if err != nil { + return err + } + latestGroups := make(map[string]struct{}) + for _, group := range groups { + log.Info("create resource group cost controller", zap.String("name", group.GetName())) + gc := newGroupCostController(group, c.config, c.lowTokenNotifyChan) + c.groupsController.Store(group.GetName(), gc) + latestGroups[group.GetName()] = struct{}{} + } + c.groupsController.Range(func(key, value any) bool { + resourceGroupName := key.(string) + if _, ok := latestGroups[resourceGroupName]; !ok { + c.groupsController.Delete(key) + } + return true + }) + return nil +} + +func (c *ResourceGroupsController) initRunState() { + now := time.Now() + c.run.now = now + c.run.lastRequestTime = now + c.run.targetPeriod = c.config.targetPeriod + c.groupsController.Range(func(name, value any) bool { + gc := value.(*groupCostController) + gc.initRunState() + return true + }) +} + +func (c *ResourceGroupsController) updateRunState(ctx context.Context) { + c.run.now = time.Now() + c.groupsController.Range(func(name, value any) bool { + gc := value.(*groupCostController) + gc.updateRunState(ctx) + return true + }) +} + +func (c *ResourceGroupsController) shouldReportConsumption() bool { + if c.run.requestInProgress { + return false + } + timeSinceLastRequest := c.run.now.Sub(c.run.lastRequestTime) + if timeSinceLastRequest >= c.run.targetPeriod { + if timeSinceLastRequest >= extendedReportingPeriodFactor*c.run.targetPeriod { + return true + } + ret := false + c.groupsController.Range(func(name, value any) bool { + gc := value.(*groupCostController) + ret = ret || gc.shouldReportConsumption() + return !ret + }) + return ret + } + return false +} + +func (c *ResourceGroupsController) updateAvgRequestResourcePerSec() { + c.groupsController.Range(func(name, value any) bool { + gc := value.(*groupCostController) + gc.updateAvgRequestResourcePerSec() + return true + }) +} + +func (c *ResourceGroupsController) handleTokenBucketResponse(resp []*rmpb.TokenBucketResponse) { + for _, res := range resp { + name := res.GetResourceGroupName() + v, ok := c.groupsController.Load(name) + if !ok { + log.Warn("A non-existent resource group was found when handle token response.", zap.String("name", name)) + continue + } + gc := v.(*groupCostController) + gc.handleTokenBucketResponse(res) + } +} + +func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Context, source string, low bool) { + requests := make([]*rmpb.TokenBucketRequest, 0) + c.groupsController.Range(func(name, value any) bool { + gc := value.(*groupCostController) + request := gc.collectRequestAndConsumption(low) + if request != nil { + requests = append(requests, request) + } + return true + }) + if len(requests) > 0 { + c.sendTokenBucketRequests(ctx, requests, source) + } +} + +func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, requests []*rmpb.TokenBucketRequest, source string) { + now := time.Now() + c.run.lastRequestTime = now + c.run.requestInProgress = true + req := &rmpb.TokenBucketsRequest{ + Requests: requests, + TargetRequestPeriodMs: uint64(c.config.targetPeriod / time.Millisecond), + } + go func() { + log.Debug("[resource group controllor] send token bucket request", zap.Time("now", now), zap.Any("req", req.Requests), zap.String("source", source)) + resp, err := c.provider.AcquireTokenBuckets(ctx, req) + if err != nil { + // Don't log any errors caused by the stopper canceling the context. + if !errors.ErrorEqual(err, context.Canceled) { + log.L().Sugar().Infof("TokenBucket RPC error: %v", err) + } + resp = nil + } + log.Debug("[resource group controllor] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", time.Since(now))) + c.tokenResponseChan <- resp + }() +} + +func (c *ResourceGroupsController) handleTokenBucketTrickEvent(ctx context.Context) { + c.groupsController.Range(func(name, value any) bool { + gc := value.(*groupCostController) + gc.handleTokenBucketTrickEvent(ctx) + return true + }) +} + +func (c *ResourceGroupsController) mainLoop(ctx context.Context) { + interval := c.config.groupLoopUpdateInterval + ticker := time.NewTicker(interval) + defer ticker.Stop() + + c.updateRunState(ctx) + c.collectTokenBucketRequests(ctx, "init", false /* select all */) + + for { + select { + case <-ctx.Done(): + return + case resp := <-c.tokenResponseChan: + c.run.requestInProgress = false + if resp != nil { + c.updateRunState(ctx) + c.handleTokenBucketResponse(resp) + } else { + // A nil response indicates a failure (which would have been logged). + c.run.requestNeedsRetry = true + } + case <-ticker.C: + c.updateRunState(ctx) + c.updateAvgRequestResourcePerSec() + if c.run.requestNeedsRetry || c.shouldReportConsumption() { + c.run.requestNeedsRetry = false + c.collectTokenBucketRequests(ctx, "report", false /* select all */) + } + case <-c.lowTokenNotifyChan: + c.updateRunState(ctx) + c.updateAvgRequestResourcePerSec() + if !c.run.requestInProgress { + c.collectTokenBucketRequests(ctx, "low_ru", true /* only select low tokens resource group */) + } + default: + c.handleTokenBucketTrickEvent(ctx) + } + } +} + +// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs wait some time. +func (c *ResourceGroupsController) OnRequestWait( + ctx context.Context, resourceGroupName string, info RequestInfo, +) (err error) { + var gc *groupCostController + if tmp, ok := c.groupsController.Load(resourceGroupName); ok { + gc = tmp.(*groupCostController) + } else { + gc, err = c.putResourceGroup(ctx, resourceGroupName) + if err != nil { + return errors.Errorf("[resource group] resourceGroupName %s is not existed.", resourceGroupName) + } + } + err = gc.onRequestWait(ctx, info) + return err +} + +// OnResponse is used to consume tokens atfer receiving response +func (c *ResourceGroupsController) OnResponse(_ context.Context, resourceGroupName string, req RequestInfo, resp ResponseInfo) error { + tmp, ok := c.groupsController.Load(resourceGroupName) + if !ok { + log.Warn("[resource group] resourceGroupName is not existed.", zap.String("resourceGroupName", resourceGroupName)) + } + gc := tmp.(*groupCostController) + gc.onResponse(req, resp) + return nil +} + +type groupCostController struct { + *rmpb.ResourceGroup + mainCfg *Config + calculators []ResourceCalculator + mode rmpb.GroupMode + + handleRespFunc func(*rmpb.TokenBucketResponse) + + mu struct { + sync.Mutex + consumption *rmpb.Consumption + } + + lowRUNotifyChan chan struct{} + // run contains the state that is updated by the main loop. + run struct { + now time.Time + + // targetPeriod stores the value of the TargetPeriodSetting setting at the + // last update. + targetPeriod time.Duration + + // consumptions stores the last value of mu.consumption. + // requestUnitConsumptions []*rmpb.RequestUnitItem + // resourceConsumptions []*rmpb.ResourceItem + consumption *rmpb.Consumption + + // lastRequestUnitConsumptions []*rmpb.RequestUnitItem + // lastResourceConsumptions []*rmpb.ResourceItem + lastRequestConsumption *rmpb.Consumption + + // initialRequestCompleted is set to true when the first token bucket + // request completes successfully. + initialRequestCompleted bool + + resourceTokens map[rmpb.RawResourceType]*tokenCounter + requestUnitTokens map[rmpb.RequestUnitType]*tokenCounter + } +} + +type tokenCounter struct { + // avgRUPerSec is an exponentially-weighted moving average of the RU + // consumption per second; used to estimate the RU requirements for the next + // request. + avgRUPerSec float64 + // lastSecRU is the consumption.RU value when avgRUPerSec was last updated. + avgRUPerSecLastRU float64 + avgLastTime time.Time + + setupNotificationCh <-chan time.Time + setupNotificationThreshold float64 + setupNotificationTimer *time.Timer + + lastDeadline time.Time + lastRate float64 + + limiter *Limiter +} + +func newGroupCostController(group *rmpb.ResourceGroup, mainCfg *Config, lowRUNotifyChan chan struct{}) *groupCostController { + gc := &groupCostController{ + ResourceGroup: group, + mainCfg: mainCfg, + calculators: []ResourceCalculator{newKVCalculator(mainCfg), newSQLCalculator(mainCfg)}, + mode: group.GetMode(), + lowRUNotifyChan: lowRUNotifyChan, + } + + switch gc.mode { + case rmpb.GroupMode_RUMode: + gc.handleRespFunc = gc.handleRUTokenResponse + case rmpb.GroupMode_RawMode: + gc.handleRespFunc = gc.handleResourceTokenResponse + } + + gc.mu.consumption = &rmpb.Consumption{} + return gc +} + +func (gc *groupCostController) initRunState() { + now := time.Now() + gc.run.now = now + gc.run.targetPeriod = gc.mainCfg.targetPeriod + + gc.run.consumption = &rmpb.Consumption{} + + gc.run.lastRequestConsumption = &rmpb.Consumption{} + + switch gc.mode { + case rmpb.GroupMode_RUMode: + gc.run.requestUnitTokens = make(map[rmpb.RequestUnitType]*tokenCounter) + for typ := range requestUnitList { + counter := &tokenCounter{ + limiter: NewLimiter(now, 0, initialRequestUnits, gc.mainCfg.maxRequestTokens, gc.lowRUNotifyChan), + avgRUPerSec: initialRequestUnits / gc.run.targetPeriod.Seconds() * 2, + avgLastTime: now, + } + gc.run.requestUnitTokens[typ] = counter + } + case rmpb.GroupMode_RawMode: + gc.run.resourceTokens = make(map[rmpb.RawResourceType]*tokenCounter) + for typ := range requestResourceList { + counter := &tokenCounter{ + limiter: NewLimiter(now, 0, initialRequestUnits, gc.mainCfg.maxRequestTokens, gc.lowRUNotifyChan), + avgRUPerSec: initialRequestUnits / gc.run.targetPeriod.Seconds() * 2, + avgLastTime: now, + } + gc.run.resourceTokens[typ] = counter + } + } +} + +func (gc *groupCostController) updateRunState(ctx context.Context) { + newTime := time.Now() + deltaConsumption := &rmpb.Consumption{} + for _, calc := range gc.calculators { + calc.Trickle(ctx, deltaConsumption) + } + gc.mu.Lock() + add(gc.mu.consumption, deltaConsumption) + *gc.run.consumption = *gc.mu.consumption + gc.mu.Unlock() + // remove tokens + switch gc.mode { + case rmpb.GroupMode_RUMode: + for typ, counter := range gc.run.requestUnitTokens { + if v := getRUValueFromConsumption(deltaConsumption, typ); v > 0 { + counter.limiter.RemoveTokens(newTime, v) + } + } + case rmpb.GroupMode_RawMode: + for typ, counter := range gc.run.resourceTokens { + if v := getRawResourceValueFromConsumption(deltaConsumption, typ); v > 0 { + counter.limiter.RemoveTokens(newTime, v) + } + } + } + log.Debug("update run state", zap.Any("request unit comsumption", gc.run.consumption)) + gc.run.now = newTime +} + +func (gc *groupCostController) updateAvgRequestResourcePerSec() { + switch gc.mode { + case rmpb.GroupMode_RawMode: + gc.updateAvgRaWResourcePerSec() + case rmpb.GroupMode_RUMode: + gc.updateAvgRUPerSec() + } +} + +func (gc *groupCostController) handleTokenBucketTrickEvent(ctx context.Context) { + switch gc.mode { + case rmpb.GroupMode_RawMode: + for _, counter := range gc.run.resourceTokens { + select { + case <-counter.setupNotificationCh: + counter.setupNotificationTimer = nil + counter.setupNotificationCh = nil + counter.limiter.SetupNotificationThreshold(gc.run.now, counter.setupNotificationThreshold) + gc.updateRunState(ctx) + default: + } + } + case rmpb.GroupMode_RUMode: + for _, counter := range gc.run.requestUnitTokens { + select { + case <-counter.setupNotificationCh: + counter.setupNotificationTimer = nil + counter.setupNotificationCh = nil + counter.limiter.SetupNotificationThreshold(gc.run.now, counter.setupNotificationThreshold) + gc.updateRunState(ctx) + default: + } + } + } +} + +func (gc *groupCostController) updateAvgRaWResourcePerSec() { + for typ, counter := range gc.run.resourceTokens { + if !gc.calcAvg(counter, getRawResourceValueFromConsumption(gc.run.consumption, typ)) { + continue + } + log.Debug("[resource group controllor] update avg raw resource per sec", zap.String("name", gc.Name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec)) + } +} + +func (gc *groupCostController) updateAvgRUPerSec() { + for typ, counter := range gc.run.requestUnitTokens { + if !gc.calcAvg(counter, getRUValueFromConsumption(gc.run.consumption, typ)) { + continue + } + log.Debug("[resource group controllor] update avg ru per sec", zap.String("name", gc.Name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec)) + } +} + +func (gc *groupCostController) calcAvg(counter *tokenCounter, new float64) bool { + deltaDuration := gc.run.now.Sub(counter.avgLastTime) + if deltaDuration <= 500*time.Millisecond { + return false + } + delta := (new - counter.avgRUPerSecLastRU) / deltaDuration.Seconds() + counter.avgRUPerSec = movingAvgFactor*counter.avgRUPerSec + (1-movingAvgFactor)*delta + counter.avgLastTime = gc.run.now + counter.avgRUPerSecLastRU = new + return true +} + +func (gc *groupCostController) shouldReportConsumption() bool { + switch gc.Mode { + case rmpb.GroupMode_RUMode: + for typ := range requestUnitList { + if getRUValueFromConsumption(gc.run.consumption, typ)-getRUValueFromConsumption(gc.run.lastRequestConsumption, typ) >= consumptionsReportingThreshold { + return true + } + } + case rmpb.GroupMode_RawMode: + for typ := range requestResourceList { + if getRawResourceValueFromConsumption(gc.run.consumption, typ)-getRawResourceValueFromConsumption(gc.run.lastRequestConsumption, typ) >= consumptionsReportingThreshold { + return true + } + } + } + return false +} + +func (gc *groupCostController) handleTokenBucketResponse(resp *rmpb.TokenBucketResponse) { + gc.handleRespFunc(resp) + if !gc.run.initialRequestCompleted { + gc.run.initialRequestCompleted = true + // This is the first successful request. Take back the initial RUs that we + // used to pre-fill the bucket. + for _, counter := range gc.run.resourceTokens { + counter.limiter.RemoveTokens(gc.run.now, initialRequestUnits) + } + for _, counter := range gc.run.requestUnitTokens { + counter.limiter.RemoveTokens(gc.run.now, initialRequestUnits) + } + } +} + +func (gc *groupCostController) handleResourceTokenResponse(resp *rmpb.TokenBucketResponse) { + for _, grantedTB := range resp.GetGrantedResourceTokens() { + typ := grantedTB.GetType() + counter, ok := gc.run.resourceTokens[typ] + if !ok { + log.Warn("not support this resource type", zap.String("type", rmpb.RawResourceType_name[int32(typ)])) + continue + } + gc.modifyTokenCounter(counter, grantedTB.GetGrantedTokens(), grantedTB.GetTrickleTimeMs()) + } +} + +func (gc *groupCostController) handleRUTokenResponse(resp *rmpb.TokenBucketResponse) { + for _, grantedTB := range resp.GetGrantedRUTokens() { + typ := grantedTB.GetType() + counter, ok := gc.run.requestUnitTokens[typ] + if !ok { + log.Warn("not support this resource type", zap.String("type", rmpb.RawResourceType_name[int32(typ)])) + continue + } + gc.modifyTokenCounter(counter, grantedTB.GetGrantedTokens(), grantedTB.GetTrickleTimeMs()) + } +} + +func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket *rmpb.TokenBucket, trickleTimeMs int64) { + granted := bucket.Tokens + if !counter.lastDeadline.IsZero() { + // If last request came with a trickle duration, we may have RUs that were + // not made available to the bucket yet; throw them together with the newly + // granted RUs. + if since := counter.lastDeadline.Sub(gc.run.now); since > 0 { + granted += counter.lastRate * since.Seconds() + } + } + if counter.setupNotificationTimer != nil { + counter.setupNotificationTimer.Stop() + counter.setupNotificationTimer = nil + counter.setupNotificationCh = nil + } + notifyThreshold := granted * notifyFraction + if notifyThreshold < bufferRUs { + notifyThreshold = bufferRUs + } + + var cfg tokenBucketReconfigureArgs + // when trickleTimeMs equals zero, server has enough tokens and does not need to + // limit client consume token. So all token is granted to client right now. + if trickleTimeMs == 0 { + cfg.NewTokens = granted + cfg.NewRate = float64(bucket.GetSettings().FillRate) + cfg.NotifyThreshold = notifyThreshold + counter.lastDeadline = time.Time{} + } else { + // Otherwise the granted token is delivered to the client by fillrate. + cfg.NewTokens = 0 + trickleDuration := time.Duration(trickleTimeMs) * time.Millisecond + deadline := gc.run.now.Add(trickleDuration) + cfg.NewRate = float64(bucket.GetSettings().FillRate) + granted/trickleDuration.Seconds() + + timerDuration := trickleDuration - time.Second + if timerDuration <= 0 { + timerDuration = (trickleDuration + time.Second) / 2 + } + counter.setupNotificationTimer = time.NewTimer(timerDuration) + counter.setupNotificationCh = counter.setupNotificationTimer.C + counter.setupNotificationThreshold = notifyThreshold + + counter.lastDeadline = deadline + } + counter.lastRate = cfg.NewRate + counter.limiter.Reconfigure(gc.run.now, cfg) +} + +func (gc *groupCostController) collectRequestAndConsumption(low bool) *rmpb.TokenBucketRequest { + req := &rmpb.TokenBucketRequest{ + ResourceGroupName: gc.ResourceGroup.GetName(), + } + // collect request resource + selected := !low + switch gc.mode { + case rmpb.GroupMode_RawMode: + requests := make([]*rmpb.RawResourceItem, 0, len(requestResourceList)) + for typ, counter := range gc.run.resourceTokens { + if low && counter.limiter.IsLowTokens() { + selected = true + } + request := &rmpb.RawResourceItem{ + Type: typ, + Value: gc.calcRequest(counter), + } + requests = append(requests, request) + } + req.Request = &rmpb.TokenBucketRequest_RawResourceItems{ + RawResourceItems: &rmpb.TokenBucketRequest_RequestRawResource{ + RequestRawResource: requests, + }, + } + case rmpb.GroupMode_RUMode: + requests := make([]*rmpb.RequestUnitItem, 0, len(requestUnitList)) + for typ, counter := range gc.run.requestUnitTokens { + if low && counter.limiter.IsLowTokens() { + selected = true + } + request := &rmpb.RequestUnitItem{ + Type: typ, + Value: gc.calcRequest(counter), + } + requests = append(requests, request) + } + req.Request = &rmpb.TokenBucketRequest_RuItems{ + RuItems: &rmpb.TokenBucketRequest_RequestRU{ + RequestRU: requests, + }, + } + } + if !selected { + return nil + } + + deltaConsumption := &rmpb.Consumption{} + *deltaConsumption = *gc.run.consumption + sub(deltaConsumption, gc.run.lastRequestConsumption) + req.ConsumptionSinceLastRequest = deltaConsumption + + *gc.run.lastRequestConsumption = *gc.run.consumption + return req +} + +func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 { + value := counter.avgRUPerSec*gc.run.targetPeriod.Seconds() + bufferRUs + value -= counter.limiter.AvailableTokens(gc.run.now) + if value < 0 { + value = 0 + } + return value +} + +func (gc *groupCostController) onRequestWait( + ctx context.Context, info RequestInfo, +) (err error) { + delta := &rmpb.Consumption{} + for _, calc := range gc.calculators { + calc.BeforeKVRequest(delta, info) + } + now := time.Now() + // retry +retryLoop: + for i := 0; i < maxRetry; i++ { + switch gc.mode { + case rmpb.GroupMode_RawMode: + res := make([]*Reservation, 0, len(requestResourceList)) + for typ, counter := range gc.run.resourceTokens { + if v := getRawResourceValueFromConsumption(delta, typ); v > 0 { + res = append(res, counter.limiter.Reserve(ctx, defaultMaxWaitDuration, now, v)) + } + } + if err = WaitReservations(ctx, now, res); err == nil { + break retryLoop + } + case rmpb.GroupMode_RUMode: + res := make([]*Reservation, 0, len(requestUnitList)) + for typ, counter := range gc.run.requestUnitTokens { + if v := getRUValueFromConsumption(delta, typ); v > 0 { + res = append(res, counter.limiter.Reserve(ctx, defaultMaxWaitDuration, now, v)) + } + } + if err = WaitReservations(ctx, now, res); err == nil { + break retryLoop + } + } + time.Sleep(100 * time.Millisecond) + } + if err != nil { + return err + } + gc.mu.Lock() + add(gc.mu.consumption, delta) + gc.mu.Unlock() + return nil +} + +func (gc *groupCostController) onResponse(req RequestInfo, resp ResponseInfo) { + delta := &rmpb.Consumption{} + for _, calc := range gc.calculators { + calc.AfterKVRequest(delta, req, resp) + } + + switch gc.mode { + case rmpb.GroupMode_RawMode: + for typ, counter := range gc.run.resourceTokens { + if v := getRawResourceValueFromConsumption(delta, typ); v > 0 { + counter.limiter.RemoveTokens(time.Now(), v) + } + } + case rmpb.GroupMode_RUMode: + for typ, counter := range gc.run.requestUnitTokens { + if v := getRUValueFromConsumption(delta, typ); v > 0 { + counter.limiter.RemoveTokens(time.Now(), v) + } + } + } + gc.mu.Lock() + add(gc.mu.consumption, delta) + gc.mu.Unlock() +} diff --git a/pkg/mcs/resource_manager/client/config.go b/pkg/mcs/resource_manager/client/config.go index dd578d56557..5267eddce09 100644 --- a/pkg/mcs/resource_manager/client/config.go +++ b/pkg/mcs/resource_manager/client/config.go @@ -14,6 +14,42 @@ package client +import ( + "time" + + rmpb "github.com/pingcap/kvproto/pkg/resource_manager" +) + +var ( + requestUnitList map[rmpb.RequestUnitType]struct{} = map[rmpb.RequestUnitType]struct{}{ + rmpb.RequestUnitType_RRU: {}, + rmpb.RequestUnitType_WRU: {}, + } + requestResourceList map[rmpb.RawResourceType]struct{} = map[rmpb.RawResourceType]struct{}{ + rmpb.RawResourceType_IOReadFlow: {}, + rmpb.RawResourceType_IOWriteFlow: {}, + rmpb.RawResourceType_CPU: {}, + } +) + +const ( + initialRequestUnits = 10000 + bufferRUs = 2000 + // movingAvgFactor is the weight applied to a new "sample" of RU usage (with one + // sample per mainLoopUpdateInterval). + // + // If we want a factor of 0.5 per second, this should be: + // + // 0.5^(1 second / mainLoopUpdateInterval) + movingAvgFactor = 0.5 + notifyFraction = 0.1 + consumptionsReportingThreshold = 100 + extendedReportingPeriodFactor = 4 + defaultGroupLoopUpdateInterval = 1 * time.Second + defaultTargetPeriod = 10 * time.Second + defaultMaxRequestTokens = 1e8 +) + const ( defaultReadBaseCost = 1 defaultReadCostPerByte = 1. / 1024 / 1024 @@ -55,6 +91,10 @@ func DefaultRequestUnitConfig() *RequestUnitConfig { // units or request resource cost standards. It should be calculated by a given `RequestUnitConfig` // or `RequestResourceConfig`. type Config struct { + groupLoopUpdateInterval time.Duration + targetPeriod time.Duration + maxRequestTokens float64 + ReadBaseCost RequestUnit ReadBytesCost RequestUnit ReadCPUMsCost RequestUnit @@ -79,5 +119,8 @@ func generateConfig(ruConfig *RequestUnitConfig) *Config { WriteBaseCost: RequestUnit(ruConfig.WriteBaseCost), WriteBytesCost: RequestUnit(ruConfig.WriteCostPerByte), } + cfg.groupLoopUpdateInterval = defaultGroupLoopUpdateInterval + cfg.targetPeriod = defaultTargetPeriod + cfg.maxRequestTokens = defaultMaxRequestTokens return cfg } diff --git a/pkg/mcs/resource_manager/client/limiter.go b/pkg/mcs/resource_manager/client/limiter.go index 76825a2e5e5..c87bf8c7424 100644 --- a/pkg/mcs/resource_manager/client/limiter.go +++ b/pkg/mcs/resource_manager/client/limiter.go @@ -174,7 +174,7 @@ func (r *Reservation) CancelAt(now time.Time) { // Act() // // Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events. -func (lim *Limiter) Reserve(ctx context.Context, now time.Time, n float64) *Reservation { +func (lim *Limiter) Reserve(ctx context.Context, waitDuration time.Duration, now time.Time, n float64) *Reservation { // Check if ctx is already cancelled select { case <-ctx.Done(): @@ -184,7 +184,7 @@ func (lim *Limiter) Reserve(ctx context.Context, now time.Time, n float64) *Rese default: } // Determine wait limit - waitLimit := InfDuration + waitLimit := waitDuration if deadline, ok := ctx.Deadline(); ok { waitLimit = deadline.Sub(now) } @@ -194,6 +194,8 @@ func (lim *Limiter) Reserve(ctx context.Context, now time.Time, n float64) *Rese // SetupNotificationThreshold enables the notification at the given threshold. func (lim *Limiter) SetupNotificationThreshold(now time.Time, threshold float64) { + lim.mu.Lock() + defer lim.mu.Unlock() lim.advance(now) lim.notifyThreshold = threshold } @@ -215,19 +217,25 @@ func (lim *Limiter) notify() { // maybeNotify checks if it's time to send the notification and if so, performs // the notification. func (lim *Limiter) maybeNotify() { - if lim.IsLowTokens() { + if lim.isLowTokensLocked() { lim.notify() } } -// IsLowTokens returns whether the limiter is in low tokens -func (lim *Limiter) IsLowTokens() bool { +func (lim *Limiter) isLowTokensLocked() bool { if lim.isLowProcess || (lim.notifyThreshold > 0 && lim.tokens < lim.notifyThreshold) { return true } return false } +// IsLowTokens returns whether the limiter is in low tokens +func (lim *Limiter) IsLowTokens() bool { + lim.mu.Lock() + defer lim.mu.Unlock() + return lim.isLowTokensLocked() +} + // RemoveTokens decreases the amount of tokens currently available. func (lim *Limiter) RemoveTokens(now time.Time, amount float64) { lim.mu.Lock() @@ -373,7 +381,7 @@ func WaitReservations(ctx context.Context, now time.Time, reservations []*Reserv for _, res := range reservations { if !res.ok { cancel() - return fmt.Errorf("[resource group controller] limiter has no enough token") + return fmt.Errorf("[resource group controller] limiter has no enough token or needs wait too long") } delay := res.DelayFrom(now) if delay > longestDelayDuration { diff --git a/pkg/mcs/resource_manager/client/limiter_test.go b/pkg/mcs/resource_manager/client/limiter_test.go index 0a0ae48d7c6..9a50b0db918 100644 --- a/pkg/mcs/resource_manager/client/limiter_test.go +++ b/pkg/mcs/resource_manager/client/limiter_test.go @@ -140,8 +140,8 @@ func TestCancel(t *testing.T) { r1.CancelAt(t1) checkTokens(re, lim1, t1, 11) - r1 = lim1.Reserve(ctx, t1, 5) - r2 := lim2.Reserve(ctx1, t1, 5) + r1 = lim1.Reserve(ctx, InfDuration, t1, 5) + r2 := lim2.Reserve(ctx1, InfDuration, t1, 5) checkTokens(re, lim1, t2, 7) checkTokens(re, lim2, t2, 2) err := WaitReservations(ctx, t2, []*Reservation{r1, r2}) @@ -151,8 +151,8 @@ func TestCancel(t *testing.T) { cancel1() ctx2, cancel2 := context.WithCancel(ctx) - r1 = lim1.Reserve(ctx, t3, 5) - r2 = lim2.Reserve(ctx2, t3, 5) + r1 = lim1.Reserve(ctx, InfDuration, t3, 5) + r2 = lim2.Reserve(ctx2, InfDuration, t3, 5) checkTokens(re, lim1, t3, 8) checkTokens(re, lim2, t3, -2) var wg sync.WaitGroup diff --git a/pkg/mcs/resource_manager/client/model.go b/pkg/mcs/resource_manager/client/model.go index 64a6303774c..918cb31ca6c 100644 --- a/pkg/mcs/resource_manager/client/model.go +++ b/pkg/mcs/resource_manager/client/model.go @@ -59,9 +59,9 @@ type KVCalculator struct { var _ ResourceCalculator = (*KVCalculator)(nil) -// func newKVCalculator(cfg *Config) *KVCalculator { -// return &KVCalculator{Config: cfg} -// } +func newKVCalculator(cfg *Config) *KVCalculator { + return &KVCalculator{Config: cfg} +} // Trickle ... func (kc *KVCalculator) Trickle(ctx context.Context, consumption *rmpb.Consumption) { @@ -104,9 +104,9 @@ type SQLCalculator struct { var _ ResourceCalculator = (*SQLCalculator)(nil) -// func newSQLCalculator(cfg *Config) *SQLCalculator { -// return &SQLCalculator{Config: cfg} -// } +func newSQLCalculator(cfg *Config) *SQLCalculator { + return &SQLCalculator{Config: cfg} +} // Trickle ... // TODO: calculate the SQL CPU cost and related resource consumption. @@ -120,3 +120,47 @@ func (dsc *SQLCalculator) BeforeKVRequest(consumption *rmpb.Consumption, req Req // AfterKVRequest ... func (dsc *SQLCalculator) AfterKVRequest(consumption *rmpb.Consumption, req RequestInfo, res ResponseInfo) { } + +func getRUValueFromConsumption(custom *rmpb.Consumption, typ rmpb.RequestUnitType) float64 { + switch typ { + case 0: + return custom.RRU + case 1: + return custom.WRU + } + return 0 +} + +func getRawResourceValueFromConsumption(custom *rmpb.Consumption, typ rmpb.RawResourceType) float64 { + switch typ { + case 0: + return custom.TotalCpuTimeMs + case 1: + return custom.ReadBytes + case 2: + return custom.WriteBytes + } + return 0 +} + +func add(custom1 *rmpb.Consumption, custom2 *rmpb.Consumption) { + custom1.RRU += custom2.RRU + custom1.WRU += custom2.WRU + custom1.ReadBytes += custom2.ReadBytes + custom1.WriteBytes += custom2.WriteBytes + custom1.TotalCpuTimeMs += custom2.TotalCpuTimeMs + custom1.SqlLayerCpuTimeMs += custom2.SqlLayerCpuTimeMs + custom1.KvReadRpcCount += custom2.KvReadRpcCount + custom1.KvWriteRpcCount += custom2.KvWriteRpcCount +} + +func sub(custom1 *rmpb.Consumption, custom2 *rmpb.Consumption) { + custom1.RRU -= custom2.RRU + custom1.WRU -= custom2.WRU + custom1.ReadBytes -= custom2.ReadBytes + custom1.WriteBytes -= custom2.WriteBytes + custom1.TotalCpuTimeMs -= custom2.TotalCpuTimeMs + custom1.SqlLayerCpuTimeMs -= custom2.SqlLayerCpuTimeMs + custom1.KvReadRpcCount -= custom2.KvReadRpcCount + custom1.KvWriteRpcCount -= custom2.KvWriteRpcCount +} diff --git a/pkg/mcs/resource_manager/server/token_buckets_test.go b/pkg/mcs/resource_manager/server/token_buckets_test.go index a7ecbe81d77..e064a6504fe 100644 --- a/pkg/mcs/resource_manager/server/token_buckets_test.go +++ b/pkg/mcs/resource_manager/server/token_buckets_test.go @@ -66,13 +66,13 @@ func TestGroupTokenBucketRequest(t *testing.T) { gtb := NewGroupTokenBucket(tbSetting) time1 := time.Now() - tb, trickle := gtb.request(time1, 100000, uint64(time.Second)*10/uint64(time.Millisecond)) - re.LessOrEqual(math.Abs(tb.Tokens-100000), 1e-7) + tb, trickle := gtb.request(time1, 190000, uint64(time.Second)*10/uint64(time.Millisecond)) + re.LessOrEqual(math.Abs(tb.Tokens-190000), 1e-7) re.Equal(trickle, int64(0)) // need to lend token - tb, trickle = gtb.request(time1, 101000, uint64(time.Second)*10/uint64(time.Millisecond)) - re.LessOrEqual(math.Abs(tb.Tokens-101000), 1e-7) - re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) + tb, trickle = gtb.request(time1, 11000, uint64(time.Second)*10/uint64(time.Millisecond)) + re.LessOrEqual(math.Abs(tb.Tokens-11000), 1e-7) + re.Equal(trickle, int64(time.Second)*11000./4000./int64(time.Millisecond)) tb, trickle = gtb.request(time1, 35000, uint64(time.Second)*10/uint64(time.Millisecond)) re.LessOrEqual(math.Abs(tb.Tokens-35000), 1e-7) re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) diff --git a/pkg/mcs/resource_manager/server/token_bukets.go b/pkg/mcs/resource_manager/server/token_bukets.go index 92edc5d347e..9ca81ed5dae 100644 --- a/pkg/mcs/resource_manager/server/token_bukets.go +++ b/pkg/mcs/resource_manager/server/token_bukets.go @@ -15,6 +15,7 @@ package server import ( + "math" "time" "github.com/gogo/protobuf/proto" @@ -117,10 +118,12 @@ func (t *GroupTokenBucket) request(now time.Time, neededTokens float64, targetPe // Firstly allocate the remaining tokens var grantedTokens float64 + hasRemaining := false if t.Tokens > 0 { grantedTokens = t.Tokens neededTokens -= grantedTokens t.Tokens = 0 + hasRemaining = true } var targetPeriodTime = time.Duration(targetPeriodMs) * time.Millisecond @@ -155,6 +158,7 @@ func (t *GroupTokenBucket) request(now time.Time, neededTokens float64, targetPe if roundReserveTokens > neededTokens { t.Tokens -= neededTokens grantedTokens += neededTokens + trickleTime += grantedTokens / fillRate neededTokens = 0 } else { roundReserveTime := roundReserveTokens / fillRate @@ -177,5 +181,13 @@ func (t *GroupTokenBucket) request(now time.Time, neededTokens float64, targetPe grantedTokens = defaultReserveRatio * float64(t.Settings.FillRate) * targetPeriodTime.Seconds() } res.Tokens = grantedTokens - return &res, targetPeriodTime.Milliseconds() + var trickleDuration time.Duration + // can't directly treat targetPeriodTime as trickleTime when there is a token remaining. + // If treat, client consumption will be slowed down (actually cloud be increased). + if hasRemaining { + trickleDuration = time.Duration(math.Min(trickleTime, targetPeriodTime.Seconds()) * float64(time.Second)) + } else { + trickleDuration = targetPeriodTime + } + return &res, trickleDuration.Milliseconds() } diff --git a/tests/msc/resource_manager/resource_manager_test.go b/tests/msc/resource_manager/resource_manager_test.go index 0bff664820a..8b45125574f 100644 --- a/tests/msc/resource_manager/resource_manager_test.go +++ b/tests/msc/resource_manager/resource_manager_test.go @@ -27,6 +27,7 @@ import ( rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" + rgcli "github.com/tikv/pd/pkg/mcs/resource_manager/client" "github.com/tikv/pd/pkg/mcs/resource_manager/server" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/tests" @@ -43,10 +44,11 @@ func TestMain(m *testing.M) { type resourceManagerClientTestSuite struct { suite.Suite - ctx context.Context - clean context.CancelFunc - cluster *tests.TestCluster - client pd.Client + ctx context.Context + clean context.CancelFunc + cluster *tests.TestCluster + client pd.Client + initGroups []*rmpb.ResourceGroup } func TestResourceManagerClientTestSuite(t *testing.T) { @@ -68,6 +70,45 @@ func (suite *resourceManagerClientTestSuite) SetupSuite() { leaderName := suite.cluster.WaitLeader() leader := suite.cluster.GetServer(leaderName) suite.client, err = pd.NewClientWithContext(suite.ctx, []string{leader.GetAddr()}, pd.SecurityOption{}) + suite.initGroups = []*rmpb.ResourceGroup{ + { + Name: "test1", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RRU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 10000, + }, + Tokens: 100000, + }, + WRU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 20000, + }, + Tokens: 50000, + }, + }, + }, + { + Name: "test2", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RRU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 40000, + }, + Tokens: 100000, + }, + WRU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 20000, + }, + Tokens: 50000, + }, + }, + }, + } + re.NoError(err) } @@ -158,36 +199,157 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { } } -func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { +const buffDuration = time.Millisecond * 200 + +type testRequestInfo struct { + isWrite bool + writeBytes uint64 +} + +func (ti *testRequestInfo) IsWrite() bool { + return ti.isWrite +} + +func (ti *testRequestInfo) WriteBytes() uint64 { + return ti.writeBytes +} + +type testResponseInfo struct { + cpuMs uint64 + readBytes uint64 +} + +func (tri *testResponseInfo) ReadBytes() uint64 { + return tri.readBytes +} + +func (tri *testResponseInfo) KVCPUMs() uint64 { + return tri.cpuMs +} + +type tokenConsumptionPerSecond struct { + rruTokensAtATime float64 + wruTokensAtATime float64 + times int + waitDuration time.Duration +} + +func (t tokenConsumptionPerSecond) makeReadRequest() *testRequestInfo { + return &testRequestInfo{ + isWrite: false, + writeBytes: 0, + } +} + +func (t tokenConsumptionPerSecond) makeWriteRequest() *testRequestInfo { + return &testRequestInfo{ + isWrite: true, + writeBytes: uint64(t.wruTokensAtATime - 1), + } +} + +func (t tokenConsumptionPerSecond) makeReadResponse() *testResponseInfo { + return &testResponseInfo{ + readBytes: uint64((t.rruTokensAtATime - 1) / 2), + cpuMs: uint64(t.rruTokensAtATime / 2), + } +} + +func (t tokenConsumptionPerSecond) makeWriteResponse() *testResponseInfo { + return &testResponseInfo{ + readBytes: 0, + cpuMs: 0, + } +} + +func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { re := suite.Require() cli := suite.client - groups := []*rmpb.ResourceGroup{ - { - Name: "test1", - Mode: rmpb.GroupMode_RUMode, - RUSettings: &rmpb.GroupRequestUnitSettings{ - RRU: &rmpb.TokenBucket{ - Settings: &rmpb.TokenLimitSettings{ - FillRate: 10000, - }, - Tokens: 100000, - }, - }, - }, + for _, group := range suite.initGroups { + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + } + + cfg := &rgcli.RequestUnitConfig{ + ReadBaseCost: 1, + ReadCostPerByte: 1, + ReadCPUMsCost: 1, + WriteBaseCost: 1, + WriteCostPerByte: 1, + } + + controller, _ := rgcli.NewResourceGroupController(1, cli, cfg) + controller.Start(suite.ctx) + + testCases := []struct { + resourceGroupName string + tcs []tokenConsumptionPerSecond + len int + }{ { - Name: "test2", - Mode: rmpb.GroupMode_RUMode, - RUSettings: &rmpb.GroupRequestUnitSettings{ - RRU: &rmpb.TokenBucket{ - Settings: &rmpb.TokenLimitSettings{ - FillRate: 40000, - }, - Tokens: 100000, - }, + resourceGroupName: suite.initGroups[0].Name, + len: 8, + tcs: []tokenConsumptionPerSecond{ + {rruTokensAtATime: 50, wruTokensAtATime: 20, times: 200, waitDuration: 0}, + {rruTokensAtATime: 50, wruTokensAtATime: 100, times: 200, waitDuration: 0}, + {rruTokensAtATime: 50, wruTokensAtATime: 100, times: 200, waitDuration: 0}, + {rruTokensAtATime: 20, wruTokensAtATime: 40, times: 500, waitDuration: 0}, + {rruTokensAtATime: 25, wruTokensAtATime: 50, times: 400, waitDuration: 0}, + {rruTokensAtATime: 30, wruTokensAtATime: 60, times: 330, waitDuration: 0}, + {rruTokensAtATime: 40, wruTokensAtATime: 80, times: 250, waitDuration: 0}, + {rruTokensAtATime: 50, wruTokensAtATime: 100, times: 200, waitDuration: 0}, }, }, } + tricker := time.NewTicker(time.Second) + defer tricker.Stop() + i := 0 + for { + v := false + <-tricker.C + for _, cas := range testCases { + if i >= cas.len { + continue + } + v = true + sum := time.Duration(0) + for j := 0; j < cas.tcs[i].times; j++ { + rreq := cas.tcs[i].makeReadRequest() + wreq := cas.tcs[i].makeWriteRequest() + rres := cas.tcs[i].makeReadResponse() + wres := cas.tcs[i].makeWriteResponse() + startTime := time.Now() + controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq) + controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq) + endTime := time.Now() + sum += endTime.Sub(startTime) + controller.OnResponse(suite.ctx, cas.resourceGroupName, rreq, rres) + controller.OnResponse(suite.ctx, cas.resourceGroupName, wreq, wres) + time.Sleep(1000 * time.Microsecond) + } + re.LessOrEqual(sum, buffDuration+cas.tcs[i].waitDuration) + } + i++ + if !v { + break + } + } + for _, g := range suite.initGroups { + // Delete Resource Group + dresp, err := cli.DeleteResourceGroup(suite.ctx, g.Name) + re.NoError(err) + re.Contains(dresp, "Success!") + } +} + +func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { + re := suite.Require() + cli := suite.client + + groups := make([]*rmpb.ResourceGroup, 0) + groups = append(groups, suite.initGroups...) for _, group := range groups { resp, err := cli.AddResourceGroup(suite.ctx, group) re.NoError(err) @@ -203,7 +365,7 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { requests := make([]*rmpb.RequestUnitItem, 0) requests = append(requests, &rmpb.RequestUnitItem{ Type: rmpb.RequestUnitType_RRU, - Value: 10000, + Value: 100, }) req := &rmpb.TokenBucketRequest{ ResourceGroupName: group.Name, @@ -219,7 +381,7 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { re.NoError(err) for _, resp := range aresp { re.Len(resp.GrantedRUTokens, 1) - re.Equal(resp.GrantedRUTokens[0].GrantedTokens.Tokens, float64(10000.)) + re.Equal(resp.GrantedRUTokens[0].GrantedTokens.Tokens, float64(100.)) } gresp, err := cli.GetResourceGroup(suite.ctx, groups[0].GetName()) re.NoError(err)