Skip to content

Commit

Permalink
Merge branch 'master' into rms2
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 21, 2023
2 parents a94eaab + 31bf4bc commit 241bb8a
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 1 deletion.
28 changes: 28 additions & 0 deletions pkg/mcs/resource_manager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ import (

const defaultConsumptionChanSize = 1024

const (
metricsCleanupInterval = time.Minute
metricsCleanupTimeout = 20 * time.Minute
)

// Manager is the manager of resource group.
type Manager struct {
sync.RWMutex
Expand All @@ -46,6 +51,8 @@ type Manager struct {
resourceGroupName string
*rmpb.Consumption
}
// record update time of each resource group
comsumptionRecord map[string]time.Time
}

// NewManager returns a new Manager.
Expand All @@ -56,6 +63,7 @@ func NewManager(srv bs.Server) *Manager {
resourceGroupName string
*rmpb.Consumption
}, defaultConsumptionChanSize),
comsumptionRecord: make(map[string]time.Time),
}
// The first initialization after the server is started.
srv.AddStartCallback(func() {
Expand Down Expand Up @@ -225,6 +233,8 @@ func (m *Manager) persistResourceGroupRunningState() {

// Receive the consumption and flush it to the metrics.
func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
ticker := time.NewTicker(metricsCleanupInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -273,6 +283,24 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
if consumption.KvWriteRpcCount != 0 {
writeRequestCountMetrics.Add(consumption.KvWriteRpcCount)
}

m.comsumptionRecord[name] = time.Now()

case <-ticker.C:
// Clean up the metrics that have not been updated for a long time.
for name, lastTime := range m.comsumptionRecord {
if time.Since(lastTime) > metricsCleanupTimeout {
readRequestUnitCost.DeleteLabelValues(name)
writeRequestUnitCost.DeleteLabelValues(name)
readByteCost.DeleteLabelValues(name)
writeByteCost.DeleteLabelValues(name)
kvCPUCost.DeleteLabelValues(name)
sqlCPUCost.DeleteLabelValues(name)
requestCount.DeleteLabelValues(name, readTypeLabel)
requestCount.DeleteLabelValues(name, writeTypeLabel)
delete(m.comsumptionRecord, name)
}
}
}
}
}
2 changes: 1 addition & 1 deletion server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Co
regionID := solver.region.GetID()
sourceID := solver.source.GetID()
targetID := solver.target.GetID()
log.Debug("", zap.Uint64("region-id", regionID), zap.Uint64("source-store", sourceID), zap.Uint64("target-store", targetID))
log.Debug("candidate store", zap.Uint64("region-id", regionID), zap.Uint64("source-store", sourceID), zap.Uint64("target-store", targetID))

if !solver.shouldBalance(s.GetName()) {
balanceRegionSkipCounter.Inc()
Expand Down

0 comments on commit 241bb8a

Please sign in to comment.