diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 6bbdc178234..6a08756e85d 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -922,6 +922,9 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType } // collect request resource selected := gc.run.requestInProgress + failpoint.Inject("triggerUpdate", func() { + selected = true + }) switch gc.mode { case rmpb.GroupMode_RawMode: requests := make([]*rmpb.RawResourceItem, 0, len(requestResourceLimitTypeList)) @@ -973,13 +976,7 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType if !selected { return nil } - - deltaConsumption := &rmpb.Consumption{} - *deltaConsumption = *gc.run.consumption - sub(deltaConsumption, gc.run.lastRequestConsumption) - req.ConsumptionSinceLastRequest = deltaConsumption - - *gc.run.lastRequestConsumption = *gc.run.consumption + req.ConsumptionSinceLastRequest = updateDeltaConsumption(gc.run.lastRequestConsumption, gc.run.consumption) gc.run.lastRequestTime = time.Now() gc.run.requestInProgress = true return req @@ -1046,6 +1043,9 @@ func (gc *groupCostController) onRequestWait( gc.mu.Lock() sub(gc.mu.consumption, delta) gc.mu.Unlock() + failpoint.Inject("triggerUpdate", func() { + gc.lowRUNotifyChan <- struct{}{} + }) return nil, nil, err } gc.successfulRequestDuration.Observe(d.Seconds()) diff --git a/client/resource_group/controller/model.go b/client/resource_group/controller/model.go index e8b5eb1679d..711ed9f5956 100644 --- a/client/resource_group/controller/model.go +++ b/client/resource_group/controller/model.go @@ -231,6 +231,43 @@ func add(custom1 *rmpb.Consumption, custom2 *rmpb.Consumption) { custom1.KvWriteRpcCount += custom2.KvWriteRpcCount } +func updateDeltaConsumption(last *rmpb.Consumption, now *rmpb.Consumption) *rmpb.Consumption { + delta := &rmpb.Consumption{} + if now.RRU >= last.RRU { + delta.RRU = now.RRU - last.RRU + last.RRU = now.RRU + } + if now.WRU >= last.WRU { + delta.WRU = now.WRU - last.WRU + last.WRU = now.WRU + } + if now.ReadBytes >= last.ReadBytes { + delta.ReadBytes = now.ReadBytes - last.ReadBytes + last.ReadBytes = now.ReadBytes + } + if now.WriteBytes >= last.WriteBytes { + delta.WriteBytes = now.WriteBytes - last.WriteBytes + last.WriteBytes = now.WriteBytes + } + if now.TotalCpuTimeMs >= last.TotalCpuTimeMs { + delta.TotalCpuTimeMs = now.TotalCpuTimeMs - last.TotalCpuTimeMs + last.TotalCpuTimeMs = now.TotalCpuTimeMs + } + if now.SqlLayerCpuTimeMs >= last.SqlLayerCpuTimeMs { + delta.SqlLayerCpuTimeMs = now.SqlLayerCpuTimeMs - last.SqlLayerCpuTimeMs + last.SqlLayerCpuTimeMs = now.SqlLayerCpuTimeMs + } + if now.KvReadRpcCount >= last.KvReadRpcCount { + delta.KvReadRpcCount = now.KvReadRpcCount - last.KvReadRpcCount + last.KvReadRpcCount = now.KvReadRpcCount + } + if now.KvWriteRpcCount >= last.KvWriteRpcCount { + delta.KvWriteRpcCount = now.KvWriteRpcCount - last.KvWriteRpcCount + last.KvWriteRpcCount = now.KvWriteRpcCount + } + return delta +} + func sub(custom1 *rmpb.Consumption, custom2 *rmpb.Consumption) { if custom1 == nil || custom2 == nil { return diff --git a/pkg/mcs/resource_manager/server/manager.go b/pkg/mcs/resource_manager/server/manager.go index a884b064292..c20b22e8f18 100644 --- a/pkg/mcs/resource_manager/server/manager.go +++ b/pkg/mcs/resource_manager/server/manager.go @@ -314,17 +314,17 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { writeRequestCountMetrics = requestCount.WithLabelValues(name, writeTypeLabel) ) // RU info. - if consumption.RRU != 0 { + if consumption.RRU > 0 { rruMetrics.Observe(consumption.RRU) } - if consumption.WRU != 0 { + if consumption.WRU > 0 { wruMetrics.Observe(consumption.WRU) } // Byte info. - if consumption.ReadBytes != 0 { + if consumption.ReadBytes > 0 { readByteMetrics.Observe(consumption.ReadBytes) } - if consumption.WriteBytes != 0 { + if consumption.WriteBytes > 0 { writeByteMetrics.Observe(consumption.WriteBytes) } // CPU time info. @@ -336,10 +336,10 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { kvCPUMetrics.Observe(consumption.TotalCpuTimeMs - consumption.SqlLayerCpuTimeMs) } // RPC count info. - if consumption.KvReadRpcCount != 0 { + if consumption.KvReadRpcCount > 0 { readRequestCountMetrics.Add(consumption.KvReadRpcCount) } - if consumption.KvWriteRpcCount != 0 { + if consumption.KvWriteRpcCount > 0 { writeRequestCountMetrics.Add(consumption.KvWriteRpcCount) } diff --git a/tests/integrations/mcs/resource_manager/resource_manager_test.go b/tests/integrations/mcs/resource_manager/resource_manager_test.go index 6dcc5dd6430..2ea81457329 100644 --- a/tests/integrations/mcs/resource_manager/resource_manager_test.go +++ b/tests/integrations/mcs/resource_manager/resource_manager_test.go @@ -360,6 +360,13 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { break } } + re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate", "return(true)")) + tcs := tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 900000000, times: 1, waitDuration: 0} + wreq := tcs.makeWriteRequest() + _, _, err := controller.OnRequestWait(suite.ctx, suite.initGroups[0].Name, wreq) + re.Error(err) + time.Sleep(time.Millisecond * 200) + re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate")) controller.Stop() }