Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource_control: load resource unit config from local pd client #43097

Merged
merged 6 commits into from
Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ go_library(
"@com_github_tikv_client_go_v2//txnkv/txnsnapshot",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//resource_group/controller",
"@com_github_twmb_murmur3//:murmur3",
"@com_sourcegraph_sourcegraph_appdash//:appdash",
"@com_sourcegraph_sourcegraph_appdash//opentracing",
Expand Down Expand Up @@ -458,6 +459,8 @@ go_test(
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//resource_group/controller",
"@org_golang_google_grpc//:grpc",
"@org_golang_x_exp//slices",
"@org_uber_go_atomic//:atomic",
Expand Down
169 changes: 68 additions & 101 deletions executor/calibrate_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,54 +16,70 @@ package executor

import (
"context"
"strconv"
"strings"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
rmclient "github.com/tikv/pd/client/resource_group/controller"
)

// workloadBaseRUCostMap contains the base resource cost rate per 1 kv cpu within 1 second,
// the data is calculated from benchmark result, these data might not be very accurate,
// but is enough here because the maximum RU capacity is depended on both the cluster and
// the workload.
var workloadBaseRUCostMap = map[ast.CalibrateResourceType]*baseResourceCost{
ast.TPCC: {
tidbCPU: 0.6,
kvCPU: 0.15,
readBytes: units.MiB / 2,
writeBytes: units.MiB,
readReqCount: 300,
writeReqCount: 1750,
},
ast.OLTPREADWRITE: {
tidbCPU: 1.25,
kvCPU: 0.35,
readBytes: units.MiB * 4.25,
writeBytes: units.MiB / 3,
readReqCount: 1600,
writeReqCount: 1400,
},
ast.OLTPREADONLY: {
tidbCPU: 2,
kvCPU: 0.52,
readBytes: units.MiB * 28,
writeBytes: 0,
readReqCount: 4500,
writeReqCount: 0,
},
ast.OLTPWRITEONLY: {
tidbCPU: 1,
kvCPU: 0,
readBytes: 0,
writeBytes: units.MiB,
readReqCount: 0,
writeReqCount: 3550,
},
var (
// workloadBaseRUCostMap contains the base resource cost rate per 1 kv cpu within 1 second,
// the data is calculated from benchmark result, these data might not be very accurate,
// but is enough here because the maximum RU capacity is depended on both the cluster and
// the workload.
workloadBaseRUCostMap = map[ast.CalibrateResourceType]*baseResourceCost{
ast.TPCC: {
tidbCPU: 0.6,
kvCPU: 0.15,
readBytes: units.MiB / 2,
writeBytes: units.MiB,
readReqCount: 300,
writeReqCount: 1750,
},
ast.OLTPREADWRITE: {
tidbCPU: 1.25,
kvCPU: 0.35,
readBytes: units.MiB * 4.25,
writeBytes: units.MiB / 3,
readReqCount: 1600,
writeReqCount: 1400,
},
ast.OLTPREADONLY: {
tidbCPU: 2,
kvCPU: 0.52,
readBytes: units.MiB * 28,
writeBytes: 0,
readReqCount: 4500,
writeReqCount: 0,
},
ast.OLTPWRITEONLY: {
tidbCPU: 1,
kvCPU: 0,
readBytes: 0,
writeBytes: units.MiB,
readReqCount: 0,
writeReqCount: 3550,
},
}

// resourceGrooupCtl is the ResourceGroupController in pd client
resourceGrooupCtl *rmclient.ResourceGroupsController
)

// SetResourceGroupController set a inited ResourceGroupsController for calibrate usage.
func SetResourceGroupController(rc *rmclient.ResourceGroupsController) {
resourceGrooupCtl = rc
}

// GetResourceGroupController returns the ResourceGroupsController.
func GetResourceGroupController() *rmclient.ResourceGroupsController {
return resourceGrooupCtl
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra o

}

// the resource cost rate of a specified workload per 1 tikv cpu
Expand Down Expand Up @@ -97,15 +113,16 @@ func (e *calibrateResourceExec) Next(ctx context.Context, req *chunk.Chunk) erro
}
e.done = true

exec := e.ctx.(sqlexec.RestrictedSQLExecutor)
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers)

if !variable.EnableResourceControl.Load() {
return infoschema.ErrResourceGroupSupportDisabled
}
// first fetch the ru settings config.
ruCfg, err := getRUSettings(ctx, exec)
if err != nil {
return err
if resourceGrooupCtl == nil {
return errors.New("resource group controller is not initialized.")
}

exec := e.ctx.(sqlexec.RestrictedSQLExecutor)
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers)
totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec)
if err != nil {
return err
Expand All @@ -127,68 +144,18 @@ func (e *calibrateResourceExec) Next(ctx context.Context, req *chunk.Chunk) erro
if totalTiDBCPU/baseCost.tidbCPU < totalKVCPUQuota {
totalKVCPUQuota = totalTiDBCPU / baseCost.tidbCPU
}
ruPerKVCPU := ruCfg.readBaseCost*float64(baseCost.readReqCount) +
ruCfg.readCostCPU*baseCost.kvCPU +
ruCfg.readCostPerByte*float64(baseCost.readBytes) +
ruCfg.writeBaseCost*float64(baseCost.writeReqCount) +
ruCfg.writeCostPerByte*float64(baseCost.writeBytes)
ruCfg := resourceGrooupCtl.GetConfig()
ruPerKVCPU := float64(ruCfg.ReadBaseCost)*float64(baseCost.readReqCount) +
float64(ruCfg.CPUMsCost)*baseCost.kvCPU +
float64(ruCfg.ReadBytesCost)*float64(baseCost.readBytes) +
float64(ruCfg.WriteBaseCost)*float64(baseCost.writeReqCount) +
float64(ruCfg.WriteBytesCost)*float64(baseCost.writeBytes)
quota := totalKVCPUQuota * ruPerKVCPU
req.AppendUint64(0, uint64(quota))

return nil
}

