Skip to content

Commit 6717f3f

Browse files
ti-chi-botHunDunDM
andauthoredNov 30, 2021
cluster: fix the bug that region statistics are not updated after flow-round-by-digit change (#4304) (#4306)
* cluster: fix the bug that region statistics are not updated after flow-round-by-digit change close #4295 Signed-off-by: HunDunDM <hundundm@gmail.com> * refine some code Signed-off-by: HunDunDM <hundundm@gmail.com> * address comment Signed-off-by: HunDunDM <hundundm@gmail.com> Co-authored-by: HunDunDM <hundundm@gmail.com>
1 parent c135477 commit 6717f3f

File tree

4 files changed

+93
-6
lines changed

4 files changed

+93
-6
lines changed
 

‎server/core/region.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,8 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
550550
// Once flow has changed, will update the cache.
551551
// Because keys and bytes are strongly related, only bytes are judged.
552552
if region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() ||
553-
region.GetRoundBytesRead() != origin.GetRoundBytesRead() {
553+
region.GetRoundBytesRead() != origin.GetRoundBytesRead() ||
554+
region.flowRoundDivisor < origin.flowRoundDivisor {
554555
saveCache, needSync = true, true
555556
}
556557

‎server/core/region_option.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@ func WithDownPeers(downPeers []*pdpb.PeerStats) RegionCreateOption {
3838

3939
// WithFlowRoundByDigit set the digit, which use to round to the nearest number
4040
func WithFlowRoundByDigit(digit int) RegionCreateOption {
41+
flowRoundDivisor := uint64(math.Pow10(digit))
4142
return func(region *RegionInfo) {
42-
region.flowRoundDivisor = uint64(math.Pow10(digit))
43+
region.flowRoundDivisor = flowRoundDivisor
4344
}
4445
}
4546

‎server/core/region_test.go

+85
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,91 @@ func (s *testRegionInfoSuite) TestRegionWriteRate(c *C) {
204204
}
205205
}
206206

207+
var _ = Suite(&testRegionGuideSuite{})
208+
209+
type testRegionGuideSuite struct {
210+
RegionGuide RegionGuideFunc
211+
}
212+
213+
func (s *testRegionGuideSuite) SetUpSuite(c *C) {
214+
s.RegionGuide = GenerateRegionGuideFunc(false)
215+
}
216+
217+
func (s *testRegionGuideSuite) TestNeedSync(c *C) {
218+
meta := &metapb.Region{
219+
Id: 1000,
220+
StartKey: []byte("a"),
221+
EndKey: []byte("z"),
222+
RegionEpoch: &metapb.RegionEpoch{ConfVer: 100, Version: 100},
223+
Peers: []*metapb.Peer{
224+
{Id: 11, StoreId: 1, Role: metapb.PeerRole_Voter},
225+
{Id: 12, StoreId: 1, Role: metapb.PeerRole_Voter},
226+
{Id: 13, StoreId: 1, Role: metapb.PeerRole_Voter},
227+
},
228+
}
229+
region := NewRegionInfo(meta, meta.Peers[0])
230+
231+
testcases := []struct {
232+
optionsA []RegionCreateOption
233+
optionsB []RegionCreateOption
234+
needSync bool
235+
}{
236+
{
237+
optionsB: []RegionCreateOption{WithLeader(nil)},
238+
needSync: true,
239+
},
240+
{
241+
optionsB: []RegionCreateOption{WithLeader(meta.Peers[1])},
242+
needSync: true,
243+
},
244+
{
245+
optionsB: []RegionCreateOption{WithPendingPeers(meta.Peers[1:2])},
246+
needSync: true,
247+
},
248+
{
249+
optionsB: []RegionCreateOption{WithDownPeers([]*pdpb.PeerStats{{Peer: meta.Peers[1], DownSeconds: 600}})},
250+
needSync: true,
251+
},
252+
{
253+
optionsA: []RegionCreateOption{SetWrittenBytes(200), WithFlowRoundByDigit(2)},
254+
optionsB: []RegionCreateOption{SetWrittenBytes(300), WithFlowRoundByDigit(2)},
255+
needSync: true,
256+
},
257+
{
258+
optionsA: []RegionCreateOption{SetWrittenBytes(250), WithFlowRoundByDigit(2)},
259+
optionsB: []RegionCreateOption{SetWrittenBytes(349), WithFlowRoundByDigit(2)},
260+
needSync: false,
261+
},
262+
{
263+
optionsA: []RegionCreateOption{SetWrittenBytes(200), WithFlowRoundByDigit(4)},
264+
optionsB: []RegionCreateOption{SetWrittenBytes(300), WithFlowRoundByDigit(4)},
265+
needSync: false,
266+
},
267+
{
268+
optionsA: []RegionCreateOption{SetWrittenBytes(100000), WithFlowRoundByDigit(4)},
269+
optionsB: []RegionCreateOption{SetWrittenBytes(200), WithFlowRoundByDigit(2)},
270+
needSync: true,
271+
},
272+
{
273+
optionsA: []RegionCreateOption{SetWrittenBytes(100000), WithFlowRoundByDigit(127)},
274+
optionsB: []RegionCreateOption{SetWrittenBytes(0), WithFlowRoundByDigit(2)},
275+
needSync: false,
276+
},
277+
{
278+
optionsA: []RegionCreateOption{SetWrittenBytes(0), WithFlowRoundByDigit(2)},
279+
optionsB: []RegionCreateOption{SetWrittenBytes(100000), WithFlowRoundByDigit(127)},
280+
needSync: true,
281+
},
282+
}
283+
284+
for _, t := range testcases {
285+
regionA := region.Clone(t.optionsA...)
286+
regionB := region.Clone(t.optionsB...)
287+
_, _, _, needSync := s.RegionGuide(regionA, regionB)
288+
c.Assert(needSync, Equals, t.needSync)
289+
}
290+
}
291+
207292
var _ = Suite(&testRegionMapSuite{})
208293

209294
type testRegionMapSuite struct{}

‎server/grpc_service.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -501,9 +501,9 @@ func (s *heartbeatServer) Recv() (*pdpb.RegionHeartbeatRequest, error) {
501501

502502
// RegionHeartbeat implements gRPC PDServer.
503503
func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
504-
server := &heartbeatServer{stream: stream}
505-
FlowRoundByDigit := s.persistOptions.GetPDServerConfig().FlowRoundByDigit
506504
var (
505+
server = &heartbeatServer{stream: stream}
506+
flowRoundOption = core.WithFlowRoundByDigit(s.persistOptions.GetPDServerConfig().FlowRoundByDigit)
507507
forwardStream pdpb.PD_RegionHeartbeatClient
508508
cancel context.CancelFunc
509509
lastForwardedHost string
@@ -585,11 +585,11 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
585585
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "bind").Inc()
586586
s.hbStreams.BindStream(storeID, server)
587587
// refresh FlowRoundByDigit
588-
FlowRoundByDigit = s.persistOptions.GetPDServerConfig().FlowRoundByDigit
588+
flowRoundOption = core.WithFlowRoundByDigit(s.persistOptions.GetPDServerConfig().FlowRoundByDigit)
589589
lastBind = time.Now()
590590
}
591591

592-
region := core.RegionFromHeartbeat(request, core.WithFlowRoundByDigit(FlowRoundByDigit))
592+
region := core.RegionFromHeartbeat(request, flowRoundOption)
593593
if region.GetLeader() == nil {
594594
log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil))
595595
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc()

0 commit comments

Comments
 (0)
Please sign in to comment.