From c82b237672a0497cea4146c11d86ec2fa3728a6d Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 1 Mar 2023 17:45:09 +0800 Subject: [PATCH] resource_controller: load RU config from the remote server when initializing controller (#6048) ref tikv/pd#6038, ref tikv/pd#6041 - Load RU config from the remote server when initializing controller. Signed-off-by: JmPotato --- client/resource_group/controller/config.go | 6 +-- .../resource_group/controller/controller.go | 45 ++++++++++++++++--- .../resource_manager/resource_manager_test.go | 36 ++++++++++++++- 3 files changed, 76 insertions(+), 11 deletions(-) diff --git a/client/resource_group/controller/config.go b/client/resource_group/controller/config.go index 6bad72f1fec..c8c0b5e2936 100644 --- a/client/resource_group/controller/config.go +++ b/client/resource_group/controller/config.go @@ -106,13 +106,13 @@ type Config struct { // DefaultConfig returns the default configuration. func DefaultConfig() *Config { - cfg := generateConfig( + return GenerateConfig( defaultRequestUnitConfig(), ) - return cfg } -func generateConfig(ruConfig *RequestUnitConfig) *Config { +// GenerateConfig generates the configuration by the given request unit configuration. +func GenerateConfig(ruConfig *RequestUnitConfig) *Config { cfg := &Config{ ReadBaseCost: RequestUnit(ruConfig.ReadBaseCost), ReadBytesCost: RequestUnit(ruConfig.ReadCostPerByte), diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 46d773a2591..24203809215 100644 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -16,6 +16,7 @@ package controller import ( "context" + "encoding/json" "math" "sync" "sync/atomic" @@ -24,11 +25,13 @@ import ( "github.com/pingcap/errors" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/log" + pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/errs" "go.uber.org/zap" ) const ( + requestUnitConfigPath = "resource_group/ru_config" defaultMaxWaitDuration = time.Second maxRetry = 3 maxNotificationChanLen = 200 @@ -50,6 +53,7 @@ type ResourceGroupProvider interface { ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) + LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]pd.GlobalConfigItem, int64, error) } var _ ResourceGroupKVInterceptor = (*ResourceGroupsController)(nil) @@ -91,14 +95,20 @@ type ResourceGroupsController struct { } // NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor -func NewResourceGroupController(clientUniqueID uint64, provider ResourceGroupProvider, requestUnitConfig *RequestUnitConfig) (*ResourceGroupsController, error) { - // TODO: initialize `requestUnitConfig`` from the remote manager server. - var config *Config - if requestUnitConfig != nil { - config = generateConfig(requestUnitConfig) - } else { - config = DefaultConfig() +func NewResourceGroupController( + ctx context.Context, + clientUniqueID uint64, + provider ResourceGroupProvider, + requestUnitConfig *RequestUnitConfig, +) (*ResourceGroupsController, error) { + if requestUnitConfig == nil { + var err error + requestUnitConfig, err = loadRequestUnitConfig(ctx, provider) + if err != nil { + return nil, err + } } + config := GenerateConfig(requestUnitConfig) return &ResourceGroupsController{ clientUniqueID: clientUniqueID, provider: provider, @@ -110,6 +120,27 @@ func NewResourceGroupController(clientUniqueID uint64, provider ResourceGroupPro }, nil } +func loadRequestUnitConfig(ctx context.Context, provider ResourceGroupProvider) (*RequestUnitConfig, error) { + items, _, err := provider.LoadGlobalConfig(ctx, nil, requestUnitConfigPath) + if err != nil { + return nil, err + } + if len(items) == 0 { + return nil, errors.Errorf("failed to load the ru config from remote server") + } + ruConfig := &RequestUnitConfig{} + err = json.Unmarshal(items[0].PayLoad, ruConfig) + if err != nil { + return nil, err + } + return ruConfig, nil +} + +// GetConfig returns the config of controller. It's only used for test. +func (c *ResourceGroupsController) GetConfig() *Config { + return c.config +} + // Start starts ResourceGroupController service. func (c *ResourceGroupsController) Start(ctx context.Context) { c.initRunState() diff --git a/tests/mcs/resource_manager/resource_manager_test.go b/tests/mcs/resource_manager/resource_manager_test.go index e36d1900fc3..523fe527d71 100644 --- a/tests/mcs/resource_manager/resource_manager_test.go +++ b/tests/mcs/resource_manager/resource_manager_test.go @@ -301,7 +301,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { CPUMsCost: 1, } - controller, _ := controller.NewResourceGroupController(1, cli, cfg) + controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg) controller.Start(suite.ctx) testCases := []struct { @@ -700,3 +700,37 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientFailover() // Cleanup the resource group. suite.cleanupResourceGroups() } + +func (suite *resourceManagerClientTestSuite) TestLoadRequestUnitConfig() { + re := suite.Require() + cli := suite.client + // Test load from resource manager. + ctr, err := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) + re.NoError(err) + config := ctr.GetConfig() + re.NotNil(config) + expectedConfig := controller.DefaultConfig() + re.Equal(expectedConfig.ReadBaseCost, config.ReadBaseCost) + re.Equal(expectedConfig.ReadBytesCost, config.ReadBytesCost) + re.Equal(expectedConfig.WriteBaseCost, config.WriteBaseCost) + re.Equal(expectedConfig.WriteBytesCost, config.WriteBytesCost) + re.Equal(expectedConfig.CPUMsCost, config.CPUMsCost) + // Test init from given config. + ruConfig := &controller.RequestUnitConfig{ + ReadBaseCost: 1, + ReadCostPerByte: 2, + WriteBaseCost: 3, + WriteCostPerByte: 4, + CPUMsCost: 5, + } + ctr, err = controller.NewResourceGroupController(suite.ctx, 1, cli, ruConfig) + re.NoError(err) + config = ctr.GetConfig() + re.NotNil(config) + expectedConfig = controller.GenerateConfig(ruConfig) + re.Equal(expectedConfig.ReadBaseCost, config.ReadBaseCost) + re.Equal(expectedConfig.ReadBytesCost, config.ReadBytesCost) + re.Equal(expectedConfig.WriteBaseCost, config.WriteBaseCost) + re.Equal(expectedConfig.WriteBytesCost, config.WriteBytesCost) + re.Equal(expectedConfig.CPUMsCost, config.CPUMsCost) +}