Skip to content

Commit

Permalink
store-limit-v2
Browse files Browse the repository at this point in the history
Signed-off-by: bufferflies <1045931706@qq.com>
  • Loading branch information
bufferflies committed Oct 17, 2022
1 parent eeaa078 commit f0d40c6
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 16 deletions.
30 changes: 25 additions & 5 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/etcdutil"
"github.com/tikv/pd/pkg/logutil"
"github.com/tikv/pd/pkg/netutil"
"github.com/tikv/pd/pkg/progress"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/syncutil"
Expand Down Expand Up @@ -321,7 +320,7 @@ func syncConfig(manager *config.StoreConfigManager, stores []*core.StoreInfo) bo
continue
}
// it will try next store if the current store is failed.
address := netutil.ResolveLoopBackAddr(stores[index].GetStatusAddress(), stores[index].GetAddress())
address := stores[index].GetStatusAddress()
if err := manager.ObserveConfig(address); err != nil {
storeSyncConfigEvent.WithLabelValues(address, "fail").Inc()
log.Debug("sync store config failed, it will try next store", zap.Error(err))
Expand Down Expand Up @@ -2245,7 +2244,21 @@ func (c *RaftCluster) GetAllStoreLimit() map[uint64]config.StoreLimitConfig {
}

func (c *RaftCluster) getAllStoreLimitV2() map[uint64]config.StoreLimitConfig {
return nil
limits := make(map[uint64]config.StoreLimitConfig)
for _, store := range c.GetStores() {
statusAddress := store.GetStatusAddress()
storeID := store.GetID()
cfg, err := c.storeConfigManager.GetConfig(statusAddress)
if err != nil {
log.Error("sync store config error", zap.Uint64("store-id", storeID), zap.Error(err))
}
rate := typeutil.ParseMBFromText(cfg.Server.SnapMaxWriteBytesPerSec, 100)
limits[storeID] = config.StoreLimitConfig{
AddPeer: float64(rate),
RemovePeer: float64(rate),
}
}
return limits
}

// GetAllStoreLimit returns all store limit.
Expand All @@ -2255,7 +2268,7 @@ func (c *RaftCluster) getAllStoreLimit() map[uint64]config.StoreLimitConfig {
return c.opt.GetScheduleConfig().StoreLimit
}

// SetStoreLimitByVersion sets store limit by version.
// SetStoreLimit sets store limit by version.
func (c *RaftCluster) SetStoreLimit(storeID uint64, typ storelimit.Type, ratePerMin float64) error {
version := c.opt.GetStoreLimitFormulaVersion()
switch version {
Expand All @@ -2268,7 +2281,14 @@ func (c *RaftCluster) SetStoreLimit(storeID uint64, typ storelimit.Type, ratePer
}
}

func (c *RaftCluster) setStoreLimitV2(_ uint64, _ float64) error {
func (c *RaftCluster) setStoreLimitV2(storeID uint64, rate float64) error {
address := c.GetStore(storeID).GetStatusAddress()
config := &config.StoreConfig{
Server: config.ServerConfig{
SnapMaxWriteBytesPerSec: strconv.FormatFloat(rate, 'f', -1, 64) + "MiB",
},
}
c.storeConfigManager.UpdateConfig(address, config)
return nil
}

Expand Down
45 changes: 35 additions & 10 deletions server/config/store_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package config

import (
"bytes"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -201,8 +202,14 @@ func (m *StoreConfigManager) ObserveConfig(address string) error {
return nil
}

func (m *StoreConfigManager) UpdateConfig(address string, cfg *StoreConfig) error {
// GetConfig returns the store config.
func (m *StoreConfigManager) GetConfig(address string) (*StoreConfig, error) {
return m.source.GetConfig(address)
}

// UpdateConfig is used to update the config.
func (m *StoreConfigManager) UpdateConfig(address string, cfg *StoreConfig) error {
return m.source.UpdateConfig(address, cfg)
}

// GetStoreConfig returns the current store configuration.
Expand All @@ -217,6 +224,7 @@ func (m *StoreConfigManager) GetStoreConfig() *StoreConfig {
// Source is used to get the store config.
type Source interface {
GetConfig(statusAddress string) (*StoreConfig, error)
UpdateConfig(statusAddress string, config *StoreConfig) error
}

// TiKVConfigSource is used to get the store config from TiKV.
Expand All @@ -234,8 +242,16 @@ func newTiKVConfigSource(schema string, client *http.Client) *TiKVConfigSource {

// UpdateConfig is used to update the TIKV config.
func (s *TiKVConfigSource) UpdateConfig(statusAddress string, config *StoreConfig) error {
if len(statusAddress) == 0 {
return fmt.Errorf("status address is empty")
}
url := fmt.Sprintf("%s://%s/config", s.schema, statusAddress)

body, err := json.Marshal(config)
if err != nil {
return err
}
_, err = s.client.Post(url, "application/json", bytes.NewBuffer(body))
return err
}

// GetConfig returns the store config from TiKV.
Expand All @@ -260,11 +276,20 @@ func (s *TiKVConfigSource) GetConfig(statusAddress string) (*StoreConfig, error)
// FakeSource is used to test.
type FakeSource struct {
whiteList []string
config *StoreConfig
}

func newFakeSource(whiteList []string) *FakeSource {
return &FakeSource{
whiteList: whiteList,
config: &StoreConfig{
Coprocessor{
RegionMaxSize: "10MiB",
},
ServerConfig{
SnapMaxWriteBytesPerSec: "100MiB",
},
},
}
}

Expand All @@ -273,13 +298,13 @@ func (f *FakeSource) GetConfig(url string) (*StoreConfig, error) {
if !slice.Contains(f.whiteList, url) {
return nil, fmt.Errorf("[url:%s] is not in white list", url)
}
config := &StoreConfig{
Coprocessor{
RegionMaxSize: "10MiB",
},
ServerConfig{
SnapMaxWriteBytesPerSec: "100MiB",
},
return f.config, nil
}

func (f *FakeSource) UpdateConfig(url string, config *StoreConfig) error {
if !slice.Contains(f.whiteList, url) {
return fmt.Errorf("[url:%s] is not in white list", url)
}
return config, nil
f.config = config
return nil
}
3 changes: 2 additions & 1 deletion server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package core

import (
"github.com/tikv/pd/pkg/netutil"
"github.com/tikv/pd/server/config"
"math"
"strings"
Expand Down Expand Up @@ -190,7 +191,7 @@ func (s *StoreInfo) GetNodeState() metapb.NodeState {

// GetStatusAddress returns the http address of the store.
func (s *StoreInfo) GetStatusAddress() string {
return s.meta.GetStatusAddress()
return netutil.ResolveLoopBackAddr(s.meta.GetStatusAddress(), s.GetAddress())
}

// GetAddress returns the address of the store.
Expand Down

0 comments on commit f0d40c6

Please sign in to comment.