Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: write RU model config into the storage #6041

Merged
merged 5 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions client/resource_group/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const (

const (
defaultReadBaseCost = 0.25
defaultWriteBaseCost = 1.5
defaultWriteBaseCost = 1
// 1 RU = 64 KiB read bytes
defaultReadCostPerByte = 1. / (64 * 1024)
// 1 RU = 1 KiB written bytes
Expand All @@ -81,8 +81,8 @@ type RequestUnitConfig struct {
CPUMsCost float64 `toml:"read-cpu-ms-cost" json:"read-cpu-ms-cost"`
}

// DefaultRequestUnitConfig returns the default request unit configuration.
func DefaultRequestUnitConfig() *RequestUnitConfig {
// defaultRequestUnitConfig returns the default request unit configuration.
func defaultRequestUnitConfig() *RequestUnitConfig {
return &RequestUnitConfig{
ReadBaseCost: defaultReadBaseCost,
ReadCostPerByte: defaultReadCostPerByte,
Expand All @@ -107,7 +107,7 @@ type Config struct {
// DefaultConfig returns the default configuration.
func DefaultConfig() *Config {
cfg := generateConfig(
DefaultRequestUnitConfig(),
defaultRequestUnitConfig(),
)
return cfg
}
Expand Down
1 change: 1 addition & 0 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type ResourceGroupsController struct {

// NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor
func NewResourceGroupController(clientUniqueID uint64, provider ResourceGroupProvider, requestUnitConfig *RequestUnitConfig) (*ResourceGroupsController, error) {
// TODO: initialize `requestUnitConfig`` from the remote manager server.
var config *Config
if requestUnitConfig != nil {
config = generateConfig(requestUnitConfig)
Expand Down
69 changes: 62 additions & 7 deletions pkg/mcs/resource_manager/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ const (

defaultLogFormat = "text"
defaultDisableErrorVerbose = true

defaultReadBaseCost = 0.25
defaultWriteBaseCost = 1
// 1 RU = 64 KiB read bytes
defaultReadCostPerByte = 1. / (64 * 1024)
// 1 RU = 1 KiB written bytes
defaultWriteCostPerByte = 1. / 1024
// 1 RU = 3 millisecond CPU time
defaultCPUMsCost = 1. / 3
)

// Config is the configuration for the resource manager.
Expand All @@ -51,11 +60,55 @@ type Config struct {
Metric metricutil.MetricConfig `toml:"metric" json:"metric"`

// Log related config.
Log log.Config `toml:"log" json:"log"`

Log log.Config `toml:"log" json:"log"`
Logger *zap.Logger
LogProps *log.ZapProperties

Security configutil.SecurityConfig `toml:"security" json:"security"`

// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
// This configuration should be modified carefully.
RequestUnit RequestUnitConfig
}

// RequestUnitConfig is the configuration of the request units, which determines the coefficients of
// the RRU and WRU cost.
type RequestUnitConfig struct {
// ReadBaseCost is the base cost for a read request. No matter how many bytes read/written or
// the CPU times taken for a request, this cost is inevitable.
ReadBaseCost float64 `toml:"read-base-cost" json:"read-base-cost"`
// ReadCostPerByte is the cost for each byte read. It's 1 RU = 64 KiB by default.
ReadCostPerByte float64 `toml:"read-cost-per-byte" json:"read-cost-per-byte"`
// WriteBaseCost is the base cost for a write request. No matter how many bytes read/written or
// the CPU times taken for a request, this cost is inevitable.
WriteBaseCost float64 `toml:"write-base-cost" json:"write-base-cost"`
// WriteCostPerByte is the cost for each byte written. It's 1 RU = 1 KiB by default.
WriteCostPerByte float64 `toml:"write-cost-per-byte" json:"write-cost-per-byte"`
// CPUMsCost is the cost for each millisecond of CPU time taken.
// It's 1 RU = 3 millisecond by default.
CPUMsCost float64 `toml:"read-cpu-ms-cost" json:"read-cpu-ms-cost"`
}

// Adjust adjusts the configuration and initializes it with the default value if necessary.
func (ruc *RequestUnitConfig) Adjust() {
if ruc == nil {
return
}
if ruc.ReadBaseCost == 0 {
ruc.ReadBaseCost = defaultReadBaseCost
}
if ruc.ReadCostPerByte == 0 {
ruc.ReadCostPerByte = defaultReadCostPerByte
}
if ruc.WriteBaseCost == 0 {
ruc.WriteBaseCost = defaultWriteBaseCost
}
if ruc.WriteCostPerByte == 0 {
ruc.WriteCostPerByte = defaultWriteCostPerByte
}
if ruc.CPUMsCost == 0 {
ruc.CPUMsCost = defaultCPUMsCost
}
}

// NewConfig creates a new config.
Expand Down Expand Up @@ -90,7 +143,7 @@ func (c *Config) Parse(flagSet *pflag.FlagSet) error {
return c.Adjust(meta, false)
}

// Adjust is used to adjust the PD configurations.
// Adjust is used to adjust the resource manager configurations.
func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
configMetaData := configutil.NewConfigMetadata(meta)
warningMsgs := make([]string, 0)
Expand All @@ -107,7 +160,7 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
configutil.AdjustString(&c.Name, fmt.Sprintf("%s-%s", defaultName, hostname))
}
configutil.AdjustString(&c.DataDir, fmt.Sprintf("default.%s", c.Name))
adjustPath(&c.DataDir)
c.adjustPath()

if err := c.Validate(); err != nil {
return err
Expand All @@ -127,13 +180,15 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
c.Log.Format = defaultLogFormat
}

c.RequestUnit.Adjust()

return nil
}

func adjustPath(p *string) {
absPath, err := filepath.Abs(*p)
func (c *Config) adjustPath() {
absPath, err := filepath.Abs(c.DataDir)
if err == nil {
*p = absPath
c.DataDir = absPath
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/resource_manager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ type Service struct {
}

// NewService creates a new resource manager service.
func NewService(svr bs.Server) registry.RegistrableService {
manager := NewManager(svr)
func NewService[T RUConfigProvider](svr bs.Server) registry.RegistrableService {
manager := NewManager[T](svr)

return &Service{
ctx: svr.Context(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/resource_manager/server/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ func init() {

// Install registers the API group and grpc service.
func Install(register *registry.ServiceRegistry) {
register.RegisterService("ResourceManager", rm_server.NewService)
register.RegisterService("ResourceManager", rm_server.NewService[*rm_server.Server])
}
43 changes: 27 additions & 16 deletions pkg/mcs/resource_manager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,38 +32,46 @@ import (
"go.uber.org/zap"
)

const defaultConsumptionChanSize = 1024

const (
metricsCleanupInterval = time.Minute
metricsCleanupTimeout = 20 * time.Minute
defaultConsumptionChanSize = 1024
metricsCleanupInterval = time.Minute
metricsCleanupTimeout = 20 * time.Minute
)

// Manager is the manager of resource group.
type Manager struct {
sync.RWMutex
srv bs.Server
groups map[string]*ResourceGroup
storage endpoint.ResourceGroupStorage
srv bs.Server
ruConfig *RequestUnitConfig
groups map[string]*ResourceGroup
storage endpoint.ResourceGroupStorage
// consumptionChan is used to send the consumption
// info to the background metrics flusher.
consumptionDispatcher chan struct {
resourceGroupName string
*rmpb.Consumption
}
// record update time of each resource group
comsumptionRecord map[string]time.Time
consumptionRecord map[string]time.Time
}

// RUConfigProvider is used to get RU config from the given
// `bs.server` without modifying its interface.
type RUConfigProvider interface {
GetRequestUnitConfig() *RequestUnitConfig
}

// NewManager returns a new Manager.
func NewManager(srv bs.Server) *Manager {
// NewManager returns a new manager base on the given server,
// which should implement the `RUConfigProvider` interface.
func NewManager[T RUConfigProvider](srv bs.Server) *Manager {
m := &Manager{
groups: make(map[string]*ResourceGroup),
ruConfig: srv.(T).GetRequestUnitConfig(),
groups: make(map[string]*ResourceGroup),
consumptionDispatcher: make(chan struct {
resourceGroupName string
*rmpb.Consumption
}, defaultConsumptionChanSize),
comsumptionRecord: make(map[string]time.Time),
consumptionRecord: make(map[string]time.Time),
}
// The first initialization after the server is started.
srv.AddStartCallback(func() {
Expand All @@ -86,7 +94,9 @@ func (m *Manager) GetBasicServer() bs.Server {

// Init initializes the resource group manager.
func (m *Manager) Init(ctx context.Context) {
// Reset the resource groups first.
// Store the RU model config into the storage.
m.storage.SaveRequestUnitConfig(m.ruConfig)
// Load resource group meta info from storage.
m.groups = make(map[string]*ResourceGroup)
handler := func(k, v string) {
group := &rmpb.ResourceGroup{}
Expand All @@ -97,6 +107,7 @@ func (m *Manager) Init(ctx context.Context) {
m.groups[group.Name] = FromProtoResourceGroup(group)
}
m.storage.LoadResourceGroupSettings(handler)
// Load resource group states from storage.
tokenHandler := func(k, v string) {
tokens := &GroupStates{}
if err := json.Unmarshal([]byte(v), tokens); err != nil {
Expand Down Expand Up @@ -289,11 +300,11 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
writeRequestCountMetrics.Add(consumption.KvWriteRpcCount)
}

m.comsumptionRecord[name] = time.Now()
m.consumptionRecord[name] = time.Now()

case <-ticker.C:
// Clean up the metrics that have not been updated for a long time.
for name, lastTime := range m.comsumptionRecord {
for name, lastTime := range m.consumptionRecord {
if time.Since(lastTime) > metricsCleanupTimeout {
readRequestUnitCost.DeleteLabelValues(name)
writeRequestUnitCost.DeleteLabelValues(name)
Expand All @@ -303,7 +314,7 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
sqlCPUCost.DeleteLabelValues(name)
requestCount.DeleteLabelValues(name, readTypeLabel)
requestCount.DeleteLabelValues(name, writeTypeLabel)
delete(m.comsumptionRecord, name)
delete(m.consumptionRecord, name)
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/mcs/resource_manager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ func (s *Server) Close() {
log.Info("resource manager server is closed")
}

// GetRequestUnitConfig returns the RU config.
func (s *Server) GetRequestUnitConfig() *RequestUnitConfig {
return &s.cfg.RequestUnit
}

// GetClient returns builtin etcd client.
func (s *Server) GetClient() *clientv3.Client {
return s.etcdClient
Expand Down Expand Up @@ -253,7 +258,7 @@ func (s *Server) GetPrimary() bs.MemberProvider {
}

func (s *Server) startServer() error {
manager := NewManager(s)
manager := NewManager[*Server](s)
s.service = &Service{
ctx: s.ctx,
manager: manager,
Expand Down
6 changes: 5 additions & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ func (s *Server) Run() error {
}

// Close closes the server.
func (s *Server) Close() {
func (s *Server) Close() {}

// GetConfigAny returns the config with any type.
func (s *Server) GetConfigAny() any {
return nil
}

// GetClient returns builtin etcd client.
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (
// resource group storage endpoint has prefix `resource_group`
resourceGroupSettingsPath = "settings"
resourceGroupStatesPath = "states"
requestUnitConfigPath = "ru_config"
// tso storage endpoint has prefix `tso`
microserviceKey = "microservice"
tsoServiceKey = "tso"
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/endpoint/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ResourceGroupStorage interface {
LoadResourceGroupStates(f func(k, v string)) error
SaveResourceGroupStates(name string, obj interface{}) error
DeleteResourceGroupStates(name string) error
SaveRequestUnitConfig(config interface{}) error
}

var _ ResourceGroupStorage = (*StorageEndpoint)(nil)
Expand Down Expand Up @@ -59,3 +60,8 @@ func (se *StorageEndpoint) DeleteResourceGroupStates(name string) error {
func (se *StorageEndpoint) LoadResourceGroupStates(f func(k, v string)) error {
return se.loadRangeByPrefix(resourceGroupStatesPath+"/", f)
}

// SaveRequestUnitConfig stores the request unit config to storage.
func (se *StorageEndpoint) SaveRequestUnitConfig(config interface{}) error {
return se.saveJSON(requestUnitConfigPath, config)
}
5 changes: 5 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/spf13/pflag"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/errs"
rm "github.com/tikv/pd/pkg/mcs/resource_manager/server"
"github.com/tikv/pd/pkg/utils/configutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/metricutil"
Expand Down Expand Up @@ -157,6 +158,8 @@ type Config struct {
ReplicationMode ReplicationModeConfig `toml:"replication-mode" json:"replication-mode"`

Keyspace KeyspaceConfig `toml:"keyspace" json:"keyspace"`

RequestUnit rm.RequestUnitConfig `toml:"request-unit" json:"request-unit"`
}

// NewConfig creates a new config.
Expand Down Expand Up @@ -502,6 +505,8 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
c.Log.Format = defaultLogFormat
}

c.RequestUnit.Adjust()

return nil
}

Expand Down
7 changes: 6 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, legacyServiceBuilders
failpoint.Inject("useGlobalRegistry", func() {
s.registry = registry.ServerServiceRegistry
})
s.registry.RegisterService("ResourceManager", rm_server.NewService)
s.registry.RegisterService("ResourceManager", rm_server.NewService[*Server])
// Register the micro services REST path.
s.registry.InstallAllRESTHandler(s, etcdCfg.UserHandlers)

Expand Down Expand Up @@ -1149,6 +1149,11 @@ func (s *Server) GetTLSConfig() *grpcutil.TLSConfig {
return &s.cfg.Security.TLSConfig
}

// GetRequestUnitConfig gets the RU config.
func (s *Server) GetRequestUnitConfig() *rm_server.RequestUnitConfig {
return &s.cfg.RequestUnit
}

// GetRaftCluster gets Raft cluster.
// If cluster has not been bootstrapped, return nil.
func (s *Server) GetRaftCluster() *cluster.RaftCluster {
Expand Down