Skip to content

Commit

Permalink
*: write RU model config into the storage (#6041)
Browse files Browse the repository at this point in the history
ref #6038

- Introduce the RU model config.
- Write the RU model config into the storage while initializing the resource manager.

Signed-off-by: JmPotato <ghzpotato@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
JmPotato and ti-chi-bot authored Feb 27, 2023
1 parent 10eeab1 commit 8d4438e
Show file tree
Hide file tree
Showing 12 changed files with 126 additions and 33 deletions.
8 changes: 4 additions & 4 deletions client/resource_group/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const (

const (
defaultReadBaseCost = 0.25
defaultWriteBaseCost = 1.5
defaultWriteBaseCost = 1
// 1 RU = 64 KiB read bytes
defaultReadCostPerByte = 1. / (64 * 1024)
// 1 RU = 1 KiB written bytes
Expand All @@ -81,8 +81,8 @@ type RequestUnitConfig struct {
CPUMsCost float64 `toml:"read-cpu-ms-cost" json:"read-cpu-ms-cost"`
}

// DefaultRequestUnitConfig returns the default request unit configuration.
func DefaultRequestUnitConfig() *RequestUnitConfig {
// defaultRequestUnitConfig returns the default request unit configuration.
func defaultRequestUnitConfig() *RequestUnitConfig {
return &RequestUnitConfig{
ReadBaseCost: defaultReadBaseCost,
ReadCostPerByte: defaultReadCostPerByte,
Expand All @@ -107,7 +107,7 @@ type Config struct {
// DefaultConfig returns the default configuration.
func DefaultConfig() *Config {
cfg := generateConfig(
DefaultRequestUnitConfig(),
defaultRequestUnitConfig(),
)
return cfg
}
Expand Down
1 change: 1 addition & 0 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type ResourceGroupsController struct {

// NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor
func NewResourceGroupController(clientUniqueID uint64, provider ResourceGroupProvider, requestUnitConfig *RequestUnitConfig) (*ResourceGroupsController, error) {
// TODO: initialize `requestUnitConfig`` from the remote manager server.
var config *Config
if requestUnitConfig != nil {
config = generateConfig(requestUnitConfig)
Expand Down
69 changes: 62 additions & 7 deletions pkg/mcs/resource_manager/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ const (

defaultLogFormat = "text"
defaultDisableErrorVerbose = true

defaultReadBaseCost = 0.25
defaultWriteBaseCost = 1
// 1 RU = 64 KiB read bytes
defaultReadCostPerByte = 1. / (64 * 1024)
// 1 RU = 1 KiB written bytes
defaultWriteCostPerByte = 1. / 1024
// 1 RU = 3 millisecond CPU time
defaultCPUMsCost = 1. / 3
)

// Config is the configuration for the resource manager.
Expand All @@ -51,11 +60,55 @@ type Config struct {
Metric metricutil.MetricConfig `toml:"metric" json:"metric"`

// Log related config.
Log log.Config `toml:"log" json:"log"`

Log log.Config `toml:"log" json:"log"`
Logger *zap.Logger
LogProps *log.ZapProperties

Security configutil.SecurityConfig `toml:"security" json:"security"`

// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
// This configuration should be modified carefully.
RequestUnit RequestUnitConfig
}

// RequestUnitConfig is the configuration of the request units, which determines the coefficients of
// the RRU and WRU cost.
type RequestUnitConfig struct {
// ReadBaseCost is the base cost for a read request. No matter how many bytes read/written or
// the CPU times taken for a request, this cost is inevitable.
ReadBaseCost float64 `toml:"read-base-cost" json:"read-base-cost"`
// ReadCostPerByte is the cost for each byte read. It's 1 RU = 64 KiB by default.
ReadCostPerByte float64 `toml:"read-cost-per-byte" json:"read-cost-per-byte"`
// WriteBaseCost is the base cost for a write request. No matter how many bytes read/written or
// the CPU times taken for a request, this cost is inevitable.
WriteBaseCost float64 `toml:"write-base-cost" json:"write-base-cost"`
// WriteCostPerByte is the cost for each byte written. It's 1 RU = 1 KiB by default.
WriteCostPerByte float64 `toml:"write-cost-per-byte" json:"write-cost-per-byte"`
// CPUMsCost is the cost for each millisecond of CPU time taken.
// It's 1 RU = 3 millisecond by default.
CPUMsCost float64 `toml:"read-cpu-ms-cost" json:"read-cpu-ms-cost"`
}

// Adjust adjusts the configuration and initializes it with the default value if necessary.
func (ruc *RequestUnitConfig) Adjust() {
if ruc == nil {
return
}
if ruc.ReadBaseCost == 0 {
ruc.ReadBaseCost = defaultReadBaseCost
}
if ruc.ReadCostPerByte == 0 {
ruc.ReadCostPerByte = defaultReadCostPerByte
}
if ruc.WriteBaseCost == 0 {
ruc.WriteBaseCost = defaultWriteBaseCost
}
if ruc.WriteCostPerByte == 0 {
ruc.WriteCostPerByte = defaultWriteCostPerByte
}
if ruc.CPUMsCost == 0 {
ruc.CPUMsCost = defaultCPUMsCost
}
}

// NewConfig creates a new config.
Expand Down Expand Up @@ -90,7 +143,7 @@ func (c *Config) Parse(flagSet *pflag.FlagSet) error {
return c.Adjust(meta, false)
}

// Adjust is used to adjust the PD configurations.
// Adjust is used to adjust the resource manager configurations.
func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
configMetaData := configutil.NewConfigMetadata(meta)
warningMsgs := make([]string, 0)
Expand All @@ -107,7 +160,7 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
configutil.AdjustString(&c.Name, fmt.Sprintf("%s-%s", defaultName, hostname))
}
configutil.AdjustString(&c.DataDir, fmt.Sprintf("default.%s", c.Name))
adjustPath(&c.DataDir)
c.adjustPath()

if err := c.Validate(); err != nil {
return err
Expand All @@ -127,13 +180,15 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
c.Log.Format = defaultLogFormat
}

c.RequestUnit.Adjust()

return nil
}

func adjustPath(p *string) {
absPath, err := filepath.Abs(*p)
func (c *Config) adjustPath() {
absPath, err := filepath.Abs(c.DataDir)
if err == nil {
*p = absPath
c.DataDir = absPath
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/resource_manager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ type Service struct {
}

// NewService creates a new resource manager service.
func NewService(svr bs.Server) registry.RegistrableService {
manager := NewManager(svr)
func NewService[T RUConfigProvider](svr bs.Server) registry.RegistrableService {
manager := NewManager[T](svr)

return &Service{
ctx: svr.Context(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/resource_manager/server/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ func init() {

// Install registers the API group and grpc service.
func Install(register *registry.ServiceRegistry) {
register.RegisterService("ResourceManager", rm_server.NewService)
register.RegisterService("ResourceManager", rm_server.NewService[*rm_server.Server])
}
43 changes: 27 additions & 16 deletions pkg/mcs/resource_manager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,38 +32,46 @@ import (
"go.uber.org/zap"
)

const defaultConsumptionChanSize = 1024

const (
metricsCleanupInterval = time.Minute
metricsCleanupTimeout = 20 * time.Minute
defaultConsumptionChanSize = 1024
metricsCleanupInterval = time.Minute
metricsCleanupTimeout = 20 * time.Minute
)

// Manager is the manager of resource group.
type Manager struct {
sync.RWMutex
srv bs.Server
groups map[string]*ResourceGroup
storage endpoint.ResourceGroupStorage
srv bs.Server
ruConfig *RequestUnitConfig
groups map[string]*ResourceGroup
storage endpoint.ResourceGroupStorage
// consumptionChan is used to send the consumption
// info to the background metrics flusher.
consumptionDispatcher chan struct {
resourceGroupName string
*rmpb.Consumption
}
// record update time of each resource group
comsumptionRecord map[string]time.Time
consumptionRecord map[string]time.Time
}

// RUConfigProvider is used to get RU config from the given
// `bs.server` without modifying its interface.
type RUConfigProvider interface {
GetRequestUnitConfig() *RequestUnitConfig
}

// NewManager returns a new Manager.
func NewManager(srv bs.Server) *Manager {
// NewManager returns a new manager base on the given server,
// which should implement the `RUConfigProvider` interface.
func NewManager[T RUConfigProvider](srv bs.Server) *Manager {
m := &Manager{
groups: make(map[string]*ResourceGroup),
ruConfig: srv.(T).GetRequestUnitConfig(),
groups: make(map[string]*ResourceGroup),
consumptionDispatcher: make(chan struct {
resourceGroupName string
*rmpb.Consumption
}, defaultConsumptionChanSize),
comsumptionRecord: make(map[string]time.Time),
consumptionRecord: make(map[string]time.Time),
}
// The first initialization after the server is started.
srv.AddStartCallback(func() {
Expand All @@ -86,7 +94,9 @@ func (m *Manager) GetBasicServer() bs.Server {

// Init initializes the resource group manager.
func (m *Manager) Init(ctx context.Context) {
// Reset the resource groups first.
// Store the RU model config into the storage.
m.storage.SaveRequestUnitConfig(m.ruConfig)
// Load resource group meta info from storage.
m.groups = make(map[string]*ResourceGroup)
handler := func(k, v string) {
group := &rmpb.ResourceGroup{}
Expand All @@ -97,6 +107,7 @@ func (m *Manager) Init(ctx context.Context) {
m.groups[group.Name] = FromProtoResourceGroup(group)
}
m.storage.LoadResourceGroupSettings(handler)
// Load resource group states from storage.
tokenHandler := func(k, v string) {
tokens := &GroupStates{}
if err := json.Unmarshal([]byte(v), tokens); err != nil {
Expand Down Expand Up @@ -289,11 +300,11 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
writeRequestCountMetrics.Add(consumption.KvWriteRpcCount)
}

m.comsumptionRecord[name] = time.Now()
m.consumptionRecord[name] = time.Now()

case <-ticker.C:
// Clean up the metrics that have not been updated for a long time.
for name, lastTime := range m.comsumptionRecord {
for name, lastTime := range m.consumptionRecord {
if time.Since(lastTime) > metricsCleanupTimeout {
readRequestUnitCost.DeleteLabelValues(name)
writeRequestUnitCost.DeleteLabelValues(name)
Expand All @@ -303,7 +314,7 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
sqlCPUCost.DeleteLabelValues(name)
requestCount.DeleteLabelValues(name, readTypeLabel)
requestCount.DeleteLabelValues(name, writeTypeLabel)
delete(m.comsumptionRecord, name)
delete(m.consumptionRecord, name)
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/mcs/resource_manager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ func (s *Server) Close() {
log.Info("resource manager server is closed")
}

// GetRequestUnitConfig returns the RU config.
func (s *Server) GetRequestUnitConfig() *RequestUnitConfig {
return &s.cfg.RequestUnit
}

// GetClient returns builtin etcd client.
func (s *Server) GetClient() *clientv3.Client {
return s.etcdClient
Expand Down Expand Up @@ -253,7 +258,7 @@ func (s *Server) GetPrimary() bs.MemberProvider {
}

func (s *Server) startServer() error {
manager := NewManager(s)
manager := NewManager[*Server](s)
s.service = &Service{
ctx: s.ctx,
manager: manager,
Expand Down
6 changes: 5 additions & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ func (s *Server) Run() error {
}

// Close closes the server.
func (s *Server) Close() {
func (s *Server) Close() {}

// GetConfigAny returns the config with any type.
func (s *Server) GetConfigAny() any {
return nil
}

// GetClient returns builtin etcd client.
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (
// resource group storage endpoint has prefix `resource_group`
resourceGroupSettingsPath = "settings"
resourceGroupStatesPath = "states"
requestUnitConfigPath = "ru_config"
// tso storage endpoint has prefix `tso`
microserviceKey = "microservice"
tsoServiceKey = "tso"
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/endpoint/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ResourceGroupStorage interface {
LoadResourceGroupStates(f func(k, v string)) error
SaveResourceGroupStates(name string, obj interface{}) error
DeleteResourceGroupStates(name string) error
SaveRequestUnitConfig(config interface{}) error
}

var _ ResourceGroupStorage = (*StorageEndpoint)(nil)
Expand Down Expand Up @@ -59,3 +60,8 @@ func (se *StorageEndpoint) DeleteResourceGroupStates(name string) error {
func (se *StorageEndpoint) LoadResourceGroupStates(f func(k, v string)) error {
return se.loadRangeByPrefix(resourceGroupStatesPath+"/", f)
}

// SaveRequestUnitConfig stores the request unit config to storage.
func (se *StorageEndpoint) SaveRequestUnitConfig(config interface{}) error {
return se.saveJSON(requestUnitConfigPath, config)
}
5 changes: 5 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/spf13/pflag"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/errs"
rm "github.com/tikv/pd/pkg/mcs/resource_manager/server"
"github.com/tikv/pd/pkg/utils/configutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/metricutil"
Expand Down Expand Up @@ -157,6 +158,8 @@ type Config struct {
ReplicationMode ReplicationModeConfig `toml:"replication-mode" json:"replication-mode"`

Keyspace KeyspaceConfig `toml:"keyspace" json:"keyspace"`

RequestUnit rm.RequestUnitConfig `toml:"request-unit" json:"request-unit"`
}

// NewConfig creates a new config.
Expand Down Expand Up @@ -502,6 +505,8 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
c.Log.Format = defaultLogFormat
}

c.RequestUnit.Adjust()

return nil
}

Expand Down
7 changes: 6 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, legacyServiceBuilders
failpoint.Inject("useGlobalRegistry", func() {
s.registry = registry.ServerServiceRegistry
})
s.registry.RegisterService("ResourceManager", rm_server.NewService)
s.registry.RegisterService("ResourceManager", rm_server.NewService[*Server])
// Register the micro services REST path.
s.registry.InstallAllRESTHandler(s, etcdCfg.UserHandlers)

Expand Down Expand Up @@ -1149,6 +1149,11 @@ func (s *Server) GetTLSConfig() *grpcutil.TLSConfig {
return &s.cfg.Security.TLSConfig
}

// GetRequestUnitConfig gets the RU config.
func (s *Server) GetRequestUnitConfig() *rm_server.RequestUnitConfig {
return &s.cfg.RequestUnit
}

// GetRaftCluster gets Raft cluster.
// If cluster has not been bootstrapped, return nil.
func (s *Server) GetRaftCluster() *cluster.RaftCluster {
Expand Down

0 comments on commit 8d4438e

Please sign in to comment.