Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

checker: use ttl cache #8505

Merged
merged 3 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (s *Service) AskBatchSplit(_ context.Context, request *schedulingpb.AskBatc
// If region splits during the scheduling process, regions with abnormal
// status may be left, and these regions need to be checked with higher
// priority.
c.GetCoordinator().GetCheckerController().AddPendingProcessedRegions(recordRegions...)
c.GetCoordinator().GetCheckerController().AddPendingProcessedRegions(false, recordRegions...)

return &schedulingpb.AskBatchSplitResponse{
Header: s.header(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ func (mc *Cluster) SetStoreLabel(storeID uint64, labels map[string]string) {
}

// AddPendingProcessedRegions mock method
func (mc *Cluster) AddPendingProcessedRegions(ids ...uint64) {
func (mc *Cluster) AddPendingProcessedRegions(_ bool, ids ...uint64) {
for _, id := range ids {
mc.pendingProcessedRegions[id] = struct{}{}
}
Expand Down
19 changes: 9 additions & 10 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type Controller struct {
mergeChecker *MergeChecker
jointStateChecker *JointStateChecker
priorityInspector *PriorityInspector
pendingProcessedRegions cache.Cache
pendingProcessedRegions *cache.TTLUint64
suspectKeyRanges *cache.TTLString // suspect key-range regions that may need fix

// duration is the duration of the last patrol round.
Expand All @@ -88,7 +88,7 @@ type Controller struct {

// NewController create a new Controller.
func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller) *Controller {
pendingProcessedRegions := cache.NewDefaultCache(DefaultPendingRegionCacheSize)
pendingProcessedRegions := cache.NewIDTTL(ctx, time.Minute, 3*time.Minute)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, previous suspectRegions is *cache.TTLUint64

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add a test for this issue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not just the problem of suspectRegions. The waiting list also has the same problem previously.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, then do we need to add a test for this issue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 3 mins?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original setting is 3 min.

return &Controller{
ctx: ctx,
cluster: cluster,
Expand Down Expand Up @@ -311,7 +311,7 @@ func (c *Controller) tryAddOperators(region *core.RegionInfo) {
c.opController.AddWaitingOperator(ops...)
c.RemovePendingProcessedRegion(id)
} else {
c.AddPendingProcessedRegions(id)
c.AddPendingProcessedRegions(true, id)
}
}

Expand All @@ -327,16 +327,15 @@ func (c *Controller) GetRuleChecker() *RuleChecker {

// GetPendingProcessedRegions returns the pending processed regions in the cache.
func (c *Controller) GetPendingProcessedRegions() []uint64 {
pendingRegions := make([]uint64, 0)
for _, item := range c.pendingProcessedRegions.Elems() {
pendingRegions = append(pendingRegions, item.Key)
}
return pendingRegions
return c.pendingProcessedRegions.GetAllID()
}

// AddPendingProcessedRegions adds the pending processed region into the cache.
func (c *Controller) AddPendingProcessedRegions(ids ...uint64) {
func (c *Controller) AddPendingProcessedRegions(needCheckLen bool, ids ...uint64) {
for _, id := range ids {
if needCheckLen && c.pendingProcessedRegions.Len() > DefaultPendingRegionCacheSize {
return
}
c.pendingProcessedRegions.Put(id, nil)
}
}
Expand Down Expand Up @@ -385,7 +384,7 @@ func (c *Controller) CheckSuspectRanges() {
if lastRegion.GetEndKey() != nil && bytes.Compare(lastRegion.GetEndKey(), keyRange[1]) < 0 {
c.AddSuspectKeyRange(lastRegion.GetEndKey(), keyRange[1])
}
c.AddPendingProcessedRegions(regionIDList...)
c.AddPendingProcessedRegions(false, regionIDList...)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ type ReplicaChecker struct {
PauseController
cluster sche.CheckerCluster
conf config.CheckerConfigProvider
pendingProcessedRegions cache.Cache
pendingProcessedRegions *cache.TTLUint64
}

// NewReplicaChecker creates a replica checker.
func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfigProvider, pendingProcessedRegions cache.Cache) *ReplicaChecker {
func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfigProvider, pendingProcessedRegions *cache.TTLUint64) *ReplicaChecker {
return &ReplicaChecker{
cluster: cluster,
conf: conf,
Expand Down
20 changes: 10 additions & 10 deletions pkg/schedule/checker/replica_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (suite *replicaCheckerTestSuite) SetupTest() {
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.cluster = mockcluster.NewCluster(suite.ctx, cfg)
suite.cluster.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
suite.rc = NewReplicaChecker(suite.cluster, suite.cluster.GetCheckerConfig(), cache.NewDefaultCache(10))
suite.rc = NewReplicaChecker(suite.cluster, suite.cluster.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))
stats := &pdpb.StoreStats{
Capacity: 100,
Available: 100,
Expand Down Expand Up @@ -213,7 +213,7 @@ func (suite *replicaCheckerTestSuite) TestBasic() {
tc := mockcluster.NewCluster(suite.ctx, opt)
tc.SetMaxSnapshotCount(2)
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

// Add stores 1,2,3,4.
tc.AddRegionStore(1, 4)
Expand Down Expand Up @@ -290,7 +290,7 @@ func (suite *replicaCheckerTestSuite) TestLostStore() {
tc.AddRegionStore(1, 1)
tc.AddRegionStore(2, 1)

rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

// now region peer in store 1,2,3.but we just have store 1,2
// This happens only in recovering the PD tc
Expand All @@ -309,7 +309,7 @@ func (suite *replicaCheckerTestSuite) TestOffline() {
tc.SetMaxReplicas(3)
tc.SetLocationLabels([]string{"zone", "rack", "host"})

rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))
tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
tc.AddLabelsStore(2, 2, map[string]string{"zone": "z2", "rack": "r1", "host": "h1"})
tc.AddLabelsStore(3, 3, map[string]string{"zone": "z3", "rack": "r1", "host": "h1"})
Expand Down Expand Up @@ -361,7 +361,7 @@ func (suite *replicaCheckerTestSuite) TestDistinctScore() {
tc.SetMaxReplicas(3)
tc.SetLocationLabels([]string{"zone", "rack", "host"})

rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

tc.AddLabelsStore(1, 9, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
tc.AddLabelsStore(2, 8, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
Expand Down Expand Up @@ -441,7 +441,7 @@ func (suite *replicaCheckerTestSuite) TestDistinctScore2() {
tc.SetMaxReplicas(5)
tc.SetLocationLabels([]string{"zone", "host"})

rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1", "host": "h1"})
tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1", "host": "h2"})
Expand Down Expand Up @@ -470,7 +470,7 @@ func (suite *replicaCheckerTestSuite) TestStorageThreshold() {
tc := mockcluster.NewCluster(suite.ctx, opt)
tc.SetLocationLabels([]string{"zone"})
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"})
tc.UpdateStorageRatio(1, 0.5, 0.5)
Expand Down Expand Up @@ -506,7 +506,7 @@ func (suite *replicaCheckerTestSuite) TestOpts() {
opt := mockconfig.NewTestOptions()
tc := mockcluster.NewCluster(suite.ctx, opt)
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

tc.AddRegionStore(1, 100)
tc.AddRegionStore(2, 100)
Expand Down Expand Up @@ -539,7 +539,7 @@ func (suite *replicaCheckerTestSuite) TestFixDownPeer() {
tc := mockcluster.NewCluster(suite.ctx, opt)
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetLocationLabels([]string{"zone"})
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"})
tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1"})
Expand Down Expand Up @@ -571,7 +571,7 @@ func (suite *replicaCheckerTestSuite) TestFixOfflinePeer() {
tc := mockcluster.NewCluster(suite.ctx, opt)
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetLocationLabels([]string{"zone"})
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"})
tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1"})
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ type RuleChecker struct {
PauseController
cluster sche.CheckerCluster
ruleManager *placement.RuleManager
pendingProcessedRegions cache.Cache
pendingProcessedRegions *cache.TTLUint64
pendingList cache.Cache
switchWitnessCache *cache.TTLUint64
record *recorder
}

// NewRuleChecker creates a checker instance.
func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManager *placement.RuleManager, pendingProcessedRegions cache.Cache) *RuleChecker {
func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManager *placement.RuleManager, pendingProcessedRegions *cache.TTLUint64) *RuleChecker {
return &RuleChecker{
cluster: cluster,
ruleManager: ruleManager,
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/rule_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (suite *ruleCheckerTestSuite) SetupTest() {
suite.cluster.SetEnableWitness(true)
suite.cluster.SetEnableUseJointConsensus(false)
suite.ruleManager = suite.cluster.RuleManager
suite.rc = NewRuleChecker(suite.ctx, suite.cluster, suite.ruleManager, cache.NewDefaultCache(10))
suite.rc = NewRuleChecker(suite.ctx, suite.cluster, suite.ruleManager, cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))
}

func (suite *ruleCheckerTestSuite) TearDownTest() {
Expand Down Expand Up @@ -1955,7 +1955,7 @@ func (suite *ruleCheckerTestAdvancedSuite) SetupTest() {
suite.cluster.SetEnableWitness(true)
suite.cluster.SetEnableUseJointConsensus(true)
suite.ruleManager = suite.cluster.RuleManager
suite.rc = NewRuleChecker(suite.ctx, suite.cluster, suite.ruleManager, cache.NewDefaultCache(10))
suite.rc = NewRuleChecker(suite.ctx, suite.cluster, suite.ruleManager, cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))
}

func (suite *ruleCheckerTestAdvancedSuite) TearDownTest() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,7 @@ func (h *Handler) AccelerateRegionsScheduleInRange(rawStartKey, rawEndKey string
for _, region := range regions {
regionsIDList = append(regionsIDList, region.GetID())
}
co.GetCheckerController().AddPendingProcessedRegions(regionsIDList...)
co.GetCheckerController().AddPendingProcessedRegions(false, regionsIDList...)
}
return nil
}
Expand All @@ -1151,7 +1151,7 @@ func (h *Handler) AccelerateRegionsScheduleInRanges(startKeys [][]byte, endKeys
for _, region := range regions {
regionsIDList = append(regionsIDList, region.GetID())
}
co.GetCheckerController().AddPendingProcessedRegions(regionsIDList...)
co.GetCheckerController().AddPendingProcessedRegions(false, regionsIDList...)
}
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/scatter/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ type RegionScatterer struct {
ordinaryEngine engineContext
specialEngines sync.Map
opController *operator.Controller
addSuspectRegions func(regionIDs ...uint64)
addSuspectRegions func(bool, ...uint64)
}

// NewRegionScatterer creates a region scatterer.
// RegionScatter is used for the `Lightning`, it will scatter the specified regions before import data.
func NewRegionScatterer(ctx context.Context, cluster sche.SharedCluster, opController *operator.Controller, addSuspectRegions func(regionIDs ...uint64)) *RegionScatterer {
func NewRegionScatterer(ctx context.Context, cluster sche.SharedCluster, opController *operator.Controller, addSuspectRegions func(bool, ...uint64)) *RegionScatterer {
return &RegionScatterer{
ctx: ctx,
name: regionScatterName,
Expand Down Expand Up @@ -275,7 +275,7 @@ func (r *RegionScatterer) scatterRegions(regions map[uint64]*core.RegionInfo, fa
// in a group level instead of cluster level.
func (r *RegionScatterer) Scatter(region *core.RegionInfo, group string, skipStoreLimit bool) (*operator.Operator, error) {
if !filter.IsRegionReplicated(r.cluster, region) {
r.addSuspectRegions(region.GetID())
r.addSuspectRegions(false, region.GetID())
scatterSkipNotReplicatedCounter.Inc()
log.Warn("region not replicated during scatter", zap.Uint64("region-id", region.GetID()))
return nil, errors.Errorf("region %d is not fully replicated", region.GetID())
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/splitter/region_splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ func NewSplitRegionsHandler(cluster sche.ClusterInformer, oc *operator.Controlle
type RegionSplitter struct {
cluster sche.ClusterInformer
handler SplitRegionsHandler
addSuspectRegions func(ids ...uint64)
addSuspectRegions func(bool, ...uint64)
}

// NewRegionSplitter return a region splitter
func NewRegionSplitter(cluster sche.ClusterInformer, handler SplitRegionsHandler, addSuspectRegions func(ids ...uint64)) *RegionSplitter {
func NewRegionSplitter(cluster sche.ClusterInformer, handler SplitRegionsHandler, addSuspectRegions func(bool, ...uint64)) *RegionSplitter {
return &RegionSplitter{
cluster: cluster,
handler: handler,
Expand Down Expand Up @@ -173,7 +173,7 @@ func (r *RegionSplitter) groupKeysByRegion(keys [][]byte) map[uint64]*regionGrou

func (r *RegionSplitter) checkRegionValid(region *core.RegionInfo) bool {
if !filter.IsRegionReplicated(r.cluster, region) {
r.addSuspectRegions(region.GetID())
r.addSuspectRegions(false, region.GetID())
return false
}
if region.GetLeader() == nil {
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (*
// If region splits during the scheduling process, regions with abnormal
// status may be left, and these regions need to be checked with higher
// priority.
c.AddPendingProcessedRegions(recordRegions...)
c.AddPendingProcessedRegions(false, recordRegions...)

resp := &pdpb.AskBatchSplitResponse{Ids: splitIDs}

Expand Down
4 changes: 2 additions & 2 deletions server/cluster/scheduling_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,10 +404,10 @@ func (sc *schedulingController) PauseOrResumeChecker(name string, t int64) error
}

// AddPendingProcessedRegions adds regions to suspect list.
func (sc *schedulingController) AddPendingProcessedRegions(regionIDs ...uint64) {
func (sc *schedulingController) AddPendingProcessedRegions(needCheckLen bool, regionIDs ...uint64) {
sc.mu.RLock()
defer sc.mu.RUnlock()
sc.coordinator.GetCheckerController().AddPendingProcessedRegions(regionIDs...)
sc.coordinator.GetCheckerController().AddPendingProcessedRegions(needCheckLen, regionIDs...)
}

// GetPendingProcessedRegions gets all suspect regions.
Expand Down