diff --git a/pkg/mcs/resource_manager/server/grpc_service.go b/pkg/mcs/resource_manager/server/grpc_service.go index a6b1c8f830f..c2b9625ec12 100644 --- a/pkg/mcs/resource_manager/server/grpc_service.go +++ b/pkg/mcs/resource_manager/server/grpc_service.go @@ -145,11 +145,18 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu targetPeriodMs := request.GetTargetRequestPeriodMs() resps := &rmpb.TokenBucketsResponse{} for _, req := range request.Requests { - rg := s.manager.GetMutableResourceGroup(req.ResourceGroupName) + resourceGroupName := req.GetResourceGroupName() + // Get the resource group from manager to acquire token buckets. + rg := s.manager.GetMutableResourceGroup(resourceGroupName) if rg == nil { - log.Warn("resource group not found", zap.String("resource-group", req.ResourceGroupName)) + log.Warn("resource group not found", zap.String("resource-group", resourceGroupName)) continue } + // Send the consumption to update the metrics. + s.manager.consumptionDispatcher <- struct { + resourceGroupName string + *rmpb.Consumption + }{resourceGroupName, req.GetConsumptionSinceLastRequest()} now := time.Now() resp := &rmpb.TokenBucketResponse{ ResourceGroupName: rg.Name, @@ -167,10 +174,10 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu resp.GrantedRUTokens = append(resp.GrantedRUTokens, tokens) } case rmpb.GroupMode_RawMode: - log.Warn("not supports the resource type", zap.String("resource-group", req.ResourceGroupName), zap.String("mode", rmpb.GroupMode_name[int32(rmpb.GroupMode_RawMode)])) + log.Warn("not supports the resource type", zap.String("resource-group", resourceGroupName), zap.String("mode", rmpb.GroupMode_name[int32(rmpb.GroupMode_RawMode)])) continue } - log.Debug("finish token request from", zap.String("resource group", req.ResourceGroupName)) + log.Debug("finish token request from", zap.String("resource group", resourceGroupName)) resps.Responses = append(resps.Responses, resp) } stream.Send(resps) diff --git a/pkg/mcs/resource_manager/server/manager.go b/pkg/mcs/resource_manager/server/manager.go index ccb22c6ab06..21d69449185 100644 --- a/pkg/mcs/resource_manager/server/manager.go +++ b/pkg/mcs/resource_manager/server/manager.go @@ -15,6 +15,7 @@ package server import ( + "context" "sort" "sync" @@ -27,11 +28,19 @@ import ( "go.uber.org/zap" ) +const defaultConsumptionChanSize = 1024 + // Manager is the manager of resource group. type Manager struct { sync.RWMutex groups map[string]*ResourceGroup storage func() storage.Storage + // consumptionChan is used to send the consumption + // info to the background metrics flusher. + consumptionDispatcher chan struct { + resourceGroupName string + *rmpb.Consumption + } } // NewManager returns a new Manager. @@ -39,8 +48,13 @@ func NewManager(srv *server.Server) *Manager { m := &Manager{ groups: make(map[string]*ResourceGroup), storage: srv.GetStorage, + consumptionDispatcher: make(chan struct { + resourceGroupName string + *rmpb.Consumption + }, defaultConsumptionChanSize), } srv.AddStartCallback(m.Init) + go m.backgroundMetricsFlush(srv.Context()) return m } @@ -145,3 +159,55 @@ func (m *Manager) GetResourceGroupList() []*ResourceGroup { }) return res } + +// Receive the consumption and flush it to the metrics. +func (m *Manager) backgroundMetricsFlush(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case consumptionInfo := <-m.consumptionDispatcher: + consumption := consumptionInfo.Consumption + if consumption == nil { + continue + } + var ( + name = consumptionInfo.resourceGroupName + rruMetrics = readRequestUnitCost.WithLabelValues(name) + wruMetrics = writeRequestUnitCost.WithLabelValues(name) + readByteMetrics = readByteCost.WithLabelValues(name) + writeByteMetrics = writeByteCost.WithLabelValues(name) + kvCPUMetrics = kvCPUCost.WithLabelValues(name) + sqlCPUMetrics = sqlCPUCost.WithLabelValues(name) + readRequestCountMetrics = requestCount.WithLabelValues(name, readTypeLabel) + writeRequestCountMetrics = requestCount.WithLabelValues(name, writeTypeLabel) + ) + // RU info. + if consumption.RRU != 0 { + rruMetrics.Observe(consumption.RRU) + } + if consumption.WRU != 0 { + wruMetrics.Observe(consumption.WRU) + } + // Byte info. + if consumption.ReadBytes != 0 { + readByteMetrics.Observe(consumption.ReadBytes) + } + if consumption.WriteBytes != 0 { + writeByteMetrics.Observe(consumption.WriteBytes) + } + // CPU time info. + if consumption.SqlLayerCpuTimeMs != 0 { + sqlCPUMetrics.Observe(consumption.SqlLayerCpuTimeMs) + kvCPUMetrics.Observe(consumption.TotalCpuTimeMs - consumption.SqlLayerCpuTimeMs) + } + // RPC count info. + if consumption.KvReadRpcCount != 0 { + readRequestCountMetrics.Add(consumption.KvReadRpcCount) + } + if consumption.KvWriteRpcCount != 0 { + writeRequestCountMetrics.Add(consumption.KvWriteRpcCount) + } + } + } +} diff --git a/pkg/mcs/resource_manager/server/metrics.go b/pkg/mcs/resource_manager/server/metrics.go index 24bb8431587..4551a3f54b5 100644 --- a/pkg/mcs/resource_manager/server/metrics.go +++ b/pkg/mcs/resource_manager/server/metrics.go @@ -1,4 +1,4 @@ -// Copyright 2022 TiKV Project Authors. +// Copyright 2023 TiKV Project Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,4 +14,87 @@ package server -// TODO: add metrics +import ( + "github.com/prometheus/client_golang/prometheus" +) + +const ( + namespace = "resource_manager" + ruSubsystem = "resource_unit" + resourceSubsystem = "resource" + resourceGroupNameLabel = "name" + typeLabel = "type" + readTypeLabel = "read" + writeTypeLabel = "write" +) + +var ( + // RU cost metrics. + readRequestUnitCost = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: ruSubsystem, + Name: "read_request_unit", + Help: "Bucketed histogram of the read request unit cost for all resource groups.", + Buckets: prometheus.ExponentialBuckets(1, 10, 5), // 1 ~ 100000 + }, []string{resourceGroupNameLabel}) + writeRequestUnitCost = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: ruSubsystem, + Name: "write_request_unit", + Help: "Bucketed histogram of the write request unit cost for all resource groups.", + Buckets: prometheus.ExponentialBuckets(3, 10, 5), // 3 ~ 300000 + }, []string{resourceGroupNameLabel}) + + // Resource cost metrics. + readByteCost = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: resourceSubsystem, + Name: "read_byte", + Help: "Bucketed histogram of the read byte cost for all resource groups.", + Buckets: prometheus.ExponentialBuckets(1, 8, 12), + }, []string{resourceGroupNameLabel}) + writeByteCost = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: resourceSubsystem, + Name: "write_byte", + Help: "Bucketed histogram of the write byte cost for all resource groups.", + Buckets: prometheus.ExponentialBuckets(1, 8, 12), + }, []string{resourceGroupNameLabel}) + kvCPUCost = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: resourceSubsystem, + Name: "kv_cpu_time_ms", + Help: "Bucketed histogram of the KV CPU time cost in milliseconds for all resource groups.", + Buckets: prometheus.ExponentialBuckets(1, 10, 3), // 1 ~ 1000 + }, []string{resourceGroupNameLabel}) + sqlCPUCost = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: resourceSubsystem, + Name: "sql_cpu_time_ms", + Help: "Bucketed histogram of the SQL CPU time cost in milliseconds for all resource groups.", + Buckets: prometheus.ExponentialBuckets(1, 10, 3), // 1 ~ 1000 + }, []string{resourceGroupNameLabel}) + requestCount = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: resourceSubsystem, + Name: "request_count", + Help: "The number of read/write requests for all resource groups.", + }, []string{resourceGroupNameLabel, typeLabel}) +) + +func init() { + prometheus.MustRegister(readRequestUnitCost) + prometheus.MustRegister(writeRequestUnitCost) + prometheus.MustRegister(readByteCost) + prometheus.MustRegister(writeByteCost) + prometheus.MustRegister(kvCPUCost) + prometheus.MustRegister(sqlCPUCost) + prometheus.MustRegister(requestCount) +}