Skip to content

Commit

Permalink
resource_control: supports dynamictlly change the controller config
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Sep 4, 2023
1 parent 91648e5 commit d449b8b
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 31 deletions.
15 changes: 11 additions & 4 deletions client/resource_group/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/tikv/pd/pkg/typeutil"
)

var (
Expand Down Expand Up @@ -51,7 +52,7 @@ const (
// According to the resource control Grafana panel and Prometheus sampling period, the period should be the factor of 15.
defaultTargetPeriod = 5 * time.Second
// defaultMaxWaitDuration is the max duration to wait for the token before throwing error.
defaultMaxWaitDuration = time.Second
defaultMaxWaitDuration = 30 * time.Second
)

const (
Expand Down Expand Up @@ -82,6 +83,9 @@ type Config struct {
// EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect.
DegradedModeWaitDuration string `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"`

// LTBMaxWaitDuration is the max wait time duration for local token bucket.
LTBMaxWaitDuration typeutil.Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"`

// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
// This configuration should be modified carefully.
RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"`
Expand All @@ -91,6 +95,7 @@ type Config struct {
func DefaultConfig() *Config {
return &Config{
DegradedModeWaitDuration: defaultDegradedModeWaitDuration,
LTBMaxWaitDuration: defaultMaxWaitDuration,
RequestUnit: DefaultRequestUnitConfig(),
}
}
Expand Down Expand Up @@ -143,8 +148,10 @@ type RUConfig struct {
WriteBytesCost RequestUnit
CPUMsCost RequestUnit
// The CPU statistics need to distinguish between different environments.
isSingleGroupByKeyspace bool
maxWaitDuration time.Duration
isSingleGroupByKeyspace bool

// some config for client
LTBMaxWaitDuration time.Duration
DegradedModeWaitDuration time.Duration
}

Expand All @@ -165,7 +172,7 @@ func GenerateRUConfig(config *Config) *RUConfig {
WritePerBatchBaseCost: RequestUnit(config.RequestUnit.WritePerBatchBaseCost),
WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte),
CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost),
maxWaitDuration: defaultMaxWaitDuration,
LTBMaxWaitDuration: config.LTBMaxWaitDuration,
}
duration, err := time.ParseDuration(config.DegradedModeWaitDuration)
if err != nil {
Expand Down
100 changes: 75 additions & 25 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func EnableSingleGroupByKeyspace() ResourceControlCreateOption {
// WithMaxWaitDuration is the option to set the max wait duration for acquiring token buckets.
func WithMaxWaitDuration(d time.Duration) ResourceControlCreateOption {
return func(controller *ResourceGroupsController) {
controller.ruConfig.maxWaitDuration = d
controller.ruConfig.LTBMaxWaitDuration = d
}
}

Expand Down Expand Up @@ -122,6 +122,8 @@ type ResourceGroupsController struct {
// Currently, we don't do multiple `AcquireTokenBuckets`` at the same time, so there are no concurrency problems with `currentRequests`.
currentRequests []*rmpb.TokenBucketRequest
}

opts []ResourceControlCreateOption
}

// NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor
Expand All @@ -148,6 +150,7 @@ func NewResourceGroupController(
lowTokenNotifyChan: make(chan struct{}, 1),
tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1),
tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen),
opts: opts,
}
for _, opt := range opts {
opt(controller)
Expand Down Expand Up @@ -213,22 +216,61 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
stateUpdateTicker = time.NewTicker(time.Millisecond * 100)
})

_, revision, err := c.provider.LoadResourceGroups(ctx)
_, metaRevision, err := c.provider.LoadResourceGroups(ctx)
if err != nil {
log.Warn("load resource group revision failed", zap.Error(err))
}
_, cfgRevision, err := c.provider.LoadGlobalConfig(ctx, nil, controllerConfigPath)
if err != nil {
log.Warn("load resource group revision failed", zap.Error(err))
}
var watchChannel chan []*meta_storagepb.Event
var watchMetaChannel, watchConfigChannel chan []*meta_storagepb.Event
if !c.ruConfig.isSingleGroupByKeyspace {
watchChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(revision), pd.WithPrefix())
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix())
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))
}
}
watchRetryTimer := time.NewTimer(watchRetryInterval)
if err == nil || c.ruConfig.isSingleGroupByKeyspace {
watchRetryTimer.Stop()
watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix())
if err != nil {
log.Warn("watch resource group config failed", zap.Error(err))
}
watchRetryTimer := time.NewTimer(watchRetryInterval)
defer watchRetryTimer.Stop()

