Skip to content

Commit

Permalink
core:solve bug in hot_region_storage (#4121)
Browse files Browse the repository at this point in the history
* cluster:add hot_region_storage

Signed-off-by: qidi1 <1083369179@qq.com>

* change cluster part

Signed-off-by: qidi1 <1083369179@qq.com>

* change isleader to bool

Signed-off-by: qidi1 <1083369179@qq.com>

* remove cluster code that no need

Signed-off-by: qidi1 <1083369179@qq.com>

* add wg

Signed-off-by: qidi1 <1083369179@qq.com>

* move hot_region_storage to core

Signed-off-by: qidi1 <1083369179@qq.com>

* cluster:merge from remote

Signed-off-by: qidi1 <1083369179@qq.com>

* cluster:change 1000 to time.Millisecond

Signed-off-by: qidi1 <1083369179@qq.com>

* cluster:change 12 to defalut,go out function

Signed-off-by: qidi1 <1083369179@qq.com>

* clsuter:format check

Signed-off-by: qidi1 <1083369179@qq.com>

* cluster:add mock for peer id

Signed-off-by: qidi1 <1083369179@qq.com>

* cluster:fix bug every region see as read,AsPeer include AsLeader,remove AsLeader

Signed-off-by: qidi1 <1083369179@qq.com>

* cluster:add test for hot region storage

Signed-off-by: qidi1 <1083369179@qq.com>

* cluster:add test in tests/server/core

Signed-off-by: qidi1 <1083369179@qq.com>

* cluster:remove some unuse code

Signed-off-by: qidi1 <1083369179@qq.com>

* core:change core.HotRegionType from string  to enum

Signed-off-by: qidi1 <1083369179@qq.com>

* Update hot_region_storage_test.go

Signed-off-by: nolouch <nolouch@gmail.com>

* Update handler.go

Signed-off-by: qidi1 <1083369179@qq.com>
Signed-off-by: nolouch <nolouch@gmail.com>

Co-authored-by: ShuNing <nolouch@gmail.com>
  • Loading branch information
qidi1 and nolouch authored Sep 27, 2021
1 parent 4ca73fb commit 455a2f9
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 32 deletions.
30 changes: 25 additions & 5 deletions server/core/hot_region_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,30 @@ const (
defaultDeleteTime = 4
)

// HotRegionType stands for hot type.
type HotRegionType uint32

// Flags for flow.
const (
WriteType HotRegionType = iota
ReadType
)

// HotRegionTypes stands for hot type.
var HotRegionTypes = []string{
"read",
"write",
WriteType.String(),
ReadType.String(),
}

// String return HotRegionType in string format.
func (h HotRegionType) String() string {
switch h {
case WriteType:
return "write"
case ReadType:
return "read"
}
return "unimplemented"
}

// NewHotRegionsStorage create storage to store hot regions info.
Expand Down Expand Up @@ -223,14 +243,14 @@ func (h *HotRegionStorage) pullHotRegionInfo() error {
if err != nil {
return err
}
if err := h.packHistoryHotRegions(historyHotReadRegions, HotRegionTypes[0]); err != nil {
if err := h.packHistoryHotRegions(historyHotReadRegions, ReadType.String()); err != nil {
return err
}
historyHotWriteRegions, err := h.hotRegionStorageHandler.PackHistoryHotWriteRegions()
if err != nil {
return err
}
err = h.packHistoryHotRegions(historyHotWriteRegions, HotRegionTypes[1])
err = h.packHistoryHotRegions(historyHotWriteRegions, WriteType.String())
return err
}

Expand All @@ -248,7 +268,7 @@ func (h *HotRegionStorage) packHistoryHotRegions(historyHotRegions []HistoryHotR
}
historyHotRegions[i].StartKey = region.StartKey
historyHotRegions[i].EndKey = region.EndKey
key := HotRegionStorePath(HotRegionTypes[0], historyHotRegions[i].UpdateTime, historyHotRegions[i].RegionID)
key := HotRegionStorePath(hotRegionType, historyHotRegions[i].UpdateTime, historyHotRegions[i].RegionID)
h.batchHotInfo[key] = &historyHotRegions[i]
}
return nil
Expand Down
22 changes: 15 additions & 7 deletions server/core/hot_region_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,25 +102,33 @@ func (t *testHotRegionStorage) TestHotRegionWrite(c *C) {
UpdateTime: now.UnixNano() / int64(time.Millisecond),
RegionID: 1,
StoreID: 1,
HotRegionType: HotRegionTypes[0],
HotRegionType: ReadType.String(),
},
{
UpdateTime: now.Add(10*time.Second).UnixNano() / int64(time.Millisecond),
RegionID: 2,
StoreID: 1,
HotRegionType: HotRegionTypes[0],
HotRegionType: ReadType.String(),
},
{
UpdateTime: now.Add(20*time.Second).UnixNano() / int64(time.Millisecond),
RegionID: 3,
StoreID: 1,
HotRegionType: HotRegionTypes[0],
HotRegionType: ReadType.String(),
},
}
packHotRegionInfo.historyHotReads = hotRegionStorages
packHotRegionInfo.historyHotWrites = []HistoryHotRegion{
{
UpdateTime: now.Add(30*time.Second).UnixNano() / int64(time.Millisecond),
RegionID: 4,
StoreID: 1,
HotRegionType: WriteType.String(),
},
}
store.pullHotRegionInfo()
store.flush()
iter := store.NewIterator(HotRegionTypes,
iter := store.NewIterator([]string{ReadType.String()},
now.UnixNano()/int64(time.Millisecond),
now.Add(40*time.Second).UnixNano()/int64(time.Millisecond))
index := 0
Expand All @@ -143,19 +151,19 @@ func (t *testHotRegionStorage) TestHotRegionDelete(c *C) {
UpdateTime: deleteDate.UnixNano() / int64(time.Millisecond),
RegionID: 1,
StoreID: 1,
HotRegionType: HotRegionTypes[0],
HotRegionType: ReadType.String(),
},
{
UpdateTime: deleteDate.Add(10*time.Second).UnixNano() / int64(time.Millisecond),
RegionID: 2,
StoreID: 1,
HotRegionType: HotRegionTypes[0],
HotRegionType: ReadType.String(),
},
{
UpdateTime: time.Now().UnixNano() / int64(time.Millisecond),
RegionID: 3,
StoreID: 1,
HotRegionType: HotRegionTypes[0],
HotRegionType: ReadType.String(),
},
}
packHotRegionInfo.historyHotReads = hotRegionStorages
Expand Down
41 changes: 21 additions & 20 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,54 +939,55 @@ func (h *Handler) IsLeader() bool {

// PackHistoryHotReadRegions get read hot region info in HistoryHotRegion form.
func (h *Handler) PackHistoryHotReadRegions() (historyHotRegions []core.HistoryHotRegion, err error) {
hotReadLeaderRegions := h.GetHotReadRegions().AsLeader
historyLeaderHotRegions, err := h.packHotRegions(hotReadLeaderRegions, true, core.HotRegionTypes[0])
if err != nil {
hotReadRegions := h.GetHotReadRegions()
if hotReadRegions == nil {
return
}
hotReadPeerRegions := h.GetHotReadRegions().AsPeer
historyPeerHotRegions, err := h.packHotRegions(hotReadPeerRegions, false, core.HotRegionTypes[0])
hotReadPeerRegions := hotReadRegions.AsPeer
historyPeerHotRegions, err := h.packHotRegions(hotReadPeerRegions, core.ReadType.String())
if err != nil {
return
}
historyHotRegions = append(historyHotRegions, historyLeaderHotRegions...)
historyHotRegions = append(historyHotRegions, historyPeerHotRegions...)
return
}

// PackHistoryHotWriteRegions get write hot region info in HistoryHotRegion from
func (h *Handler) PackHistoryHotWriteRegions() (historyHotRegions []core.HistoryHotRegion, err error) {
hotWriteLeaderRegions := h.GetHotWriteRegions().AsLeader
historyLeaderHotRegions, err := h.packHotRegions(hotWriteLeaderRegions, true, core.HotRegionTypes[1])
if err != nil {
hotWriteRegions := h.GetHotWriteRegions()
if hotWriteRegions == nil {
return
}
hotWritePeerRegions := h.GetHotWriteRegions().AsPeer
historyPeerHotRegions, err := h.packHotRegions(hotWritePeerRegions, false, core.HotRegionTypes[1])
hotWritePeerRegions := hotWriteRegions.AsPeer
historyPeerHotRegions, err := h.packHotRegions(hotWritePeerRegions, core.WriteType.String())
if err != nil {
return
}
historyHotRegions = append(historyHotRegions, historyLeaderHotRegions...)

historyHotRegions = append(historyHotRegions, historyPeerHotRegions...)
return
}

func (h *Handler) packHotRegions(hotPeersStat statistics.StoreHotPeersStat, isLeader bool, hotRegionType string) (historyHotRegions []core.HistoryHotRegion, err error) {
func (h *Handler) packHotRegions(hotPeersStat statistics.StoreHotPeersStat, hotRegionType string) (historyHotRegions []core.HistoryHotRegion, err error) {
c, err := h.GetRaftCluster()
if err != nil {
return nil, err
}
for _, hotPeersStat := range hotPeersStat {
stats := hotPeersStat.Stats
for _, hotPeerStat := range stats {
region := c.GetRegion(hotPeerStat.RegionID).GetMeta()
region, err := encryption.EncryptRegion(region, h.s.encryptionKeyManager)
region := c.GetRegion(hotPeerStat.RegionID)
if region == nil {
continue
}
meta := region.GetMeta()
meta, err := encryption.EncryptRegion(meta, h.s.encryptionKeyManager)
if err != nil {
return nil, err
}
var peerID uint64
var isLearner bool
for _, peer := range region.Peers {
for _, peer := range meta.Peers {
if peer.StoreId == hotPeerStat.StoreID {
peerID = peer.Id
isLearner = peer.Role == metapb.PeerRole_Learner
Expand All @@ -998,15 +999,15 @@ func (h *Handler) packHotRegions(hotPeersStat statistics.StoreHotPeersStat, isLe
RegionID: hotPeerStat.RegionID,
StoreID: hotPeerStat.StoreID,
PeerID: peerID,
IsLeader: isLeader,
IsLeader: meta.Id == region.GetLeader().Id,
IsLearner: isLearner,
HotDegree: int64(hotPeerStat.HotDegree),
FlowBytes: hotPeerStat.ByteRate,
KeyRate: hotPeerStat.KeyRate,
QueryRate: hotPeerStat.QueryRate,
StartKey: region.StartKey,
EndKey: region.EndKey,
EncryptionMeta: region.EncryptionMeta,
StartKey: meta.StartKey,
EndKey: meta.EndKey,
EncryptionMeta: meta.EncryptionMeta,
HotRegionType: hotRegionType,
}
historyHotRegions = append(historyHotRegions, stat)
Expand Down
145 changes: 145 additions & 0 deletions tests/server/core/hot_region_storage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2021 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package core_test

import (
"context"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/statistics"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/pdctl"
)

func Test(t *testing.T) {
TestingT(t)
}

var _ = Suite(&hotRegionHistorySuite{})

type hotRegionHistorySuite struct{}

func (s *hotRegionHistorySuite) SetUpSuite(c *C) {
server.EnableZap = true
}

func (s *hotRegionHistorySuite) TestHotRegionStorage(c *C) {
statistics.Denoising = false
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 1,
func(cfg *config.Config, serverName string) {
cfg.Schedule.HotRegionCacheHitsThreshold = 0
cfg.Schedule.HotRegionsWriteInterval.Duration = 1000 * time.Millisecond
cfg.Schedule.HotRegionsResevervedDays = 1
},
)
c.Assert(err, IsNil)
err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()
stores := []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
LastHeartbeat: time.Now().UnixNano(),
},
{
Id: 2,
State: metapb.StoreState_Up,
LastHeartbeat: time.Now().UnixNano(),
},
}

leaderServer := cluster.GetServer(cluster.GetLeader())
c.Assert(leaderServer.BootstrapCluster(), IsNil)
for _, store := range stores {
pdctl.MustPutStore(c, leaderServer.GetServer(), store)
}
defer cluster.Destroy()
startTime := time.Now().UnixNano() / int64(time.Millisecond)
pdctl.MustPutRegion(c, cluster, 1, 1, []byte("a"), []byte("b"), core.SetWrittenBytes(3000000000), core.SetReportInterval(statistics.WriteReportInterval))
pdctl.MustPutRegion(c, cluster, 2, 2, []byte("c"), []byte("d"), core.SetWrittenBytes(6000000000), core.SetReportInterval(statistics.WriteReportInterval))
pdctl.MustPutRegion(c, cluster, 3, 1, []byte("e"), []byte("f"))
pdctl.MustPutRegion(c, cluster, 4, 2, []byte("g"), []byte("h"))
storeStatss := []*pdpb.StoreStats{
{
StoreId: 1,
Interval: &pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: statistics.ReadReportInterval},
PeerStats: []*pdpb.PeerStat{
{
RegionId: 3,
ReadBytes: 9000000000,
},
},
},
{
StoreId: 2,
Interval: &pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: statistics.ReadReportInterval},
PeerStats: []*pdpb.PeerStat{
{
RegionId: 4,
ReadBytes: 9000000000,
},
},
},
}
for _, storeStats := range storeStatss {
leaderServer.GetRaftCluster().HandleStoreHeartbeat(storeStats)
}
// wait hot scheduler starts
time.Sleep(5000 * time.Millisecond)
endTime := time.Now().UnixNano() / int64(time.Millisecond)
storage := leaderServer.GetServer().GetHistoryHotRegionStorage()
iter := storage.NewIterator([]string{core.WriteType.String()}, startTime, endTime)
next, err := iter.Next()
c.Assert(next, NotNil)
c.Assert(err, IsNil)
c.Assert(next.RegionID, Equals, uint64(1))
c.Assert(next.StoreID, Equals, uint64(1))
c.Assert(next.HotRegionType, Equals, core.WriteType.String())
next, err = iter.Next()
c.Assert(next, NotNil)
c.Assert(err, IsNil)
c.Assert(next.RegionID, Equals, uint64(2))
c.Assert(next.StoreID, Equals, uint64(2))
c.Assert(next.HotRegionType, Equals, core.WriteType.String())
next, err = iter.Next()
c.Assert(next, IsNil)
c.Assert(err, IsNil)
iter = storage.NewIterator([]string{core.ReadType.String()}, startTime, endTime)
next, err = iter.Next()
c.Assert(next, NotNil)
c.Assert(err, IsNil)
c.Assert(next.RegionID, Equals, uint64(3))
c.Assert(next.StoreID, Equals, uint64(1))
c.Assert(next.HotRegionType, Equals, core.ReadType.String())
next, err = iter.Next()
c.Assert(next, NotNil)
c.Assert(err, IsNil)
c.Assert(next.RegionID, Equals, uint64(4))
c.Assert(next.StoreID, Equals, uint64(2))
c.Assert(next.HotRegionType, Equals, core.ReadType.String())
next, err = iter.Next()
c.Assert(next, IsNil)
c.Assert(err, IsNil)
}

0 comments on commit 455a2f9

Please sign in to comment.