Skip to content

Commit

Permalink
config:fix bug hot region configuration cant be change unless we do a…
Browse files Browse the repository at this point in the history
… reboot (tikv#4539)

* fix bug of hot_region_history

Signed-off-by: qidi1 <1083369179@qq.com>

* config:make config can be change
ref #pingcap/tidb/issues/25281

Signed-off-by: qidi1 <1083369179@qq.com>

* server:address comment and add unit test

Signed-off-by: qidi1 <1083369179@qq.com>

Co-authored-by: IcePigZDB <icepigzdb@gmail.com>

* server:data race

Signed-off-by: qidi1 <1083369179@qq.com>

* core:add integration test for config change

Signed-off-by: qidi1 <1083369179@qq.com>

* core:remove config change unit test

Signed-off-by: qidi1 <1083369179@qq.com>

* core:add getCurReservedDays and getCurInterval

Signed-off-by: qidi1 <1083369179@qq.com>

Co-authored-by: IcePigZDB <icepigzdb@gmail.com>
Co-authored-by: ShuNing <nolouch@gmail.com>
Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
4 people committed Jan 11, 2022
1 parent f83641d commit b7c9498
Show file tree
Hide file tree
Showing 8 changed files with 299 additions and 44 deletions.
9 changes: 3 additions & 6 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ type ScheduleConfig struct {
HotRegionsWriteInterval typeutil.Duration `toml:"hot-regions-write-interval" json:"hot-regions-write-interval"`

// The day of hot regions data to be reserved. 0 means close.
HotRegionsReservedDays int64 `toml:"hot-regions-reserved-days" json:"hot-regions-reserved-days"`
HotRegionsReservedDays uint64 `toml:"hot-regions-reserved-days" json:"hot-regions-reserved-days"`
}

// Clone returns a cloned scheduling configuration.
Expand Down Expand Up @@ -805,6 +805,7 @@ func (c *ScheduleConfig) adjust(meta *configMetaData, reloading bool) error {
adjustDuration(&c.SplitMergeInterval, defaultSplitMergeInterval)
adjustDuration(&c.PatrolRegionInterval, defaultPatrolRegionInterval)
adjustDuration(&c.MaxStoreDownTime, defaultMaxStoreDownTime)
adjustDuration(&c.HotRegionsWriteInterval, defaultHotRegionsWriteInterval)
if !meta.IsDefined("leader-schedule-limit") {
adjustUint64(&c.LeaderScheduleLimit, defaultLeaderScheduleLimit)
}
Expand Down Expand Up @@ -868,12 +869,8 @@ func (c *ScheduleConfig) adjust(meta *configMetaData, reloading bool) error {
c.StoreLimit = make(map[uint64]StoreLimitConfig)
}

if !meta.IsDefined("hot-regions-write-interval") {
adjustDuration(&c.HotRegionsWriteInterval, defaultHotRegionsWriteInterval)
}

if !meta.IsDefined("hot-regions-reserved-days") {
adjustInt64(&c.HotRegionsReservedDays, defaultHotRegionsReservedDays)
adjustUint64(&c.HotRegionsReservedDays, defaultHotRegionsReservedDays)
}

return c.Validate()
Expand Down
4 changes: 2 additions & 2 deletions server/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,13 +477,13 @@ hot-regions-write-interval= "30m"
err = cfg.Adjust(&meta, false)
c.Assert(err, IsNil)
c.Assert(cfg.Schedule.HotRegionsWriteInterval.Duration, Equals, 30*time.Minute)
c.Assert(cfg.Schedule.HotRegionsReservedDays, Equals, int64(30))
c.Assert(cfg.Schedule.HotRegionsReservedDays, Equals, uint64(30))
// Verify default value
cfg = NewConfig()
err = cfg.Adjust(nil, false)
c.Assert(err, IsNil)
c.Assert(cfg.Schedule.HotRegionsWriteInterval.Duration, Equals, 10*time.Minute)
c.Assert(cfg.Schedule.HotRegionsReservedDays, Equals, int64(7))
c.Assert(cfg.Schedule.HotRegionsReservedDays, Equals, uint64(7))
}

func (s *testConfigSuite) TestConfigClone(c *C) {
Expand Down
10 changes: 10 additions & 0 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,16 @@ func (o *PersistOptions) GetSchedulers() SchedulerConfigs {
return o.GetScheduleConfig().Schedulers
}

// GetHotRegionsWriteInterval gets interval for PD to store Hot Region information.
func (o *PersistOptions) GetHotRegionsWriteInterval() time.Duration {
return o.GetScheduleConfig().HotRegionsWriteInterval.Duration
}

// GetHotRegionsReservedDays gets days hot region information is kept.
func (o *PersistOptions) GetHotRegionsReservedDays() uint64 {
return o.GetScheduleConfig().HotRegionsReservedDays
}

// AddSchedulerCfg adds the scheduler configurations.
func (o *PersistOptions) AddSchedulerCfg(tp string, args []string) {
v := o.GetScheduleConfig().Clone()
Expand Down
84 changes: 68 additions & 16 deletions server/core/hot_region_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server/encryptionkm"
"github.com/tikv/pd/server/kv"
"go.uber.org/zap"
)

// HotRegionStorage is used to storage hot region info,
Expand All @@ -43,14 +44,15 @@ import (
type HotRegionStorage struct {
*kv.LeveldbKV
encryptionKeyManager *encryptionkm.KeyManager
mu sync.RWMutex
hotRegionLoopWg sync.WaitGroup
batchHotInfo map[string]*HistoryHotRegion
remianedDays int64
pullInterval time.Duration
hotRegionInfoCtx context.Context
hotRegionInfoCancel context.CancelFunc
hotRegionStorageHandler HotRegionStorageHandler

curReservedDays uint64
curInterval time.Duration
mu sync.RWMutex
}

// HistoryHotRegions wraps historyHotRegion
Expand Down Expand Up @@ -90,6 +92,10 @@ type HotRegionStorageHandler interface {
PackHistoryHotWriteRegions() ([]HistoryHotRegion, error)
// IsLeader return true means this server is leader.
IsLeader() bool
// GetHotRegionWriteInterval gets interval for PD to store Hot Region information..
GetHotRegionsWriteInterval() time.Duration
// GetHotRegionsReservedDays gets days hot region information is kept.
GetHotRegionsReservedDays() uint64
}

const (
Expand Down Expand Up @@ -129,8 +135,6 @@ func NewHotRegionsStorage(
path string,
encryptionKeyManager *encryptionkm.KeyManager,
hotRegionStorageHandler HotRegionStorageHandler,
remianedDays int64,
pullInterval time.Duration,
) (*HotRegionStorage, error) {
levelDB, err := kv.NewLeveldbKV(path)
if err != nil {
Expand All @@ -141,17 +145,15 @@ func NewHotRegionsStorage(
LeveldbKV: levelDB,
encryptionKeyManager: encryptionKeyManager,
batchHotInfo: make(map[string]*HistoryHotRegion),
remianedDays: remianedDays,
pullInterval: pullInterval,
hotRegionInfoCtx: hotRegionInfoCtx,
hotRegionInfoCancel: hotRegionInfoCancle,
hotRegionStorageHandler: hotRegionStorageHandler,
curReservedDays: hotRegionStorageHandler.GetHotRegionsReservedDays(),
curInterval: hotRegionStorageHandler.GetHotRegionsWriteInterval(),
}
if remianedDays > 0 {
h.hotRegionLoopWg.Add(2)
go h.backgroundFlush()
go h.backgroundDelete()
}
h.hotRegionLoopWg.Add(2)
go h.backgroundFlush()
go h.backgroundDelete()
return &h, nil
}

Expand All @@ -173,11 +175,18 @@ func (h *HotRegionStorage) backgroundDelete() {
for {
select {
case <-ticker.C:
h.updateReservedDays()
curReservedDays := h.getCurReservedDays()
if isFirst {
ticker.Reset(24 * time.Hour)
isFirst = false
}
h.delete()
if curReservedDays == 0 {
log.Warn(`hot region reserved days is 0, if previous reserved days is non 0,
there may be residual hot regions, you can remove it manually, [pd-dir]/data/hot-region.`)
continue
}
h.delete(int(curReservedDays))
case <-h.hotRegionInfoCtx.Done():
return
}
Expand All @@ -186,14 +195,21 @@ func (h *HotRegionStorage) backgroundDelete() {

// Write hot_region info into db in the background.
func (h *HotRegionStorage) backgroundFlush() {
ticker := time.NewTicker(h.pullInterval)
interval := h.getCurInterval()
ticker := time.NewTicker(interval)
defer func() {
ticker.Stop()
h.hotRegionLoopWg.Done()
}()
for {
select {
case <-ticker.C:
h.updateInterval()
h.updateReservedDays()
ticker.Reset(h.getCurInterval())
if h.getCurReservedDays() == 0 {
continue
}
if h.hotRegionStorageHandler.IsLeader() {
if err := h.pullHotRegionInfo(); err != nil {
log.Error("get hot_region stat meet error", errs.ZapError(err))
Expand Down Expand Up @@ -270,6 +286,42 @@ func (h *HotRegionStorage) packHistoryHotRegions(historyHotRegions []HistoryHotR
return nil
}

func (h *HotRegionStorage) updateInterval() {
h.mu.Lock()
defer h.mu.Unlock()
interval := h.hotRegionStorageHandler.GetHotRegionsWriteInterval()
if interval != h.curInterval {
log.Info("hot region write interval changed",
zap.Duration("previous-interval", h.curInterval),
zap.Duration("new-interval", interval))
h.curInterval = interval
}
}

func (h *HotRegionStorage) getCurInterval() time.Duration {
h.mu.RLock()
defer h.mu.RUnlock()
return h.curInterval
}

func (h *HotRegionStorage) updateReservedDays() {
h.mu.Lock()
defer h.mu.Unlock()
reservedDays := h.hotRegionStorageHandler.GetHotRegionsReservedDays()
if reservedDays != h.curReservedDays {
log.Info("hot region reserved days changed",
zap.Uint64("previous-reserved-days", h.curReservedDays),
zap.Uint64("new-reserved-days", reservedDays))
h.curReservedDays = reservedDays
}
}

func (h *HotRegionStorage) getCurReservedDays() uint64 {
h.mu.RLock()
defer h.mu.RUnlock()
return h.curReservedDays
}

func (h *HotRegionStorage) flush() error {
h.mu.Lock()
defer h.mu.Unlock()
Expand All @@ -288,14 +340,14 @@ func (h *HotRegionStorage) flush() error {
return nil
}

func (h *HotRegionStorage) delete() error {
func (h *HotRegionStorage) delete(reservedDays int) error {
h.mu.Lock()
defer h.mu.Unlock()
db := h.LeveldbKV
batch := new(leveldb.Batch)
for _, hotRegionType := range HotRegionTypes {
startKey := HotRegionStorePath(hotRegionType, 0, 0)
endTime := time.Now().AddDate(0, 0, 0-int(h.remianedDays)).UnixNano() / int64(time.Millisecond)
endTime := time.Now().AddDate(0, 0, 0-reservedDays).UnixNano() / int64(time.Millisecond)
endKey := HotRegionStorePath(hotRegionType, endTime, math.MaxInt64)
iter := db.NewIterator(&util.Range{
Start: []byte(startKey), Limit: []byte(endKey)}, nil)
Expand Down
54 changes: 39 additions & 15 deletions server/core/hot_region_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,22 @@ type MockPackHotRegionInfo struct {
isLeader bool
historyHotReads []HistoryHotRegion
historyHotWrites []HistoryHotRegion
reservedDays uint64
pullInterval time.Duration
}

// PackHistoryHotWriteRegions get read hot region info in HistoryHotRegion from.
func (m *MockPackHotRegionInfo) PackHistoryHotReadRegions() ([]HistoryHotRegion, error) {
return m.historyHotReads, nil
result := make([]HistoryHotRegion, len(m.historyHotReads))
copy(result, m.historyHotReads)
return result, nil
}

// PackHistoryHotWriteRegions get write hot region info in HistoryHotRegion form.
func (m *MockPackHotRegionInfo) PackHistoryHotWriteRegions() ([]HistoryHotRegion, error) {
return m.historyHotWrites, nil
result := make([]HistoryHotRegion, len(m.historyHotWrites))
copy(result, m.historyHotWrites)
return result, nil
}

// IsLeader return isLeader.
Expand Down Expand Up @@ -75,13 +81,29 @@ func (m *MockPackHotRegionInfo) GenHistoryHotRegions(num int, updateTime time.Ti
}
}

func (m *MockPackHotRegionInfo) GetHotRegionsReservedDays() uint64 {
return m.reservedDays
}

func (m *MockPackHotRegionInfo) SetHotRegionsReservedDays(reservedDays uint64) {
m.reservedDays = reservedDays
}

func (m *MockPackHotRegionInfo) GetHotRegionsWriteInterval() time.Duration {
return m.pullInterval
}

func (m *MockPackHotRegionInfo) SetHotRegionsWriteInterval(interval time.Duration) {
m.pullInterval = interval
}

// ClearHotRegion delete all region cached.
func (m *MockPackHotRegionInfo) ClearHotRegion() {
m.historyHotReads = make([]HistoryHotRegion, 0)
m.historyHotWrites = make([]HistoryHotRegion, 0)
}

var _ = Suite(&testHotRegionStorage{})
var _ = SerialSuites(&testHotRegionStorage{})

type testHotRegionStorage struct {
ctx context.Context
Expand Down Expand Up @@ -157,11 +179,11 @@ func (t *testHotRegionStorage) TestHotRegionWrite(c *C) {
}

func (t *testHotRegionStorage) TestHotRegionDelete(c *C) {
defaultReaminDay := 7
defaultRemainDay := 7
defaultDelteData := 30
deleteDate := time.Now().AddDate(0, 0, 0)
packHotRegionInfo := &MockPackHotRegionInfo{}
store, clean, err := newTestHotRegionStorage(10*time.Minute, int64(defaultReaminDay), packHotRegionInfo)
store, clean, err := newTestHotRegionStorage(10*time.Minute, uint64(defaultRemainDay), packHotRegionInfo)
c.Assert(err, IsNil)
defer clean()
historyHotRegions := make([]HistoryHotRegion, 0)
Expand All @@ -177,14 +199,14 @@ func (t *testHotRegionStorage) TestHotRegionDelete(c *C) {
packHotRegionInfo.historyHotReads = historyHotRegions
store.pullHotRegionInfo()
store.flush()
store.delete()
store.delete(defaultRemainDay)
iter := store.NewIterator(HotRegionTypes,
deleteDate.UnixNano()/int64(time.Millisecond),
time.Now().UnixNano()/int64(time.Millisecond))
num := 0
for next, err := iter.Next(); next != nil && err == nil; next, err = iter.Next() {
num++
c.Assert(reflect.DeepEqual(next, &historyHotRegions[defaultReaminDay-num]), IsTrue)
c.Assert(reflect.DeepEqual(next, &historyHotRegions[defaultRemainDay-num]), IsTrue)
}
}

Expand All @@ -202,10 +224,10 @@ func BenchmarkInsert(b *testing.B) {
b.StopTimer()
}

func BenchmarkInsertAfterMonth(b *testing.B) {
func BenchmarkInsertAfterManyDays(b *testing.B) {
defaultInsertDay := 30
packHotRegionInfo := &MockPackHotRegionInfo{}
regionStorage, clear, err := newTestHotRegionStorage(10*time.Hour, int64(defaultInsertDay), packHotRegionInfo)
regionStorage, clear, err := newTestHotRegionStorage(10*time.Hour, uint64(defaultInsertDay), packHotRegionInfo)
defer clear()
if err != nil {
b.Fatal(err)
Expand All @@ -220,17 +242,17 @@ func BenchmarkInsertAfterMonth(b *testing.B) {

func BenchmarkDelete(b *testing.B) {
defaultInsertDay := 7
defaultReaminDay := 7
defaultRemainDay := 7
packHotRegionInfo := &MockPackHotRegionInfo{}
regionStorage, clear, err := newTestHotRegionStorage(10*time.Hour, int64(defaultReaminDay), packHotRegionInfo)
regionStorage, clear, err := newTestHotRegionStorage(10*time.Hour, uint64(defaultRemainDay), packHotRegionInfo)
defer clear()
if err != nil {
b.Fatal(err)
}
deleteTime := time.Now().AddDate(0, 0, -14)
newTestHotRegions(regionStorage, packHotRegionInfo, 144*defaultInsertDay, 1000, deleteTime)
b.ResetTimer()
regionStorage.delete()
regionStorage.delete(defaultRemainDay)
b.StopTimer()
}

Expand Down Expand Up @@ -269,18 +291,20 @@ func newTestHotRegions(storage *HotRegionStorage, mock *MockPackHotRegionInfo, c
}

func newTestHotRegionStorage(pullInterval time.Duration,
remianedDays int64,
packHotRegionInfo HotRegionStorageHandler) (
reservedDays uint64,
packHotRegionInfo *MockPackHotRegionInfo) (
hotRegionStorage *HotRegionStorage,
clear func(), err error) {
writePath := "./tmp"
ctx := context.Background()
if err != nil {
return nil, nil, err
}
packHotRegionInfo.pullInterval = pullInterval
packHotRegionInfo.reservedDays = reservedDays
// delete data in between today and tomrrow
hotRegionStorage, err = NewHotRegionsStorage(ctx,
writePath, nil, packHotRegionInfo, remianedDays, pullInterval)
writePath, nil, packHotRegionInfo)
if err != nil {
return nil, nil, err
}
Expand Down
10 changes: 10 additions & 0 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,16 @@ func (h *Handler) GetHotReadRegions() *statistics.StoreHotPeersInfos {
return c.GetHotReadRegions()
}

// GetHotRegionsWriteInterval gets interval for PD to store Hot Region information..
func (h *Handler) GetHotRegionsWriteInterval() time.Duration {
return h.opt.GetHotRegionsWriteInterval()
}

// GetHotRegionsReservedDays gets days hot region information is kept.
func (h *Handler) GetHotRegionsReservedDays() uint64 {
return h.opt.GetHotRegionsReservedDays()
}

// GetStoresLoads gets all hot write stores stats.
func (h *Handler) GetStoresLoads() map[uint64][]float64 {
rc := h.s.GetRaftCluster()
Expand Down
Loading

0 comments on commit b7c9498

Please sign in to comment.