Skip to content

Commit

Permalink
resource_controller: load RU config from the remote server when initi…
Browse files Browse the repository at this point in the history
…alizing controller (#6048)

ref #6038, ref #6041

- Load RU config from the remote server when initializing controller.

Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato authored Mar 1, 2023
1 parent f999ad5 commit c82b237
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 11 deletions.
6 changes: 3 additions & 3 deletions client/resource_group/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ type Config struct {

// DefaultConfig returns the default configuration.
func DefaultConfig() *Config {
cfg := generateConfig(
return GenerateConfig(
defaultRequestUnitConfig(),
)
return cfg
}

func generateConfig(ruConfig *RequestUnitConfig) *Config {
// GenerateConfig generates the configuration by the given request unit configuration.
func GenerateConfig(ruConfig *RequestUnitConfig) *Config {
cfg := &Config{
ReadBaseCost: RequestUnit(ruConfig.ReadBaseCost),
ReadBytesCost: RequestUnit(ruConfig.ReadCostPerByte),
Expand Down
45 changes: 38 additions & 7 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package controller

import (
"context"
"encoding/json"
"math"
"sync"
"sync/atomic"
Expand All @@ -24,11 +25,13 @@ import (
"github.com/pingcap/errors"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/errs"
"go.uber.org/zap"
)

const (
requestUnitConfigPath = "resource_group/ru_config"
defaultMaxWaitDuration = time.Second
maxRetry = 3
maxNotificationChanLen = 200
Expand All @@ -50,6 +53,7 @@ type ResourceGroupProvider interface {
ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error)
AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error)
LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]pd.GlobalConfigItem, int64, error)
}

var _ ResourceGroupKVInterceptor = (*ResourceGroupsController)(nil)
Expand Down Expand Up @@ -91,14 +95,20 @@ type ResourceGroupsController struct {
}

// NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor
func NewResourceGroupController(clientUniqueID uint64, provider ResourceGroupProvider, requestUnitConfig *RequestUnitConfig) (*ResourceGroupsController, error) {
// TODO: initialize `requestUnitConfig`` from the remote manager server.
var config *Config
if requestUnitConfig != nil {
config = generateConfig(requestUnitConfig)
} else {
config = DefaultConfig()
func NewResourceGroupController(
ctx context.Context,
clientUniqueID uint64,
provider ResourceGroupProvider,
requestUnitConfig *RequestUnitConfig,
) (*ResourceGroupsController, error) {
if requestUnitConfig == nil {
var err error
requestUnitConfig, err = loadRequestUnitConfig(ctx, provider)
if err != nil {
return nil, err
}
}
config := GenerateConfig(requestUnitConfig)
return &ResourceGroupsController{
clientUniqueID: clientUniqueID,
provider: provider,
Expand All @@ -110,6 +120,27 @@ func NewResourceGroupController(clientUniqueID uint64, provider ResourceGroupPro
}, nil
}

func loadRequestUnitConfig(ctx context.Context, provider ResourceGroupProvider) (*RequestUnitConfig, error) {
items, _, err := provider.LoadGlobalConfig(ctx, nil, requestUnitConfigPath)
if err != nil {
return nil, err
}
if len(items) == 0 {
return nil, errors.Errorf("failed to load the ru config from remote server")
}
ruConfig := &RequestUnitConfig{}
err = json.Unmarshal(items[0].PayLoad, ruConfig)
if err != nil {
return nil, err
}
return ruConfig, nil
}

// GetConfig returns the config of controller. It's only used for test.
func (c *ResourceGroupsController) GetConfig() *Config {
return c.config
}

// Start starts ResourceGroupController service.
func (c *ResourceGroupsController) Start(ctx context.Context) {
c.initRunState()
Expand Down
36 changes: 35 additions & 1 deletion tests/mcs/resource_manager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() {
CPUMsCost: 1,
}

controller, _ := controller.NewResourceGroupController(1, cli, cfg)
controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg)
controller.Start(suite.ctx)

testCases := []struct {
Expand Down Expand Up @@ -700,3 +700,37 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientFailover()
// Cleanup the resource group.
suite.cleanupResourceGroups()
}

func (suite *resourceManagerClientTestSuite) TestLoadRequestUnitConfig() {
re := suite.Require()
cli := suite.client
// Test load from resource manager.
ctr, err := controller.NewResourceGroupController(suite.ctx, 1, cli, nil)
re.NoError(err)
config := ctr.GetConfig()
re.NotNil(config)
expectedConfig := controller.DefaultConfig()
re.Equal(expectedConfig.ReadBaseCost, config.ReadBaseCost)
re.Equal(expectedConfig.ReadBytesCost, config.ReadBytesCost)
re.Equal(expectedConfig.WriteBaseCost, config.WriteBaseCost)
re.Equal(expectedConfig.WriteBytesCost, config.WriteBytesCost)
re.Equal(expectedConfig.CPUMsCost, config.CPUMsCost)
// Test init from given config.
ruConfig := &controller.RequestUnitConfig{
ReadBaseCost: 1,
ReadCostPerByte: 2,
WriteBaseCost: 3,
WriteCostPerByte: 4,
CPUMsCost: 5,
}
ctr, err = controller.NewResourceGroupController(suite.ctx, 1, cli, ruConfig)
re.NoError(err)
config = ctr.GetConfig()
re.NotNil(config)
expectedConfig = controller.GenerateConfig(ruConfig)
re.Equal(expectedConfig.ReadBaseCost, config.ReadBaseCost)
re.Equal(expectedConfig.ReadBytesCost, config.ReadBytesCost)
re.Equal(expectedConfig.WriteBaseCost, config.WriteBaseCost)
re.Equal(expectedConfig.WriteBytesCost, config.WriteBytesCost)
re.Equal(expectedConfig.CPUMsCost, config.CPUMsCost)
}

0 comments on commit c82b237

Please sign in to comment.