From 73b6edd2b1ecdff086ef84d793939e8758d666b3 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Tue, 23 Mar 2021 17:59:37 +0800 Subject: [PATCH 1/3] cherry pick #3506 to release-4.0 Signed-off-by: ti-srebot --- server/core/region_option.go | 7 ++ server/statistics/hot_peer_cache_test.go | 129 +++++++++++++++++++++++ server/statistics/queue.go | 54 ++++++++++ server/statistics/queue_test.go | 28 +++++ 4 files changed, 218 insertions(+) create mode 100644 server/statistics/queue.go create mode 100644 server/statistics/queue_test.go diff --git a/server/core/region_option.go b/server/core/region_option.go index e84eefc7d56..1292ec89965 100644 --- a/server/core/region_option.go +++ b/server/core/region_option.go @@ -269,3 +269,10 @@ func WithReplacePeerStore(oldStoreID, newStoreID uint64) RegionCreateOption { } } } + +// WithInterval sets the interval +func WithInterval(interval *pdpb.TimeInterval) RegionCreateOption { + return func(region *RegionInfo) { + region.interval = interval + } +} diff --git a/server/statistics/hot_peer_cache_test.go b/server/statistics/hot_peer_cache_test.go index 64e066302ce..ec06c7f0d2b 100644 --- a/server/statistics/hot_peer_cache_test.go +++ b/server/statistics/hot_peer_cache_test.go @@ -15,9 +15,15 @@ package statistics import ( "math/rand" +<<<<<<< HEAD +======= + "testing" + "time" +>>>>>>> 610c05b7... movingaverage: support concurrency safe queue in AvgOverTime (#3506) . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" "github.com/tikv/pd/server/core" ) @@ -213,3 +219,126 @@ func newPeers(n int, pid genID, sid genID) []*metapb.Peer { } return peers } +<<<<<<< HEAD +======= + +func (t *testHotPeerCache) TestUpdateHotPeerStat(c *C) { + cache := NewHotStoresStats(ReadFlow) + + // skip interval=0 + newItem := &HotPeerStat{needDelete: false, thresholds: [2]float64{0.0, 0.0}} + newItem = cache.updateHotPeerStat(newItem, nil, 0, 0, 0) + c.Check(newItem, IsNil) + + // new peer, interval is larger than report interval, but no hot + newItem = &HotPeerStat{needDelete: false, thresholds: [2]float64{1.0, 1.0}} + newItem = cache.updateHotPeerStat(newItem, nil, 0, 0, 60*time.Second) + c.Check(newItem, IsNil) + + // new peer, interval is less than report interval + newItem = &HotPeerStat{needDelete: false, thresholds: [2]float64{0.0, 0.0}} + newItem = cache.updateHotPeerStat(newItem, nil, 60, 60, 30*time.Second) + c.Check(newItem, NotNil) + c.Check(newItem.HotDegree, Equals, 0) + c.Check(newItem.AntiCount, Equals, 0) + // sum of interval is less than report interval + oldItem := newItem + newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 10*time.Second) + c.Check(newItem.HotDegree, Equals, 0) + c.Check(newItem.AntiCount, Equals, 0) + // sum of interval is larger than report interval, and hot + oldItem = newItem + newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 30*time.Second) + c.Check(newItem.HotDegree, Equals, 1) + c.Check(newItem.AntiCount, Equals, 2) + // sum of interval is less than report interval + oldItem = newItem + newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 10*time.Second) + c.Check(newItem.HotDegree, Equals, 1) + c.Check(newItem.AntiCount, Equals, 2) + // sum of interval is larger than report interval, and hot + oldItem = newItem + newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 50*time.Second) + c.Check(newItem.HotDegree, Equals, 2) + c.Check(newItem.AntiCount, Equals, 2) + // sum of interval is larger than report interval, and cold + oldItem = newItem + newItem.thresholds = [2]float64{10.0, 10.0} + newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 60*time.Second) + c.Check(newItem.HotDegree, Equals, 1) + c.Check(newItem.AntiCount, Equals, 1) + // sum of interval is larger than report interval, and cold + oldItem = newItem + newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 60*time.Second) + c.Check(newItem.HotDegree, Equals, 0) + c.Check(newItem.AntiCount, Equals, 0) + c.Check(newItem.needDelete, Equals, true) +} + +func (t *testHotPeerCache) TestThresholdWithUpdateHotPeerStat(c *C) { + byteRate := minHotThresholds[ReadFlow][byteDim] * 2 + expectThreshold := byteRate * HotThresholdRatio + t.testMetrics(c, 120., byteRate, expectThreshold) + t.testMetrics(c, 60., byteRate, expectThreshold) + t.testMetrics(c, 30., byteRate, expectThreshold) + t.testMetrics(c, 17., byteRate, expectThreshold) + t.testMetrics(c, 1., byteRate, expectThreshold) +} +func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold float64) { + cache := NewHotStoresStats(ReadFlow) + minThresholds := minHotThresholds[cache.kind] + storeID := uint64(1) + c.Assert(byteRate, GreaterEqual, minThresholds[byteDim]) + for i := uint64(1); i < TopNN+10; i++ { + var oldItem *HotPeerStat + for { + thresholds := cache.calcHotThresholds(storeID) + newItem := &HotPeerStat{ + StoreID: storeID, + RegionID: i, + needDelete: false, + thresholds: thresholds, + ByteRate: byteRate, + KeyRate: 0, + } + oldItem = cache.getOldHotPeerStat(i, storeID) + if oldItem != nil && oldItem.rollingByteRate.isHot(thresholds) == true { + break + } + item := cache.updateHotPeerStat(newItem, oldItem, byteRate*interval, 0, time.Duration(interval)*time.Second) + cache.Update(item) + } + thresholds := cache.calcHotThresholds(storeID) + if i < TopNN { + c.Assert(thresholds[byteDim], Equals, minThresholds[byteDim]) + } else { + c.Assert(thresholds[byteDim], Equals, expectThreshold) + } + } +} + +func BenchmarkCheckRegionFlow(b *testing.B) { + cache := NewHotStoresStats(ReadFlow) + region := core.NewRegionInfo(&metapb.Region{ + Id: 1, + Peers: []*metapb.Peer{ + {Id: 101, StoreId: 1}, + {Id: 102, StoreId: 2}, + {Id: 103, StoreId: 3}, + }, + }, + &metapb.Peer{Id: 101, StoreId: 1}, + ) + newRegion := region.Clone( + core.WithInterval(&pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: 10}), + core.SetReadBytes(30000*10), + core.SetReadKeys(300000*10)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + rets := cache.CheckRegionFlow(newRegion) + for _, ret := range rets { + cache.Update(ret) + } + } +} +>>>>>>> 610c05b7... movingaverage: support concurrency safe queue in AvgOverTime (#3506) diff --git a/server/statistics/queue.go b/server/statistics/queue.go new file mode 100644 index 00000000000..04a1a159195 --- /dev/null +++ b/server/statistics/queue.go @@ -0,0 +1,54 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package movingaverage + +import ( + "sync" + + "github.com/phf/go-queue/queue" +) + +// SafeQueue is a concurrency safe queue +type SafeQueue struct { + mu sync.Mutex + que *queue.Queue +} + +// NewSafeQueue return a SafeQueue +func NewSafeQueue() *SafeQueue { + sq := &SafeQueue{} + sq.que = queue.New() + return sq +} + +// Init implement init +func (sq *SafeQueue) Init() { + sq.mu.Lock() + defer sq.mu.Unlock() + sq.que.Init() +} + +// PushBack implement PushBack +func (sq *SafeQueue) PushBack(v interface{}) { + sq.mu.Lock() + defer sq.mu.Unlock() + sq.que.PushBack(v) +} + +// PopFront implement PopFront +func (sq *SafeQueue) PopFront() interface{} { + sq.mu.Lock() + defer sq.mu.Unlock() + return sq.que.PopFront() +} diff --git a/server/statistics/queue_test.go b/server/statistics/queue_test.go new file mode 100644 index 00000000000..84b51488f9f --- /dev/null +++ b/server/statistics/queue_test.go @@ -0,0 +1,28 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package movingaverage + +import ( + . "github.com/pingcap/check" +) + +func (t *testMovingAvg) TestQueue(c *C) { + sq := NewSafeQueue() + sq.PushBack(1) + sq.PushBack(2) + v1 := sq.PopFront() + v2 := sq.PopFront() + c.Assert(1, Equals, v1.(int)) + c.Assert(2, Equals, v2.(int)) +} From b0df7c1d4f4bfa649438f41277933b1d5680d6ba Mon Sep 17 00:00:00 2001 From: yisaer Date: Thu, 25 Mar 2021 18:25:00 +0800 Subject: [PATCH 2/3] fix conflict Signed-off-by: yisaer --- server/statistics/avg_over_time.go | 8 +- server/statistics/hot_peer_cache_test.go | 129 ----------------------- server/statistics/queue.go | 2 +- server/statistics/queue_test.go | 2 +- 4 files changed, 5 insertions(+), 136 deletions(-) diff --git a/server/statistics/avg_over_time.go b/server/statistics/avg_over_time.go index cb58e445a49..f3838f539e6 100644 --- a/server/statistics/avg_over_time.go +++ b/server/statistics/avg_over_time.go @@ -15,8 +15,6 @@ package statistics import ( "time" - - "github.com/phf/go-queue/queue" ) const ( @@ -35,7 +33,7 @@ type deltaWithInterval struct { // stores recent changes that happened in the last avgInterval, // then calculates the change rate by (sum of changes) / (sum of intervals). type AvgOverTime struct { - que *queue.Queue + que *SafeQueue deltaSum float64 intervalSum time.Duration avgInterval time.Duration @@ -44,7 +42,7 @@ type AvgOverTime struct { // NewAvgOverTime returns an AvgOverTime with given interval. func NewAvgOverTime(interval time.Duration) *AvgOverTime { return &AvgOverTime{ - que: queue.New(), + que: NewSafeQueue(), deltaSum: 0, intervalSum: 0, avgInterval: interval, @@ -58,7 +56,7 @@ func (aot *AvgOverTime) Get() float64 { // Clear clears the AvgOverTime. func (aot *AvgOverTime) Clear() { - aot.que = queue.New() + aot.que = NewSafeQueue() aot.intervalSum = 0 aot.deltaSum = 0 } diff --git a/server/statistics/hot_peer_cache_test.go b/server/statistics/hot_peer_cache_test.go index ec06c7f0d2b..64e066302ce 100644 --- a/server/statistics/hot_peer_cache_test.go +++ b/server/statistics/hot_peer_cache_test.go @@ -15,15 +15,9 @@ package statistics import ( "math/rand" -<<<<<<< HEAD -======= - "testing" - "time" ->>>>>>> 610c05b7... movingaverage: support concurrency safe queue in AvgOverTime (#3506) . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/kvproto/pkg/pdpb" "github.com/tikv/pd/server/core" ) @@ -219,126 +213,3 @@ func newPeers(n int, pid genID, sid genID) []*metapb.Peer { } return peers } -<<<<<<< HEAD -======= - -func (t *testHotPeerCache) TestUpdateHotPeerStat(c *C) { - cache := NewHotStoresStats(ReadFlow) - - // skip interval=0 - newItem := &HotPeerStat{needDelete: false, thresholds: [2]float64{0.0, 0.0}} - newItem = cache.updateHotPeerStat(newItem, nil, 0, 0, 0) - c.Check(newItem, IsNil) - - // new peer, interval is larger than report interval, but no hot - newItem = &HotPeerStat{needDelete: false, thresholds: [2]float64{1.0, 1.0}} - newItem = cache.updateHotPeerStat(newItem, nil, 0, 0, 60*time.Second) - c.Check(newItem, IsNil) - - // new peer, interval is less than report interval - newItem = &HotPeerStat{needDelete: false, thresholds: [2]float64{0.0, 0.0}} - newItem = cache.updateHotPeerStat(newItem, nil, 60, 60, 30*time.Second) - c.Check(newItem, NotNil) - c.Check(newItem.HotDegree, Equals, 0) - c.Check(newItem.AntiCount, Equals, 0) - // sum of interval is less than report interval - oldItem := newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 10*time.Second) - c.Check(newItem.HotDegree, Equals, 0) - c.Check(newItem.AntiCount, Equals, 0) - // sum of interval is larger than report interval, and hot - oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 30*time.Second) - c.Check(newItem.HotDegree, Equals, 1) - c.Check(newItem.AntiCount, Equals, 2) - // sum of interval is less than report interval - oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 10*time.Second) - c.Check(newItem.HotDegree, Equals, 1) - c.Check(newItem.AntiCount, Equals, 2) - // sum of interval is larger than report interval, and hot - oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 50*time.Second) - c.Check(newItem.HotDegree, Equals, 2) - c.Check(newItem.AntiCount, Equals, 2) - // sum of interval is larger than report interval, and cold - oldItem = newItem - newItem.thresholds = [2]float64{10.0, 10.0} - newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 60*time.Second) - c.Check(newItem.HotDegree, Equals, 1) - c.Check(newItem.AntiCount, Equals, 1) - // sum of interval is larger than report interval, and cold - oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 60*time.Second) - c.Check(newItem.HotDegree, Equals, 0) - c.Check(newItem.AntiCount, Equals, 0) - c.Check(newItem.needDelete, Equals, true) -} - -func (t *testHotPeerCache) TestThresholdWithUpdateHotPeerStat(c *C) { - byteRate := minHotThresholds[ReadFlow][byteDim] * 2 - expectThreshold := byteRate * HotThresholdRatio - t.testMetrics(c, 120., byteRate, expectThreshold) - t.testMetrics(c, 60., byteRate, expectThreshold) - t.testMetrics(c, 30., byteRate, expectThreshold) - t.testMetrics(c, 17., byteRate, expectThreshold) - t.testMetrics(c, 1., byteRate, expectThreshold) -} -func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold float64) { - cache := NewHotStoresStats(ReadFlow) - minThresholds := minHotThresholds[cache.kind] - storeID := uint64(1) - c.Assert(byteRate, GreaterEqual, minThresholds[byteDim]) - for i := uint64(1); i < TopNN+10; i++ { - var oldItem *HotPeerStat - for { - thresholds := cache.calcHotThresholds(storeID) - newItem := &HotPeerStat{ - StoreID: storeID, - RegionID: i, - needDelete: false, - thresholds: thresholds, - ByteRate: byteRate, - KeyRate: 0, - } - oldItem = cache.getOldHotPeerStat(i, storeID) - if oldItem != nil && oldItem.rollingByteRate.isHot(thresholds) == true { - break - } - item := cache.updateHotPeerStat(newItem, oldItem, byteRate*interval, 0, time.Duration(interval)*time.Second) - cache.Update(item) - } - thresholds := cache.calcHotThresholds(storeID) - if i < TopNN { - c.Assert(thresholds[byteDim], Equals, minThresholds[byteDim]) - } else { - c.Assert(thresholds[byteDim], Equals, expectThreshold) - } - } -} - -func BenchmarkCheckRegionFlow(b *testing.B) { - cache := NewHotStoresStats(ReadFlow) - region := core.NewRegionInfo(&metapb.Region{ - Id: 1, - Peers: []*metapb.Peer{ - {Id: 101, StoreId: 1}, - {Id: 102, StoreId: 2}, - {Id: 103, StoreId: 3}, - }, - }, - &metapb.Peer{Id: 101, StoreId: 1}, - ) - newRegion := region.Clone( - core.WithInterval(&pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: 10}), - core.SetReadBytes(30000*10), - core.SetReadKeys(300000*10)) - b.ResetTimer() - for i := 0; i < b.N; i++ { - rets := cache.CheckRegionFlow(newRegion) - for _, ret := range rets { - cache.Update(ret) - } - } -} ->>>>>>> 610c05b7... movingaverage: support concurrency safe queue in AvgOverTime (#3506) diff --git a/server/statistics/queue.go b/server/statistics/queue.go index 04a1a159195..f61fa023e8a 100644 --- a/server/statistics/queue.go +++ b/server/statistics/queue.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package movingaverage +package statistics import ( "sync" diff --git a/server/statistics/queue_test.go b/server/statistics/queue_test.go index 84b51488f9f..455f1002ce9 100644 --- a/server/statistics/queue_test.go +++ b/server/statistics/queue_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package movingaverage +package statistics import ( . "github.com/pingcap/check" From cc0f4ac841bcc4f8e4957eecf108dd44c4fd170b Mon Sep 17 00:00:00 2001 From: yisaer Date: Thu, 25 Mar 2021 22:27:50 +0800 Subject: [PATCH 3/3] use init Signed-off-by: yisaer --- server/statistics/avg_over_time.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/statistics/avg_over_time.go b/server/statistics/avg_over_time.go index f3838f539e6..894c2502daa 100644 --- a/server/statistics/avg_over_time.go +++ b/server/statistics/avg_over_time.go @@ -56,7 +56,7 @@ func (aot *AvgOverTime) Get() float64 { // Clear clears the AvgOverTime. func (aot *AvgOverTime) Clear() { - aot.que = NewSafeQueue() + aot.que.Init() aot.intervalSum = 0 aot.deltaSum = 0 }