type ruConfig struct {
readBaseCost float64
writeBaseCost float64
readCostCPU float64
readCostPerByte float64
writeCostPerByte float64
}

func getRUSettings(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (*ruConfig, error) {
rows, fields, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, "SHOW CONFIG WHERE TYPE = 'pd' AND name like 'controller.request-unit.%'")
if err != nil {
return nil, errors.Trace(err)
}
if len(rows) == 0 {
return nil, errors.New("PD request-unit config not found")
}
var nameIdx, valueIdx int
for i, f := range fields {
switch f.ColumnAsName.L {
case "name":
nameIdx = i
case "value":
valueIdx = i
}
}

cfg := &ruConfig{}
for _, row := range rows {
val, err := strconv.ParseFloat(row.GetString(valueIdx), 64)
if err != nil {
return nil, errors.Trace(err)
}
name, _ := strings.CutPrefix(row.GetString(nameIdx), "controller.request-unit.")

switch name {
case "read-base-cost":
cfg.readBaseCost = val
case "read-cost-per-byte":
cfg.readCostPerByte = val
case "read-cpu-ms-cost":
cfg.readCostCPU = val
case "write-base-cost":
cfg.writeBaseCost = val
case "write-cost-per-byte":
cfg.writeCostPerByte = val
}
}

return cfg, nil
}

func getTiKVTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) {
query := "SELECT SUM(value) FROM METRICS_SCHEMA.tikv_cpu_quota GROUP BY time ORDER BY time desc limit 1"
return getNumberFromMetrics(ctx, exec, query, "tikv_cpu_quota")
Expand Down
72 changes: 51 additions & 21 deletions executor/calibrate_resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,60 @@ package executor_test

import (
"context"
"encoding/json"
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
rmclient "github.com/tikv/pd/client/resource_group/controller"
)

func TestCalibrateResource(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

var confItems [][]types.Datum
var confErr error
var confFunc executor.TestShowClusterConfigFunc = func() ([][]types.Datum, error) {
return confItems, confErr
}
tk.Session().SetValue(executor.TestShowClusterConfigKey, confFunc)
strs2Items := func(strs ...string) []types.Datum {
items := make([]types.Datum, 0, len(strs))
for _, s := range strs {
items = append(items, types.NewStringDatum(s))
}
return items
}

// empty request-unit config error
// first test resource_control flag
tk.MustExec("SET GLOBAL tidb_enable_resource_control='OFF';")
rs, err := tk.Exec("CALIBRATE RESOURCE")
require.NoError(t, err)
require.NotNil(t, rs)
err = rs.Next(context.Background(), rs.NewChunk(nil))
require.ErrorContains(t, err, "PD request-unit config not found")
require.ErrorContains(t, err, "Resource control feature is disabled")

confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "controller.request-unit.read-base-cost", "0.25"))
confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "controller.request-unit.read-cost-per-byte", "0.0000152587890625"))
confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "controller.request-unit.read-cpu-ms-cost", "0.3333333333333333"))
confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "controller.request-unit.write-base-cost", "1"))
confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "controller.request-unit.write-cost-per-byte", "0.0009765625"))
tk.MustExec("SET GLOBAL tidb_enable_resource_control='ON';")

// resource group controller is not inited.
rs, err = tk.Exec("CALIBRATE RESOURCE")
require.NoError(t, err)
require.NotNil(t, rs)
err = rs.Next(context.Background(), rs.NewChunk(nil))
require.ErrorContains(t, err, "resource group controller is not initialized.")

oldResourceCtl := executor.GetResourceGroupController()
defer func() {
executor.SetResourceGroupController(oldResourceCtl)
}()

mockPrivider := &mockResourceGroupProvider{
cfg: rmclient.ControllerConfig{
RequestUnit: rmclient.RequestUnitConfig{
ReadBaseCost: 0.25,
ReadCostPerByte: 0.0000152587890625,
WriteBaseCost: 1.0,
WriteCostPerByte: 0.0009765625,
CPUMsCost: 0.3333333333333333,
},
},
}
resource_ctl, err := rmclient.NewResourceGroupController(context.Background(), 1, mockPrivider, nil)
require.NoError(t, err)
executor.SetResourceGroupController(resource_ctl)

// empty metrics error
rs, err = tk.Exec("CALIBRATE RESOURCE")
Expand Down Expand Up @@ -107,3 +121,19 @@ func TestCalibrateResource(t *testing.T) {
}
tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE").Check(testkit.Rows("38094"))
}

type mockResourceGroupProvider struct {
rmclient.ResourceGroupProvider
cfg rmclient.ControllerConfig
}

func (p *mockResourceGroupProvider) LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]pd.GlobalConfigItem, int64, error) {
if configPath != "resource_group/controller" {
return nil, 0, errors.New("unsupported configPath")
}
payload, _ := json.Marshal(&p.cfg)
item := pd.GlobalConfigItem{
PayLoad: payload,
}
return []pd.GlobalConfigItem{item}, 0, nil
}
1 change: 1 addition & 0 deletions store/driver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/pingcap/tidb/store/driver",
visibility = ["//visibility:public"],
deps = [
"//executor",
"//executor/importer",
"//kv",
"//sessionctx/variable",
Expand Down
2 changes: 2 additions & 0 deletions store/driver/tikv_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/errors"
deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/executor/importer"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -109,6 +110,7 @@ func TrySetupGlobalResourceController(ctx context.Context, serverID uint64, s kv
if err != nil {
return err
}
executor.SetResourceGroupController(control)
tikv.SetResourceControlInterceptor(control)
control.Start(ctx)
return nil
Expand Down