From ebc342cb4eb548636a0de337dd2000a69e77c277 Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Tue, 6 Feb 2024 11:49:13 +0800 Subject: [PATCH] This is an automated cherry-pick of #7748 close tikv/pd#7728 Signed-off-by: ti-chi-bot --- pkg/cluster/cluster.go | 62 +++ pkg/core/region.go | 20 +- pkg/core/region_test.go | 2 +- pkg/mcs/scheduling/server/cluster.go | 605 ++++++++++++++++++++++ pkg/mcs/scheduling/server/grpc_service.go | 367 +++++++++++++ pkg/statistics/region_collection.go | 32 +- server/api/region_test.go | 15 + server/cluster/cluster.go | 9 +- server/region_syncer/client.go | 2 +- tests/server/cluster/cluster_test.go | 94 ++++ 10 files changed, 1191 insertions(+), 17 deletions(-) create mode 100644 pkg/cluster/cluster.go create mode 100644 pkg/mcs/scheduling/server/cluster.go create mode 100644 pkg/mcs/scheduling/server/grpc_service.go diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go new file mode 100644 index 000000000000..916200bfa3ec --- /dev/null +++ b/pkg/cluster/cluster.go @@ -0,0 +1,62 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cluster + +import ( + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/schedule" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/statistics" +) + +// Cluster provides an overview of a cluster's basic information. +type Cluster interface { + GetHotStat() *statistics.HotStat + GetRegionStats() *statistics.RegionStatistics + GetLabelStats() *statistics.LabelStatistics + GetCoordinator() *schedule.Coordinator + GetRuleManager() *placement.RuleManager +} + +// HandleStatsAsync handles the flow asynchronously. +func HandleStatsAsync(c Cluster, region *core.RegionInfo) { + c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) + c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + for _, peer := range region.GetPeers() { + peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval) + c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region)) + } + c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region) +} + +// HandleOverlaps handles the overlap regions. +func HandleOverlaps(c Cluster, overlaps []*core.RegionInfo) { + for _, item := range overlaps { + if c.GetRegionStats() != nil { + c.GetRegionStats().ClearDefunctRegion(item.GetID()) + } + c.GetLabelStats().ClearDefunctRegion(item.GetID()) + c.GetRuleManager().InvalidCache(item.GetID()) + } +} + +// Collect collects the cluster information. +func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRegionStats bool) { + if hasRegionStats { + c.GetRegionStats().Observe(region, stores) + } +} diff --git a/pkg/core/region.go b/pkg/core/region.go index cfe24ba02137..990116a638af 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -95,6 +95,12 @@ func (r *RegionInfo) LoadedFromStorage() bool { return r.source == Storage } +// LoadedFromSync means this region's meta info loaded from region syncer. +// Only used for test. +func (r *RegionInfo) LoadedFromSync() bool { + return r.source == Sync +} + // NewRegionInfo creates RegionInfo with region's meta and leader peer. func NewRegionInfo(region *metapb.Region, leader *metapb.Peer, opts ...RegionCreateOption) *RegionInfo { regionInfo := &RegionInfo{ @@ -668,7 +674,7 @@ func (r *RegionInfo) isRegionRecreated() bool { // RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin // and new region information. -type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) +type RegionGuideFunc func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool) // GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function. // nil means do not print the log. @@ -681,19 +687,15 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } // Save to storage if meta is updated. // Save to cache if meta or leader is updated, or contains any down/pending peer. - // Mark isNew if the region in cache does not have leader. - return func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) { + return func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool) { if origin == nil { if log.GetLevel() <= zap.DebugLevel { debug("insert new region", zap.Uint64("region-id", region.GetID()), logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta()))) } - saveKV, saveCache, isNew = true, true, true + saveKV, saveCache = true, true } else { - if origin.LoadedFromStorage() { - isNew = true - } r := region.GetRegionEpoch() o := origin.GetRegionEpoch() if r.GetVersion() > o.GetVersion() { @@ -719,9 +721,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { saveKV, saveCache = true, true } if region.GetLeader().GetId() != origin.GetLeader().GetId() { - if origin.GetLeader().GetId() == 0 { - isNew = true - } else if log.GetLevel() <= zap.InfoLevel { + if origin.GetLeader().GetId() != 0 && log.GetLevel() <= zap.InfoLevel { info("leader changed", zap.Uint64("region-id", region.GetID()), zap.Uint64("from", origin.GetLeader().GetStoreId()), diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index 1624697e8939..f857970c71e9 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -362,7 +362,7 @@ func TestNeedSync(t *testing.T) { for _, testCase := range testCases { regionA := region.Clone(testCase.optionsA...) regionB := region.Clone(testCase.optionsB...) - _, _, _, needSync := RegionGuide(regionA, regionB) + _, _, needSync := RegionGuide(regionA, regionB) re.Equal(testCase.needSync, needSync) } } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go new file mode 100644 index 000000000000..9e75057621ec --- /dev/null +++ b/pkg/mcs/scheduling/server/cluster.go @@ -0,0 +1,605 @@ +package server + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/kvproto/pkg/schedulingpb" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/cluster" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/scheduling/server/config" + "github.com/tikv/pd/pkg/schedule" + sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/schedule/hbstream" + "github.com/tikv/pd/pkg/schedule/labeler" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/schedule/scatter" + "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/schedule/splitter" + "github.com/tikv/pd/pkg/slice" + "github.com/tikv/pd/pkg/statistics" + "github.com/tikv/pd/pkg/statistics/buckets" + "github.com/tikv/pd/pkg/statistics/utils" + "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/utils/logutil" + "go.uber.org/zap" +) + +// Cluster is used to manage all information for scheduling purpose. +type Cluster struct { + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + *core.BasicCluster + persistConfig *config.PersistConfig + ruleManager *placement.RuleManager + labelerManager *labeler.RegionLabeler + regionStats *statistics.RegionStatistics + labelStats *statistics.LabelStatistics + hotStat *statistics.HotStat + storage storage.Storage + coordinator *schedule.Coordinator + checkMembershipCh chan struct{} + apiServerLeader atomic.Value + clusterID uint64 + running atomic.Bool +} + +const ( + regionLabelGCInterval = time.Hour + requestTimeout = 3 * time.Second + collectWaitTime = time.Minute +) + +// NewCluster creates a new cluster. +func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, storage storage.Storage, basicCluster *core.BasicCluster, hbStreams *hbstream.HeartbeatStreams, clusterID uint64, checkMembershipCh chan struct{}) (*Cluster, error) { + ctx, cancel := context.WithCancel(parentCtx) + labelerManager, err := labeler.NewRegionLabeler(ctx, storage, regionLabelGCInterval) + if err != nil { + cancel() + return nil, err + } + ruleManager := placement.NewRuleManager(ctx, storage, basicCluster, persistConfig) + c := &Cluster{ + ctx: ctx, + cancel: cancel, + BasicCluster: basicCluster, + ruleManager: ruleManager, + labelerManager: labelerManager, + persistConfig: persistConfig, + hotStat: statistics.NewHotStat(ctx), + labelStats: statistics.NewLabelStatistics(), + regionStats: statistics.NewRegionStatistics(basicCluster, persistConfig, ruleManager), + storage: storage, + clusterID: clusterID, + checkMembershipCh: checkMembershipCh, + } + c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) + err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel()) + if err != nil { + cancel() + return nil, err + } + return c, nil +} + +// GetCoordinator returns the coordinator +func (c *Cluster) GetCoordinator() *schedule.Coordinator { + return c.coordinator +} + +// GetHotStat gets hot stat. +func (c *Cluster) GetHotStat() *statistics.HotStat { + return c.hotStat +} + +// GetStoresStats returns stores' statistics from cluster. +// And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat +func (c *Cluster) GetStoresStats() *statistics.StoresStats { + return c.hotStat.StoresStats +} + +// GetRegionStats gets region statistics. +func (c *Cluster) GetRegionStats() *statistics.RegionStatistics { + return c.regionStats +} + +// GetLabelStats gets label statistics. +func (c *Cluster) GetLabelStats() *statistics.LabelStatistics { + return c.labelStats +} + +// GetBasicCluster returns the basic cluster. +func (c *Cluster) GetBasicCluster() *core.BasicCluster { + return c.BasicCluster +} + +// GetSharedConfig returns the shared config. +func (c *Cluster) GetSharedConfig() sc.SharedConfigProvider { + return c.persistConfig +} + +// GetRuleManager returns the rule manager. +func (c *Cluster) GetRuleManager() *placement.RuleManager { + return c.ruleManager +} + +// GetRegionLabeler returns the region labeler. +func (c *Cluster) GetRegionLabeler() *labeler.RegionLabeler { + return c.labelerManager +} + +// GetRegionSplitter returns the region splitter. +func (c *Cluster) GetRegionSplitter() *splitter.RegionSplitter { + return c.coordinator.GetRegionSplitter() +} + +// GetRegionScatterer returns the region scatter. +func (c *Cluster) GetRegionScatterer() *scatter.RegionScatterer { + return c.coordinator.GetRegionScatterer() +} + +// GetStoresLoads returns load stats of all stores. +func (c *Cluster) GetStoresLoads() map[uint64][]float64 { + return c.hotStat.GetStoresLoads() +} + +// IsRegionHot checks if a region is in hot state. +func (c *Cluster) IsRegionHot(region *core.RegionInfo) bool { + return c.hotStat.IsRegionHot(region, c.persistConfig.GetHotRegionCacheHitsThreshold()) +} + +// GetHotPeerStat returns hot peer stat with specified regionID and storeID. +func (c *Cluster) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *statistics.HotPeerStat { + return c.hotStat.GetHotPeerStat(rw, regionID, storeID) +} + +// RegionReadStats returns hot region's read stats. +// The result only includes peers that are hot enough. +// RegionStats is a thread-safe method +func (c *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat { + // As read stats are reported by store heartbeat, the threshold needs to be adjusted. + threshold := c.persistConfig.GetHotRegionCacheHitsThreshold() * + (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) + return c.hotStat.RegionStats(utils.Read, threshold) +} + +// RegionWriteStats returns hot region's write stats. +// The result only includes peers that are hot enough. +func (c *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { + // RegionStats is a thread-safe method + return c.hotStat.RegionStats(utils.Write, c.persistConfig.GetHotRegionCacheHitsThreshold()) +} + +// BucketsStats returns hot region's buckets stats. +func (c *Cluster) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*buckets.BucketStat { + return c.hotStat.BucketsStats(degree, regionIDs...) +} + +// GetStorage returns the storage. +func (c *Cluster) GetStorage() storage.Storage { + return c.storage +} + +// GetCheckerConfig returns the checker config. +func (c *Cluster) GetCheckerConfig() sc.CheckerConfigProvider { return c.persistConfig } + +// GetSchedulerConfig returns the scheduler config. +func (c *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider { return c.persistConfig } + +// GetStoreConfig returns the store config. +func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConfig } + +// AllocID allocates a new ID. +func (c *Cluster) AllocID() (uint64, error) { + client, err := c.getAPIServerLeaderClient() + if err != nil { + return 0, err + } + ctx, cancel := context.WithTimeout(c.ctx, requestTimeout) + defer cancel() + resp, err := client.AllocID(ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}}) + if err != nil { + c.triggerMembershipCheck() + return 0, err + } + return resp.GetId(), nil +} + +func (c *Cluster) getAPIServerLeaderClient() (pdpb.PDClient, error) { + cli := c.apiServerLeader.Load() + if cli == nil { + c.triggerMembershipCheck() + return nil, errors.New("API server leader is not found") + } + return cli.(pdpb.PDClient), nil +} + +func (c *Cluster) triggerMembershipCheck() { + select { + case c.checkMembershipCh <- struct{}{}: + default: // avoid blocking + } +} + +// SwitchAPIServerLeader switches the API server leader. +func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool { + old := c.apiServerLeader.Load() + return c.apiServerLeader.CompareAndSwap(old, new) +} + +func trySend(notifier chan struct{}) { + select { + case notifier <- struct{}{}: + // If the channel is not empty, it means the check is triggered. + default: + } +} + +// updateScheduler listens on the schedulers updating notifier and manage the scheduler creation and deletion. +func (c *Cluster) updateScheduler() { + defer logutil.LogPanic() + defer c.wg.Done() + + // Make sure the coordinator has initialized all the existing schedulers. + c.waitSchedulersInitialized() + // Establish a notifier to listen the schedulers updating. + notifier := make(chan struct{}, 1) + // Make sure the check will be triggered once later. + trySend(notifier) + c.persistConfig.SetSchedulersUpdatingNotifier(notifier) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-c.ctx.Done(): + log.Info("cluster is closing, stop listening the schedulers updating notifier") + return + case <-notifier: + // This is triggered by the watcher when the schedulers are updated. + } + + if !c.running.Load() { + select { + case <-c.ctx.Done(): + log.Info("cluster is closing, stop listening the schedulers updating notifier") + return + case <-ticker.C: + // retry + trySend(notifier) + continue + } + } + + log.Info("schedulers updating notifier is triggered, try to update the scheduler") + var ( + schedulersController = c.coordinator.GetSchedulersController() + latestSchedulersConfig = c.persistConfig.GetScheduleConfig().Schedulers + ) + // Create the newly added schedulers. + for _, scheduler := range latestSchedulersConfig { + s, err := schedulers.CreateScheduler( + scheduler.Type, + c.coordinator.GetOperatorController(), + c.storage, + schedulers.ConfigSliceDecoder(scheduler.Type, scheduler.Args), + schedulersController.RemoveScheduler, + ) + if err != nil { + log.Error("failed to create scheduler", + zap.String("scheduler-type", scheduler.Type), + zap.Strings("scheduler-args", scheduler.Args), + errs.ZapError(err)) + continue + } + name := s.GetName() + if existed, _ := schedulersController.IsSchedulerExisted(name); existed { + log.Info("scheduler has already existed, skip adding it", + zap.String("scheduler-name", name), + zap.Strings("scheduler-args", scheduler.Args)) + continue + } + if err := schedulersController.AddScheduler(s, scheduler.Args...); err != nil { + log.Error("failed to add scheduler", + zap.String("scheduler-name", name), + zap.Strings("scheduler-args", scheduler.Args), + errs.ZapError(err)) + continue + } + log.Info("add scheduler successfully", + zap.String("scheduler-name", name), + zap.Strings("scheduler-args", scheduler.Args)) + } + // Remove the deleted schedulers. + for _, name := range schedulersController.GetSchedulerNames() { + scheduler := schedulersController.GetScheduler(name) + if slice.AnyOf(latestSchedulersConfig, func(i int) bool { + return latestSchedulersConfig[i].Type == scheduler.GetType() + }) { + continue + } + if err := schedulersController.RemoveScheduler(name); err != nil { + log.Error("failed to remove scheduler", + zap.String("scheduler-name", name), + errs.ZapError(err)) + continue + } + log.Info("remove scheduler successfully", + zap.String("scheduler-name", name)) + } + } +} + +func (c *Cluster) waitSchedulersInitialized() { + ticker := time.NewTicker(time.Millisecond * 100) + defer ticker.Stop() + for { + if c.coordinator.AreSchedulersInitialized() { + return + } + select { + case <-c.ctx.Done(): + log.Info("cluster is closing, stop waiting the schedulers initialization") + return + case <-ticker.C: + } + } +} + +// TODO: implement the following methods + +// UpdateRegionsLabelLevelStats updates the status of the region label level by types. +func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { + for _, region := range regions { + c.labelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels()) + } +} + +func (c *Cluster) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo { + stores := make([]*core.StoreInfo, 0, len(region.GetPeers())) + for _, p := range region.GetPeers() { + if store := c.GetStore(p.GetStoreId()); store != nil && !core.IsStoreContainLabel(store.GetMeta(), key, value) { + stores = append(stores, store) + } + } + return stores +} + +// HandleStoreHeartbeat updates the store status. +func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatRequest) error { + stats := heartbeat.GetStats() + storeID := stats.GetStoreId() + store := c.GetStore(storeID) + if store == nil { + return errors.Errorf("store %v not found", storeID) + } + + nowTime := time.Now() + newStore := store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime)) + + if store := c.GetStore(storeID); store != nil { + statistics.UpdateStoreHeartbeatMetrics(store) + } + c.PutStore(newStore) + c.hotStat.Observe(storeID, newStore.GetStoreStats()) + c.hotStat.FilterUnhealthyStore(c) + reportInterval := stats.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + + regions := make(map[uint64]*core.RegionInfo, len(stats.GetPeerStats())) + for _, peerStat := range stats.GetPeerStats() { + regionID := peerStat.GetRegionId() + region := c.GetRegion(regionID) + regions[regionID] = region + if region == nil { + log.Warn("discard hot peer stat for unknown region", + zap.Uint64("region-id", regionID), + zap.Uint64("store-id", storeID)) + continue + } + peer := region.GetStorePeer(storeID) + if peer == nil { + log.Warn("discard hot peer stat for unknown region peer", + zap.Uint64("region-id", regionID), + zap.Uint64("store-id", storeID)) + continue + } + readQueryNum := core.GetReadQueryNum(peerStat.GetQueryStats()) + loads := []float64{ + utils.RegionReadBytes: float64(peerStat.GetReadBytes()), + utils.RegionReadKeys: float64(peerStat.GetReadKeys()), + utils.RegionReadQueryNum: float64(readQueryNum), + utils.RegionWriteBytes: 0, + utils.RegionWriteKeys: 0, + utils.RegionWriteQueryNum: 0, + } + peerInfo := core.NewPeerInfo(peer, loads, interval) + c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) + } + + // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. + c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval)) + return nil +} + +// runUpdateStoreStats updates store stats periodically. +func (c *Cluster) runUpdateStoreStats() { + defer logutil.LogPanic() + defer c.wg.Done() + + ticker := time.NewTicker(9 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-c.ctx.Done(): + log.Info("update store stats background jobs has been stopped") + return + case <-ticker.C: + c.UpdateAllStoreStatus() + } + } +} + +// runCoordinator runs the main scheduling loop. +func (c *Cluster) runCoordinator() { + defer logutil.LogPanic() + defer c.wg.Done() + // force wait for 1 minute to make prepare checker won't be directly skipped + runCollectWaitTime := collectWaitTime + failpoint.Inject("changeRunCollectWaitTime", func() { + runCollectWaitTime = 1 * time.Second + }) + c.coordinator.RunUntilStop(runCollectWaitTime) +} + +func (c *Cluster) runMetricsCollectionJob() { + defer logutil.LogPanic() + defer c.wg.Done() + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-c.ctx.Done(): + log.Info("metrics are reset") + c.resetMetrics() + log.Info("metrics collection job has been stopped") + return + case <-ticker.C: + c.collectMetrics() + } + } +} + +func (c *Cluster) collectMetrics() { + statsMap := statistics.NewStoreStatisticsMap(c.persistConfig) + stores := c.GetStores() + for _, s := range stores { + statsMap.Observe(s) + statsMap.ObserveHotStat(s, c.hotStat.StoresStats) + } + statsMap.Collect() + + c.coordinator.GetSchedulersController().CollectSchedulerMetrics() + c.coordinator.CollectHotSpotMetrics() + if c.regionStats == nil { + return + } + c.regionStats.Collect() + c.labelStats.Collect() + // collect hot cache metrics + c.hotStat.CollectMetrics() +} + +func (c *Cluster) resetMetrics() { + statistics.Reset() + schedulers.ResetSchedulerMetrics() + schedule.ResetHotSpotMetrics() +} + +// StartBackgroundJobs starts background jobs. +func (c *Cluster) StartBackgroundJobs() { + c.wg.Add(4) + go c.updateScheduler() + go c.runUpdateStoreStats() + go c.runCoordinator() + go c.runMetricsCollectionJob() + c.running.Store(true) +} + +// StopBackgroundJobs stops background jobs. +func (c *Cluster) StopBackgroundJobs() { + if !c.running.Load() { + return + } + c.running.Store(false) + c.coordinator.Stop() + c.cancel() + c.wg.Wait() +} + +// IsBackgroundJobsRunning returns whether the background jobs are running. Only for test purpose. +func (c *Cluster) IsBackgroundJobsRunning() bool { + return c.running.Load() +} + +// HandleRegionHeartbeat processes RegionInfo reports from client. +func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { + if err := c.processRegionHeartbeat(region); err != nil { + return err + } + + c.coordinator.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL) + return nil +} + +// processRegionHeartbeat updates the region information. +func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { + origin, _, err := c.PreCheckPutRegion(region) + if err != nil { + return err + } + region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) + + cluster.HandleStatsAsync(c, region) + + hasRegionStats := c.regionStats != nil + // Save to storage if meta is updated, except for flashback. + // Save to cache if meta or leader is updated, or contains any down/pending peer. + _, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin) + if !saveCache { + // Due to some config changes need to update the region stats as well, + // so we do some extra checks here. + if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { + c.regionStats.Observe(region, c.GetRegionStores(region)) + } + return nil + } + + var overlaps []*core.RegionInfo + if saveCache { + // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, + // check its validation again here. + // + // However, it can't solve the race condition of concurrent heartbeats from the same region. + if overlaps, err = c.AtomicCheckAndPutRegion(region); err != nil { + return err + } + + cluster.HandleOverlaps(c, overlaps) + } + + cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats) + return nil +} + +// IsPrepared return true if the prepare checker is ready. +func (c *Cluster) IsPrepared() bool { + return c.coordinator.GetPrepareChecker().IsPrepared() +} + +// SetPrepared set the prepare check to prepared. Only for test purpose. +func (c *Cluster) SetPrepared() { + c.coordinator.GetPrepareChecker().SetPrepared() +} + +// DropCacheAllRegion removes all cached regions. +func (c *Cluster) DropCacheAllRegion() { + c.ResetRegionCache() +} + +// DropCacheRegion removes a region from the cache. +func (c *Cluster) DropCacheRegion(id uint64) { + c.RemoveRegionIfExist(id) +} diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go new file mode 100644 index 000000000000..ebce73e3303e --- /dev/null +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -0,0 +1,367 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "context" + "io" + "net/http" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/kvproto/pkg/schedulingpb" + "github.com/pingcap/log" + bs "github.com/tikv/pd/pkg/basicserver" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/registry" + "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/versioninfo" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// gRPC errors +var ( + ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") + ErrClusterMismatched = status.Errorf(codes.Unavailable, "cluster mismatched") +) + +// SetUpRestHandler is a hook to sets up the REST service. +var SetUpRestHandler = func(srv *Service) (http.Handler, apiutil.APIServiceGroup) { + return dummyRestService{}, apiutil.APIServiceGroup{} +} + +type dummyRestService struct{} + +func (d dummyRestService) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotImplemented) + w.Write([]byte("not implemented")) +} + +// ConfigProvider is used to get scheduling config from the given +// `bs.server` without modifying its interface. +type ConfigProvider any + +// Service is the scheduling grpc service. +type Service struct { + *Server +} + +// NewService creates a new scheduling service. +func NewService[T ConfigProvider](svr bs.Server) registry.RegistrableService { + server, ok := svr.(*Server) + if !ok { + log.Fatal("create scheduling server failed") + } + return &Service{ + Server: server, + } +} + +// heartbeatServer wraps Scheduling_RegionHeartbeatServer to ensure when any error +// occurs on Send() or Recv(), both endpoints will be closed. +type heartbeatServer struct { + stream schedulingpb.Scheduling_RegionHeartbeatServer + closed int32 +} + +func (s *heartbeatServer) Send(m core.RegionHeartbeatResponse) error { + if atomic.LoadInt32(&s.closed) == 1 { + return io.EOF + } + done := make(chan error, 1) + go func() { + defer logutil.LogPanic() + done <- s.stream.Send(m.(*schedulingpb.RegionHeartbeatResponse)) + }() + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + select { + case err := <-done: + if err != nil { + atomic.StoreInt32(&s.closed, 1) + } + return errors.WithStack(err) + case <-timer.C: + atomic.StoreInt32(&s.closed, 1) + return status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout") + } +} + +func (s *heartbeatServer) Recv() (*schedulingpb.RegionHeartbeatRequest, error) { + if atomic.LoadInt32(&s.closed) == 1 { + return nil, io.EOF + } + req, err := s.stream.Recv() + if err != nil { + atomic.StoreInt32(&s.closed, 1) + return nil, errors.WithStack(err) + } + return req, nil +} + +// RegionHeartbeat implements gRPC SchedulingServer. +func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeatServer) error { + var ( + server = &heartbeatServer{stream: stream} + cancel context.CancelFunc + lastBind time.Time + ) + defer func() { + // cancel the forward stream + if cancel != nil { + cancel() + } + }() + + for { + request, err := server.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return errors.WithStack(err) + } + + c := s.GetCluster() + if c == nil { + resp := &schedulingpb.RegionHeartbeatResponse{Header: s.notBootstrappedHeader()} + err := server.Send(resp) + return errors.WithStack(err) + } + + storeID := request.GetLeader().GetStoreId() + store := c.GetStore(storeID) + if store == nil { + return errors.Errorf("invalid store ID %d, not found", storeID) + } + + if time.Since(lastBind) > time.Minute { + s.hbStreams.BindStream(storeID, server) + lastBind = time.Now() + } + region := core.RegionFromHeartbeat(request) + err = c.HandleRegionHeartbeat(region) + if err != nil { + // TODO: if we need to send the error back to API server. + log.Error("failed handle region heartbeat", zap.Error(err)) + continue + } + } +} + +// StoreHeartbeat implements gRPC SchedulingServer. +func (s *Service) StoreHeartbeat(ctx context.Context, request *schedulingpb.StoreHeartbeatRequest) (*schedulingpb.StoreHeartbeatResponse, error) { + c := s.GetCluster() + if c == nil { + // TODO: add metrics + log.Info("cluster isn't initialized") + return &schedulingpb.StoreHeartbeatResponse{Header: s.notBootstrappedHeader()}, nil + } + + if c.GetStore(request.GetStats().GetStoreId()) == nil { + s.metaWatcher.GetStoreWatcher().ForceLoad() + } + + // TODO: add metrics + if err := c.HandleStoreHeartbeat(request); err != nil { + log.Error("handle store heartbeat failed", zap.Error(err)) + } + return &schedulingpb.StoreHeartbeatResponse{Header: &schedulingpb.ResponseHeader{ClusterId: s.clusterID}}, nil +} + +// SplitRegions split regions by the given split keys +func (s *Service) SplitRegions(ctx context.Context, request *schedulingpb.SplitRegionsRequest) (*schedulingpb.SplitRegionsResponse, error) { + c := s.GetCluster() + if c == nil { + return &schedulingpb.SplitRegionsResponse{Header: s.notBootstrappedHeader()}, nil + } + finishedPercentage, newRegionIDs := c.GetRegionSplitter().SplitRegions(ctx, request.GetSplitKeys(), int(request.GetRetryLimit())) + return &schedulingpb.SplitRegionsResponse{ + Header: s.header(), + RegionsId: newRegionIDs, + FinishedPercentage: uint64(finishedPercentage), + }, nil +} + +// ScatterRegions implements gRPC SchedulingServer. +func (s *Service) ScatterRegions(ctx context.Context, request *schedulingpb.ScatterRegionsRequest) (*schedulingpb.ScatterRegionsResponse, error) { + c := s.GetCluster() + if c == nil { + return &schedulingpb.ScatterRegionsResponse{Header: s.notBootstrappedHeader()}, nil + } + + opsCount, failures, err := c.GetRegionScatterer().ScatterRegionsByID(request.GetRegionsId(), request.GetGroup(), int(request.GetRetryLimit()), request.GetSkipStoreLimit()) + if err != nil { + header := s.errorHeader(&schedulingpb.Error{ + Type: schedulingpb.ErrorType_UNKNOWN, + Message: err.Error(), + }) + return &schedulingpb.ScatterRegionsResponse{Header: header}, nil + } + percentage := 100 + if len(failures) > 0 { + percentage = 100 - 100*len(failures)/(opsCount+len(failures)) + log.Debug("scatter regions", zap.Errors("failures", func() []error { + r := make([]error, 0, len(failures)) + for _, err := range failures { + r = append(r, err) + } + return r + }())) + } + return &schedulingpb.ScatterRegionsResponse{ + Header: s.header(), + FinishedPercentage: uint64(percentage), + }, nil +} + +// GetOperator gets information about the operator belonging to the specify region. +func (s *Service) GetOperator(ctx context.Context, request *schedulingpb.GetOperatorRequest) (*schedulingpb.GetOperatorResponse, error) { + c := s.GetCluster() + if c == nil { + return &schedulingpb.GetOperatorResponse{Header: s.notBootstrappedHeader()}, nil + } + + opController := c.GetCoordinator().GetOperatorController() + requestID := request.GetRegionId() + r := opController.GetOperatorStatus(requestID) + if r == nil { + header := s.errorHeader(&schedulingpb.Error{ + Type: schedulingpb.ErrorType_UNKNOWN, + Message: "region not found", + }) + return &schedulingpb.GetOperatorResponse{Header: header}, nil + } + + return &schedulingpb.GetOperatorResponse{ + Header: s.header(), + RegionId: requestID, + Desc: []byte(r.Desc()), + Kind: []byte(r.Kind().String()), + Status: r.Status, + }, nil +} + +// AskBatchSplit implements gRPC SchedulingServer. +func (s *Service) AskBatchSplit(ctx context.Context, request *schedulingpb.AskBatchSplitRequest) (*schedulingpb.AskBatchSplitResponse, error) { + c := s.GetCluster() + if c == nil { + return &schedulingpb.AskBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil + } + + if request.GetRegion() == nil { + return &schedulingpb.AskBatchSplitResponse{ + Header: s.wrapErrorToHeader(schedulingpb.ErrorType_UNKNOWN, + "missing region for split"), + }, nil + } + + if c.persistConfig.IsSchedulingHalted() { + return nil, errs.ErrSchedulingIsHalted.FastGenByArgs() + } + if !c.persistConfig.IsTikvRegionSplitEnabled() { + return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs() + } + reqRegion := request.GetRegion() + splitCount := request.GetSplitCount() + err := c.ValidRegion(reqRegion) + if err != nil { + return nil, err + } + splitIDs := make([]*pdpb.SplitID, 0, splitCount) + recordRegions := make([]uint64, 0, splitCount+1) + + for i := 0; i < int(splitCount); i++ { + newRegionID, err := c.AllocID() + if err != nil { + return nil, errs.ErrSchedulerNotFound.FastGenByArgs() + } + + peerIDs := make([]uint64, len(request.Region.Peers)) + for i := 0; i < len(peerIDs); i++ { + if peerIDs[i], err = c.AllocID(); err != nil { + return nil, err + } + } + + recordRegions = append(recordRegions, newRegionID) + splitIDs = append(splitIDs, &pdpb.SplitID{ + NewRegionId: newRegionID, + NewPeerIds: peerIDs, + }) + + log.Info("alloc ids for region split", zap.Uint64("region-id", newRegionID), zap.Uint64s("peer-ids", peerIDs)) + } + + recordRegions = append(recordRegions, reqRegion.GetId()) + if versioninfo.IsFeatureSupported(c.persistConfig.GetClusterVersion(), versioninfo.RegionMerge) { + // Disable merge the regions in a period of time. + c.GetCoordinator().GetMergeChecker().RecordRegionSplit(recordRegions) + } + + // If region splits during the scheduling process, regions with abnormal + // status may be left, and these regions need to be checked with higher + // priority. + c.GetCoordinator().GetCheckerController().AddSuspectRegions(recordRegions...) + + return &schedulingpb.AskBatchSplitResponse{ + Header: s.header(), + Ids: splitIDs, + }, nil +} + +// RegisterGRPCService registers the service to gRPC server. +func (s *Service) RegisterGRPCService(g *grpc.Server) { + schedulingpb.RegisterSchedulingServer(g, s) +} + +// RegisterRESTHandler registers the service to REST server. +func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) { + handler, group := SetUpRestHandler(s) + apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) +} + +func (s *Service) errorHeader(err *schedulingpb.Error) *schedulingpb.ResponseHeader { + return &schedulingpb.ResponseHeader{ + ClusterId: s.clusterID, + Error: err, + } +} + +func (s *Service) notBootstrappedHeader() *schedulingpb.ResponseHeader { + return s.errorHeader(&schedulingpb.Error{ + Type: schedulingpb.ErrorType_NOT_BOOTSTRAPPED, + Message: "cluster is not initialized", + }) +} + +func (s *Service) header() *schedulingpb.ResponseHeader { + if s.clusterID == 0 { + return s.wrapErrorToHeader(schedulingpb.ErrorType_NOT_BOOTSTRAPPED, "cluster id is not ready") + } + return &schedulingpb.ResponseHeader{ClusterId: s.clusterID} +} + +func (s *Service) wrapErrorToHeader( + errorType schedulingpb.ErrorType, message string) *schedulingpb.ResponseHeader { + return s.errorHeader(&schedulingpb.Error{Type: errorType, Message: message}) +} diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go index 23791a145141..3ad7ab76dd4d 100644 --- a/pkg/statistics/region_collection.go +++ b/pkg/statistics/region_collection.go @@ -28,6 +28,8 @@ import ( // RegionStatisticType represents the type of the region's status. type RegionStatisticType uint32 +const emptyStatistic = RegionStatisticType(0) + // region status type const ( MissPeer RegionStatisticType = 1 << iota @@ -163,6 +165,9 @@ func (r *RegionStatistics) deleteOfflineEntry(deleteIndex RegionStatisticType, r // due to some special state types. func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool { regionID := region.GetID() + if !r.isObserved(regionID) { + return true + } if r.IsRegionStatsType(regionID, OversizedRegion) != region.IsOversized(int64(r.storeConfigManager.GetStoreConfig().GetRegionMaxSize()), int64(r.storeConfigManager.GetStoreConfig().GetRegionMaxKeys())) { return true @@ -171,6 +176,14 @@ func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool { region.NeedMerge(int64(r.conf.GetMaxMergeRegionSize()), int64(r.conf.GetMaxMergeRegionKeys())) } +// isObserved returns whether the region is observed. And it also shows whether PD received heartbeat of this region. +func (r *RegionStatistics) isObserved(id uint64) bool { + r.RLock() + defer r.RUnlock() + _, ok := r.index[id] + return ok +} + // Observe records the current regions' status. func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.StoreInfo) { r.Lock() @@ -178,9 +191,15 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store // Region state. regionID := region.GetID() var ( +<<<<<<< HEAD peerTypeIndex RegionStatisticType offlinePeerTypeIndex RegionStatisticType deleteIndex RegionStatisticType +======= + desiredReplicas = r.conf.GetMaxReplicas() + desiredVoters = desiredReplicas + peerTypeIndex RegionStatisticType +>>>>>>> cce1464b1 (*: fix region stats check (#7748)) ) desiredReplicas := r.conf.GetMaxReplicas() desiredVoters := desiredReplicas @@ -263,6 +282,7 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store peerTypeIndex |= typ } } +<<<<<<< HEAD if isRemoving { r.offlineStats[OfflinePeer][regionID] = region @@ -277,8 +297,13 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store if oldIndex, ok := r.index[regionID]; ok { deleteIndex = oldIndex &^ peerTypeIndex +======= + // Remove the info if any of the conditions are not met any more. + if oldIndex, ok := r.index[regionID]; ok && oldIndex > emptyStatistic { + deleteIndex := oldIndex &^ peerTypeIndex + r.deleteEntry(deleteIndex, regionID) +>>>>>>> cce1464b1 (*: fix region stats check (#7748)) } - r.deleteEntry(deleteIndex, regionID) r.index[regionID] = peerTypeIndex } @@ -287,7 +312,10 @@ func (r *RegionStatistics) ClearDefunctRegion(regionID uint64) { r.Lock() defer r.Unlock() if oldIndex, ok := r.index[regionID]; ok { - r.deleteEntry(oldIndex, regionID) + delete(r.index, regionID) + if oldIndex > emptyStatistic { + r.deleteEntry(oldIndex, regionID) + } } if oldIndex, ok := r.offlineIndex[regionID]; ok { r.deleteOfflineEntry(oldIndex, regionID) diff --git a/server/api/region_test.go b/server/api/region_test.go index 9093f71c5420..133acae14c9b 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -203,7 +203,22 @@ func (suite *regionTestSuite) TestRegionCheck() { r7 := make([]*histItem, 1) suite.NoError(tu.ReadGetJSON(re, testDialClient, url, &r7)) histKeys := []*histItem{{Start: 1000, End: 1999, Count: 1}} +<<<<<<< HEAD suite.Equal(histKeys, r7) +======= + re.Equal(histKeys, r7) + + // ref https://github.com/tikv/pd/issues/3558, we should change size to pass `NeedUpdate` for observing. + r = r.Clone(core.SetApproximateKeys(0)) + mustPutStore(re, suite.svr, 2, metapb.StoreState_Offline, metapb.NodeState_Removing, []*metapb.StoreLabel{}) + mustRegionHeartbeat(re, suite.svr, r) + url = fmt.Sprintf("%s/regions/check/%s", suite.urlPrefix, "offline-peer") + r8 := &response.RegionsInfo{} + re.NoError(tu.ReadGetJSON(re, testDialClient, url, r8)) + r4.Adjust() + re.Equal(1, r8.Count) + re.Equal(r.GetID(), r8.Regions[0].ID) +>>>>>>> cce1464b1 (*: fix region stats check (#7748)) } func (suite *regionTestSuite) TestRegions() { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index e0a4e00be030..9e228061f99e 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1027,9 +1027,8 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - // Mark isNew if the region in cache does not have leader. - isNew, saveKV, saveCache, needSync := regionGuide(region, origin) - if !saveKV && !saveCache && !isNew { + saveKV, saveCache, needSync := regionGuide(region, origin) + if !saveKV && !saveCache { // Due to some config changes need to update the region stats as well, // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { @@ -1065,6 +1064,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { regionUpdateCacheEventCounter.Inc() } +<<<<<<< HEAD if hasRegionStats { c.regionStats.Observe(region, c.getRegionStoresLocked(region)) } @@ -1072,6 +1072,9 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { if !c.IsPrepared() && isNew { c.coordinator.prepareChecker.collect(region) } +======= + cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats) +>>>>>>> cce1464b1 (*: fix region stats check (#7748)) if c.storage != nil { // If there are concurrent heartbeats from the same region, the last write will win even if diff --git a/server/region_syncer/client.go b/server/region_syncer/client.go index b39013d6dd6e..55331ea0635a 100644 --- a/server/region_syncer/client.go +++ b/server/region_syncer/client.go @@ -218,7 +218,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err)) continue } - _, saveKV, _, _ := regionGuide(region, origin) + saveKV, _, _ := regionGuide(region, origin) overlaps := bc.PutRegion(region) if hasBuckets { diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 9ef64ac91fb6..484b26ffe803 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -37,6 +37,7 @@ import ( "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/testutil" @@ -180,6 +181,99 @@ func TestDamagedRegion(t *testing.T) { re.Equal(uint64(1), rc.GetOperatorController().OperatorCount(operator.OpAdmin)) } +func TestRegionStatistics(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tc, err := tests.NewTestCluster(ctx, 2) + defer tc.Destroy() + re.NoError(err) + + err = tc.RunInitialServers() + re.NoError(err) + + leaderName := tc.WaitLeader() + leaderServer := tc.GetLeaderServer() + grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) + clusterID := leaderServer.GetClusterID() + bootstrapCluster(re, clusterID, grpcPDClient) + rc := leaderServer.GetRaftCluster() + + region := &metapb.Region{ + Id: 10, + StartKey: []byte("abc"), + EndKey: []byte("xyz"), + Peers: []*metapb.Peer{ + {Id: 101, StoreId: 1}, + {Id: 102, StoreId: 2}, + {Id: 103, StoreId: 3}, + {Id: 104, StoreId: 4, Role: metapb.PeerRole_Learner}, + }, + } + + // To put region. + regionInfo := core.NewRegionInfo(region, region.Peers[0], core.SetApproximateSize(0)) + err = tc.HandleRegionHeartbeat(regionInfo) + re.NoError(err) + regions := rc.GetRegionStatsByType(statistics.LearnerPeer) + re.Len(regions, 1) + + // wait for sync region + time.Sleep(1000 * time.Millisecond) + + leaderServer.ResignLeader() + newLeaderName := tc.WaitLeader() + re.NotEqual(newLeaderName, leaderName) + leaderServer = tc.GetLeaderServer() + rc = leaderServer.GetRaftCluster() + r := rc.GetRegion(region.Id) + re.NotNil(r) + re.True(r.LoadedFromSync()) + regions = rc.GetRegionStatsByType(statistics.LearnerPeer) + re.Empty(regions) + err = tc.HandleRegionHeartbeat(regionInfo) + re.NoError(err) + regions = rc.GetRegionStatsByType(statistics.LearnerPeer) + re.Len(regions, 1) + + leaderServer.ResignLeader() + newLeaderName = tc.WaitLeader() + re.Equal(newLeaderName, leaderName) + leaderServer = tc.GetLeaderServer() + rc = leaderServer.GetRaftCluster() + re.NotNil(r) + re.True(r.LoadedFromStorage() || r.LoadedFromSync()) + regions = rc.GetRegionStatsByType(statistics.LearnerPeer) + re.Empty(regions) + regionInfo = regionInfo.Clone(core.SetSource(core.Heartbeat), core.SetApproximateSize(30)) + err = tc.HandleRegionHeartbeat(regionInfo) + re.NoError(err) + rc = leaderServer.GetRaftCluster() + r = rc.GetRegion(region.Id) + re.NotNil(r) + re.False(r.LoadedFromStorage() && r.LoadedFromSync()) + + leaderServer.ResignLeader() + newLeaderName = tc.WaitLeader() + re.NotEqual(newLeaderName, leaderName) + leaderServer.ResignLeader() + newLeaderName = tc.WaitLeader() + re.Equal(newLeaderName, leaderName) + leaderServer = tc.GetLeaderServer() + rc = leaderServer.GetRaftCluster() + r = rc.GetRegion(region.Id) + re.NotNil(r) + re.False(r.LoadedFromStorage() && r.LoadedFromSync()) + regions = rc.GetRegionStatsByType(statistics.LearnerPeer) + re.Empty(regions) + + regionInfo = regionInfo.Clone(core.SetSource(core.Heartbeat), core.SetApproximateSize(30)) + err = tc.HandleRegionHeartbeat(regionInfo) + re.NoError(err) + regions = rc.GetRegionStatsByType(statistics.LearnerPeer) + re.Len(regions, 1) +} + func TestStaleRegion(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background())