Skip to content

Commit

Permalink
scheduler: cache statistics to avoid frequent copy (#5765)
Browse files Browse the repository at this point in the history
ref #5692

Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 authored Dec 15, 2022
1 parent 1b9fc70 commit 26289e1
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 9 deletions.
28 changes: 19 additions & 9 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"go.uber.org/zap"
)

var statisticsInterval = time.Second

type baseHotScheduler struct {
*BaseScheduler
// store information, including pending Influence by resource type
Expand All @@ -52,9 +54,11 @@ type baseHotScheduler struct {
// regionPendings stores regionID -> pendingInfluence
// this records regionID which have pending Operator by operation type. During filterHotPeers, the hot peers won't
// be selected if its owner region is tracked in this attribute.
regionPendings map[uint64]*pendingInfluence
types []statistics.RWType
r *rand.Rand
regionPendings map[uint64]*pendingInfluence
types []statistics.RWType
r *rand.Rand
updateReadTime time.Time
updateWriteTime time.Time
}

func newBaseHotScheduler(opController *schedule.OperatorController) *baseHotScheduler {
Expand Down Expand Up @@ -91,14 +95,20 @@ func (h *baseHotScheduler) prepareForBalance(rw statistics.RWType, cluster sched
switch rw {
case statistics.Read:
// update read statistics
regionRead := cluster.RegionReadStats()
prepare(regionRead, core.LeaderKind)
prepare(regionRead, core.RegionKind)
if time.Since(h.updateReadTime) >= statisticsInterval {
regionRead := cluster.RegionReadStats()
prepare(regionRead, core.LeaderKind)
prepare(regionRead, core.RegionKind)
h.updateReadTime = time.Now()
}
case statistics.Write:
// update write statistics
regionWrite := cluster.RegionWriteStats()
prepare(regionWrite, core.LeaderKind)
prepare(regionWrite, core.RegionKind)
if time.Since(h.updateWriteTime) >= statisticsInterval {
regionWrite := cluster.RegionWriteStats()
prepare(regionWrite, core.LeaderKind)
prepare(regionWrite, core.RegionKind)
h.updateWriteTime = time.Now()
}
}
}

Expand Down
17 changes: 17 additions & 0 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
statisticsInterval = 0
opt := config.NewTestOptions()

opt.SetPlacementRuleEnabled(false)
Expand Down Expand Up @@ -387,6 +388,7 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
statisticsInterval = 0
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(ctx, opt)
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
Expand Down Expand Up @@ -598,6 +600,7 @@ func TestHotWriteRegionScheduleWithQuery(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
statisticsInterval = 0
opt := config.NewTestOptions()
hb, err := schedule.CreateScheduler(statistics.Write.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
Expand Down Expand Up @@ -634,6 +637,7 @@ func TestHotWriteRegionScheduleWithKeyRate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
statisticsInterval = 0
opt := config.NewTestOptions()
hb, err := schedule.CreateScheduler(statistics.Write.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
Expand Down Expand Up @@ -693,6 +697,7 @@ func TestHotWriteRegionScheduleUnhealthyStore(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
statisticsInterval = 0
opt := config.NewTestOptions()
hb, err := schedule.CreateScheduler(statistics.Write.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
Expand Down Expand Up @@ -741,6 +746,7 @@ func TestHotWriteRegionScheduleCheckHot(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
statisticsInterval = 0
opt := config.NewTestOptions()
hb, err := schedule.CreateScheduler(statistics.Write.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
Expand Down Expand Up @@ -776,6 +782,7 @@ func TestHotWriteRegionScheduleWithLeader(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
statisticsInterval = 0
opt := config.NewTestOptions()
hb, err := schedule.CreateScheduler(statistics.Write.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil)
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{statistics.KeyPriority, statistics.BytePriority}
Expand Down Expand Up @@ -838,6 +845,7 @@ func TestHotWriteRegionScheduleWithPendingInfluence(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
statisticsInterval = 0
opt := config.NewTestOptions()
hb, err := schedule.CreateScheduler(statistics.Write.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
Expand Down Expand Up @@ -929,6 +937,7 @@ func TestHotWriteRegionScheduleWithRuleEnabled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
statisticsInterval = 0
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(ctx, opt)
tc.SetEnablePlacementRules(true)
Expand Down Expand Up @@ -1136,6 +1145,7 @@ func TestHotReadRegionScheduleWithQuery(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
statisticsInterval = 0
opt := config.NewTestOptions()
hb, err := schedule.CreateScheduler(statistics.Read.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
Expand Down Expand Up @@ -1171,6 +1181,7 @@ func TestHotReadRegionScheduleWithKeyRate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
statisticsInterval = 0
opt := config.NewTestOptions()
hb, err := schedule.CreateScheduler(statistics.Read.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
Expand Down Expand Up @@ -1349,6 +1360,7 @@ func TestHotReadWithEvictLeaderScheduler(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
statisticsInterval = 0
opt := config.NewTestOptions()
hb, err := schedule.CreateScheduler(statistics.Read.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
Expand Down Expand Up @@ -1820,6 +1832,7 @@ func TestInfluenceByRWType(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
statisticsInterval = 0
opt := config.NewTestOptions()
hb, err := schedule.CreateScheduler(statistics.Write.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
Expand Down Expand Up @@ -1939,6 +1952,7 @@ func TestHotScheduleWithPriority(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
statisticsInterval = 0
opt := config.NewTestOptions()
hb, err := schedule.CreateScheduler(statistics.Write.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
Expand Down Expand Up @@ -2045,6 +2059,7 @@ func TestHotScheduleWithStddev(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
statisticsInterval = 0
opt := config.NewTestOptions()
hb, err := schedule.CreateScheduler(statistics.Write.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
Expand Down Expand Up @@ -2105,6 +2120,7 @@ func TestHotWriteLeaderScheduleWithPriority(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
statisticsInterval = 0
opt := config.NewTestOptions()
hb, err := schedule.CreateScheduler(statistics.Write.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
Expand Down Expand Up @@ -2148,6 +2164,7 @@ func TestCompatibility(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
statisticsInterval = 0
opt := config.NewTestOptions()
hb, err := schedule.CreateScheduler(statistics.Write.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
Expand Down

0 comments on commit 26289e1

Please sign in to comment.