Skip to content

Commit

Permalink
scheduling/config: watch StoreConfig in scheduling config watcher (#6921
Browse files Browse the repository at this point in the history
)

ref #5839

Watch `StoreConfig` in scheduling config watcher.

Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato authored Aug 9, 2023
1 parent d4138c8 commit 68a3e95
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 5 deletions.
18 changes: 18 additions & 0 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/tikv/pd/pkg/utils/configutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"github.com/tikv/pd/server/config"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -193,6 +194,7 @@ type PersistConfig struct {
clusterVersion unsafe.Pointer
schedule atomic.Value
replication atomic.Value
storeConfig atomic.Value
}

// NewPersistConfig creates a new PersistConfig instance.
Expand All @@ -201,6 +203,9 @@ func NewPersistConfig(cfg *Config) *PersistConfig {
o.SetClusterVersion(&cfg.ClusterVersion)
o.schedule.Store(&cfg.Schedule)
o.replication.Store(&cfg.Replication)
// storeConfig will be fetched from TiKV by PD API server,
// so we just set an empty value here first.
o.storeConfig.Store(&config.StoreConfig{})
return o
}

Expand Down Expand Up @@ -234,6 +239,19 @@ func (o *PersistConfig) SetReplicationConfig(cfg *sc.ReplicationConfig) {
o.replication.Store(cfg)
}

// SetStoreConfig sets the TiKV store configuration.
func (o *PersistConfig) SetStoreConfig(cfg *config.StoreConfig) {
// Some of the fields won't be persisted and watched,
// so we need to adjust it here before storing it.
cfg.Adjust()
o.storeConfig.Store(cfg)
}

// GetStoreConfig returns the TiKV store configuration.
func (o *PersistConfig) GetStoreConfig() *config.StoreConfig {
return o.storeConfig.Load().(*config.StoreConfig)
}

// GetMaxReplicas returns the max replicas.
func (o *PersistConfig) GetMaxReplicas() int {
return int(o.GetReplicationConfig().MaxReplicas)
Expand Down
7 changes: 5 additions & 2 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/log"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/server/config"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
Expand All @@ -42,9 +43,10 @@ type Watcher struct {
}

type persistedConfig struct {
ClusterVersion semver.Version `json:"cluster-version"`
Schedule sc.ScheduleConfig `json:"schedule"`
Replication sc.ReplicationConfig `json:"replication"`
ClusterVersion semver.Version `json:"cluster-version"`
Store config.StoreConfig `json:"store"`
}

// NewWatcher creates a new watcher to watch the config meta change from PD API server.
Expand All @@ -71,9 +73,10 @@ func NewWatcher(
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
return err
}
cw.SetClusterVersion(&cfg.ClusterVersion)
cw.SetScheduleConfig(&cfg.Schedule)
cw.SetReplicationConfig(&cfg.Replication)
cw.SetClusterVersion(&cfg.ClusterVersion)
cw.SetStoreConfig(&cfg.Store)
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
Expand Down
21 changes: 18 additions & 3 deletions tests/integrations/mcs/scheduling/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/versioninfo"
severcfg "github.com/tikv/pd/server/config"
"github.com/tikv/pd/tests"
)

Expand Down Expand Up @@ -81,17 +82,31 @@ func (suite *configTestSuite) TestConfigWatch() {
re.Equal(sc.DefaultSplitMergeInterval, watcher.GetScheduleConfig().SplitMergeInterval.Duration)
re.Equal("0.0.0", watcher.GetClusterVersion().String())
// Update the config and check if the scheduling config watcher can get the latest value.
suite.pdLeaderServer.GetPersistOptions().SetMaxReplicas(5)
persistOpts := suite.pdLeaderServer.GetPersistOptions()
persistOpts.SetMaxReplicas(5)
persistConfig(re, suite.pdLeaderServer)
testutil.Eventually(re, func() bool {
return watcher.GetReplicationConfig().MaxReplicas == 5
})
suite.pdLeaderServer.GetPersistOptions().SetSplitMergeInterval(2 * sc.DefaultSplitMergeInterval)
persistOpts.SetSplitMergeInterval(2 * sc.DefaultSplitMergeInterval)
persistConfig(re, suite.pdLeaderServer)
testutil.Eventually(re, func() bool {
return watcher.GetScheduleConfig().SplitMergeInterval.Duration == 2*sc.DefaultSplitMergeInterval
})
suite.pdLeaderServer.GetPersistOptions().SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
persistOpts.SetStoreConfig(&severcfg.StoreConfig{
Coprocessor: severcfg.Coprocessor{
RegionMaxSize: "144MiB",
},
Storage: severcfg.Storage{
Engine: severcfg.RaftstoreV2,
},
})
persistConfig(re, suite.pdLeaderServer)
testutil.Eventually(re, func() bool {
return watcher.GetStoreConfig().GetRegionMaxSize() == 144 &&
watcher.GetStoreConfig().IsRaftKV2()
})
persistOpts.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
persistConfig(re, suite.pdLeaderServer)
testutil.Eventually(re, func() bool {
return watcher.GetClusterVersion().String() == "4.0.0"
Expand Down

0 comments on commit 68a3e95

Please sign in to comment.