diff --git a/pkg/core/region.go b/pkg/core/region.go index 6f2e52dfe22..35e71005535 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -95,6 +95,12 @@ func (r *RegionInfo) LoadedFromStorage() bool { return r.source == Storage } +// LoadedFromSync means this region's meta info loaded from region syncer. +// Only used for test. +func (r *RegionInfo) LoadedFromSync() bool { + return r.source == Sync +} + // NewRegionInfo creates RegionInfo with region's meta and leader peer. func NewRegionInfo(region *metapb.Region, leader *metapb.Peer, opts ...RegionCreateOption) *RegionInfo { regionInfo := &RegionInfo{ @@ -668,7 +674,7 @@ func (r *RegionInfo) isRegionRecreated() bool { // RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin // and new region information. -type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) +type RegionGuideFunc func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool) // GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function. // nil means do not print the log. @@ -681,19 +687,15 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } // Save to storage if meta is updated. // Save to cache if meta or leader is updated, or contains any down/pending peer. - // Mark isNew if the region in cache does not have leader. - return func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) { + return func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool) { if origin == nil { if log.GetLevel() <= zap.DebugLevel { debug("insert new region", zap.Uint64("region-id", region.GetID()), logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta()))) } - saveKV, saveCache, isNew = true, true, true + saveKV, saveCache = true, true } else { - if origin.LoadedFromStorage() { - isNew = true - } r := region.GetRegionEpoch() o := origin.GetRegionEpoch() if r.GetVersion() > o.GetVersion() { @@ -719,9 +721,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { saveKV, saveCache = true, true } if region.GetLeader().GetId() != origin.GetLeader().GetId() { - if origin.GetLeader().GetId() == 0 { - isNew = true - } else if log.GetLevel() <= zap.InfoLevel { + if origin.GetLeader().GetId() != 0 && log.GetLevel() <= zap.InfoLevel { info("leader changed", zap.Uint64("region-id", region.GetID()), zap.Uint64("from", origin.GetLeader().GetStoreId()), diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index 1624697e893..f857970c71e 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -362,7 +362,7 @@ func TestNeedSync(t *testing.T) { for _, testCase := range testCases { regionA := region.Clone(testCase.optionsA...) regionB := region.Clone(testCase.optionsB...) - _, _, _, needSync := RegionGuide(regionA, regionB) + _, _, needSync := RegionGuide(regionA, regionB) re.Equal(testCase.needSync, needSync) } } diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go index 23791a14514..2a66586c5df 100644 --- a/pkg/statistics/region_collection.go +++ b/pkg/statistics/region_collection.go @@ -28,6 +28,8 @@ import ( // RegionStatisticType represents the type of the region's status. type RegionStatisticType uint32 +const emptyStatistic = RegionStatisticType(0) + // region status type const ( MissPeer RegionStatisticType = 1 << iota @@ -163,6 +165,9 @@ func (r *RegionStatistics) deleteOfflineEntry(deleteIndex RegionStatisticType, r // due to some special state types. func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool { regionID := region.GetID() + if !r.isObserved(regionID) { + return true + } if r.IsRegionStatsType(regionID, OversizedRegion) != region.IsOversized(int64(r.storeConfigManager.GetStoreConfig().GetRegionMaxSize()), int64(r.storeConfigManager.GetStoreConfig().GetRegionMaxKeys())) { return true @@ -171,6 +176,14 @@ func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool { region.NeedMerge(int64(r.conf.GetMaxMergeRegionSize()), int64(r.conf.GetMaxMergeRegionKeys())) } +// isObserved returns whether the region is observed. And it also shows whether PD received heartbeat of this region. +func (r *RegionStatistics) isObserved(id uint64) bool { + r.RLock() + defer r.RUnlock() + _, ok := r.index[id] + return ok +} + // Observe records the current regions' status. func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.StoreInfo) { r.Lock() @@ -275,10 +288,11 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store r.deleteOfflineEntry(deleteIndex, regionID) r.offlineIndex[regionID] = offlinePeerTypeIndex - if oldIndex, ok := r.index[regionID]; ok { - deleteIndex = oldIndex &^ peerTypeIndex + // Remove the info if any of the conditions are not met any more. + if oldIndex, ok := r.index[regionID]; ok && oldIndex > emptyStatistic { + deleteIndex := oldIndex &^ peerTypeIndex + r.deleteEntry(deleteIndex, regionID) } - r.deleteEntry(deleteIndex, regionID) r.index[regionID] = peerTypeIndex } @@ -287,7 +301,10 @@ func (r *RegionStatistics) ClearDefunctRegion(regionID uint64) { r.Lock() defer r.Unlock() if oldIndex, ok := r.index[regionID]; ok { - r.deleteEntry(oldIndex, regionID) + delete(r.index, regionID) + if oldIndex > emptyStatistic { + r.deleteEntry(oldIndex, regionID) + } } if oldIndex, ok := r.offlineIndex[regionID]; ok { r.deleteOfflineEntry(oldIndex, regionID) diff --git a/server/api/region_test.go b/server/api/region_test.go index 9093f71c542..75132e1416a 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -203,7 +203,7 @@ func (suite *regionTestSuite) TestRegionCheck() { r7 := make([]*histItem, 1) suite.NoError(tu.ReadGetJSON(re, testDialClient, url, &r7)) histKeys := []*histItem{{Start: 1000, End: 1999, Count: 1}} - suite.Equal(histKeys, r7) + re.Equal(histKeys, r7) } func (suite *regionTestSuite) TestRegions() { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index df0115826c8..1637c2a8612 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1027,9 +1027,8 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - // Mark isNew if the region in cache does not have leader. - isNew, saveKV, saveCache, needSync := regionGuide(region, origin) - if !saveKV && !saveCache && !isNew { + saveKV, saveCache, needSync := regionGuide(region, origin) + if !saveKV && !saveCache { // Due to some config changes need to update the region stats as well, // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { diff --git a/server/region_syncer/client.go b/server/region_syncer/client.go index b39013d6dd6..55331ea0635 100644 --- a/server/region_syncer/client.go +++ b/server/region_syncer/client.go @@ -218,7 +218,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err)) continue } - _, saveKV, _, _ := regionGuide(region, origin) + saveKV, _, _ := regionGuide(region, origin) overlaps := bc.PutRegion(region) if hasBuckets { diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 6e79995008f..3b8863a90b1 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -37,6 +37,7 @@ import ( "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/testutil" @@ -180,6 +181,99 @@ func TestDamagedRegion(t *testing.T) { re.Equal(uint64(1), rc.GetOperatorController().OperatorCount(operator.OpAdmin)) } +func TestRegionStatistics(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tc, err := tests.NewTestCluster(ctx, 2) + defer tc.Destroy() + re.NoError(err) + + err = tc.RunInitialServers() + re.NoError(err) + + leaderName := tc.WaitLeader() + leaderServer := tc.GetServer(leaderName) + grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) + clusterID := leaderServer.GetClusterID() + bootstrapCluster(re, clusterID, grpcPDClient) + rc := leaderServer.GetRaftCluster() + + region := &metapb.Region{ + Id: 10, + StartKey: []byte("abc"), + EndKey: []byte("xyz"), + Peers: []*metapb.Peer{ + {Id: 101, StoreId: 1}, + {Id: 102, StoreId: 2}, + {Id: 103, StoreId: 3}, + {Id: 104, StoreId: 4, Role: metapb.PeerRole_Learner}, + }, + } + + // To put region. + regionInfo := core.NewRegionInfo(region, region.Peers[0], core.SetApproximateSize(0)) + err = tc.HandleRegionHeartbeat(regionInfo) + re.NoError(err) + regions := rc.GetRegionStatsByType(statistics.LearnerPeer) + re.Len(regions, 1) + + // wait for sync region + time.Sleep(1000 * time.Millisecond) + + leaderServer.ResignLeader() + newLeaderName := tc.WaitLeader() + re.NotEqual(newLeaderName, leaderName) + leaderServer = tc.GetServer(newLeaderName) + rc = leaderServer.GetRaftCluster() + r := rc.GetRegion(region.Id) + re.NotNil(r) + re.True(r.LoadedFromSync()) + regions = rc.GetRegionStatsByType(statistics.LearnerPeer) + re.Empty(regions) + err = tc.HandleRegionHeartbeat(regionInfo) + re.NoError(err) + regions = rc.GetRegionStatsByType(statistics.LearnerPeer) + re.Len(regions, 1) + + leaderServer.ResignLeader() + newLeaderName = tc.WaitLeader() + re.Equal(newLeaderName, leaderName) + leaderServer = tc.GetServer(newLeaderName) + rc = leaderServer.GetRaftCluster() + re.NotNil(r) + re.True(r.LoadedFromStorage() || r.LoadedFromSync()) + regions = rc.GetRegionStatsByType(statistics.LearnerPeer) + re.Empty(regions) + regionInfo = regionInfo.Clone(core.SetSource(core.Heartbeat), core.SetApproximateSize(30)) + err = tc.HandleRegionHeartbeat(regionInfo) + re.NoError(err) + rc = leaderServer.GetRaftCluster() + r = rc.GetRegion(region.Id) + re.NotNil(r) + re.False(r.LoadedFromStorage() && r.LoadedFromSync()) + + leaderServer.ResignLeader() + newLeaderName = tc.WaitLeader() + re.NotEqual(newLeaderName, leaderName) + leaderServer.ResignLeader() + newLeaderName = tc.WaitLeader() + re.Equal(newLeaderName, leaderName) + leaderServer = tc.GetServer(newLeaderName) + rc = leaderServer.GetRaftCluster() + r = rc.GetRegion(region.Id) + re.NotNil(r) + re.False(r.LoadedFromStorage() && r.LoadedFromSync()) + regions = rc.GetRegionStatsByType(statistics.LearnerPeer) + re.Empty(regions) + + regionInfo = regionInfo.Clone(core.SetSource(core.Heartbeat), core.SetApproximateSize(30)) + err = tc.HandleRegionHeartbeat(regionInfo) + re.NoError(err) + regions = rc.GetRegionStatsByType(statistics.LearnerPeer) + re.Len(regions, 1) +} + func TestStaleRegion(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background())