Skip to content

Commit

Permalink
improve region heartbeat (#1489)
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx authored and nolouch committed Apr 2, 2019
1 parent 5d370e8 commit 656ed4b
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 35 deletions.
44 changes: 17 additions & 27 deletions server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,7 @@ func (s *StoresInfo) TakeStore(storeID uint64) *StoreInfo {
func (s *StoresInfo) SetStore(store *StoreInfo) {
s.stores[store.GetID()] = store
store.GetRollingStoreStats().Observe(store.GetStoreStats())
s.updateTotalBytesReadRate()
s.updateTotalBytesWriteRate()
s.updateTotalBytesRate()
}

// BlockStore blocks a StoreInfo with storeID.
Expand Down Expand Up @@ -611,31 +610,26 @@ func (s *StoresInfo) UpdateStoreStatusLocked(storeID uint64, leaderCount int, re
}
}

func (s *StoresInfo) updateTotalBytesWriteRate() {
func (s *StoresInfo) updateTotalBytesRate() {
var totalBytesWirteRate float64
var totalBytesReadRate float64
var writeRate, readRate float64
for _, s := range s.stores {
if s.IsUp() {
totalBytesWirteRate += s.GetRollingStoreStats().GetBytesWriteRate()
writeRate, readRate = s.GetRollingStoreStats().GetBytesRate()
totalBytesWirteRate += writeRate
totalBytesReadRate += readRate
}
}
s.bytesWriteRate = totalBytesWirteRate
s.bytesReadRate = totalBytesReadRate
}

// TotalBytesWriteRate returns the total written bytes rate of all StoreInfo.
func (s *StoresInfo) TotalBytesWriteRate() float64 {
return s.bytesWriteRate
}

func (s *StoresInfo) updateTotalBytesReadRate() {
var totalBytesReadRate float64
for _, s := range s.stores {
if s.IsUp() {
totalBytesReadRate += s.GetRollingStoreStats().GetBytesReadRate()
}
}
s.bytesReadRate = totalBytesReadRate
}

// TotalBytesReadRate returns the total read bytes rate of all StoreInfo.
func (s *StoresInfo) TotalBytesReadRate() float64 {
return s.bytesReadRate
Expand All @@ -645,7 +639,8 @@ func (s *StoresInfo) TotalBytesReadRate() float64 {
func (s *StoresInfo) GetStoresBytesWriteStat() map[uint64]uint64 {
res := make(map[uint64]uint64, len(s.stores))
for _, s := range s.stores {
res[s.GetID()] = uint64(s.GetRollingStoreStats().GetBytesWriteRate())
writeRate, _ := s.GetRollingStoreStats().GetBytesRate()
res[s.GetID()] = uint64(writeRate)
}
return res
}
Expand All @@ -654,7 +649,8 @@ func (s *StoresInfo) GetStoresBytesWriteStat() map[uint64]uint64 {
func (s *StoresInfo) GetStoresBytesReadStat() map[uint64]uint64 {
res := make(map[uint64]uint64, len(s.stores))
for _, s := range s.stores {
res[s.GetID()] = uint64(s.GetRollingStoreStats().GetBytesReadRate())
_, readRate := s.GetRollingStoreStats().GetBytesRate()
res[s.GetID()] = uint64(readRate)
}
return res
}
Expand Down Expand Up @@ -699,7 +695,8 @@ func newRollingStoreStats() *RollingStoreStats {

// Observe records current statistics.
func (r *RollingStoreStats) Observe(stats *pdpb.StoreStats) {
interval := stats.GetInterval().GetEndTimestamp() - stats.GetInterval().GetStartTimestamp()
statInterval := stats.GetInterval()
interval := statInterval.GetEndTimestamp() - statInterval.GetStartTimestamp()
if interval == 0 {
return
}
Expand All @@ -711,18 +708,11 @@ func (r *RollingStoreStats) Observe(stats *pdpb.StoreStats) {
r.keysReadRate.Add(float64(stats.KeysRead / interval))
}

// GetBytesWriteRate returns the bytes write rate.
func (r *RollingStoreStats) GetBytesWriteRate() float64 {
r.RLock()
defer r.RUnlock()
return r.bytesWriteRate.Median()
}

// GetBytesReadRate returns the bytes read rate.
func (r *RollingStoreStats) GetBytesReadRate() float64 {
// GetBytesRate returns the bytes write rate and the bytes read rate.
func (r *RollingStoreStats) GetBytesRate() (writeRate float64, readRate float64) {
r.RLock()
defer r.RUnlock()
return r.bytesReadRate.Median()
return r.bytesWriteRate.Median(), r.bytesReadRate.Median()
}

// GetKeysWriteRate returns the keys write rate.
Expand Down
19 changes: 15 additions & 4 deletions table/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,24 @@ func (k Key) TableID() int64 {
return tableID
}

// IsMeta returns if the key is a meta key.
func (k Key) IsMeta() bool {
// MetaOrTable checks if the key is a meta key or table key.
// If the key is a meta key, it returns true and 0.
// If the key is a table key, it returns false and table ID.
// Otherwise, it returns false and 0.
func (k Key) MetaOrTable() (bool, int64) {
_, key, err := DecodeBytes(k)
if err != nil {
return false
return false, 0
}
return bytes.HasPrefix(key, metaPrefix)
if bytes.HasPrefix(key, metaPrefix) {
return true, 0
}
if bytes.HasPrefix(key, tablePrefix) {
key = key[len(tablePrefix):]
_, tableID, _ := DecodeInt(key)
return false, tableID
}
return false, 0
}

var pads = make([]byte, encGroupSize)
Expand Down
3 changes: 1 addition & 2 deletions table/namespace_classifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ func (c *tableNamespaceClassifier) GetRegionNamespace(regionInfo *core.RegionInf
c.RLock()
defer c.RUnlock()

isMeta := Key(regionInfo.GetStartKey()).IsMeta()
tableID := Key(regionInfo.GetStartKey()).TableID()
isMeta, tableID := Key(regionInfo.GetStartKey()).MetaOrTable()
if tableID == 0 && !isMeta {
return namespace.DefaultNamespace
}
Expand Down
5 changes: 3 additions & 2 deletions table/namespace_classifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ func (s *testTableNamespaceSuite) TestTableNameSpaceGetRegionNamespace(c *C) {
if !t.endcoded {
startKey, endKey = EncodeBytes(startKey), EncodeBytes(endKey)
}
c.Assert(startKey.TableID(), Equals, t.tableID)
c.Assert(startKey.IsMeta(), Equals, t.isMeta)
isMeta, tableID := startKey.MetaOrTable()
c.Assert(tableID, Equals, t.tableID)
c.Assert(isMeta, Equals, t.isMeta)

region := core.NewRegionInfo(&metapb.Region{
StartKey: startKey,
Expand Down

0 comments on commit 656ed4b

Please sign in to comment.