From d53107a08b14f13379e054982051ecbcd0186ae6 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 26 Oct 2023 18:19:45 +0800 Subject: [PATCH 01/10] dynamically enable scheduling service Signed-off-by: Ryan Leung --- pkg/utils/apiutil/serverapi/middleware.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index e26327cb3ff..6d45213c2ee 100644 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -122,6 +122,9 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri // It will be helpful when matching the redirect rules "schedulers" or "schedulers/{name}" r.URL.Path = strings.TrimRight(r.URL.Path, "/") for _, rule := range h.microserviceRedirectRules { + if !h.s.IsServiceIndependent(rule.targetServiceName) { + continue + } if strings.HasPrefix(r.URL.Path, rule.matchPath) && slice.Contains(rule.matchMethods, r.Method) { if rule.filter != nil && !rule.filter(r) { continue From 8c264b420b89e8c72a768ac89b5cc1679067bb35 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 2 Nov 2023 11:15:22 +0800 Subject: [PATCH 02/10] extract scheduling control flow from cluster Signed-off-by: Ryan Leung --- tests/pdctl/hot/hot_test.go | 3 +++ tests/pdctl/scheduler/scheduler_test.go | 3 +++ tests/server/api/operator_test.go | 1 + tests/server/cluster/cluster_test.go | 2 ++ tests/server/region_syncer/region_syncer_test.go | 4 ++++ 5 files changed, 13 insertions(+) diff --git a/tests/pdctl/hot/hot_test.go b/tests/pdctl/hot/hot_test.go index 8cab8ea9ab2..4b7ab421c32 100644 --- a/tests/pdctl/hot/hot_test.go +++ b/tests/pdctl/hot/hot_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/docker/go-units" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" @@ -49,6 +50,7 @@ func TestHotTestSuite(t *testing.T) { } func (suite *hotTestSuite) TestHot() { + suite.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyServiceCheckJob", `return(true)`)) var start time.Time start = start.Add(time.Hour) opts := []tests.ConfigOption{ @@ -66,6 +68,7 @@ func (suite *hotTestSuite) TestHot() { env.RunTestInTwoModes(suite.checkHotWithoutHotPeer) env = tests.NewSchedulingTestEnvironment(suite.T(), opts...) env.RunTestInTwoModes(suite.checkHotWithStoreID) + suite.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyServiceCheckJob")) } func (suite *hotTestSuite) checkHot(cluster *tests.TestCluster) { diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index 7098637c84a..0de796bedcc 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/spf13/cobra" "github.com/stretchr/testify/require" @@ -639,6 +640,7 @@ func TestForwardSchedulerRequest(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyServiceCheckJob", `return(true)`)) cluster, err := tests.NewTestAPICluster(ctx, 1) re.NoError(err) re.NoError(cluster.RunInitialServers()) @@ -675,4 +677,5 @@ func TestForwardSchedulerRequest(t *testing.T) { checkSchedulerWithStatusCommand("paused", []string{ "balance-leader-scheduler", }) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyServiceCheckJob")) } diff --git a/tests/server/api/operator_test.go b/tests/server/api/operator_test.go index 14b8618f6a6..b6cdafb298d 100644 --- a/tests/server/api/operator_test.go +++ b/tests/server/api/operator_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 6d233a8c8ab..8edc94e3835 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -1282,6 +1282,7 @@ func TestTransferLeaderForScheduler(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyServiceCheckJob", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`)) tc, err := tests.NewTestCluster(ctx, 2) defer tc.Destroy() @@ -1364,6 +1365,7 @@ func TestTransferLeaderForScheduler(t *testing.T) { checkEvictLeaderSchedulerExist(re, schedulersController, true) checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2}) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyServiceCheckJob")) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker")) } diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index b73d4abb9b5..57b999a2fd8 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -202,6 +202,7 @@ func TestPrepareChecker(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyServiceCheckJob", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`)) cluster, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, serverName string) { conf.PDServerCfg.UseRegionStorage = true }) defer cluster.Destroy() @@ -243,6 +244,7 @@ func TestPrepareChecker(t *testing.T) { } time.Sleep(time.Second) re.True(rc.IsPrepared()) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyServiceCheckJob")) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker")) } @@ -251,6 +253,7 @@ func TestPrepareCheckerWithTransferLeader(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyServiceCheckJob", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`)) cluster, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, serverName string) { conf.PDServerCfg.UseRegionStorage = true }) defer cluster.Destroy() @@ -289,6 +292,7 @@ func TestPrepareCheckerWithTransferLeader(t *testing.T) { re.NoError(err) re.Equal("pd1", cluster.WaitLeader()) re.True(rc.IsPrepared()) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyServiceCheckJob")) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker")) } From 937fc476bdca5f229b4f614eaaea97121a039dd0 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 7 Nov 2023 14:32:10 +0800 Subject: [PATCH 03/10] fix tests Signed-off-by: Ryan Leung --- tests/pdctl/hot/hot_test.go | 3 --- tests/pdctl/scheduler/scheduler_test.go | 3 --- tests/server/api/operator_test.go | 1 - tests/server/cluster/cluster_test.go | 2 -- tests/server/region_syncer/region_syncer_test.go | 4 ---- 5 files changed, 13 deletions(-) diff --git a/tests/pdctl/hot/hot_test.go b/tests/pdctl/hot/hot_test.go index 4b7ab421c32..8cab8ea9ab2 100644 --- a/tests/pdctl/hot/hot_test.go +++ b/tests/pdctl/hot/hot_test.go @@ -22,7 +22,6 @@ import ( "time" "github.com/docker/go-units" - "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" @@ -50,7 +49,6 @@ func TestHotTestSuite(t *testing.T) { } func (suite *hotTestSuite) TestHot() { - suite.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyServiceCheckJob", `return(true)`)) var start time.Time start = start.Add(time.Hour) opts := []tests.ConfigOption{ @@ -68,7 +66,6 @@ func (suite *hotTestSuite) TestHot() { env.RunTestInTwoModes(suite.checkHotWithoutHotPeer) env = tests.NewSchedulingTestEnvironment(suite.T(), opts...) env.RunTestInTwoModes(suite.checkHotWithStoreID) - suite.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyServiceCheckJob")) } func (suite *hotTestSuite) checkHot(cluster *tests.TestCluster) { diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index 0de796bedcc..7098637c84a 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -23,7 +23,6 @@ import ( "testing" "time" - "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/spf13/cobra" "github.com/stretchr/testify/require" @@ -640,7 +639,6 @@ func TestForwardSchedulerRequest(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyServiceCheckJob", `return(true)`)) cluster, err := tests.NewTestAPICluster(ctx, 1) re.NoError(err) re.NoError(cluster.RunInitialServers()) @@ -677,5 +675,4 @@ func TestForwardSchedulerRequest(t *testing.T) { checkSchedulerWithStatusCommand("paused", []string{ "balance-leader-scheduler", }) - re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyServiceCheckJob")) } diff --git a/tests/server/api/operator_test.go b/tests/server/api/operator_test.go index b6cdafb298d..14b8618f6a6 100644 --- a/tests/server/api/operator_test.go +++ b/tests/server/api/operator_test.go @@ -24,7 +24,6 @@ import ( "testing" "time" - "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 8edc94e3835..6d233a8c8ab 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -1282,7 +1282,6 @@ func TestTransferLeaderForScheduler(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyServiceCheckJob", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`)) tc, err := tests.NewTestCluster(ctx, 2) defer tc.Destroy() @@ -1365,7 +1364,6 @@ func TestTransferLeaderForScheduler(t *testing.T) { checkEvictLeaderSchedulerExist(re, schedulersController, true) checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2}) - re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyServiceCheckJob")) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker")) } diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index 57b999a2fd8..b73d4abb9b5 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -202,7 +202,6 @@ func TestPrepareChecker(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyServiceCheckJob", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`)) cluster, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, serverName string) { conf.PDServerCfg.UseRegionStorage = true }) defer cluster.Destroy() @@ -244,7 +243,6 @@ func TestPrepareChecker(t *testing.T) { } time.Sleep(time.Second) re.True(rc.IsPrepared()) - re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyServiceCheckJob")) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker")) } @@ -253,7 +251,6 @@ func TestPrepareCheckerWithTransferLeader(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyServiceCheckJob", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`)) cluster, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, serverName string) { conf.PDServerCfg.UseRegionStorage = true }) defer cluster.Destroy() @@ -292,7 +289,6 @@ func TestPrepareCheckerWithTransferLeader(t *testing.T) { re.NoError(err) re.Equal("pd1", cluster.WaitLeader()) re.True(rc.IsPrepared()) - re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyServiceCheckJob")) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker")) } From 7f048f34eaefe32afd06c5b80216d486c3732d9e Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 6 Nov 2023 18:12:26 +0800 Subject: [PATCH 04/10] dynamic enable scheduling jobs Signed-off-by: Ryan Leung --- pkg/utils/apiutil/serverapi/middleware.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index 6d45213c2ee..e9f84a6e3d3 100644 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/server" @@ -112,7 +113,7 @@ func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string, } func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, string) { - if !h.s.IsAPIServiceMode() { + if !h.s.IsServiceEnabled(utils.SchedulingServiceName) { return false, "" } if len(h.microserviceRedirectRules) == 0 { From cf08a8b0cc2b1877f997539b6cfa8f39abc5abfd Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 14 Nov 2023 12:15:21 +0800 Subject: [PATCH 05/10] add test Signed-off-by: Ryan Leung --- pkg/utils/apiutil/serverapi/middleware.go | 4 ++-- server/cluster/scheduling_controller.go | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index e9f84a6e3d3..64d0c3ea06a 100644 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -113,7 +113,7 @@ func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string, } func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, string) { - if !h.s.IsServiceEnabled(utils.SchedulingServiceName) { + if !h.s.IsAPIServiceMode() { return false, "" } if len(h.microserviceRedirectRules) == 0 { @@ -123,7 +123,7 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri // It will be helpful when matching the redirect rules "schedulers" or "schedulers/{name}" r.URL.Path = strings.TrimRight(r.URL.Path, "/") for _, rule := range h.microserviceRedirectRules { - if !h.s.IsServiceIndependent(rule.targetServiceName) { + if rule.targetServiceName == utils.SchedulingServiceName && !h.s.IsServiceIndependent(utils.SchedulingServiceName) { continue } if strings.HasPrefix(r.URL.Path, rule.matchPath) && slice.Contains(rule.matchMethods, r.Method) { diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go index bb6470252b0..c19e88f67b5 100644 --- a/server/cluster/scheduling_controller.go +++ b/server/cluster/scheduling_controller.go @@ -76,14 +76,13 @@ func (sc *schedulingController) stopSchedulingJobs() bool { sc.mu.Lock() defer sc.mu.Unlock() if !sc.running { - return false + return } sc.coordinator.Stop() sc.cancel() sc.wg.Wait() sc.running = false log.Info("scheduling service is stopped") - return true } func (sc *schedulingController) startSchedulingJobs(cluster sche.ClusterInformer, hbstreams *hbstream.HeartbeatStreams) { From 7159ce095d9bbeb0956bce917800a2c64766aff4 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 14 Nov 2023 18:08:31 +0800 Subject: [PATCH 06/10] resolve conflicts Signed-off-by: Ryan Leung --- .../schedulers/scheduler_controller.go | 4 +- pkg/utils/apiutil/serverapi/middleware.go | 4 -- server/cluster/cluster.go | 2 +- server/cluster/scheduling_controller.go | 44 +++++++++---------- tests/testutil.go | 1 + 5 files changed, 26 insertions(+), 29 deletions(-) diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index b65173c1f5b..f2d35b2b4f4 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -70,8 +70,8 @@ func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage e // Wait waits on all schedulers to exit. func (c *Controller) Wait() { - c.Lock() - defer c.Unlock() + c.RLock() + defer c.RUnlock() c.wg.Wait() } diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index 64d0c3ea06a..e26327cb3ff 100644 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/server" @@ -123,9 +122,6 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri // It will be helpful when matching the redirect rules "schedulers" or "schedulers/{name}" r.URL.Path = strings.TrimRight(r.URL.Path, "/") for _, rule := range h.microserviceRedirectRules { - if rule.targetServiceName == utils.SchedulingServiceName && !h.s.IsServiceIndependent(utils.SchedulingServiceName) { - continue - } if strings.HasPrefix(r.URL.Path, rule.matchPath) && slice.Contains(rule.matchMethods, r.Method) { if rule.filter != nil && !rule.filter(r) { continue diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 3b826d8d33e..c63e08419d6 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -156,7 +156,7 @@ type RaftCluster struct { core *core.BasicCluster // cached cluster info opt *config.PersistOptions limiter *StoreLimiter - *schedulingController + *SchedulingController ruleManager *placement.RuleManager regionLabeler *labeler.RegionLabeler replicationMode *replication.ModeManager diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go index c19e88f67b5..eddc6ab7e09 100644 --- a/server/cluster/scheduling_controller.go +++ b/server/cluster/scheduling_controller.go @@ -113,13 +113,13 @@ func (sc *schedulingController) initCoordinatorLocked(ctx context.Context, clust } // runCoordinator runs the main scheduling loop. -func (sc *schedulingController) runCoordinator() { +func (sc *SchedulingController) runCoordinator() { defer logutil.LogPanic() defer sc.wg.Done() sc.coordinator.RunUntilStop() } -func (sc *schedulingController) runStatsBackgroundJobs() { +func (sc *SchedulingController) runStatsBackgroundJobs() { defer logutil.LogPanic() defer sc.wg.Done() @@ -141,7 +141,7 @@ func (sc *schedulingController) runStatsBackgroundJobs() { } } -func (sc *schedulingController) runSchedulingMetricsCollectionJob() { +func (sc *SchedulingController) runSchedulingMetricsCollectionJob() { defer logutil.LogPanic() defer sc.wg.Done() @@ -165,14 +165,14 @@ func (sc *schedulingController) runSchedulingMetricsCollectionJob() { } } -func (sc *schedulingController) resetSchedulingMetrics() { +func (sc *SchedulingController) resetSchedulingMetrics() { statistics.Reset() schedulers.ResetSchedulerMetrics() schedule.ResetHotSpotMetrics() sc.resetStatisticsMetrics() } -func (sc *schedulingController) collectSchedulingMetrics() { +func (sc *SchedulingController) collectSchedulingMetrics() { statsMap := statistics.NewStoreStatisticsMap(sc.opt) stores := sc.GetStores() for _, s := range stores { @@ -185,7 +185,7 @@ func (sc *schedulingController) collectSchedulingMetrics() { sc.collectStatisticsMetrics() } -func (sc *schedulingController) resetStatisticsMetrics() { +func (sc *SchedulingController) resetStatisticsMetrics() { if sc.regionStats == nil { return } @@ -195,7 +195,7 @@ func (sc *schedulingController) resetStatisticsMetrics() { sc.hotStat.ResetMetrics() } -func (sc *schedulingController) collectStatisticsMetrics() { +func (sc *SchedulingController) collectStatisticsMetrics() { if sc.regionStats == nil { return } @@ -205,33 +205,33 @@ func (sc *schedulingController) collectStatisticsMetrics() { sc.hotStat.CollectMetrics() } -func (sc *schedulingController) removeStoreStatistics(storeID uint64) { +func (sc *SchedulingController) removeStoreStatistics(storeID uint64) { sc.hotStat.RemoveRollingStoreStats(storeID) sc.slowStat.RemoveSlowStoreStatus(storeID) } -func (sc *schedulingController) updateStoreStatistics(storeID uint64, isSlow bool) { +func (sc *SchedulingController) updateStoreStatistics(storeID uint64, isSlow bool) { sc.hotStat.GetOrCreateRollingStoreStats(storeID) sc.slowStat.ObserveSlowStoreStatus(storeID, isSlow) } // GetHotStat gets hot stat. -func (sc *schedulingController) GetHotStat() *statistics.HotStat { +func (sc *SchedulingController) GetHotStat() *statistics.HotStat { return sc.hotStat } // GetRegionStats gets region statistics. -func (sc *schedulingController) GetRegionStats() *statistics.RegionStatistics { +func (sc *SchedulingController) GetRegionStats() *statistics.RegionStatistics { return sc.regionStats } // GetLabelStats gets label statistics. -func (sc *schedulingController) GetLabelStats() *statistics.LabelStatistics { +func (sc *SchedulingController) GetLabelStats() *statistics.LabelStatistics { return sc.labelStats } // GetRegionStatsByType gets the status of the region by types. -func (sc *schedulingController) GetRegionStatsByType(typ statistics.RegionStatisticType) []*core.RegionInfo { +func (sc *SchedulingController) GetRegionStatsByType(typ statistics.RegionStatisticType) []*core.RegionInfo { if sc.regionStats == nil { return nil } @@ -239,13 +239,13 @@ func (sc *schedulingController) GetRegionStatsByType(typ statistics.RegionStatis } // UpdateRegionsLabelLevelStats updates the status of the region label level by types. -func (sc *schedulingController) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { +func (sc *SchedulingController) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { for _, region := range regions { sc.labelStats.Observe(region, sc.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), sc.opt.GetLocationLabels()) } } -func (sc *schedulingController) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo { +func (sc *SchedulingController) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo { stores := make([]*core.StoreInfo, 0, len(region.GetPeers())) for _, p := range region.GetPeers() { if store := sc.GetStore(p.StoreId); store != nil && !core.IsStoreContainLabel(store.GetMeta(), key, value) { @@ -257,29 +257,29 @@ func (sc *schedulingController) getStoresWithoutLabelLocked(region *core.RegionI // GetStoresStats returns stores' statistics from cluster. // And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat -func (sc *schedulingController) GetStoresStats() *statistics.StoresStats { +func (sc *SchedulingController) GetStoresStats() *statistics.StoresStats { return sc.hotStat.StoresStats } // GetStoresLoads returns load stats of all stores. -func (sc *schedulingController) GetStoresLoads() map[uint64][]float64 { +func (sc *SchedulingController) GetStoresLoads() map[uint64][]float64 { return sc.hotStat.GetStoresLoads() } // IsRegionHot checks if a region is in hot state. -func (sc *schedulingController) IsRegionHot(region *core.RegionInfo) bool { +func (sc *SchedulingController) IsRegionHot(region *core.RegionInfo) bool { return sc.hotStat.IsRegionHot(region, sc.opt.GetHotRegionCacheHitsThreshold()) } // GetHotPeerStat returns hot peer stat with specified regionID and storeID. -func (sc *schedulingController) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *statistics.HotPeerStat { +func (sc *SchedulingController) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *statistics.HotPeerStat { return sc.hotStat.GetHotPeerStat(rw, regionID, storeID) } // RegionReadStats returns hot region's read stats. // The result only includes peers that are hot enough. // RegionStats is a thread-safe method -func (sc *schedulingController) RegionReadStats() map[uint64][]*statistics.HotPeerStat { +func (sc *SchedulingController) RegionReadStats() map[uint64][]*statistics.HotPeerStat { // As read stats are reported by store heartbeat, the threshold needs to be adjusted. threshold := sc.opt.GetHotRegionCacheHitsThreshold() * (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) @@ -288,13 +288,13 @@ func (sc *schedulingController) RegionReadStats() map[uint64][]*statistics.HotPe // RegionWriteStats returns hot region's write stats. // The result only includes peers that are hot enough. -func (sc *schedulingController) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { +func (sc *SchedulingController) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { // RegionStats is a thread-safe method return sc.hotStat.RegionStats(utils.Write, sc.opt.GetHotRegionCacheHitsThreshold()) } // BucketsStats returns hot region's buckets stats. -func (sc *schedulingController) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*buckets.BucketStat { +func (sc *SchedulingController) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*buckets.BucketStat { return sc.hotStat.BucketsStats(degree, regionIDs...) } diff --git a/tests/testutil.go b/tests/testutil.go index 2ccf6fb76be..d4de26914fd 100644 --- a/tests/testutil.go +++ b/tests/testutil.go @@ -311,6 +311,7 @@ func (s *SchedulingTestEnvironment) startCluster(m mode) { // start scheduling cluster tc, err := NewTestSchedulingCluster(s.ctx, 1, leaderServer.GetAddr()) re.NoError(err) + re.NoError(leaderServer.BootstrapCluster()) tc.WaitForPrimaryServing(re) s.cluster.SetSchedulingCluster(tc) time.Sleep(200 * time.Millisecond) // wait for scheduling cluster to update member From 2a1c103ed5bef4f123d3d6657f7eadd9b4fdc3fc Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 15 Nov 2023 11:51:23 +0800 Subject: [PATCH 07/10] address the comment Signed-off-by: Ryan Leung --- pkg/schedule/schedulers/scheduler_controller.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index f2d35b2b4f4..b65173c1f5b 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -70,8 +70,8 @@ func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage e // Wait waits on all schedulers to exit. func (c *Controller) Wait() { - c.RLock() - defer c.RUnlock() + c.Lock() + defer c.Unlock() c.wg.Wait() } From 1ee573d24d54955d4b255d3de03af4444963f777 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 15 Nov 2023 14:34:47 +0800 Subject: [PATCH 08/10] clean up handling metrics process Signed-off-by: Ryan Leung --- pkg/mcs/scheduling/server/cluster.go | 16 ---------------- pkg/statistics/hot_cache.go | 4 ++-- pkg/statistics/region_collection.go | 8 ++++---- pkg/statistics/store_collection.go | 3 +++ server/cluster/cluster_test.go | 14 ++++++++++---- server/cluster/scheduling_controller.go | 19 ++++--------------- 6 files changed, 23 insertions(+), 41 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 96ee88259da..59b2e691658 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -485,10 +485,6 @@ func (c *Cluster) collectMetrics() { c.coordinator.GetSchedulersController().CollectSchedulerMetrics() c.coordinator.CollectHotSpotMetrics() - c.collectClusterMetrics() -} - -func (c *Cluster) collectClusterMetrics() { if c.regionStats == nil { return } @@ -500,20 +496,8 @@ func (c *Cluster) collectClusterMetrics() { func (c *Cluster) resetMetrics() { statistics.Reset() - schedulers.ResetSchedulerMetrics() schedule.ResetHotSpotMetrics() - c.resetClusterMetrics() -} - -func (c *Cluster) resetClusterMetrics() { - if c.regionStats == nil { - return - } - c.regionStats.Reset() - c.labelStats.Reset() - // reset hot cache metrics - c.hotStat.ResetMetrics() } // StartBackgroundJobs starts background jobs. diff --git a/pkg/statistics/hot_cache.go b/pkg/statistics/hot_cache.go index de7189a1332..1868e323b0f 100644 --- a/pkg/statistics/hot_cache.go +++ b/pkg/statistics/hot_cache.go @@ -125,8 +125,8 @@ func (w *HotCache) CollectMetrics() { w.CheckReadAsync(newCollectMetricsTask()) } -// ResetMetrics resets the hot cache metrics. -func (w *HotCache) ResetMetrics() { +// ResetHotCacheStatusMetrics resets the hot cache metrics. +func ResetHotCacheStatusMetrics() { hotCacheStatusGauge.Reset() } diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go index 26cbea9ef92..21af8e152fd 100644 --- a/pkg/statistics/region_collection.go +++ b/pkg/statistics/region_collection.go @@ -272,8 +272,8 @@ func (r *RegionStatistics) Collect() { regionWitnessLeaderRegionCounter.Set(float64(len(r.stats[WitnessLeader]))) } -// Reset resets the metrics of the regions' status. -func (r *RegionStatistics) Reset() { +// ResetRegionStatsMetrics resets the metrics of the regions' status. +func ResetRegionStatsMetrics() { regionMissPeerRegionCounter.Set(0) regionExtraPeerRegionCounter.Set(0) regionDownPeerRegionCounter.Set(0) @@ -326,8 +326,8 @@ func (l *LabelStatistics) Collect() { } } -// Reset resets the metrics of the label status. -func (l *LabelStatistics) Reset() { +// ResetLabelStatsMetrics resets the metrics of the label status. +func ResetLabelStatsMetrics() { regionLabelLevelGauge.Reset() } diff --git a/pkg/statistics/store_collection.go b/pkg/statistics/store_collection.go index dcdd77d9112..aacd45338d1 100644 --- a/pkg/statistics/store_collection.go +++ b/pkg/statistics/store_collection.go @@ -322,4 +322,7 @@ func Reset() { storeStatusGauge.Reset() clusterStatusGauge.Reset() placementStatusGauge.Reset() + ResetRegionStatsMetrics() + ResetLabelStatsMetrics() + ResetHotCacheStatusMetrics() } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index e9ce35dfb54..67a998f8d03 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2499,14 +2499,17 @@ func TestCollectMetricsConcurrent(t *testing.T) { }(i) } controller := co.GetSchedulersController() + rc := co.GetCluster().(*RaftCluster) + rc.SchedulingController = NewSchedulingController(rc.serverCtx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager()) + rc.SchedulingController.coordinator = co for i := 0; i < 1000; i++ { co.CollectHotSpotMetrics() controller.CollectSchedulerMetrics() - co.GetCluster().(*RaftCluster).collectStatisticsMetrics() + rc.collectSchedulingMetrics() } schedule.ResetHotSpotMetrics() schedulers.ResetSchedulerMetrics() - co.GetCluster().(*RaftCluster).resetStatisticsMetrics() + rc.resetSchedulingMetrics() wg.Wait() } @@ -2534,10 +2537,13 @@ func TestCollectMetrics(t *testing.T) { } } controller := co.GetSchedulersController() + rc := co.GetCluster().(*RaftCluster) + rc.SchedulingController = NewSchedulingController(rc.serverCtx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager()) + rc.SchedulingController.coordinator = co for i := 0; i < 1000; i++ { co.CollectHotSpotMetrics() controller.CollectSchedulerMetrics() - co.GetCluster().(*RaftCluster).collectStatisticsMetrics() + rc.collectSchedulingMetrics() } stores := co.GetCluster().GetStores() regionStats := co.GetCluster().RegionWriteStats() @@ -2552,7 +2558,7 @@ func TestCollectMetrics(t *testing.T) { re.Equal(status1, status2) schedule.ResetHotSpotMetrics() schedulers.ResetSchedulerMetrics() - co.GetCluster().(*RaftCluster).resetStatisticsMetrics() + rc.resetSchedulingMetrics() } func prepare(setCfg func(*sc.ScheduleConfig), setTc func(*testCluster), run func(*schedule.Coordinator), re *require.Assertions) (*testCluster, *schedule.Coordinator, func()) { diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go index eddc6ab7e09..7c780662d4f 100644 --- a/server/cluster/scheduling_controller.go +++ b/server/cluster/scheduling_controller.go @@ -169,7 +169,10 @@ func (sc *SchedulingController) resetSchedulingMetrics() { statistics.Reset() schedulers.ResetSchedulerMetrics() schedule.ResetHotSpotMetrics() - sc.resetStatisticsMetrics() + statistics.ResetRegionStatsMetrics() + statistics.ResetLabelStatsMetrics() + // reset hot cache metrics + statistics.ResetHotCacheStatusMetrics() } func (sc *SchedulingController) collectSchedulingMetrics() { @@ -182,20 +185,6 @@ func (sc *SchedulingController) collectSchedulingMetrics() { statsMap.Collect() sc.coordinator.GetSchedulersController().CollectSchedulerMetrics() sc.coordinator.CollectHotSpotMetrics() - sc.collectStatisticsMetrics() -} - -func (sc *SchedulingController) resetStatisticsMetrics() { - if sc.regionStats == nil { - return - } - sc.regionStats.Reset() - sc.labelStats.Reset() - // reset hot cache metrics - sc.hotStat.ResetMetrics() -} - -func (sc *SchedulingController) collectStatisticsMetrics() { if sc.regionStats == nil { return } From 09360e54994be67963eb3850362aa37b760adfe1 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 20 Nov 2023 14:19:52 +0800 Subject: [PATCH 09/10] resolve conflicts Signed-off-by: Ryan Leung --- server/cluster/cluster.go | 2 +- server/cluster/cluster_test.go | 19 +++++----- server/cluster/scheduling_controller.go | 48 ++++++++++++++----------- tests/testutil.go | 1 - 4 files changed, 38 insertions(+), 32 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index c63e08419d6..3b826d8d33e 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -156,7 +156,7 @@ type RaftCluster struct { core *core.BasicCluster // cached cluster info opt *config.PersistOptions limiter *StoreLimiter - *SchedulingController + *schedulingController ruleManager *placement.RuleManager regionLabeler *labeler.RegionLabeler replicationMode *replication.ModeManager diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 67a998f8d03..b1a3535e90f 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2485,7 +2485,10 @@ func TestCollectMetricsConcurrent(t *testing.T) { nil) }, func(co *schedule.Coordinator) { co.Run() }, re) defer cleanup() - + rc := co.GetCluster().(*RaftCluster) + rc.schedulingController = newSchedulingController(rc.serverCtx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager()) + rc.schedulingController.coordinator = co + controller := co.GetSchedulersController() // Make sure there are no problem when concurrent write and read var wg sync.WaitGroup count := 10 @@ -2498,10 +2501,6 @@ func TestCollectMetricsConcurrent(t *testing.T) { } }(i) } - controller := co.GetSchedulersController() - rc := co.GetCluster().(*RaftCluster) - rc.SchedulingController = NewSchedulingController(rc.serverCtx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager()) - rc.SchedulingController.coordinator = co for i := 0; i < 1000; i++ { co.CollectHotSpotMetrics() controller.CollectSchedulerMetrics() @@ -2523,6 +2522,11 @@ func TestCollectMetrics(t *testing.T) { nil) }, func(co *schedule.Coordinator) { co.Run() }, re) defer cleanup() + + rc := co.GetCluster().(*RaftCluster) + rc.schedulingController = newSchedulingController(rc.serverCtx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager()) + rc.schedulingController.coordinator = co + controller := co.GetSchedulersController() count := 10 for i := 0; i <= count; i++ { for k := 0; k < 200; k++ { @@ -2536,10 +2540,7 @@ func TestCollectMetrics(t *testing.T) { tc.hotStat.HotCache.Update(item, utils.Write) } } - controller := co.GetSchedulersController() - rc := co.GetCluster().(*RaftCluster) - rc.SchedulingController = NewSchedulingController(rc.serverCtx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager()) - rc.SchedulingController.coordinator = co + for i := 0; i < 1000; i++ { co.CollectHotSpotMetrics() controller.CollectSchedulerMetrics() diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go index 7c780662d4f..9cb6515e8c6 100644 --- a/server/cluster/scheduling_controller.go +++ b/server/cluster/scheduling_controller.go @@ -76,13 +76,14 @@ func (sc *schedulingController) stopSchedulingJobs() bool { sc.mu.Lock() defer sc.mu.Unlock() if !sc.running { - return + return false } sc.coordinator.Stop() sc.cancel() sc.wg.Wait() sc.running = false log.Info("scheduling service is stopped") + return true } func (sc *schedulingController) startSchedulingJobs(cluster sche.ClusterInformer, hbstreams *hbstream.HeartbeatStreams) { @@ -113,13 +114,18 @@ func (sc *schedulingController) initCoordinatorLocked(ctx context.Context, clust } // runCoordinator runs the main scheduling loop. -func (sc *SchedulingController) runCoordinator() { +func (sc *schedulingController) runCoordinator() { defer logutil.LogPanic() defer sc.wg.Done() + select { + case <-sc.ctx.Done(): + return + default: + } sc.coordinator.RunUntilStop() } -func (sc *SchedulingController) runStatsBackgroundJobs() { +func (sc *schedulingController) runStatsBackgroundJobs() { defer logutil.LogPanic() defer sc.wg.Done() @@ -141,7 +147,7 @@ func (sc *SchedulingController) runStatsBackgroundJobs() { } } -func (sc *SchedulingController) runSchedulingMetricsCollectionJob() { +func (sc *schedulingController) runSchedulingMetricsCollectionJob() { defer logutil.LogPanic() defer sc.wg.Done() @@ -165,7 +171,7 @@ func (sc *SchedulingController) runSchedulingMetricsCollectionJob() { } } -func (sc *SchedulingController) resetSchedulingMetrics() { +func (sc *schedulingController) resetSchedulingMetrics() { statistics.Reset() schedulers.ResetSchedulerMetrics() schedule.ResetHotSpotMetrics() @@ -175,7 +181,7 @@ func (sc *SchedulingController) resetSchedulingMetrics() { statistics.ResetHotCacheStatusMetrics() } -func (sc *SchedulingController) collectSchedulingMetrics() { +func (sc *schedulingController) collectSchedulingMetrics() { statsMap := statistics.NewStoreStatisticsMap(sc.opt) stores := sc.GetStores() for _, s := range stores { @@ -194,33 +200,33 @@ func (sc *SchedulingController) collectSchedulingMetrics() { sc.hotStat.CollectMetrics() } -func (sc *SchedulingController) removeStoreStatistics(storeID uint64) { +func (sc *schedulingController) removeStoreStatistics(storeID uint64) { sc.hotStat.RemoveRollingStoreStats(storeID) sc.slowStat.RemoveSlowStoreStatus(storeID) } -func (sc *SchedulingController) updateStoreStatistics(storeID uint64, isSlow bool) { +func (sc *schedulingController) updateStoreStatistics(storeID uint64, isSlow bool) { sc.hotStat.GetOrCreateRollingStoreStats(storeID) sc.slowStat.ObserveSlowStoreStatus(storeID, isSlow) } // GetHotStat gets hot stat. -func (sc *SchedulingController) GetHotStat() *statistics.HotStat { +func (sc *schedulingController) GetHotStat() *statistics.HotStat { return sc.hotStat } // GetRegionStats gets region statistics. -func (sc *SchedulingController) GetRegionStats() *statistics.RegionStatistics { +func (sc *schedulingController) GetRegionStats() *statistics.RegionStatistics { return sc.regionStats } // GetLabelStats gets label statistics. -func (sc *SchedulingController) GetLabelStats() *statistics.LabelStatistics { +func (sc *schedulingController) GetLabelStats() *statistics.LabelStatistics { return sc.labelStats } // GetRegionStatsByType gets the status of the region by types. -func (sc *SchedulingController) GetRegionStatsByType(typ statistics.RegionStatisticType) []*core.RegionInfo { +func (sc *schedulingController) GetRegionStatsByType(typ statistics.RegionStatisticType) []*core.RegionInfo { if sc.regionStats == nil { return nil } @@ -228,13 +234,13 @@ func (sc *SchedulingController) GetRegionStatsByType(typ statistics.RegionStatis } // UpdateRegionsLabelLevelStats updates the status of the region label level by types. -func (sc *SchedulingController) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { +func (sc *schedulingController) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { for _, region := range regions { sc.labelStats.Observe(region, sc.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), sc.opt.GetLocationLabels()) } } -func (sc *SchedulingController) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo { +func (sc *schedulingController) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo { stores := make([]*core.StoreInfo, 0, len(region.GetPeers())) for _, p := range region.GetPeers() { if store := sc.GetStore(p.StoreId); store != nil && !core.IsStoreContainLabel(store.GetMeta(), key, value) { @@ -246,29 +252,29 @@ func (sc *SchedulingController) getStoresWithoutLabelLocked(region *core.RegionI // GetStoresStats returns stores' statistics from cluster. // And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat -func (sc *SchedulingController) GetStoresStats() *statistics.StoresStats { +func (sc *schedulingController) GetStoresStats() *statistics.StoresStats { return sc.hotStat.StoresStats } // GetStoresLoads returns load stats of all stores. -func (sc *SchedulingController) GetStoresLoads() map[uint64][]float64 { +func (sc *schedulingController) GetStoresLoads() map[uint64][]float64 { return sc.hotStat.GetStoresLoads() } // IsRegionHot checks if a region is in hot state. -func (sc *SchedulingController) IsRegionHot(region *core.RegionInfo) bool { +func (sc *schedulingController) IsRegionHot(region *core.RegionInfo) bool { return sc.hotStat.IsRegionHot(region, sc.opt.GetHotRegionCacheHitsThreshold()) } // GetHotPeerStat returns hot peer stat with specified regionID and storeID. -func (sc *SchedulingController) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *statistics.HotPeerStat { +func (sc *schedulingController) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *statistics.HotPeerStat { return sc.hotStat.GetHotPeerStat(rw, regionID, storeID) } // RegionReadStats returns hot region's read stats. // The result only includes peers that are hot enough. // RegionStats is a thread-safe method -func (sc *SchedulingController) RegionReadStats() map[uint64][]*statistics.HotPeerStat { +func (sc *schedulingController) RegionReadStats() map[uint64][]*statistics.HotPeerStat { // As read stats are reported by store heartbeat, the threshold needs to be adjusted. threshold := sc.opt.GetHotRegionCacheHitsThreshold() * (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) @@ -277,13 +283,13 @@ func (sc *SchedulingController) RegionReadStats() map[uint64][]*statistics.HotPe // RegionWriteStats returns hot region's write stats. // The result only includes peers that are hot enough. -func (sc *SchedulingController) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { +func (sc *schedulingController) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { // RegionStats is a thread-safe method return sc.hotStat.RegionStats(utils.Write, sc.opt.GetHotRegionCacheHitsThreshold()) } // BucketsStats returns hot region's buckets stats. -func (sc *SchedulingController) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*buckets.BucketStat { +func (sc *schedulingController) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*buckets.BucketStat { return sc.hotStat.BucketsStats(degree, regionIDs...) } diff --git a/tests/testutil.go b/tests/testutil.go index d4de26914fd..2ccf6fb76be 100644 --- a/tests/testutil.go +++ b/tests/testutil.go @@ -311,7 +311,6 @@ func (s *SchedulingTestEnvironment) startCluster(m mode) { // start scheduling cluster tc, err := NewTestSchedulingCluster(s.ctx, 1, leaderServer.GetAddr()) re.NoError(err) - re.NoError(leaderServer.BootstrapCluster()) tc.WaitForPrimaryServing(re) s.cluster.SetSchedulingCluster(tc) time.Sleep(200 * time.Millisecond) // wait for scheduling cluster to update member From cabf1b8cb6edfcd90ded65ce3561fb38406c68d5 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 21 Nov 2023 11:44:58 +0800 Subject: [PATCH 10/10] fix the test Signed-off-by: Ryan Leung --- server/cluster/cluster.go | 4 ++-- server/cluster/scheduling_controller.go | 7 +------ tests/server/cluster/cluster_test.go | 2 -- 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 3b826d8d33e..e9cece0faa7 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -654,7 +654,7 @@ func (c *RaftCluster) runMetricsCollectionJob() { ticker := time.NewTicker(metricsCollectionJobInterval) failpoint.Inject("highFrequencyClusterJobs", func() { ticker.Stop() - ticker = time.NewTicker(time.Microsecond) + ticker = time.NewTicker(time.Millisecond) }) defer ticker.Stop() @@ -734,10 +734,10 @@ func (c *RaftCluster) Stop() { return } c.running = false + c.cancel() if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { c.stopSchedulingJobs() } - c.cancel() c.Unlock() c.wg.Wait() diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go index 9cb6515e8c6..04c77498948 100644 --- a/server/cluster/scheduling_controller.go +++ b/server/cluster/scheduling_controller.go @@ -117,11 +117,6 @@ func (sc *schedulingController) initCoordinatorLocked(ctx context.Context, clust func (sc *schedulingController) runCoordinator() { defer logutil.LogPanic() defer sc.wg.Done() - select { - case <-sc.ctx.Done(): - return - default: - } sc.coordinator.RunUntilStop() } @@ -154,7 +149,7 @@ func (sc *schedulingController) runSchedulingMetricsCollectionJob() { ticker := time.NewTicker(metricsCollectionJobInterval) failpoint.Inject("highFrequencyClusterJobs", func() { ticker.Stop() - ticker = time.NewTicker(time.Microsecond) + ticker = time.NewTicker(time.Millisecond) }) defer ticker.Stop() diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 6d233a8c8ab..a5861f1ba43 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -518,8 +518,6 @@ func TestRaftClusterMultipleRestart(t *testing.T) { err = rc.Start(leaderServer.GetServer()) re.NoError(err) time.Sleep(time.Millisecond) - rc = leaderServer.GetRaftCluster() - re.NotNil(rc) rc.Stop() } re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs"))