for {
select {
/* tickers */
case <-cleanupTicker.C:
c.cleanUpResourceGroup()
case <-stateUpdateTicker.C:
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
if len(c.run.currentRequests) == 0 {
c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */)
}
case <-watchRetryTimer.C:
if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil {
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix())
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))
watchRetryTimer.Reset(watchRetryInterval)
failpoint.Inject("watchStreamError", func() {
watchRetryTimer.Reset(20 * time.Millisecond)
})
}
}
if watchConfigChannel == nil {
watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix())
if err != nil {
log.Warn("watch resource group config failed", zap.Error(err))
watchRetryTimer.Reset(watchRetryInterval)
}
}

case <-emergencyTokenAcquisitionTicker.C:
c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition)
/* channels */
case <-c.loopCtx.Done():
resourceGroupStatusGauge.Reset()
return
Expand All @@ -242,14 +284,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
c.handleTokenBucketResponse(resp)
}
c.run.currentRequests = nil
case <-cleanupTicker.C:
c.cleanUpResourceGroup()
case <-stateUpdateTicker.C:
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
if len(c.run.currentRequests) == 0 {
c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */)
}
case <-c.lowTokenNotifyChan:
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
Expand All @@ -259,24 +293,22 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
if c.run.inDegradedMode {
c.executeOnAllGroups((*groupCostController).applyDegradedMode)
}
case <-emergencyTokenAcquisitionTicker.C:
c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition)
case resp, ok := <-watchChannel:
case resp, ok := <-watchMetaChannel:
failpoint.Inject("disableWatch", func() {
if c.ruConfig.isSingleGroupByKeyspace {
panic("disableWatch")
}
})
if !ok {
watchChannel = nil
watchMetaChannel = nil
watchRetryTimer.Reset(watchRetryInterval)
failpoint.Inject("watchStreamError", func() {
watchRetryTimer.Reset(20 * time.Millisecond)
})
continue
}
for _, item := range resp {
revision = item.Kv.ModRevision
metaRevision = item.Kv.ModRevision
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal(item.Kv.Value, group); err != nil {
continue
Expand All @@ -293,14 +325,32 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
}
}
}
case <-watchRetryTimer.C:
watchChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(revision), pd.WithPrefix())
if err != nil {
case resp, ok := <-watchConfigChannel:
if !ok {
watchConfigChannel = nil
watchRetryTimer.Reset(watchRetryInterval)
failpoint.Inject("watchStreamError", func() {
watchRetryTimer.Reset(20 * time.Millisecond)
})
continue
}
for _, item := range resp {
cfgRevision = item.Kv.ModRevision
if !strings.HasPrefix(string(item.Kv.Key), controllerConfigPath) {
continue
}
config := &Config{}
if err := json.Unmarshal(item.Kv.Value, config); err != nil {
continue
}
c.ruConfig = GenerateRUConfig(config)
// Stay compatible with serverless
for _, opt := range c.opts {
opt(c)
}
log.Info("load resource controller config after config changed", zap.Reflect("config", config))
}

case gc := <-c.tokenBucketUpdateChan:
now := gc.run.now
go gc.handleTokenBucketUpdateEvent(c.loopCtx, now)
Expand Down Expand Up @@ -1127,7 +1177,7 @@ func (gc *groupCostController) onRequestWait(
res := make([]*Reservation, 0, len(requestResourceLimitTypeList))
for typ, counter := range gc.run.resourceTokens {
if v := getRawResourceValueFromConsumption(delta, typ); v > 0 {
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v))
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil {
Expand All @@ -1137,7 +1187,7 @@ func (gc *groupCostController) onRequestWait(
res := make([]*Reservation, 0, len(requestUnitLimitTypeList))
for typ, counter := range gc.run.requestUnitTokens {
if v := getRUValueFromConsumption(delta, typ); v > 0 {
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v))
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil {
Expand Down
4 changes: 4 additions & 0 deletions client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
add actionType = 0
modify actionType = 1
groupSettingsPathPrefix = "resource_group/settings"
controllerPathPrefix = "resource_manager/controller"
// errNotPrimary is returned when the requested server is not primary.
errNotPrimary = "not primary"
// errNotLeader is returned when the requested server is not pd leader.
Expand All @@ -43,6 +44,9 @@ const (
// GroupSettingsPathPrefixBytes is used to watch or get resource groups.
var GroupSettingsPathPrefixBytes = []byte(groupSettingsPathPrefix)

// ControllerPathPrefixBytes is used to watch or get controller config.
var ControllerPathPrefixBytes = []byte(controllerPathPrefix)

// ResourceManagerClient manages resource group info and token request.
type ResourceManagerClient interface {
ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error)
Expand Down
45 changes: 45 additions & 0 deletions pkg/mcs/resourcemanager/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package apis

import (
"errors"
"fmt"
"net/http"
"reflect"
"sync"

"github.com/gin-contrib/cors"
Expand All @@ -29,6 +31,7 @@ import (
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/tikv/pd/pkg/utils/reflectutil"
)

// APIPathPrefix is the prefix of the API path.
Expand Down Expand Up @@ -97,6 +100,8 @@ func (s *Service) RegisterRouter() {
configEndpoint.GET("/group/:name", s.getResourceGroup)
configEndpoint.GET("/groups", s.getResourceGroupList)
configEndpoint.DELETE("/group/:name", s.deleteResourceGroup)
configEndpoint.GET("/controller", s.getControllerConfig)
configEndpoint.POST("/controller", s.setControllerConfig)
}

func (s *Service) handler() http.Handler {
Expand Down Expand Up @@ -191,3 +196,43 @@ func (s *Service) deleteResourceGroup(c *gin.Context) {
}
c.String(http.StatusOK, "Success!")
}

// GetControllerConfig
//
// @Tags ResourceManager
// @Summary Get the resource controller config.
// @Success 200 {string} json format of rmserver.ControllerConfig
// @Failure 400 {string} error
// @Router /config/controller [GET]
func (s *Service) getControllerConfig(c *gin.Context) {
config := s.manager.GetControllerConfig()
c.IndentedJSON(http.StatusOK, config)
}

// SetControllerConfig
//
// @Tags ResourceManager
// @Summary Set the resource controller config.
// @Param config body object true "json params, rmserver.ControllerConfig"
// @Success 200 {string} string "Success!"
// @Failure 400 {string} error
// @Router /config/controller [POST]
func (s *Service) setControllerConfig(c *gin.Context) {
conf := make(map[string]interface{})
if err := c.ShouldBindJSON(&conf); err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
for k, v := range conf {
key := reflectutil.FindJSONFullTagByChildTag(reflect.TypeOf(rmserver.ControllerConfig{}), k)
if key == "" {
c.String(http.StatusBadRequest, fmt.Sprintf("config item %s not found", k))
return
}
if err := s.manager.UpdateControllerConfigItem(key, v); err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
}
c.String(http.StatusOK, "Success!")
}
3 changes: 3 additions & 0 deletions pkg/mcs/resourcemanager/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ type ControllerConfig struct {
// EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect.
DegradedModeWaitDuration typeutil.Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"`

// LTBMaxWaitDuration is the max wait time duration for local token bucket.
LTBMaxWaitDuration typeutil.Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"`

// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
// This configuration should be modified carefully.
RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"`
Expand Down
2 changes: 2 additions & 0 deletions pkg/mcs/resourcemanager/server/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestControllerConfig(t *testing.T) {
re := require.New(t)
cfgData := `
[controller]
ltb-max-wait-duration = "60s"
degraded-mode-wait-duration = "2s"
[controller.request-unit]
read-base-cost = 1.0
Expand All @@ -42,6 +43,7 @@ read-cpu-ms-cost = 5.0
re.NoError(err)

re.Equal(cfg.Controller.DegradedModeWaitDuration.Duration, time.Second*2)
re.Equal(cfg.Controller.LTBMaxWaitDuration.Duration, time.Second*60)
re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.CPUMsCost-5), 1e-7)
re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteCostPerByte-4), 1e-7)
re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteBaseCost-3), 1e-7)
Expand Down
Loading

0 comments on commit d449b8b

Please sign in to comment.