diff --git a/client/resource_group/controller/config.go b/client/resource_group/controller/config.go index 8c54532cbc6..6bad72f1fec 100644 --- a/client/resource_group/controller/config.go +++ b/client/resource_group/controller/config.go @@ -54,7 +54,7 @@ const ( const ( defaultReadBaseCost = 0.25 - defaultWriteBaseCost = 1.5 + defaultWriteBaseCost = 1 // 1 RU = 64 KiB read bytes defaultReadCostPerByte = 1. / (64 * 1024) // 1 RU = 1 KiB written bytes @@ -81,8 +81,8 @@ type RequestUnitConfig struct { CPUMsCost float64 `toml:"read-cpu-ms-cost" json:"read-cpu-ms-cost"` } -// DefaultRequestUnitConfig returns the default request unit configuration. -func DefaultRequestUnitConfig() *RequestUnitConfig { +// defaultRequestUnitConfig returns the default request unit configuration. +func defaultRequestUnitConfig() *RequestUnitConfig { return &RequestUnitConfig{ ReadBaseCost: defaultReadBaseCost, ReadCostPerByte: defaultReadCostPerByte, @@ -107,7 +107,7 @@ type Config struct { // DefaultConfig returns the default configuration. func DefaultConfig() *Config { cfg := generateConfig( - DefaultRequestUnitConfig(), + defaultRequestUnitConfig(), ) return cfg } diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 15d3b74dd19..46d773a2591 100644 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -92,6 +92,7 @@ type ResourceGroupsController struct { // NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor func NewResourceGroupController(clientUniqueID uint64, provider ResourceGroupProvider, requestUnitConfig *RequestUnitConfig) (*ResourceGroupsController, error) { + // TODO: initialize `requestUnitConfig`` from the remote manager server. var config *Config if requestUnitConfig != nil { config = generateConfig(requestUnitConfig) diff --git a/pkg/mcs/resource_manager/server/config.go b/pkg/mcs/resource_manager/server/config.go index 04cf89481d4..1a69fc67b72 100644 --- a/pkg/mcs/resource_manager/server/config.go +++ b/pkg/mcs/resource_manager/server/config.go @@ -38,6 +38,15 @@ const ( defaultLogFormat = "text" defaultDisableErrorVerbose = true + + defaultReadBaseCost = 0.25 + defaultWriteBaseCost = 1 + // 1 RU = 64 KiB read bytes + defaultReadCostPerByte = 1. / (64 * 1024) + // 1 RU = 1 KiB written bytes + defaultWriteCostPerByte = 1. / 1024 + // 1 RU = 3 millisecond CPU time + defaultCPUMsCost = 1. / 3 ) // Config is the configuration for the resource manager. @@ -51,11 +60,55 @@ type Config struct { Metric metricutil.MetricConfig `toml:"metric" json:"metric"` // Log related config. - Log log.Config `toml:"log" json:"log"` - + Log log.Config `toml:"log" json:"log"` Logger *zap.Logger LogProps *log.ZapProperties + Security configutil.SecurityConfig `toml:"security" json:"security"` + + // RequestUnit is the configuration determines the coefficients of the RRU and WRU cost. + // This configuration should be modified carefully. + RequestUnit RequestUnitConfig +} + +// RequestUnitConfig is the configuration of the request units, which determines the coefficients of +// the RRU and WRU cost. +type RequestUnitConfig struct { + // ReadBaseCost is the base cost for a read request. No matter how many bytes read/written or + // the CPU times taken for a request, this cost is inevitable. + ReadBaseCost float64 `toml:"read-base-cost" json:"read-base-cost"` + // ReadCostPerByte is the cost for each byte read. It's 1 RU = 64 KiB by default. + ReadCostPerByte float64 `toml:"read-cost-per-byte" json:"read-cost-per-byte"` + // WriteBaseCost is the base cost for a write request. No matter how many bytes read/written or + // the CPU times taken for a request, this cost is inevitable. + WriteBaseCost float64 `toml:"write-base-cost" json:"write-base-cost"` + // WriteCostPerByte is the cost for each byte written. It's 1 RU = 1 KiB by default. + WriteCostPerByte float64 `toml:"write-cost-per-byte" json:"write-cost-per-byte"` + // CPUMsCost is the cost for each millisecond of CPU time taken. + // It's 1 RU = 3 millisecond by default. + CPUMsCost float64 `toml:"read-cpu-ms-cost" json:"read-cpu-ms-cost"` +} + +// Adjust adjusts the configuration and initializes it with the default value if necessary. +func (ruc *RequestUnitConfig) Adjust() { + if ruc == nil { + return + } + if ruc.ReadBaseCost == 0 { + ruc.ReadBaseCost = defaultReadBaseCost + } + if ruc.ReadCostPerByte == 0 { + ruc.ReadCostPerByte = defaultReadCostPerByte + } + if ruc.WriteBaseCost == 0 { + ruc.WriteBaseCost = defaultWriteBaseCost + } + if ruc.WriteCostPerByte == 0 { + ruc.WriteCostPerByte = defaultWriteCostPerByte + } + if ruc.CPUMsCost == 0 { + ruc.CPUMsCost = defaultCPUMsCost + } } // NewConfig creates a new config. @@ -90,7 +143,7 @@ func (c *Config) Parse(flagSet *pflag.FlagSet) error { return c.Adjust(meta, false) } -// Adjust is used to adjust the PD configurations. +// Adjust is used to adjust the resource manager configurations. func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { configMetaData := configutil.NewConfigMetadata(meta) warningMsgs := make([]string, 0) @@ -107,7 +160,7 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { configutil.AdjustString(&c.Name, fmt.Sprintf("%s-%s", defaultName, hostname)) } configutil.AdjustString(&c.DataDir, fmt.Sprintf("default.%s", c.Name)) - adjustPath(&c.DataDir) + c.adjustPath() if err := c.Validate(); err != nil { return err @@ -127,13 +180,15 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { c.Log.Format = defaultLogFormat } + c.RequestUnit.Adjust() + return nil } -func adjustPath(p *string) { - absPath, err := filepath.Abs(*p) +func (c *Config) adjustPath() { + absPath, err := filepath.Abs(c.DataDir) if err == nil { - *p = absPath + c.DataDir = absPath } } diff --git a/pkg/mcs/resource_manager/server/grpc_service.go b/pkg/mcs/resource_manager/server/grpc_service.go index f57c2d3b41a..9d711ce8369 100644 --- a/pkg/mcs/resource_manager/server/grpc_service.go +++ b/pkg/mcs/resource_manager/server/grpc_service.go @@ -59,8 +59,8 @@ type Service struct { } // NewService creates a new resource manager service. -func NewService(svr bs.Server) registry.RegistrableService { - manager := NewManager(svr) +func NewService[T RUConfigProvider](svr bs.Server) registry.RegistrableService { + manager := NewManager[T](svr) return &Service{ ctx: svr.Context(), diff --git a/pkg/mcs/resource_manager/server/install/install.go b/pkg/mcs/resource_manager/server/install/install.go index 6cc47d21382..e8410f12b3b 100644 --- a/pkg/mcs/resource_manager/server/install/install.go +++ b/pkg/mcs/resource_manager/server/install/install.go @@ -28,5 +28,5 @@ func init() { // Install registers the API group and grpc service. func Install(register *registry.ServiceRegistry) { - register.RegisterService("ResourceManager", rm_server.NewService) + register.RegisterService("ResourceManager", rm_server.NewService[*rm_server.Server]) } diff --git a/pkg/mcs/resource_manager/server/manager.go b/pkg/mcs/resource_manager/server/manager.go index 424699d5d46..de2a131eaae 100644 --- a/pkg/mcs/resource_manager/server/manager.go +++ b/pkg/mcs/resource_manager/server/manager.go @@ -32,19 +32,19 @@ import ( "go.uber.org/zap" ) -const defaultConsumptionChanSize = 1024 - const ( - metricsCleanupInterval = time.Minute - metricsCleanupTimeout = 20 * time.Minute + defaultConsumptionChanSize = 1024 + metricsCleanupInterval = time.Minute + metricsCleanupTimeout = 20 * time.Minute ) // Manager is the manager of resource group. type Manager struct { sync.RWMutex - srv bs.Server - groups map[string]*ResourceGroup - storage endpoint.ResourceGroupStorage + srv bs.Server + ruConfig *RequestUnitConfig + groups map[string]*ResourceGroup + storage endpoint.ResourceGroupStorage // consumptionChan is used to send the consumption // info to the background metrics flusher. consumptionDispatcher chan struct { @@ -52,18 +52,26 @@ type Manager struct { *rmpb.Consumption } // record update time of each resource group - comsumptionRecord map[string]time.Time + consumptionRecord map[string]time.Time +} + +// RUConfigProvider is used to get RU config from the given +// `bs.server` without modifying its interface. +type RUConfigProvider interface { + GetRequestUnitConfig() *RequestUnitConfig } -// NewManager returns a new Manager. -func NewManager(srv bs.Server) *Manager { +// NewManager returns a new manager base on the given server, +// which should implement the `RUConfigProvider` interface. +func NewManager[T RUConfigProvider](srv bs.Server) *Manager { m := &Manager{ - groups: make(map[string]*ResourceGroup), + ruConfig: srv.(T).GetRequestUnitConfig(), + groups: make(map[string]*ResourceGroup), consumptionDispatcher: make(chan struct { resourceGroupName string *rmpb.Consumption }, defaultConsumptionChanSize), - comsumptionRecord: make(map[string]time.Time), + consumptionRecord: make(map[string]time.Time), } // The first initialization after the server is started. srv.AddStartCallback(func() { @@ -86,7 +94,9 @@ func (m *Manager) GetBasicServer() bs.Server { // Init initializes the resource group manager. func (m *Manager) Init(ctx context.Context) { - // Reset the resource groups first. + // Store the RU model config into the storage. + m.storage.SaveRequestUnitConfig(m.ruConfig) + // Load resource group meta info from storage. m.groups = make(map[string]*ResourceGroup) handler := func(k, v string) { group := &rmpb.ResourceGroup{} @@ -97,6 +107,7 @@ func (m *Manager) Init(ctx context.Context) { m.groups[group.Name] = FromProtoResourceGroup(group) } m.storage.LoadResourceGroupSettings(handler) + // Load resource group states from storage. tokenHandler := func(k, v string) { tokens := &GroupStates{} if err := json.Unmarshal([]byte(v), tokens); err != nil { @@ -289,11 +300,11 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { writeRequestCountMetrics.Add(consumption.KvWriteRpcCount) } - m.comsumptionRecord[name] = time.Now() + m.consumptionRecord[name] = time.Now() case <-ticker.C: // Clean up the metrics that have not been updated for a long time. - for name, lastTime := range m.comsumptionRecord { + for name, lastTime := range m.consumptionRecord { if time.Since(lastTime) > metricsCleanupTimeout { readRequestUnitCost.DeleteLabelValues(name) writeRequestUnitCost.DeleteLabelValues(name) @@ -303,7 +314,7 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { sqlCPUCost.DeleteLabelValues(name) requestCount.DeleteLabelValues(name, readTypeLabel) requestCount.DeleteLabelValues(name, writeTypeLabel) - delete(m.comsumptionRecord, name) + delete(m.consumptionRecord, name) } } } diff --git a/pkg/mcs/resource_manager/server/server.go b/pkg/mcs/resource_manager/server/server.go index e854685d5f4..f9c108fd695 100644 --- a/pkg/mcs/resource_manager/server/server.go +++ b/pkg/mcs/resource_manager/server/server.go @@ -126,6 +126,11 @@ func (s *Server) Close() { log.Info("resource manager server is closed") } +// GetRequestUnitConfig returns the RU config. +func (s *Server) GetRequestUnitConfig() *RequestUnitConfig { + return &s.cfg.RequestUnit +} + // GetClient returns builtin etcd client. func (s *Server) GetClient() *clientv3.Client { return s.etcdClient @@ -253,7 +258,7 @@ func (s *Server) GetPrimary() bs.MemberProvider { } func (s *Server) startServer() error { - manager := NewManager(s) + manager := NewManager[*Server](s) s.service = &Service{ ctx: s.ctx, manager: manager, diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 39b29979b89..bec75d5c78f 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -102,7 +102,11 @@ func (s *Server) Run() error { } // Close closes the server. -func (s *Server) Close() { +func (s *Server) Close() {} + +// GetConfigAny returns the config with any type. +func (s *Server) GetConfigAny() any { + return nil } // GetClient returns builtin etcd client. diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index 8fd6cfc750f..13ae012813a 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -45,6 +45,7 @@ const ( // resource group storage endpoint has prefix `resource_group` resourceGroupSettingsPath = "settings" resourceGroupStatesPath = "states" + requestUnitConfigPath = "ru_config" // tso storage endpoint has prefix `tso` microserviceKey = "microservice" tsoServiceKey = "tso" diff --git a/pkg/storage/endpoint/resource_group.go b/pkg/storage/endpoint/resource_group.go index 8cbc56df8e9..5527d6d4fd2 100644 --- a/pkg/storage/endpoint/resource_group.go +++ b/pkg/storage/endpoint/resource_group.go @@ -26,6 +26,7 @@ type ResourceGroupStorage interface { LoadResourceGroupStates(f func(k, v string)) error SaveResourceGroupStates(name string, obj interface{}) error DeleteResourceGroupStates(name string) error + SaveRequestUnitConfig(config interface{}) error } var _ ResourceGroupStorage = (*StorageEndpoint)(nil) @@ -59,3 +60,8 @@ func (se *StorageEndpoint) DeleteResourceGroupStates(name string) error { func (se *StorageEndpoint) LoadResourceGroupStates(f func(k, v string)) error { return se.loadRangeByPrefix(resourceGroupStatesPath+"/", f) } + +// SaveRequestUnitConfig stores the request unit config to storage. +func (se *StorageEndpoint) SaveRequestUnitConfig(config interface{}) error { + return se.saveJSON(requestUnitConfigPath, config) +} diff --git a/server/config/config.go b/server/config/config.go index e505b1c3eb9..8464a82c464 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -29,6 +29,7 @@ import ( "github.com/spf13/pflag" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/errs" + rm "github.com/tikv/pd/pkg/mcs/resource_manager/server" "github.com/tikv/pd/pkg/utils/configutil" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/metricutil" @@ -157,6 +158,8 @@ type Config struct { ReplicationMode ReplicationModeConfig `toml:"replication-mode" json:"replication-mode"` Keyspace KeyspaceConfig `toml:"keyspace" json:"keyspace"` + + RequestUnit rm.RequestUnitConfig `toml:"request-unit" json:"request-unit"` } // NewConfig creates a new config. @@ -502,6 +505,8 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { c.Log.Format = defaultLogFormat } + c.RequestUnit.Adjust() + return nil } diff --git a/server/server.go b/server/server.go index 4ea0ef231f0..95aec4f7042 100644 --- a/server/server.go +++ b/server/server.go @@ -236,7 +236,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, legacyServiceBuilders failpoint.Inject("useGlobalRegistry", func() { s.registry = registry.ServerServiceRegistry }) - s.registry.RegisterService("ResourceManager", rm_server.NewService) + s.registry.RegisterService("ResourceManager", rm_server.NewService[*Server]) // Register the micro services REST path. s.registry.InstallAllRESTHandler(s, etcdCfg.UserHandlers) @@ -1149,6 +1149,11 @@ func (s *Server) GetTLSConfig() *grpcutil.TLSConfig { return &s.cfg.Security.TLSConfig } +// GetRequestUnitConfig gets the RU config. +func (s *Server) GetRequestUnitConfig() *rm_server.RequestUnitConfig { + return &s.cfg.RequestUnit +} + // GetRaftCluster gets Raft cluster. // If cluster has not been bootstrapped, return nil. func (s *Server) GetRaftCluster() *cluster.RaftCluster {