diff --git a/pkg/mcs/resource_manager/server/manager.go b/pkg/mcs/resource_manager/server/manager.go index c926ae898ff..13bd5b7b0e1 100644 --- a/pkg/mcs/resource_manager/server/manager.go +++ b/pkg/mcs/resource_manager/server/manager.go @@ -34,6 +34,11 @@ import ( const defaultConsumptionChanSize = 1024 +const ( + metricsCleanupInterval = time.Minute + metricsCleanupTimeout = 20 * time.Minute +) + // Manager is the manager of resource group. type Manager struct { sync.RWMutex @@ -46,6 +51,8 @@ type Manager struct { resourceGroupName string *rmpb.Consumption } + // record update time of each resource group + comsumptionRecord map[string]time.Time } // NewManager returns a new Manager. @@ -56,6 +63,7 @@ func NewManager(srv bs.Server) *Manager { resourceGroupName string *rmpb.Consumption }, defaultConsumptionChanSize), + comsumptionRecord: make(map[string]time.Time), } // The first initialization after the server is started. srv.AddStartCallback(func() { @@ -225,6 +233,8 @@ func (m *Manager) persistResourceGroupRunningState() { // Receive the consumption and flush it to the metrics. func (m *Manager) backgroundMetricsFlush(ctx context.Context) { + ticker := time.NewTicker(metricsCleanupInterval) + defer ticker.Stop() for { select { case <-ctx.Done(): @@ -273,6 +283,24 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { if consumption.KvWriteRpcCount != 0 { writeRequestCountMetrics.Add(consumption.KvWriteRpcCount) } + + m.comsumptionRecord[name] = time.Now() + + case <-ticker.C: + // Clean up the metrics that have not been updated for a long time. + for name, lastTime := range m.comsumptionRecord { + if time.Since(lastTime) > metricsCleanupTimeout { + readRequestUnitCost.DeleteLabelValues(name) + writeRequestUnitCost.DeleteLabelValues(name) + readByteCost.DeleteLabelValues(name) + writeByteCost.DeleteLabelValues(name) + kvCPUCost.DeleteLabelValues(name) + sqlCPUCost.DeleteLabelValues(name) + requestCount.DeleteLabelValues(name, readTypeLabel) + requestCount.DeleteLabelValues(name, writeTypeLabel) + delete(m.comsumptionRecord, name) + } + } } } } diff --git a/server/schedulers/balance_region.go b/server/schedulers/balance_region.go index 0c8e224685f..549563dbf72 100644 --- a/server/schedulers/balance_region.go +++ b/server/schedulers/balance_region.go @@ -256,7 +256,7 @@ func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Co regionID := solver.region.GetID() sourceID := solver.source.GetID() targetID := solver.target.GetID() - log.Debug("", zap.Uint64("region-id", regionID), zap.Uint64("source-store", sourceID), zap.Uint64("target-store", targetID)) + log.Debug("candidate store", zap.Uint64("region-id", regionID), zap.Uint64("source-store", sourceID), zap.Uint64("target-store", targetID)) if !solver.shouldBalance(s.GetName()) { balanceRegionSkipCounter.Inc()