From 4a631f65cd8c5effdc93ecaa7ab62ec753e160e6 Mon Sep 17 00:00:00 2001 From: nolouch Date: Mon, 4 Sep 2023 23:18:14 +0800 Subject: [PATCH 1/6] resource_control: supports dynamically change the controller config Signed-off-by: nolouch --- client/go.mod | 1 + client/go.sum | 1 + client/resource_group/controller/config.go | 44 ++++---- .../resource_group/controller/controller.go | 103 +++++++++++++----- client/resource_group/controller/util.go | 68 ++++++++++++ client/resource_group/controller/util_test.go | 51 +++++++++ client/resource_manager_client.go | 10 +- pkg/mcs/resourcemanager/server/apis/v1/api.go | 45 ++++++++ pkg/mcs/resourcemanager/server/config.go | 6 + pkg/mcs/resourcemanager/server/config_test.go | 2 + pkg/mcs/resourcemanager/server/manager.go | 54 ++++++++- pkg/storage/endpoint/resource_group.go | 6 + .../resourcemanager/resource_manager_test.go | 94 ++++++++++++++++ 13 files changed, 432 insertions(+), 53 deletions(-) create mode 100644 client/resource_group/controller/util.go create mode 100644 client/resource_group/controller/util_test.go diff --git a/client/go.mod b/client/go.mod index 9eb066d0fcc..099bcf86296 100644 --- a/client/go.mod +++ b/client/go.mod @@ -3,6 +3,7 @@ module github.com/tikv/pd/client go 1.21 require ( + github.com/BurntSushi/toml v0.3.1 github.com/cloudfoundry/gosigar v1.3.6 github.com/gogo/protobuf v1.3.2 github.com/opentracing/opentracing-go v1.2.0 diff --git a/client/go.sum b/client/go.sum index 33ba3254d53..9261ab4a999 100644 --- a/client/go.sum +++ b/client/go.sum @@ -1,4 +1,5 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/client/resource_group/controller/config.go b/client/resource_group/controller/config.go index 2095bc60601..16a2525cd0d 100644 --- a/client/resource_group/controller/config.go +++ b/client/resource_group/controller/config.go @@ -51,7 +51,7 @@ const ( // According to the resource control Grafana panel and Prometheus sampling period, the period should be the factor of 15. defaultTargetPeriod = 5 * time.Second // defaultMaxWaitDuration is the max duration to wait for the token before throwing error. - defaultMaxWaitDuration = time.Second + defaultMaxWaitDuration = 30 * time.Second ) const ( @@ -73,14 +73,17 @@ const ( // Because the resource manager has not been deployed in microservice mode, // do not enable this function. - defaultDegradedModeWaitDuration = "0s" + defaultDegradedModeWaitDuration = 0 defaultAvgBatchProportion = 0.7 ) // Config is the configuration of the resource manager controller which includes some option for client needed. type Config struct { // EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect. - DegradedModeWaitDuration string `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"` + DegradedModeWaitDuration Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"` + + // LTBMaxWaitDuration is the max wait time duration for local token bucket. + LTBMaxWaitDuration Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"` // RequestUnit is the configuration determines the coefficients of the RRU and WRU cost. // This configuration should be modified carefully. @@ -90,7 +93,8 @@ type Config struct { // DefaultConfig returns the default resource manager controller configuration. func DefaultConfig() *Config { return &Config{ - DegradedModeWaitDuration: defaultDegradedModeWaitDuration, + DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration), + LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration), RequestUnit: DefaultRequestUnitConfig(), } } @@ -143,8 +147,10 @@ type RUConfig struct { WriteBytesCost RequestUnit CPUMsCost RequestUnit // The CPU statistics need to distinguish between different environments. - isSingleGroupByKeyspace bool - maxWaitDuration time.Duration + isSingleGroupByKeyspace bool + + // some config for client + LTBMaxWaitDuration time.Duration DegradedModeWaitDuration time.Duration } @@ -157,21 +163,15 @@ func DefaultRUConfig() *RUConfig { // GenerateRUConfig generates the configuration by the given request unit configuration. func GenerateRUConfig(config *Config) *RUConfig { - cfg := &RUConfig{ - ReadBaseCost: RequestUnit(config.RequestUnit.ReadBaseCost), - ReadPerBatchBaseCost: RequestUnit(config.RequestUnit.ReadPerBatchBaseCost), - ReadBytesCost: RequestUnit(config.RequestUnit.ReadCostPerByte), - WriteBaseCost: RequestUnit(config.RequestUnit.WriteBaseCost), - WritePerBatchBaseCost: RequestUnit(config.RequestUnit.WritePerBatchBaseCost), - WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte), - CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost), - maxWaitDuration: defaultMaxWaitDuration, - } - duration, err := time.ParseDuration(config.DegradedModeWaitDuration) - if err != nil { - cfg.DegradedModeWaitDuration, _ = time.ParseDuration(defaultDegradedModeWaitDuration) - } else { - cfg.DegradedModeWaitDuration = duration + return &RUConfig{ + ReadBaseCost: RequestUnit(config.RequestUnit.ReadBaseCost), + ReadPerBatchBaseCost: RequestUnit(config.RequestUnit.ReadPerBatchBaseCost), + ReadBytesCost: RequestUnit(config.RequestUnit.ReadCostPerByte), + WriteBaseCost: RequestUnit(config.RequestUnit.WriteBaseCost), + WritePerBatchBaseCost: RequestUnit(config.RequestUnit.WritePerBatchBaseCost), + WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte), + CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost), + LTBMaxWaitDuration: config.LTBMaxWaitDuration.Duration, + DegradedModeWaitDuration: config.DegradedModeWaitDuration.Duration, } - return cfg } diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index c79bfec1e56..356ca998935 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -89,7 +89,7 @@ func EnableSingleGroupByKeyspace() ResourceControlCreateOption { // WithMaxWaitDuration is the option to set the max wait duration for acquiring token buckets. func WithMaxWaitDuration(d time.Duration) ResourceControlCreateOption { return func(controller *ResourceGroupsController) { - controller.ruConfig.maxWaitDuration = d + controller.ruConfig.LTBMaxWaitDuration = d } } @@ -122,6 +122,8 @@ type ResourceGroupsController struct { // Currently, we don't do multiple `AcquireTokenBuckets`` at the same time, so there are no concurrency problems with `currentRequests`. currentRequests []*rmpb.TokenBucketRequest } + + opts []ResourceControlCreateOption } // NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor @@ -139,7 +141,7 @@ func NewResourceGroupController( if requestUnitConfig != nil { config.RequestUnit = *requestUnitConfig } - log.Info("load resource controller config", zap.Reflect("config", config)) + ruConfig := GenerateRUConfig(config) controller := &ResourceGroupsController{ clientUniqueID: clientUniqueID, @@ -148,10 +150,12 @@ func NewResourceGroupController( lowTokenNotifyChan: make(chan struct{}, 1), tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1), tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen), + opts: opts, } for _, opt := range opts { opt(controller) } + log.Info("load resource controller config", zap.Reflect("config", config), zap.Reflect("ru-config", controller.ruConfig)) controller.calculators = []ResourceCalculator{newKVCalculator(controller.ruConfig), newSQLCalculator(controller.ruConfig)} return controller, nil } @@ -213,22 +217,61 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { stateUpdateTicker = time.NewTicker(time.Millisecond * 100) }) - _, revision, err := c.provider.LoadResourceGroups(ctx) + _, metaRevision, err := c.provider.LoadResourceGroups(ctx) + if err != nil { + log.Warn("load resource group revision failed", zap.Error(err)) + } + _, cfgRevision, err := c.provider.LoadGlobalConfig(ctx, nil, controllerConfigPath) if err != nil { log.Warn("load resource group revision failed", zap.Error(err)) } - var watchChannel chan []*meta_storagepb.Event + var watchMetaChannel, watchConfigChannel chan []*meta_storagepb.Event if !c.ruConfig.isSingleGroupByKeyspace { - watchChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(revision), pd.WithPrefix()) + watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix()) + if err != nil { + log.Warn("watch resource group meta failed", zap.Error(err)) + } } - watchRetryTimer := time.NewTimer(watchRetryInterval) - if err == nil || c.ruConfig.isSingleGroupByKeyspace { - watchRetryTimer.Stop() + watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix()) + if err != nil { + log.Warn("watch resource group config failed", zap.Error(err)) } + watchRetryTimer := time.NewTimer(watchRetryInterval) defer watchRetryTimer.Stop() for { select { + /* tickers */ + case <-cleanupTicker.C: + c.cleanUpResourceGroup() + case <-stateUpdateTicker.C: + c.executeOnAllGroups((*groupCostController).updateRunState) + c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) + if len(c.run.currentRequests) == 0 { + c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */) + } + case <-watchRetryTimer.C: + if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil { + watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix()) + if err != nil { + log.Warn("watch resource group meta failed", zap.Error(err)) + watchRetryTimer.Reset(watchRetryInterval) + failpoint.Inject("watchStreamError", func() { + watchRetryTimer.Reset(20 * time.Millisecond) + }) + } + } + if watchConfigChannel == nil { + watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix()) + if err != nil { + log.Warn("watch resource group config failed", zap.Error(err)) + watchRetryTimer.Reset(watchRetryInterval) + } + } + + case <-emergencyTokenAcquisitionTicker.C: + c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition) + /* channels */ case <-c.loopCtx.Done(): resourceGroupStatusGauge.Reset() return @@ -242,14 +285,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { c.handleTokenBucketResponse(resp) } c.run.currentRequests = nil - case <-cleanupTicker.C: - c.cleanUpResourceGroup() - case <-stateUpdateTicker.C: - c.executeOnAllGroups((*groupCostController).updateRunState) - c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) - if len(c.run.currentRequests) == 0 { - c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */) - } case <-c.lowTokenNotifyChan: c.executeOnAllGroups((*groupCostController).updateRunState) c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) @@ -259,16 +294,14 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { if c.run.inDegradedMode { c.executeOnAllGroups((*groupCostController).applyDegradedMode) } - case <-emergencyTokenAcquisitionTicker.C: - c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition) - case resp, ok := <-watchChannel: + case resp, ok := <-watchMetaChannel: failpoint.Inject("disableWatch", func() { if c.ruConfig.isSingleGroupByKeyspace { panic("disableWatch") } }) if !ok { - watchChannel = nil + watchMetaChannel = nil watchRetryTimer.Reset(watchRetryInterval) failpoint.Inject("watchStreamError", func() { watchRetryTimer.Reset(20 * time.Millisecond) @@ -276,7 +309,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { continue } for _, item := range resp { - revision = item.Kv.ModRevision + metaRevision = item.Kv.ModRevision group := &rmpb.ResourceGroup{} if err := proto.Unmarshal(item.Kv.Value, group); err != nil { continue @@ -293,14 +326,32 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { } } } - case <-watchRetryTimer.C: - watchChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(revision), pd.WithPrefix()) - if err != nil { + case resp, ok := <-watchConfigChannel: + if !ok { + watchConfigChannel = nil watchRetryTimer.Reset(watchRetryInterval) failpoint.Inject("watchStreamError", func() { watchRetryTimer.Reset(20 * time.Millisecond) }) + continue } + for _, item := range resp { + cfgRevision = item.Kv.ModRevision + if !strings.HasPrefix(string(item.Kv.Key), controllerConfigPath) { + continue + } + config := &Config{} + if err := json.Unmarshal(item.Kv.Value, config); err != nil { + continue + } + c.ruConfig = GenerateRUConfig(config) + // Stay compatible with serverless + for _, opt := range c.opts { + opt(c) + } + log.Info("load resource controller config after config changed", zap.Reflect("config", config), zap.Reflect("ruConfig", c.ruConfig)) + } + case gc := <-c.tokenBucketUpdateChan: now := gc.run.now go gc.handleTokenBucketUpdateEvent(c.loopCtx, now) @@ -1127,7 +1178,7 @@ func (gc *groupCostController) onRequestWait( res := make([]*Reservation, 0, len(requestResourceLimitTypeList)) for typ, counter := range gc.run.resourceTokens { if v := getRawResourceValueFromConsumption(delta, typ); v > 0 { - res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v)) + res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v)) } } if d, err = WaitReservations(ctx, now, res); err == nil { @@ -1137,7 +1188,7 @@ func (gc *groupCostController) onRequestWait( res := make([]*Reservation, 0, len(requestUnitLimitTypeList)) for typ, counter := range gc.run.requestUnitTokens { if v := getRUValueFromConsumption(delta, typ); v > 0 { - res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v)) + res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v)) } } if d, err = WaitReservations(ctx, now, res); err == nil { diff --git a/client/resource_group/controller/util.go b/client/resource_group/controller/util.go new file mode 100644 index 00000000000..e3450e0ae0d --- /dev/null +++ b/client/resource_group/controller/util.go @@ -0,0 +1,68 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "fmt" + "strconv" + "time" + + "github.com/pingcap/errors" +) + +// Duration is a wrapper of time.Duration for TOML and JSON. +type Duration struct { + time.Duration +} + +// NewDuration creates a Duration from time.Duration. +func NewDuration(duration time.Duration) Duration { + return Duration{Duration: duration} +} + +// MarshalJSON returns the duration as a JSON string. +func (d *Duration) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`"%s"`, d.String())), nil +} + +// UnmarshalJSON parses a JSON string into the duration. +func (d *Duration) UnmarshalJSON(text []byte) error { + s, err := strconv.Unquote(string(text)) + if err != nil { + return errors.WithStack(err) + } + duration, err := time.ParseDuration(s) + if err != nil { + return errors.WithStack(err) + } + d.Duration = duration + return nil +} + +// UnmarshalText parses a TOML string into the duration. +func (d *Duration) UnmarshalText(text []byte) error { + var err error + d.Duration, err = time.ParseDuration(string(text)) + return errors.WithStack(err) +} + +// MarshalText returns the duration as a JSON string. +func (d Duration) MarshalText() ([]byte, error) { + return []byte(d.String()), nil +} diff --git a/client/resource_group/controller/util_test.go b/client/resource_group/controller/util_test.go new file mode 100644 index 00000000000..b542e6713dc --- /dev/null +++ b/client/resource_group/controller/util_test.go @@ -0,0 +1,51 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "encoding/json" + "testing" + + "github.com/BurntSushi/toml" + "github.com/stretchr/testify/require" +) + +type example struct { + Interval Duration `json:"interval" toml:"interval"` +} + +func TestDurationJSON(t *testing.T) { + t.Parallel() + re := require.New(t) + example := &example{} + + text := []byte(`{"interval":"1h1m1s"}`) + re.NoError(json.Unmarshal(text, example)) + re.Equal(float64(60*60+60+1), example.Interval.Seconds()) + + b, err := json.Marshal(example) + re.NoError(err) + re.Equal(string(text), string(b)) +} + +func TestDurationTOML(t *testing.T) { + t.Parallel() + re := require.New(t) + example := &example{} + + text := []byte(`interval = "1h1m1s"`) + re.Nil(toml.Unmarshal(text, example)) + re.Equal(float64(60*60+60+1), example.Interval.Seconds()) +} diff --git a/client/resource_manager_client.go b/client/resource_manager_client.go index 61919a2ccb2..68b2de66ae2 100644 --- a/client/resource_manager_client.go +++ b/client/resource_manager_client.go @@ -31,9 +31,10 @@ import ( type actionType int const ( - add actionType = 0 - modify actionType = 1 - groupSettingsPathPrefix = "resource_group/settings" + add actionType = 0 + modify actionType = 1 + groupSettingsPathPrefix = "resource_group/settings" + controllerConfigPathPrefix = "resource_group/controller" // errNotPrimary is returned when the requested server is not primary. errNotPrimary = "not primary" // errNotLeader is returned when the requested server is not pd leader. @@ -43,6 +44,9 @@ const ( // GroupSettingsPathPrefixBytes is used to watch or get resource groups. var GroupSettingsPathPrefixBytes = []byte(groupSettingsPathPrefix) +// ControllerConfigPathPrefixBytes is used to watch or get controller config. +var ControllerConfigPathPrefixBytes = []byte(controllerConfigPathPrefix) + // ResourceManagerClient manages resource group info and token request. type ResourceManagerClient interface { ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) diff --git a/pkg/mcs/resourcemanager/server/apis/v1/api.go b/pkg/mcs/resourcemanager/server/apis/v1/api.go index 411933e55c3..b769685b54f 100644 --- a/pkg/mcs/resourcemanager/server/apis/v1/api.go +++ b/pkg/mcs/resourcemanager/server/apis/v1/api.go @@ -16,7 +16,9 @@ package apis import ( "errors" + "fmt" "net/http" + "reflect" "sync" "github.com/gin-contrib/cors" @@ -29,6 +31,7 @@ import ( "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" + "github.com/tikv/pd/pkg/utils/reflectutil" ) // APIPathPrefix is the prefix of the API path. @@ -97,6 +100,8 @@ func (s *Service) RegisterRouter() { configEndpoint.GET("/group/:name", s.getResourceGroup) configEndpoint.GET("/groups", s.getResourceGroupList) configEndpoint.DELETE("/group/:name", s.deleteResourceGroup) + configEndpoint.GET("/controller", s.getControllerConfig) + configEndpoint.POST("/controller", s.setControllerConfig) } func (s *Service) handler() http.Handler { @@ -191,3 +196,43 @@ func (s *Service) deleteResourceGroup(c *gin.Context) { } c.String(http.StatusOK, "Success!") } + +// GetControllerConfig +// +// @Tags ResourceManager +// @Summary Get the resource controller config. +// @Success 200 {string} json format of rmserver.ControllerConfig +// @Failure 400 {string} error +// @Router /config/controller [GET] +func (s *Service) getControllerConfig(c *gin.Context) { + config := s.manager.GetControllerConfig() + c.IndentedJSON(http.StatusOK, config) +} + +// SetControllerConfig +// +// @Tags ResourceManager +// @Summary Set the resource controller config. +// @Param config body object true "json params, rmserver.ControllerConfig" +// @Success 200 {string} string "Success!" +// @Failure 400 {string} error +// @Router /config/controller [POST] +func (s *Service) setControllerConfig(c *gin.Context) { + conf := make(map[string]interface{}) + if err := c.ShouldBindJSON(&conf); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + for k, v := range conf { + key := reflectutil.FindJSONFullTagByChildTag(reflect.TypeOf(rmserver.ControllerConfig{}), k) + if key == "" { + c.String(http.StatusBadRequest, fmt.Sprintf("config item %s not found", k)) + return + } + if err := s.manager.UpdateControllerConfigItem(key, v); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + } + c.String(http.StatusOK, "Success!") +} diff --git a/pkg/mcs/resourcemanager/server/config.go b/pkg/mcs/resourcemanager/server/config.go index 51fbe388458..3f64b2987fd 100644 --- a/pkg/mcs/resourcemanager/server/config.go +++ b/pkg/mcs/resourcemanager/server/config.go @@ -57,6 +57,8 @@ const ( // Because the resource manager has not been deployed in microservice mode, // do not enable this function. defaultDegradedModeWaitDuration = time.Second * 0 + // defaultMaxWaitDuration is the max duration to wait for the token before throwing error. + defaultMaxWaitDuration = 30 * time.Second ) // Config is the configuration for the resource manager. @@ -94,6 +96,9 @@ type ControllerConfig struct { // EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect. DegradedModeWaitDuration typeutil.Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"` + // LTBMaxWaitDuration is the max wait time duration for local token bucket. + LTBMaxWaitDuration typeutil.Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"` + // RequestUnit is the configuration determines the coefficients of the RRU and WRU cost. // This configuration should be modified carefully. RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"` @@ -107,6 +112,7 @@ func (rmc *ControllerConfig) Adjust(meta *configutil.ConfigMetaData) { rmc.RequestUnit.Adjust() configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, defaultDegradedModeWaitDuration) + configutil.AdjustDuration(&rmc.LTBMaxWaitDuration, defaultMaxWaitDuration) failpoint.Inject("enableDegradedMode", func() { configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, time.Second) }) diff --git a/pkg/mcs/resourcemanager/server/config_test.go b/pkg/mcs/resourcemanager/server/config_test.go index c0cac4da9c0..dd8dd2d2814 100644 --- a/pkg/mcs/resourcemanager/server/config_test.go +++ b/pkg/mcs/resourcemanager/server/config_test.go @@ -27,6 +27,7 @@ func TestControllerConfig(t *testing.T) { re := require.New(t) cfgData := ` [controller] +ltb-max-wait-duration = "60s" degraded-mode-wait-duration = "2s" [controller.request-unit] read-base-cost = 1.0 @@ -42,6 +43,7 @@ read-cpu-ms-cost = 5.0 re.NoError(err) re.Equal(cfg.Controller.DegradedModeWaitDuration.Duration, time.Second*2) + re.Equal(cfg.Controller.LTBMaxWaitDuration.Duration, time.Second*60) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.CPUMsCost-5), 1e-7) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteCostPerByte-4), 1e-7) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteBaseCost-3), 1e-7) diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index a9e53f347fa..320e124a942 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -19,6 +19,7 @@ import ( "encoding/json" "math" "sort" + "strings" "sync" "time" @@ -26,10 +27,12 @@ import ( "github.com/pingcap/failpoint" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/log" + "github.com/pkg/errors" bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/jsonutil" "github.com/tikv/pd/pkg/utils/logutil" "go.uber.org/zap" ) @@ -102,8 +105,14 @@ func (m *Manager) GetBasicServer() bs.Server { // Init initializes the resource group manager. func (m *Manager) Init(ctx context.Context) { - // Todo: If we can modify following configs in the future, we should reload these configs. - // Store the controller config into the storage. + v, err := m.storage.LoadControllerConfig() + if err != nil { + log.Error("resource controller config load failed, fallback to default config", zap.Error(err), zap.String("v", v)) + } else if err = json.Unmarshal([]byte(v), &m.controllerConfig); err != nil { + log.Error("un-marshall controller config failed", zap.Error(err), zap.String("v", v)) + } + + // re-save the config to make sure the config has been persisted. m.storage.SaveControllerConfig(m.controllerConfig) // Load resource group meta info from storage. m.groups = make(map[string]*ResourceGroup) @@ -158,6 +167,47 @@ func (m *Manager) Init(ctx context.Context) { log.Info("resource group manager finishes initialization") } +// UpdateControllerConfigItem updates the controller config item. +func (m *Manager) UpdateControllerConfigItem(key string, value interface{}) error { + kp := strings.Split(key, ".") + if len(kp) == 0 { + return errors.Errorf("invalid key %s", key) + } + m.Lock() + var config interface{} + switch kp[0] { + case "request-unit": + config = &m.controllerConfig.RequestUnit + default: + config = m.controllerConfig + } + updated, found, err := jsonutil.AddKeyValue(config, kp[len(kp)-1], value) + if err != nil { + m.Unlock() + return err + } + + if !found { + m.Unlock() + return errors.Errorf("config item %s not found", key) + } + m.Unlock() + if updated { + if err := m.storage.SaveControllerConfig(m.controllerConfig); err != nil { + log.Error("save controller config failed", zap.Error(err)) + } + log.Info("updated controller config item", zap.String("key", key), zap.Any("value", value)) + } + return nil +} + +// GetControllerConfig returns the controller config. +func (m *Manager) GetControllerConfig() *ControllerConfig { + m.RLock() + defer m.RUnlock() + return m.controllerConfig +} + // AddResourceGroup puts a resource group. // NOTE: AddResourceGroup should also be idempotent because tidb depends // on this retry mechanism. diff --git a/pkg/storage/endpoint/resource_group.go b/pkg/storage/endpoint/resource_group.go index f1b3feb36aa..150ea77a1c7 100644 --- a/pkg/storage/endpoint/resource_group.go +++ b/pkg/storage/endpoint/resource_group.go @@ -27,6 +27,7 @@ type ResourceGroupStorage interface { SaveResourceGroupStates(name string, obj interface{}) error DeleteResourceGroupStates(name string) error SaveControllerConfig(config interface{}) error + LoadControllerConfig() (string, error) } var _ ResourceGroupStorage = (*StorageEndpoint)(nil) @@ -65,3 +66,8 @@ func (se *StorageEndpoint) LoadResourceGroupStates(f func(k, v string)) error { func (se *StorageEndpoint) SaveControllerConfig(config interface{}) error { return se.saveJSON(controllerConfigPath, config) } + +// LoadControllerConfig loads the resource controller config from storage. +func (se *StorageEndpoint) LoadControllerConfig() (string, error) { + return se.Load(controllerConfigPath) +} diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index e0d295c825a..497c55c176a 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "io" + "math/rand" "net/http" "strconv" "strings" @@ -1265,3 +1266,96 @@ func (suite *resourceManagerClientTestSuite) TestSkipConsumptionForBackgroundJob c.Stop() } + +func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigChanged() { + re := suite.Require() + cli := suite.client + for _, group := range suite.initGroups { + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + } + c1, err := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) + re.NoError(err) + c1.Start(suite.ctx) + // with client option + c2, err := controller.NewResourceGroupController(suite.ctx, 2, cli, nil, controller.WithMaxWaitDuration(time.Hour)) + re.NoError(err) + c2.Start(suite.ctx) + + // helper function for sending HTTP requests and checking responses + sendRequest := func(method, url string, body io.Reader) []byte { + req, err := http.NewRequest(method, url, body) + re.NoError(err) + resp, err := http.DefaultClient.Do(req) + re.NoError(err) + defer resp.Body.Close() + bytes, err := io.ReadAll(resp.Body) + re.NoError(err) + if resp.StatusCode != http.StatusOK { + re.Fail(string(bytes)) + } + return bytes + } + + getAddr := func() string { + server := suite.cluster.GetServer(suite.cluster.GetLeader()) + if rand.Intn(100)%2 == 1 { + server = suite.cluster.GetServer(suite.cluster.GetFollower()) + } + return server.GetAddr() + } + + configURL := "/resource-manager/api/v1/config/controller" + waitDuration := 10 * time.Second + readBaseCost := 1.5 + defaultCfg := controller.DefaultConfig() + // failpoint enableDegradedMode will setup and set it be 1s. + defaultCfg.DegradedModeWaitDuration.Duration = time.Second + expectRUCfg := controller.GenerateRUConfig(defaultCfg) + // initial config verification + respString := sendRequest("GET", getAddr()+configURL, nil) + defaultString, err := json.Marshal(defaultCfg) + re.NoError(err) + re.JSONEq(string(respString), string(defaultString)) + re.EqualValues(expectRUCfg, c1.GetConfig()) + + testCases := []struct { + configJSON string + value interface{} + expected func(ruConfig *controller.RUConfig) + }{ + { + configJSON: fmt.Sprintf(`{"degraded-mode-wait-duration": "%v"}`, waitDuration), + value: waitDuration, + expected: func(ruConfig *controller.RUConfig) { ruConfig.DegradedModeWaitDuration = waitDuration }, + }, + { + configJSON: fmt.Sprintf(`{"ltb-max-wait-duration": "%v"}`, waitDuration), + value: waitDuration, + expected: func(ruConfig *controller.RUConfig) { ruConfig.LTBMaxWaitDuration = waitDuration }, + }, + { + configJSON: fmt.Sprintf(`{"read-base-cost": %v}`, readBaseCost), + value: readBaseCost, + expected: func(ruConfig *controller.RUConfig) { ruConfig.ReadBaseCost = controller.RequestUnit(readBaseCost) }, + }, + { + configJSON: fmt.Sprintf(`{"write-base-cost": %v}`, readBaseCost*2), + value: readBaseCost * 2, + expected: func(ruConfig *controller.RUConfig) { ruConfig.WriteBaseCost = controller.RequestUnit(readBaseCost * 2) }, + }, + } + // change properties one by one and verify each time + for _, t := range testCases { + sendRequest("POST", getAddr()+configURL, strings.NewReader(t.configJSON)) + time.Sleep(500 * time.Millisecond) + t.expected(expectRUCfg) + re.EqualValues(expectRUCfg, c1.GetConfig()) + + expectRUCfg2 := *expectRUCfg + // always apply the client option + expectRUCfg2.LTBMaxWaitDuration = time.Hour + re.EqualValues(&expectRUCfg2, c2.GetConfig()) + } +} From 423b58301571214f838118a5195777479cadefdf Mon Sep 17 00:00:00 2001 From: nolouch Date: Wed, 6 Sep 2023 16:53:37 +0800 Subject: [PATCH 2/6] stable test Signed-off-by: nolouch --- tests/integrations/mcs/resourcemanager/resource_manager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index 497c55c176a..64771220872 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -1282,7 +1282,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigCh c2, err := controller.NewResourceGroupController(suite.ctx, 2, cli, nil, controller.WithMaxWaitDuration(time.Hour)) re.NoError(err) c2.Start(suite.ctx) - + time.Sleep(500 * time.Millisecond) // helper function for sending HTTP requests and checking responses sendRequest := func(method, url string, body io.Reader) []byte { req, err := http.NewRequest(method, url, body) From 43ca2fd55698d26a923d056eb945654992b9c9a6 Mon Sep 17 00:00:00 2001 From: nolouch Date: Wed, 6 Sep 2023 18:19:05 +0800 Subject: [PATCH 3/6] fix race Signed-off-by: nolouch --- client/resource_group/controller/controller.go | 11 +++++++++-- .../mcs/resourcemanager/resource_manager_test.go | 7 ++++++- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 356ca998935..382b48f14f2 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -124,6 +124,9 @@ type ResourceGroupsController struct { } opts []ResourceControlCreateOption + + // a cache for ru config and make concurrency safe. + safeRuConfig atomic.Pointer[RUConfig] } // NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor @@ -157,6 +160,7 @@ func NewResourceGroupController( } log.Info("load resource controller config", zap.Reflect("config", config), zap.Reflect("ru-config", controller.ruConfig)) controller.calculators = []ResourceCalculator{newKVCalculator(controller.ruConfig), newSQLCalculator(controller.ruConfig)} + controller.safeRuConfig.Store(controller.ruConfig) return controller, nil } @@ -177,9 +181,9 @@ func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*Con return config, nil } -// GetConfig returns the config of controller. It's only used for test. +// GetConfig returns the config of controller. func (c *ResourceGroupsController) GetConfig() *RUConfig { - return c.ruConfig + return c.safeRuConfig.Load() } // Source List @@ -345,10 +349,13 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { continue } c.ruConfig = GenerateRUConfig(config) + // Stay compatible with serverless for _, opt := range c.opts { opt(c) } + copyCfg := *c.ruConfig + c.safeRuConfig.Store(©Cfg) log.Info("load resource controller config after config changed", zap.Reflect("config", config), zap.Reflect("ruConfig", c.ruConfig)) } diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index d07c7ec3c87..ae88435658c 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -1282,7 +1282,6 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigCh c2, err := controller.NewResourceGroupController(suite.ctx, 2, cli, nil, controller.WithMaxWaitDuration(time.Hour)) re.NoError(err) c2.Start(suite.ctx) - time.Sleep(500 * time.Millisecond) // helper function for sending HTTP requests and checking responses sendRequest := func(method, url string, body io.Reader) []byte { req, err := http.NewRequest(method, url, body) @@ -1345,6 +1344,12 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigCh value: readBaseCost * 2, expected: func(ruConfig *controller.RUConfig) { ruConfig.WriteBaseCost = controller.RequestUnit(readBaseCost * 2) }, }, + { + // reset the degraded-mode-wait-duration to default in test. + configJSON: fmt.Sprintf(`{"degraded-mode-wait-duration": "%v"}`, time.Second), + value: time.Second, + expected: func(ruConfig *controller.RUConfig) { ruConfig.DegradedModeWaitDuration = time.Second }, + }, } // change properties one by one and verify each time for _, t := range testCases { From e9177df9e2ca4188db65a3b310eb6a1d15d2b820 Mon Sep 17 00:00:00 2001 From: nolouch Date: Mon, 11 Sep 2023 16:28:17 +0800 Subject: [PATCH 4/6] address Signed-off-by: nolouch --- client/resource_group/controller/controller.go | 14 +++++++++----- .../mcs/resourcemanager/resource_manager_test.go | 5 +++++ 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 382b48f14f2..972f2097140 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -71,9 +71,11 @@ type ResourceGroupProvider interface { ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) - LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]pd.GlobalConfigItem, int64, error) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) + + // meta storage client Watch(ctx context.Context, key []byte, opts ...pd.OpOption) (chan []*meta_storagepb.Event, error) + Get(ctx context.Context, key []byte, opts ...pd.OpOption) (*meta_storagepb.GetResponse, error) } // ResourceControlCreateOption create a ResourceGroupsController with the optional settings. @@ -165,16 +167,16 @@ func NewResourceGroupController( } func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*Config, error) { - items, _, err := provider.LoadGlobalConfig(ctx, nil, controllerConfigPath) + resp, err := provider.Get(ctx, []byte(controllerConfigPath)) if err != nil { return nil, err } - if len(items) == 0 { + if len(resp.Kvs) == 0 { log.Warn("[resource group controller] server does not save config, load config failed") return DefaultConfig(), nil } config := &Config{} - err = json.Unmarshal(items[0].PayLoad, config) + err = json.Unmarshal(resp.Kvs[0].GetValue(), config) if err != nil { return nil, err } @@ -225,10 +227,11 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { if err != nil { log.Warn("load resource group revision failed", zap.Error(err)) } - _, cfgRevision, err := c.provider.LoadGlobalConfig(ctx, nil, controllerConfigPath) + resp, err := c.provider.Get(ctx, []byte(controllerConfigPath)) if err != nil { log.Warn("load resource group revision failed", zap.Error(err)) } + cfgRevision := resp.GetHeader().GetRevision() var watchMetaChannel, watchConfigChannel chan []*meta_storagepb.Event if !c.ruConfig.isSingleGroupByKeyspace { watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix()) @@ -236,6 +239,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { log.Warn("watch resource group meta failed", zap.Error(err)) } } + watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix()) if err != nil { log.Warn("watch resource group config failed", zap.Error(err)) diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index ae88435658c..546339bee0f 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -1363,4 +1363,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigCh expectRUCfg2.LTBMaxWaitDuration = time.Hour re.EqualValues(&expectRUCfg2, c2.GetConfig()) } + // restart c1 + c1.Stop() + c1, err = controller.NewResourceGroupController(suite.ctx, 1, cli, nil) + re.NoError(err) + re.EqualValues(expectRUCfg, c1.GetConfig()) } From a40b3c9416b09d1d9fc63cd592eee47ec550a954 Mon Sep 17 00:00:00 2001 From: nolouch Date: Mon, 11 Sep 2023 17:22:45 +0800 Subject: [PATCH 5/6] fix Signed-off-by: nolouch --- go.mod | 2 +- pkg/mcs/resourcemanager/server/manager.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 05f60191210..503ac53f33f 100644 --- a/go.mod +++ b/go.mod @@ -145,7 +145,7 @@ require ( github.com/pelletier/go-toml/v2 v2.0.1 // indirect github.com/petermattis/goid v0.0.0-20211229010228-4d14c490ee36 // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect - github.com/pkg/errors v0.9.1 + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect github.com/prometheus/client_model v0.2.0 // indirect diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index 3e328abb01b..21866ee1156 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -24,10 +24,10 @@ import ( "time" "github.com/gogo/protobuf/proto" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/log" - "github.com/pkg/errors" bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/endpoint" From d154fbf8ee07ce2dac49f575c3735abf2aed2dec Mon Sep 17 00:00:00 2001 From: nolouch Date: Thu, 14 Sep 2023 16:23:39 +0800 Subject: [PATCH 6/6] address Signed-off-by: nolouch --- client/resource_group/controller/controller.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 972f2097140..528369df229 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -345,9 +345,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { } for _, item := range resp { cfgRevision = item.Kv.ModRevision - if !strings.HasPrefix(string(item.Kv.Key), controllerConfigPath) { - continue - } config := &Config{} if err := json.Unmarshal(item.Kv.Value, config); err != nil { continue