diff --git a/server/schedule/region_scatterer.go b/server/schedule/region_scatterer.go index bf519b71bec..ae3d62e1731 100644 --- a/server/schedule/region_scatterer.go +++ b/server/schedule/region_scatterer.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "math" + "strconv" "time" "github.com/pingcap/errors" @@ -86,26 +87,6 @@ func (s *selectedStores) GetGroupDistribution(group string) (map[uint64]uint64, return s.getDistributionByGroupLocked(group) } -// TotalCountByStore counts the total count by store -func (s *selectedStores) TotalCountByStore(storeID uint64) uint64 { - s.mu.RLock() - defer s.mu.RUnlock() - groups := s.groupDistribution.GetAllID() - totalCount := uint64(0) - for _, group := range groups { - storeDistribution, ok := s.getDistributionByGroupLocked(group) - if !ok { - continue - } - count, ok := storeDistribution[storeID] - if !ok { - continue - } - totalCount += count - } - return totalCount -} - // getDistributionByGroupLocked should be called with lock func (s *selectedStores) getDistributionByGroupLocked(group string) (map[uint64]uint64, bool) { if result, ok := s.groupDistribution.Get(group); ok { @@ -309,6 +290,10 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) * selectedStores := make(map[uint64]struct{}, len(region.GetPeers())) // selected StoreID set leaderCandidateStores := make([]uint64, 0, len(region.GetPeers())) // StoreID allowed to become Leader scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) { // peers: StoreID -> Peer + filterLen := len(context.filters) + 2 + filters := make([]filter.Filter, filterLen) + copy(filters, context.filters) + filters[filterLen-2] = filter.NewExcludedFilter(r.name, nil, selectedStores) for _, peer := range peers { if _, ok := selectedStores[peer.GetStoreId()]; ok { if allowLeader(oldFit, peer) { @@ -317,9 +302,14 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) * // It is both sourcePeer and targetPeer itself, no need to select. continue } + sourceStore := r.cluster.GetStore(peer.GetStoreId()) + if sourceStore == nil { + log.Error("failed to get the store", zap.Uint64("store-id", peer.GetStoreId()), errs.ZapError(errs.ErrGetSourceStore)) + continue + } + filters[filterLen-1] = filter.NewPlacementSafeguard(r.name, r.cluster.GetOpts(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore) for { - candidates := r.selectCandidates(region, peer.GetStoreId(), selectedStores, context) - newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context) + newPeer := r.selectNewPeer(context, group, peer, filters) targetPeers[newPeer.GetStoreId()] = newPeer selectedStores[newPeer.GetStoreId()] = struct{}{} // If the selected peer is a peer other than origin peer in this region, @@ -340,7 +330,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) * // FIXME: target leader only considers the ordinary stores, maybe we need to consider the // special engine stores if the engine supports to become a leader. But now there is only // one engine, tiflash, which does not support the leader, so don't consider it for now. - targetLeader := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine) + targetLeader, leaderStorePickedCount := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine) if targetLeader == 0 { scatterCounter.WithLabelValues("no-leader", "").Inc() return nil @@ -373,6 +363,8 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) * if op != nil { scatterCounter.WithLabelValues("success", "").Inc() r.Put(targetPeers, targetLeader, group) + op.AdditionalInfos["group"] = group + op.AdditionalInfos["leader-picked-count"] = strconv.FormatUint(leaderStorePickedCount, 10) op.SetPriorityLevel(core.HighPriority) } return op @@ -404,24 +396,18 @@ func isSameDistribution(region *core.RegionInfo, targetPeers map[uint64]*metapb. return region.GetLeader().GetStoreId() == targetLeader } -func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, sourceStoreID uint64, selectedStores map[uint64]struct{}, context engineContext) []uint64 { - sourceStore := r.cluster.GetStore(sourceStoreID) - if sourceStore == nil { - log.Error("failed to get the store", zap.Uint64("store-id", sourceStoreID), errs.ZapError(errs.ErrGetSourceStore)) - return nil - } - filters := []filter.Filter{ - filter.NewExcludedFilter(r.name, nil, selectedStores), - } - scoreGuard := filter.NewPlacementSafeguard(r.name, r.cluster.GetOpts(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore) - filters = append(filters, context.filters...) - filters = append(filters, scoreGuard) +// selectNewPeer return the new peer which pick the fewest picked count. +// it keeps the origin peer if the origin store's pick count is equal the fewest pick. +// it can be diveded into three steps: +// 1. found the max pick count and the min pick count. +// 2. if max pick count equals min pick count, it means all store picked count are some, return the origin peer. +// 3. otherwise, select the store which pick count is the min pick count and pass all filter. +func (r *RegionScatterer) selectNewPeer(context engineContext, group string, peer *metapb.Peer, filters []filter.Filter) *metapb.Peer { stores := r.cluster.GetStores() - candidates := make([]uint64, 0) maxStoreTotalCount := uint64(0) minStoreTotalCount := uint64(math.MaxUint64) - for _, store := range r.cluster.GetStores() { - count := context.selectedPeer.TotalCountByStore(store.GetID()) + for _, store := range stores { + count := context.selectedPeer.Get(store.GetID(), group) if count > maxStoreTotalCount { maxStoreTotalCount = count } @@ -429,42 +415,33 @@ func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, sourceStoreI minStoreTotalCount = count } } + + var newPeer *metapb.Peer + minCount := uint64(math.MaxUint64) + originStorePickedCount := uint64(math.MaxUint64) for _, store := range stores { - storeCount := context.selectedPeer.TotalCountByStore(store.GetID()) + storeCount := context.selectedPeer.Get(store.GetID(), group) + if store.GetID() == peer.GetStoreId() { + originStorePickedCount = storeCount + } // If storeCount is equal to the maxStoreTotalCount, we should skip this store as candidate. // If the storeCount are all the same for the whole cluster(maxStoreTotalCount == minStoreTotalCount), any store // could be selected as candidate. if storeCount < maxStoreTotalCount || maxStoreTotalCount == minStoreTotalCount { if filter.Target(r.cluster.GetOpts(), store, filters) { - candidates = append(candidates, store.GetID()) + if storeCount < minCount { + minCount = storeCount + newPeer = &metapb.Peer{ + StoreId: store.GetID(), + Role: peer.GetRole(), + } + } } } } - return candidates -} - -func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceStoreID uint64, candidates []uint64, context engineContext) *metapb.Peer { - if len(candidates) < 1 { + if originStorePickedCount <= minCount { return peer } - var newPeer *metapb.Peer - minCount := uint64(math.MaxUint64) - for _, storeID := range candidates { - count := context.selectedPeer.Get(storeID, group) - if count < minCount { - minCount = count - newPeer = &metapb.Peer{ - StoreId: storeID, - Role: peer.GetRole(), - } - } - } - // if the source store have the least count, we don't need to scatter this peer - for _, storeID := range candidates { - if storeID == sourceStoreID && context.selectedPeer.Get(sourceStoreID, group) <= minCount { - return peer - } - } if newPeer == nil { return peer } @@ -473,11 +450,12 @@ func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceSto // selectAvailableLeaderStore select the target leader store from the candidates. The candidates would be collected by // the existed peers store depended on the leader counts in the group level. Please use this func before scatter spacial engines. -func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo, leaderCandidateStores []uint64, context engineContext) uint64 { +func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo, + leaderCandidateStores []uint64, context engineContext) (leaderID uint64, leaderStorePickedCount uint64) { sourceStore := r.cluster.GetStore(region.GetLeader().GetStoreId()) if sourceStore == nil { log.Error("failed to get the store", zap.Uint64("store-id", region.GetLeader().GetStoreId()), errs.ZapError(errs.ErrGetSourceStore)) - return 0 + return 0, 0 } minStoreGroupLeader := uint64(math.MaxUint64) id := uint64(0) @@ -492,7 +470,7 @@ func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core. id = storeID } } - return id + return id, minStoreGroupLeader } // Put put the final distribution in the context no matter the operator was created diff --git a/server/schedule/region_scatterer_test.go b/server/schedule/region_scatterer_test.go index 36307741e23..b3f6ec8c5ff 100644 --- a/server/schedule/region_scatterer_test.go +++ b/server/schedule/region_scatterer_test.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "math" - "math/rand" "strconv" "testing" "time" @@ -485,47 +484,11 @@ func (s *testScatterRegionSuite) TestSelectedStoreGC(c *C) { c.Assert(ok, IsFalse) } -// TestRegionFromDifferentGroups test the multi regions. each region have its own group. -// After scatter, the distribution for the whole cluster should be well. -func (s *testScatterRegionSuite) TestRegionFromDifferentGroups(c *C) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - opt := config.NewTestOptions() - tc := mockcluster.NewCluster(ctx, opt) - stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := NewOperatorController(ctx, tc, stream) - // Add 6 stores. - storeCount := 6 - for i := uint64(1); i <= uint64(storeCount); i++ { - tc.AddRegionStore(i, 0) - } - scatterer := NewRegionScatterer(ctx, tc, oc) - regionCount := 50 - for i := 1; i <= regionCount; i++ { - p := rand.Perm(storeCount) - scatterer.scatterRegion(tc.AddLeaderRegion(uint64(i), uint64(p[0])+1, uint64(p[1])+1, uint64(p[2])+1), fmt.Sprintf("t%d", i)) - } - check := func(ss *selectedStores) { - max := uint64(0) - min := uint64(math.MaxUint64) - for i := uint64(1); i <= uint64(storeCount); i++ { - count := ss.TotalCountByStore(i) - if count > max { - max = count - } - if count < min { - min = count - } - } - c.Assert(max-min, LessEqual, uint64(2)) - } - check(scatterer.ordinaryEngine.selectedPeer) -} - func TestRegionHasLearner(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + group := "group" opt := config.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) @@ -568,14 +531,14 @@ func TestRegionHasLearner(t *testing.T) { scatterer := NewRegionScatterer(ctx, tc, oc) regionCount := 50 for i := 1; i <= regionCount; i++ { - _, err := scatterer.Scatter(tc.AddRegionWithLearner(uint64(i), uint64(1), []uint64{uint64(2), uint64(3)}, []uint64{7}), "group") + _, err := scatterer.Scatter(tc.AddRegionWithLearner(uint64(i), uint64(1), []uint64{uint64(2), uint64(3)}, []uint64{7}), group) re.NoError(err) } check := func(ss *selectedStores) { max := uint64(0) min := uint64(math.MaxUint64) for i := uint64(1); i <= max; i++ { - count := ss.TotalCountByStore(i) + count := ss.Get(i, group) if count > max { max = count } @@ -590,7 +553,7 @@ func TestRegionHasLearner(t *testing.T) { max := uint64(0) min := uint64(math.MaxUint64) for i := uint64(1); i <= voterCount; i++ { - count := ss.TotalCountByStore(i) + count := ss.Get(i, group) if count > max { max = count } @@ -601,7 +564,7 @@ func TestRegionHasLearner(t *testing.T) { re.LessOrEqual(max-2, uint64(regionCount)/voterCount) re.LessOrEqual(min-1, uint64(regionCount)/voterCount) for i := voterCount + 1; i <= storeCount; i++ { - count := ss.TotalCountByStore(i) + count := ss.Get(i, group) re.LessOrEqual(count, uint64(0)) } } @@ -641,6 +604,9 @@ func (s *testScatterRegionSuite) TestSelectedStores(c *C) { region := tc.AddLeaderRegion(i+200, i%3+2, (i+1)%3+2, (i+2)%3+2) op := scatterer.scatterRegion(region, group) c.Assert(isPeerCountChanged(op), IsFalse) + if op != nil { + c.Assert(group, Equals, op.AdditionalInfos["group"]) + } } }