diff --git a/server/core/hot_region_storage.go b/server/core/hot_region_storage.go index f44c7ca36ba..9e83353e1aa 100644 --- a/server/core/hot_region_storage.go +++ b/server/core/hot_region_storage.go @@ -100,10 +100,30 @@ const ( defaultDeleteTime = 4 ) +// HotRegionType stands for hot type. +type HotRegionType uint32 + +// Flags for flow. +const ( + WriteType HotRegionType = iota + ReadType +) + // HotRegionTypes stands for hot type. var HotRegionTypes = []string{ - "read", - "write", + WriteType.String(), + ReadType.String(), +} + +// String return HotRegionType in string format. +func (h HotRegionType) String() string { + switch h { + case WriteType: + return "write" + case ReadType: + return "read" + } + return "unimplemented" } // NewHotRegionsStorage create storage to store hot regions info. @@ -223,14 +243,14 @@ func (h *HotRegionStorage) pullHotRegionInfo() error { if err != nil { return err } - if err := h.packHistoryHotRegions(historyHotReadRegions, HotRegionTypes[0]); err != nil { + if err := h.packHistoryHotRegions(historyHotReadRegions, ReadType.String()); err != nil { return err } historyHotWriteRegions, err := h.hotRegionStorageHandler.PackHistoryHotWriteRegions() if err != nil { return err } - err = h.packHistoryHotRegions(historyHotWriteRegions, HotRegionTypes[1]) + err = h.packHistoryHotRegions(historyHotWriteRegions, WriteType.String()) return err } @@ -248,7 +268,7 @@ func (h *HotRegionStorage) packHistoryHotRegions(historyHotRegions []HistoryHotR } historyHotRegions[i].StartKey = region.StartKey historyHotRegions[i].EndKey = region.EndKey - key := HotRegionStorePath(HotRegionTypes[0], historyHotRegions[i].UpdateTime, historyHotRegions[i].RegionID) + key := HotRegionStorePath(hotRegionType, historyHotRegions[i].UpdateTime, historyHotRegions[i].RegionID) h.batchHotInfo[key] = &historyHotRegions[i] } return nil diff --git a/server/core/hot_region_storage_test.go b/server/core/hot_region_storage_test.go index be5f1d5d7eb..4ef960af41a 100644 --- a/server/core/hot_region_storage_test.go +++ b/server/core/hot_region_storage_test.go @@ -102,25 +102,33 @@ func (t *testHotRegionStorage) TestHotRegionWrite(c *C) { UpdateTime: now.UnixNano() / int64(time.Millisecond), RegionID: 1, StoreID: 1, - HotRegionType: HotRegionTypes[0], + HotRegionType: ReadType.String(), }, { UpdateTime: now.Add(10*time.Second).UnixNano() / int64(time.Millisecond), RegionID: 2, StoreID: 1, - HotRegionType: HotRegionTypes[0], + HotRegionType: ReadType.String(), }, { UpdateTime: now.Add(20*time.Second).UnixNano() / int64(time.Millisecond), RegionID: 3, StoreID: 1, - HotRegionType: HotRegionTypes[0], + HotRegionType: ReadType.String(), }, } packHotRegionInfo.historyHotReads = hotRegionStorages + packHotRegionInfo.historyHotWrites = []HistoryHotRegion{ + { + UpdateTime: now.Add(30*time.Second).UnixNano() / int64(time.Millisecond), + RegionID: 4, + StoreID: 1, + HotRegionType: WriteType.String(), + }, + } store.pullHotRegionInfo() store.flush() - iter := store.NewIterator(HotRegionTypes, + iter := store.NewIterator([]string{ReadType.String()}, now.UnixNano()/int64(time.Millisecond), now.Add(40*time.Second).UnixNano()/int64(time.Millisecond)) index := 0 @@ -143,19 +151,19 @@ func (t *testHotRegionStorage) TestHotRegionDelete(c *C) { UpdateTime: deleteDate.UnixNano() / int64(time.Millisecond), RegionID: 1, StoreID: 1, - HotRegionType: HotRegionTypes[0], + HotRegionType: ReadType.String(), }, { UpdateTime: deleteDate.Add(10*time.Second).UnixNano() / int64(time.Millisecond), RegionID: 2, StoreID: 1, - HotRegionType: HotRegionTypes[0], + HotRegionType: ReadType.String(), }, { UpdateTime: time.Now().UnixNano() / int64(time.Millisecond), RegionID: 3, StoreID: 1, - HotRegionType: HotRegionTypes[0], + HotRegionType: ReadType.String(), }, } packHotRegionInfo.historyHotReads = hotRegionStorages diff --git a/server/handler.go b/server/handler.go index 3ca75604e93..ac46c74ecb6 100644 --- a/server/handler.go +++ b/server/handler.go @@ -939,39 +939,36 @@ func (h *Handler) IsLeader() bool { // PackHistoryHotReadRegions get read hot region info in HistoryHotRegion form. func (h *Handler) PackHistoryHotReadRegions() (historyHotRegions []core.HistoryHotRegion, err error) { - hotReadLeaderRegions := h.GetHotReadRegions().AsLeader - historyLeaderHotRegions, err := h.packHotRegions(hotReadLeaderRegions, true, core.HotRegionTypes[0]) - if err != nil { + hotReadRegions := h.GetHotReadRegions() + if hotReadRegions == nil { return } - hotReadPeerRegions := h.GetHotReadRegions().AsPeer - historyPeerHotRegions, err := h.packHotRegions(hotReadPeerRegions, false, core.HotRegionTypes[0]) + hotReadPeerRegions := hotReadRegions.AsPeer + historyPeerHotRegions, err := h.packHotRegions(hotReadPeerRegions, core.ReadType.String()) if err != nil { return } - historyHotRegions = append(historyHotRegions, historyLeaderHotRegions...) historyHotRegions = append(historyHotRegions, historyPeerHotRegions...) return } // PackHistoryHotWriteRegions get write hot region info in HistoryHotRegion from func (h *Handler) PackHistoryHotWriteRegions() (historyHotRegions []core.HistoryHotRegion, err error) { - hotWriteLeaderRegions := h.GetHotWriteRegions().AsLeader - historyLeaderHotRegions, err := h.packHotRegions(hotWriteLeaderRegions, true, core.HotRegionTypes[1]) - if err != nil { + hotWriteRegions := h.GetHotWriteRegions() + if hotWriteRegions == nil { return } - hotWritePeerRegions := h.GetHotWriteRegions().AsPeer - historyPeerHotRegions, err := h.packHotRegions(hotWritePeerRegions, false, core.HotRegionTypes[1]) + hotWritePeerRegions := hotWriteRegions.AsPeer + historyPeerHotRegions, err := h.packHotRegions(hotWritePeerRegions, core.WriteType.String()) if err != nil { return } - historyHotRegions = append(historyHotRegions, historyLeaderHotRegions...) + historyHotRegions = append(historyHotRegions, historyPeerHotRegions...) return } -func (h *Handler) packHotRegions(hotPeersStat statistics.StoreHotPeersStat, isLeader bool, hotRegionType string) (historyHotRegions []core.HistoryHotRegion, err error) { +func (h *Handler) packHotRegions(hotPeersStat statistics.StoreHotPeersStat, hotRegionType string) (historyHotRegions []core.HistoryHotRegion, err error) { c, err := h.GetRaftCluster() if err != nil { return nil, err @@ -979,14 +976,18 @@ func (h *Handler) packHotRegions(hotPeersStat statistics.StoreHotPeersStat, isLe for _, hotPeersStat := range hotPeersStat { stats := hotPeersStat.Stats for _, hotPeerStat := range stats { - region := c.GetRegion(hotPeerStat.RegionID).GetMeta() - region, err := encryption.EncryptRegion(region, h.s.encryptionKeyManager) + region := c.GetRegion(hotPeerStat.RegionID) + if region == nil { + continue + } + meta := region.GetMeta() + meta, err := encryption.EncryptRegion(meta, h.s.encryptionKeyManager) if err != nil { return nil, err } var peerID uint64 var isLearner bool - for _, peer := range region.Peers { + for _, peer := range meta.Peers { if peer.StoreId == hotPeerStat.StoreID { peerID = peer.Id isLearner = peer.Role == metapb.PeerRole_Learner @@ -998,15 +999,15 @@ func (h *Handler) packHotRegions(hotPeersStat statistics.StoreHotPeersStat, isLe RegionID: hotPeerStat.RegionID, StoreID: hotPeerStat.StoreID, PeerID: peerID, - IsLeader: isLeader, + IsLeader: meta.Id == region.GetLeader().Id, IsLearner: isLearner, HotDegree: int64(hotPeerStat.HotDegree), FlowBytes: hotPeerStat.ByteRate, KeyRate: hotPeerStat.KeyRate, QueryRate: hotPeerStat.QueryRate, - StartKey: region.StartKey, - EndKey: region.EndKey, - EncryptionMeta: region.EncryptionMeta, + StartKey: meta.StartKey, + EndKey: meta.EndKey, + EncryptionMeta: meta.EncryptionMeta, HotRegionType: hotRegionType, } historyHotRegions = append(historyHotRegions, stat) diff --git a/tests/server/core/hot_region_storage_test.go b/tests/server/core/hot_region_storage_test.go new file mode 100644 index 00000000000..58c8b1f71c9 --- /dev/null +++ b/tests/server/core/hot_region_storage_test.go @@ -0,0 +1,145 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core_test + +import ( + "context" + "testing" + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/tikv/pd/server" + "github.com/tikv/pd/server/config" + "github.com/tikv/pd/server/core" + "github.com/tikv/pd/server/statistics" + "github.com/tikv/pd/tests" + "github.com/tikv/pd/tests/pdctl" +) + +func Test(t *testing.T) { + TestingT(t) +} + +var _ = Suite(&hotRegionHistorySuite{}) + +type hotRegionHistorySuite struct{} + +func (s *hotRegionHistorySuite) SetUpSuite(c *C) { + server.EnableZap = true +} + +func (s *hotRegionHistorySuite) TestHotRegionStorage(c *C) { + statistics.Denoising = false + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 1, + func(cfg *config.Config, serverName string) { + cfg.Schedule.HotRegionCacheHitsThreshold = 0 + cfg.Schedule.HotRegionsWriteInterval.Duration = 1000 * time.Millisecond + cfg.Schedule.HotRegionsResevervedDays = 1 + }, + ) + c.Assert(err, IsNil) + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + cluster.WaitLeader() + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 2, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + } + + leaderServer := cluster.GetServer(cluster.GetLeader()) + c.Assert(leaderServer.BootstrapCluster(), IsNil) + for _, store := range stores { + pdctl.MustPutStore(c, leaderServer.GetServer(), store) + } + defer cluster.Destroy() + startTime := time.Now().UnixNano() / int64(time.Millisecond) + pdctl.MustPutRegion(c, cluster, 1, 1, []byte("a"), []byte("b"), core.SetWrittenBytes(3000000000), core.SetReportInterval(statistics.WriteReportInterval)) + pdctl.MustPutRegion(c, cluster, 2, 2, []byte("c"), []byte("d"), core.SetWrittenBytes(6000000000), core.SetReportInterval(statistics.WriteReportInterval)) + pdctl.MustPutRegion(c, cluster, 3, 1, []byte("e"), []byte("f")) + pdctl.MustPutRegion(c, cluster, 4, 2, []byte("g"), []byte("h")) + storeStatss := []*pdpb.StoreStats{ + { + StoreId: 1, + Interval: &pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: statistics.ReadReportInterval}, + PeerStats: []*pdpb.PeerStat{ + { + RegionId: 3, + ReadBytes: 9000000000, + }, + }, + }, + { + StoreId: 2, + Interval: &pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: statistics.ReadReportInterval}, + PeerStats: []*pdpb.PeerStat{ + { + RegionId: 4, + ReadBytes: 9000000000, + }, + }, + }, + } + for _, storeStats := range storeStatss { + leaderServer.GetRaftCluster().HandleStoreHeartbeat(storeStats) + } + // wait hot scheduler starts + time.Sleep(5000 * time.Millisecond) + endTime := time.Now().UnixNano() / int64(time.Millisecond) + storage := leaderServer.GetServer().GetHistoryHotRegionStorage() + iter := storage.NewIterator([]string{core.WriteType.String()}, startTime, endTime) + next, err := iter.Next() + c.Assert(next, NotNil) + c.Assert(err, IsNil) + c.Assert(next.RegionID, Equals, uint64(1)) + c.Assert(next.StoreID, Equals, uint64(1)) + c.Assert(next.HotRegionType, Equals, core.WriteType.String()) + next, err = iter.Next() + c.Assert(next, NotNil) + c.Assert(err, IsNil) + c.Assert(next.RegionID, Equals, uint64(2)) + c.Assert(next.StoreID, Equals, uint64(2)) + c.Assert(next.HotRegionType, Equals, core.WriteType.String()) + next, err = iter.Next() + c.Assert(next, IsNil) + c.Assert(err, IsNil) + iter = storage.NewIterator([]string{core.ReadType.String()}, startTime, endTime) + next, err = iter.Next() + c.Assert(next, NotNil) + c.Assert(err, IsNil) + c.Assert(next.RegionID, Equals, uint64(3)) + c.Assert(next.StoreID, Equals, uint64(1)) + c.Assert(next.HotRegionType, Equals, core.ReadType.String()) + next, err = iter.Next() + c.Assert(next, NotNil) + c.Assert(err, IsNil) + c.Assert(next.RegionID, Equals, uint64(4)) + c.Assert(next.StoreID, Equals, uint64(2)) + c.Assert(next.HotRegionType, Equals, core.ReadType.String()) + next, err = iter.Next() + c.Assert(next, IsNil) + c.Assert(err, IsNil) +}