Skip to content

Commit

Permalink
resource_control: load resource unit config from local pd client (#43097
Browse files Browse the repository at this point in the history
)

ref #38825
  • Loading branch information
glorv authored Apr 19, 2023
1 parent 5e76871 commit fc5392e
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 123 deletions.
3 changes: 3 additions & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ go_library(
"@com_github_tikv_client_go_v2//txnkv/txnsnapshot",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//resource_group/controller",
"@com_github_twmb_murmur3//:murmur3",
"@com_sourcegraph_sourcegraph_appdash//:appdash",
"@com_sourcegraph_sourcegraph_appdash//opentracing",
Expand Down Expand Up @@ -458,6 +459,8 @@ go_test(
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//resource_group/controller",
"@org_golang_google_grpc//:grpc",
"@org_golang_x_exp//slices",
"@org_uber_go_atomic//:atomic",
Expand Down
171 changes: 69 additions & 102 deletions executor/calibrate_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,57 +16,73 @@ package executor

import (
"context"
"strconv"
"strings"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
rmclient "github.com/tikv/pd/client/resource_group/controller"
)

// workloadBaseRUCostMap contains the base resource cost rate per 1 kv cpu within 1 second,
// the data is calculated from benchmark result, these data might not be very accurate,
// but is enough here because the maximum RU capacity is depended on both the cluster and
// the workload.
var workloadBaseRUCostMap = map[ast.CalibrateResourceType]*baseResourceCost{
ast.TPCC: {
tidbCPU: 0.6,
kvCPU: 0.15,
readBytes: units.MiB / 2,
writeBytes: units.MiB,
readReqCount: 300,
writeReqCount: 1750,
},
ast.OLTPREADWRITE: {
tidbCPU: 1.25,
kvCPU: 0.35,
readBytes: units.MiB * 4.25,
writeBytes: units.MiB / 3,
readReqCount: 1600,
writeReqCount: 1400,
},
ast.OLTPREADONLY: {
tidbCPU: 2,
kvCPU: 0.52,
readBytes: units.MiB * 28,
writeBytes: 0,
readReqCount: 4500,
writeReqCount: 0,
},
ast.OLTPWRITEONLY: {
tidbCPU: 1,
kvCPU: 0,
readBytes: 0,
writeBytes: units.MiB,
readReqCount: 0,
writeReqCount: 3550,
},
var (
// workloadBaseRUCostMap contains the base resource cost rate per 1 kv cpu within 1 second,
// the data is calculated from benchmark result, these data might not be very accurate,
// but is enough here because the maximum RU capacity is depended on both the cluster and
// the workload.
workloadBaseRUCostMap = map[ast.CalibrateResourceType]*baseResourceCost{
ast.TPCC: {
tidbCPU: 0.6,
kvCPU: 0.15,
readBytes: units.MiB / 2,
writeBytes: units.MiB,
readReqCount: 300,
writeReqCount: 1750,
},
ast.OLTPREADWRITE: {
tidbCPU: 1.25,
kvCPU: 0.35,
readBytes: units.MiB * 4.25,
writeBytes: units.MiB / 3,
readReqCount: 1600,
writeReqCount: 1400,
},
ast.OLTPREADONLY: {
tidbCPU: 2,
kvCPU: 0.52,
readBytes: units.MiB * 28,
writeBytes: 0,
readReqCount: 4500,
writeReqCount: 0,
},
ast.OLTPWRITEONLY: {
tidbCPU: 1,
kvCPU: 0,
readBytes: 0,
writeBytes: units.MiB,
readReqCount: 0,
writeReqCount: 3550,
},
}

// resourceGroupCtl is the ResourceGroupController in pd client
resourceGroupCtl *rmclient.ResourceGroupsController
)

// SetResourceGroupController set a inited ResourceGroupsController for calibrate usage.
func SetResourceGroupController(rc *rmclient.ResourceGroupsController) {
resourceGroupCtl = rc
}

// the resource cost rate of a specified workload per 1 tikv cpu
// GetResourceGroupController returns the ResourceGroupsController.
func GetResourceGroupController() *rmclient.ResourceGroupsController {
return resourceGroupCtl
}

// the resource cost rate of a specified workload per 1 tikv cpu.
type baseResourceCost struct {
// the average tikv cpu time, this is used to calculate whether tikv cpu
// or tidb cpu is the performance bottle neck.
Expand Down Expand Up @@ -97,15 +113,16 @@ func (e *calibrateResourceExec) Next(ctx context.Context, req *chunk.Chunk) erro
}
e.done = true

exec := e.ctx.(sqlexec.RestrictedSQLExecutor)
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers)

if !variable.EnableResourceControl.Load() {
return infoschema.ErrResourceGroupSupportDisabled
}
// first fetch the ru settings config.
ruCfg, err := getRUSettings(ctx, exec)
if err != nil {
return err
if resourceGroupCtl == nil {
return errors.New("resource group controller is not initialized")
}

exec := e.ctx.(sqlexec.RestrictedSQLExecutor)
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers)
totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec)
if err != nil {
return err
Expand All @@ -127,68 +144,18 @@ func (e *calibrateResourceExec) Next(ctx context.Context, req *chunk.Chunk) erro
if totalTiDBCPU/baseCost.tidbCPU < totalKVCPUQuota {
totalKVCPUQuota = totalTiDBCPU / baseCost.tidbCPU
}
ruPerKVCPU := ruCfg.readBaseCost*float64(baseCost.readReqCount) +
ruCfg.readCostCPU*baseCost.kvCPU +
ruCfg.readCostPerByte*float64(baseCost.readBytes) +
ruCfg.writeBaseCost*float64(baseCost.writeReqCount) +
ruCfg.writeCostPerByte*float64(baseCost.writeBytes)
ruCfg := resourceGroupCtl.GetConfig()
ruPerKVCPU := float64(ruCfg.ReadBaseCost)*float64(baseCost.readReqCount) +
float64(ruCfg.CPUMsCost)*baseCost.kvCPU +
float64(ruCfg.ReadBytesCost)*float64(baseCost.readBytes) +
float64(ruCfg.WriteBaseCost)*float64(baseCost.writeReqCount) +
float64(ruCfg.WriteBytesCost)*float64(baseCost.writeBytes)
quota := totalKVCPUQuota * ruPerKVCPU
req.AppendUint64(0, uint64(quota))

return nil
}

type ruConfig struct {
readBaseCost float64
writeBaseCost float64
readCostCPU float64
readCostPerByte float64
writeCostPerByte float64
}

func getRUSettings(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (*ruConfig, error) {
rows, fields, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, "SHOW CONFIG WHERE TYPE = 'pd' AND name like 'controller.request-unit.%'")
if err != nil {
return nil, errors.Trace(err)
}
if len(rows) == 0 {
return nil, errors.New("PD request-unit config not found")
}
var nameIdx, valueIdx int
for i, f := range fields {
switch f.ColumnAsName.L {
case "name":
nameIdx = i
case "value":
valueIdx = i
}
}

cfg := &ruConfig{}
for _, row := range rows {
val, err := strconv.ParseFloat(row.GetString(valueIdx), 64)
if err != nil {
return nil, errors.Trace(err)
}
name, _ := strings.CutPrefix(row.GetString(nameIdx), "controller.request-unit.")

switch name {
case "read-base-cost":
cfg.readBaseCost = val
case "read-cost-per-byte":
cfg.readCostPerByte = val
case "read-cpu-ms-cost":
cfg.readCostCPU = val
case "write-base-cost":
cfg.writeBaseCost = val
case "write-cost-per-byte":
cfg.writeCostPerByte = val
}
}

return cfg, nil
}

func getTiKVTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) {
query := "SELECT SUM(value) FROM METRICS_SCHEMA.tikv_cpu_quota GROUP BY time ORDER BY time desc limit 1"
return getNumberFromMetrics(ctx, exec, query, "tikv_cpu_quota")
Expand Down
72 changes: 51 additions & 21 deletions executor/calibrate_resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,60 @@ package executor_test

import (
"context"
"encoding/json"
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
rmclient "github.com/tikv/pd/client/resource_group/controller"
)

func TestCalibrateResource(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

var confItems [][]types.Datum
var confErr error
var confFunc executor.TestShowClusterConfigFunc = func() ([][]types.Datum, error) {
return confItems, confErr
}
tk.Session().SetValue(executor.TestShowClusterConfigKey, confFunc)
strs2Items := func(strs ...string) []types.Datum {
items := make([]types.Datum, 0, len(strs))
for _, s := range strs {
items = append(items, types.NewStringDatum(s))
}
return items
}

// empty request-unit config error
// first test resource_control flag
tk.MustExec("SET GLOBAL tidb_enable_resource_control='OFF';")
rs, err := tk.Exec("CALIBRATE RESOURCE")
require.NoError(t, err)
require.NotNil(t, rs)
err = rs.Next(context.Background(), rs.NewChunk(nil))
require.ErrorContains(t, err, "PD request-unit config not found")
require.ErrorContains(t, err, "Resource control feature is disabled")

confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "controller.request-unit.read-base-cost", "0.25"))
confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "controller.request-unit.read-cost-per-byte", "0.0000152587890625"))
confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "controller.request-unit.read-cpu-ms-cost", "0.3333333333333333"))
confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "controller.request-unit.write-base-cost", "1"))
confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "controller.request-unit.write-cost-per-byte", "0.0009765625"))
tk.MustExec("SET GLOBAL tidb_enable_resource_control='ON';")

