Skip to content

Commit

Permalink
*: decouple the dependency between config and schedule (#5792)
Browse files Browse the repository at this point in the history
ref #5838

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: 混沌DM <hundundm@gmail.com>
Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
3 people authored Mar 3, 2023
1 parent d85a0e4 commit 05228ba
Show file tree
Hide file tree
Showing 104 changed files with 555 additions and 455 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Require review from domain experts when the PR modified significant config files.
/server/config/config.go @tikv/pd-configuration-reviewer
/server/schedulers/hot_region_config.go @tikv/pd-configuration-reviewer
/server/schedule/schedulers/hot_region_config.go @tikv/pd-configuration-reviewer
/conf/config.toml @tikv/pd-configuration-reviewer
/metrics/grafana/pd.json @tikv/pd-configuration-reviewer
2 changes: 1 addition & 1 deletion cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
"github.com/tikv/pd/server/apiv2"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/join"
"github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/schedule/schedulers"
"go.uber.org/zap"
)

Expand Down
10 changes: 5 additions & 5 deletions pkg/autoscaling/calculation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/pkg/mock/mockconfig"
)

func TestGetScaledTiKVGroups(t *testing.T) {
Expand All @@ -34,7 +34,7 @@ func TestGetScaledTiKVGroups(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// case1 indicates the tikv cluster with not any group existed
case1 := mockcluster.NewCluster(ctx, config.NewTestOptions())
case1 := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions())
case1.AddLabelsStore(1, 1, map[string]string{})
case1.AddLabelsStore(2, 1, map[string]string{
"foo": "bar",
Expand All @@ -44,7 +44,7 @@ func TestGetScaledTiKVGroups(t *testing.T) {
})

// case2 indicates the tikv cluster with 1 auto-scaling group existed
case2 := mockcluster.NewCluster(ctx, config.NewTestOptions())
case2 := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions())
case2.AddLabelsStore(1, 1, map[string]string{})
case2.AddLabelsStore(2, 1, map[string]string{
groupLabelKey: fmt.Sprintf("%s-%s-0", autoScalingGroupLabelKeyPrefix, TiKV.String()),
Expand All @@ -56,7 +56,7 @@ func TestGetScaledTiKVGroups(t *testing.T) {
})

// case3 indicates the tikv cluster with other group existed
case3 := mockcluster.NewCluster(ctx, config.NewTestOptions())
case3 := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions())
case3.AddLabelsStore(1, 1, map[string]string{})
case3.AddLabelsStore(2, 1, map[string]string{
groupLabelKey: "foo",
Expand Down Expand Up @@ -331,7 +331,7 @@ func TestStrategyChangeCount(t *testing.T) {
// tikv cluster with 1 auto-scaling group existed
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster := mockcluster.NewCluster(ctx, config.NewTestOptions())
cluster := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions())
cluster.AddLabelsStore(1, 1, map[string]string{})
cluster.AddLabelsStore(2, 1, map[string]string{
groupLabelKey: fmt.Sprintf("%s-%s-0", autoScalingGroupLabelKeyPrefix, TiKV.String()),
Expand Down
16 changes: 16 additions & 0 deletions pkg/mock/mockcluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,19 @@ func (mc *Cluster) SetMaxReplicasWithLabel(enablePlacementRules bool, num int, l
mc.SetLocationLabels(labels)
}
}

// SetRegionMaxSize sets the region max size.
func (mc *Cluster) SetRegionMaxSize(v string) {
mc.updateStoreConfig(func(r *config.StoreConfig) { r.RegionMaxSize = v })
}

// SetRegionSizeMB sets the region max size.
func (mc *Cluster) SetRegionSizeMB(v uint64) {
mc.updateStoreConfig(func(r *config.StoreConfig) { r.RegionMaxSizeMB = v })
}

func (mc *Cluster) updateStoreConfig(f func(*config.StoreConfig)) {
r := mc.StoreConfigManager.GetStoreConfig().Clone()
f(r)
mc.SetStoreConfig(r)
}
5 changes: 3 additions & 2 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/tikv/pd/pkg/versioninfo"
"github.com/tikv/pd/server/config"
sc "github.com/tikv/pd/server/schedule/config"
"github.com/tikv/pd/server/schedule/labeler"
"github.com/tikv/pd/server/schedule/placement"
"github.com/tikv/pd/server/statistics"
Expand Down Expand Up @@ -81,12 +82,12 @@ func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
}

// GetStoreConfig returns the store config.
func (mc *Cluster) GetStoreConfig() *config.StoreConfig {
func (mc *Cluster) GetStoreConfig() sc.StoreConfig {
return mc.StoreConfigManager.GetStoreConfig()
}

// GetOpts returns the cluster configuration.
func (mc *Cluster) GetOpts() *config.PersistOptions {
func (mc *Cluster) GetOpts() sc.Config {
return mc.PersistOptions
}

Expand Down
31 changes: 31 additions & 0 deletions pkg/mock/mockconfig/mockconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mockconfig

import (
"github.com/tikv/pd/server/config"
sc "github.com/tikv/pd/server/schedule/config"
)

// NewTestOptions creates default options for testing.
func NewTestOptions() *config.PersistOptions {
// register default schedulers in case config check fail.
for _, d := range config.DefaultSchedulers {
sc.RegisterScheduler(d.Type)
}
c := config.NewConfig()
c.Adjust(nil, false)
return config.NewPersistOptions(c)
}
4 changes: 2 additions & 2 deletions pkg/mock/mockhbstream/mockhbstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/pkg/mock/mockconfig"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/schedule/hbstream"
)

Expand All @@ -36,7 +36,7 @@ func TestActivity(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cluster := mockcluster.NewCluster(ctx, config.NewTestOptions())
cluster := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions())
cluster.AddRegionStore(1, 1)
cluster.AddRegionStore(2, 0)
cluster.AddLeaderRegion(1, 1)
Expand Down
2 changes: 1 addition & 1 deletion plugin/scheduler_example/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/tikv/pd/server/schedule/filter"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/plan"
"github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/schedule/schedulers"
"github.com/unrolled/render"
)

Expand Down
2 changes: 1 addition & 1 deletion server/api/diagnostic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/cluster"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/schedule/schedulers"
)

type diagnosticTestSuite struct {
Expand Down
2 changes: 1 addition & 1 deletion server/api/min_resolved_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type minResolvedTS struct {
func (h *minResolvedTSHandler) GetMinResolvedTS(w http.ResponseWriter, r *http.Request) {
c := h.svr.GetRaftCluster()
value := c.GetMinResolvedTS()
persistInterval := c.GetOpts().GetPDServerConfig().MinResolvedTSPersistenceInterval
persistInterval := c.GetPDServerConfig().MinResolvedTSPersistenceInterval
h.rd.JSON(w, http.StatusOK, minResolvedTS{
MinResolvedTS: value,
PersistInterval: persistInterval,
Expand Down
8 changes: 4 additions & 4 deletions server/api/min_resolved_ts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (suite *minResolvedTSTestSuite) TearDownSuite() {

func (suite *minResolvedTSTestSuite) TestMinResolvedTS() {
// case1: default run job
interval := suite.svr.GetRaftCluster().GetOpts().GetPDServerConfig().MinResolvedTSPersistenceInterval
interval := suite.svr.GetRaftCluster().GetPDServerConfig().MinResolvedTSPersistenceInterval
suite.checkMinResolvedTS(&minResolvedTS{
MinResolvedTS: 0,
IsRealTime: true,
Expand All @@ -83,7 +83,7 @@ func (suite *minResolvedTSTestSuite) TestMinResolvedTS() {
interval = typeutil.Duration{Duration: suite.defaultInterval}
suite.setMinResolvedTSPersistenceInterval(interval)
suite.Eventually(func() bool {
return interval == suite.svr.GetRaftCluster().GetOpts().GetPDServerConfig().MinResolvedTSPersistenceInterval
return interval == suite.svr.GetRaftCluster().GetPDServerConfig().MinResolvedTSPersistenceInterval
}, time.Second*10, time.Millisecond*20)
suite.checkMinResolvedTS(&minResolvedTS{
MinResolvedTS: 0,
Expand Down Expand Up @@ -116,9 +116,9 @@ func (suite *minResolvedTSTestSuite) TestMinResolvedTS() {
}

func (suite *minResolvedTSTestSuite) setMinResolvedTSPersistenceInterval(duration typeutil.Duration) {
cfg := suite.svr.GetRaftCluster().GetOpts().GetPDServerConfig().Clone()
cfg := suite.svr.GetRaftCluster().GetPDServerConfig().Clone()
cfg.MinResolvedTSPersistenceInterval = duration
suite.svr.GetRaftCluster().GetOpts().SetPDServerConfig(cfg)
suite.svr.GetRaftCluster().SetPDServerConfig(cfg)
}

func (suite *minResolvedTSTestSuite) checkMinResolvedTS(expect *minResolvedTS) {
Expand Down
2 changes: 1 addition & 1 deletion server/api/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ func (suite *regionRuleTestSuite) TestRegionPlacementRule() {
re, "invalid region id"))
suite.NoError(err)

suite.svr.GetRaftCluster().GetOpts().GetReplicationConfig().EnablePlacementRules = false
suite.svr.GetRaftCluster().GetReplicationConfig().EnablePlacementRules = false
url = fmt.Sprintf("%s/config/rules/region/%d/detail", suite.urlPrefix, 1)
err = tu.CheckGetJSON(testDialClient, url, nil, tu.Status(re, http.StatusPreconditionFailed), tu.StringContain(
re, "placement rules feature is disabled"))
Expand Down
2 changes: 1 addition & 1 deletion server/api/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/schedule/schedulers"
"github.com/unrolled/render"
)

Expand Down
4 changes: 2 additions & 2 deletions server/api/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ func (suite *scheduleTestSuite) TestOriginAPI() {
input1["store_id"] = 2
body, err = json.Marshal(input1)
suite.NoError(err)
suite.NoError(failpoint.Enable("github.com/tikv/pd/server/schedulers/persistFail", "return(true)"))
suite.NoError(failpoint.Enable("github.com/tikv/pd/server/schedule/schedulers/persistFail", "return(true)"))
suite.NoError(tu.CheckPostJSON(testDialClient, addURL, body, tu.StatusNotOK(re)))
suite.Len(rc.GetSchedulers(), 1)
resp = make(map[string]interface{})
suite.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp))
suite.Len(resp["store-id-ranges"], 1)
suite.NoError(failpoint.Disable("github.com/tikv/pd/server/schedulers/persistFail"))
suite.NoError(failpoint.Disable("github.com/tikv/pd/server/schedule/schedulers/persistFail"))
suite.NoError(tu.CheckPostJSON(testDialClient, addURL, body, tu.StatusOK(re)))
suite.Len(rc.GetSchedulers(), 1)
resp = make(map[string]interface{})
Expand Down
32 changes: 29 additions & 3 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ import (
"github.com/tikv/pd/server/replication"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/checker"
sc "github.com/tikv/pd/server/schedule/config"
"github.com/tikv/pd/server/schedule/hbstream"
"github.com/tikv/pd/server/schedule/labeler"
"github.com/tikv/pd/server/schedule/placement"
"github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/schedule/schedulers"
"github.com/tikv/pd/server/statistics"
"github.com/tikv/pd/server/statistics/buckets"
"go.etcd.io/etcd/clientv3"
Expand Down Expand Up @@ -178,7 +179,7 @@ func NewRaftCluster(ctx context.Context, clusterID uint64, regionSyncer *syncer.
}

// GetStoreConfig returns the store config.
func (c *RaftCluster) GetStoreConfig() *config.StoreConfig {
func (c *RaftCluster) GetStoreConfig() sc.StoreConfig {
return c.storeConfigManager.GetStoreConfig()
}

Expand Down Expand Up @@ -737,10 +738,35 @@ func (c *RaftCluster) SetStorage(s storage.Storage) {

// GetOpts returns cluster's configuration.
// There is no need a lock since it won't changed.
func (c *RaftCluster) GetOpts() *config.PersistOptions {
func (c *RaftCluster) GetOpts() sc.Config {
return c.opt
}

// GetScheduleConfig returns scheduling configurations.
func (c *RaftCluster) GetScheduleConfig() *config.ScheduleConfig {
return c.opt.GetScheduleConfig()
}

// SetScheduleConfig sets the PD scheduling configuration.
func (c *RaftCluster) SetScheduleConfig(cfg *config.ScheduleConfig) {
c.opt.SetScheduleConfig(cfg)
}

// GetReplicationConfig returns replication configurations.
func (c *RaftCluster) GetReplicationConfig() *config.ReplicationConfig {
return c.opt.GetReplicationConfig()
}

// GetPDServerConfig returns pd server configurations.
func (c *RaftCluster) GetPDServerConfig() *config.PDServerConfig {
return c.opt.GetPDServerConfig()
}

// SetPDServerConfig sets the PD configuration.
func (c *RaftCluster) SetPDServerConfig(cfg *config.PDServerConfig) {
c.opt.SetPDServerConfig(cfg)
}

// AddSuspectRegions adds regions to suspect list.
func (c *RaftCluster) AddSuspectRegions(regionIDs ...uint64) {
c.coordinator.checkers.AddSuspectRegions(regionIDs...)
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
"github.com/tikv/pd/server/schedule/filter"
"github.com/tikv/pd/server/schedule/labeler"
"github.com/tikv/pd/server/schedule/placement"
"github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/schedule/schedulers"
"github.com/tikv/pd/server/statistics"
)

Expand Down
6 changes: 3 additions & 3 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
"github.com/tikv/pd/server/schedule/hbstream"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/plan"
"github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/schedule/schedulers"
"github.com/tikv/pd/server/statistics"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -93,7 +93,7 @@ func newCoordinator(ctx context.Context, cluster *RaftCluster, hbStreams *hbstre
cancel: cancel,
cluster: cluster,
prepareChecker: newPrepareChecker(),
checkers: checker.NewController(ctx, cluster, cluster.ruleManager, cluster.regionLabeler, opController),
checkers: checker.NewController(ctx, cluster, cluster.opt, cluster.ruleManager, cluster.regionLabeler, opController),
regionScatterer: schedule.NewRegionScatterer(ctx, cluster, opController),
regionSplitter: schedule.NewRegionSplitter(cluster, schedule.NewSplitRegionsHandler(cluster, opController)),
schedulers: schedulers,
Expand Down Expand Up @@ -768,7 +768,7 @@ func (c *coordinator) isSchedulerDisabled(name string) (bool, error) {
return false, errs.ErrSchedulerNotFound.FastGenByArgs()
}
t := s.GetType()
scheduleConfig := c.cluster.GetOpts().GetScheduleConfig()
scheduleConfig := c.cluster.GetScheduleConfig()
for _, s := range scheduleConfig.Schedulers {
if t == s.Type {
return s.Disable, nil
Expand Down
7 changes: 3 additions & 4 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
"github.com/tikv/pd/server/schedule/hbstream"
"github.com/tikv/pd/server/schedule/labeler"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/schedule/schedulers"
"github.com/tikv/pd/server/statistics"
)

Expand Down Expand Up @@ -536,10 +536,9 @@ func TestCheckCache(t *testing.T) {
re.Len(co.checkers.GetWaitingRegions(), 1)

// cancel the replica-schedule-limit restriction
opt := tc.GetOpts()
cfg := opt.GetScheduleConfig()
cfg := tc.GetScheduleConfig()
cfg.ReplicaScheduleLimit = 10
tc.GetOpts().SetScheduleConfig(cfg)
tc.SetScheduleConfig(cfg)
co.wg.Add(1)
co.patrolRegions()
oc := co.opController
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/diagnostic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/tikv/pd/pkg/movingaverage"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/plan"
"github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/schedule/schedulers"
"go.uber.org/zap"
)

Expand Down
Loading

0 comments on commit 05228ba

Please sign in to comment.