// resource group controller is not inited.
rs, err = tk.Exec("CALIBRATE RESOURCE")
require.NoError(t, err)
require.NotNil(t, rs)
err = rs.Next(context.Background(), rs.NewChunk(nil))
require.ErrorContains(t, err, "resource group controller is not initialized")

oldResourceCtl := executor.GetResourceGroupController()
defer func() {
executor.SetResourceGroupController(oldResourceCtl)
}()

mockPrivider := &mockResourceGroupProvider{
cfg: rmclient.ControllerConfig{
RequestUnit: rmclient.RequestUnitConfig{
ReadBaseCost: 0.25,
ReadCostPerByte: 0.0000152587890625,
WriteBaseCost: 1.0,
WriteCostPerByte: 0.0009765625,
CPUMsCost: 0.3333333333333333,
},
},
}
resourceCtl, err := rmclient.NewResourceGroupController(context.Background(), 1, mockPrivider, nil)
require.NoError(t, err)
executor.SetResourceGroupController(resourceCtl)

// empty metrics error
rs, err = tk.Exec("CALIBRATE RESOURCE")
Expand Down Expand Up @@ -107,3 +121,19 @@ func TestCalibrateResource(t *testing.T) {
}
tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE").Check(testkit.Rows("38094"))
}

type mockResourceGroupProvider struct {
rmclient.ResourceGroupProvider
cfg rmclient.ControllerConfig
}

func (p *mockResourceGroupProvider) LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]pd.GlobalConfigItem, int64, error) {
if configPath != "resource_group/controller" {
return nil, 0, errors.New("unsupported configPath")
}
payload, _ := json.Marshal(&p.cfg)
item := pd.GlobalConfigItem{
PayLoad: payload,
}
return []pd.GlobalConfigItem{item}, 0, nil
}
1 change: 1 addition & 0 deletions store/driver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/pingcap/tidb/store/driver",
visibility = ["//visibility:public"],
deps = [
"//executor",
"//executor/importer",
"//kv",
"//sessionctx/variable",
Expand Down
2 changes: 2 additions & 0 deletions store/driver/tikv_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/errors"
deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/executor/importer"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -109,6 +110,7 @@ func TrySetupGlobalResourceController(ctx context.Context, serverID uint64, s kv
if err != nil {
return err
}
executor.SetResourceGroupController(control)
tikv.SetResourceControlInterceptor(control)
control.Start(ctx)
return nil
Expand Down

0 comments on commit fc5392e

Please sign in to comment.