Skip to content

Commit

Permalink
restore: auto tune concurrency configuration when using new mode rest…
Browse files Browse the repository at this point in the history
…oration (#50877)

ref #50701
  • Loading branch information
3pointer authored Feb 29, 2024
1 parent 27ce02a commit b6e955b
Show file tree
Hide file tree
Showing 12 changed files with 196 additions and 80 deletions.
6 changes: 5 additions & 1 deletion br/pkg/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "config",
srcs = ["ebs.go"],
srcs = [
"ebs.go",
"kv.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/config",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/metautil",
"//br/pkg/storage",
"@com_github_docker_go_units//:go-units",
"@com_github_masterminds_semver//:semver",
"@com_github_pingcap_errors//:errors",
"@io_k8s_api//core/v1:core",
Expand Down
58 changes: 58 additions & 0 deletions br/pkg/config/kv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0.
package config

import (
"encoding/json"

"github.com/docker/go-units"
)

type ConfigTerm[T uint | uint64] struct {
Value T
Modified bool
}

type KVConfig struct {
ImportGoroutines ConfigTerm[uint]
MergeRegionSize ConfigTerm[uint64]
MergeRegionKeyCount ConfigTerm[uint64]
}

func ParseImportThreadsFromConfig(resp []byte) (uint, error) {
type importer struct {
Threads uint `json:"num-threads"`
}

type config struct {
Import importer `json:"import"`
}
var c config
e := json.Unmarshal(resp, &c)
if e != nil {
return 0, e
}

return c.Import.Threads, nil
}

func ParseMergeRegionSizeFromConfig(resp []byte) (uint64, uint64, error) {
type coprocessor struct {
RegionSplitSize string `json:"region-split-size"`
RegionSplitKeys uint64 `json:"region-split-keys"`
}

type config struct {
Cop coprocessor `json:"coprocessor"`
}
var c config
e := json.Unmarshal(resp, &c)
if e != nil {
return 0, 0, e
}
rs, e := units.RAMInBytes(c.Cop.RegionSplitSize)
if e != nil {
return 0, 0, e
}
urs := uint64(rs)
return urs, c.Cop.RegionSplitKeys, nil
}
2 changes: 2 additions & 0 deletions br/pkg/conn/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/pingcap/tidb/br/pkg/conn",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/config",
"//br/pkg/conn/util",
"//br/pkg/errors",
"//br/pkg/glue",
Expand Down Expand Up @@ -47,6 +48,7 @@ go_test(
flaky = True,
shard_count = 7,
deps = [
"//br/pkg/config",
"//br/pkg/conn/util",
"//br/pkg/pdutil",
"//br/pkg/utils",
Expand Down
74 changes: 47 additions & 27 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ package conn
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/url"
Expand All @@ -20,6 +20,7 @@ import (
logbackup "github.com/pingcap/kvproto/pkg/logbackuppb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
kvconfig "github.com/pingcap/tidb/br/pkg/config"
"github.com/pingcap/tidb/br/pkg/conn/util"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/glue"
Expand Down Expand Up @@ -48,6 +49,11 @@ const (

// DefaultMergeRegionKeyCount is the default region key count, 960000.
DefaultMergeRegionKeyCount uint64 = 960000

// DefaultImportNumGoroutines is the default number of threads for import.
// use 128 as default value, which is 8 times of the default value of tidb.
// we think is proper for IO-bound cases.
DefaultImportNumGoroutines uint = 128
)

type VersionCheckerType int
Expand Down Expand Up @@ -290,42 +296,56 @@ func (mgr *Mgr) GetTS(ctx context.Context) (uint64, error) {
return oracle.ComposeTS(p, l), nil
}

// GetMergeRegionSizeAndCount returns the tikv config
// `coprocessor.region-split-size` and `coprocessor.region-split-key`.
// returns the default config when failed.
func (mgr *Mgr) GetMergeRegionSizeAndCount(ctx context.Context, client *http.Client) (uint64, uint64) {
regionSplitSize := DefaultMergeRegionSizeBytes
regionSplitKeys := DefaultMergeRegionKeyCount
type coprocessor struct {
RegionSplitKeys uint64 `json:"region-split-keys"`
RegionSplitSize string `json:"region-split-size"`
// ProcessTiKVConfigs handle the tikv config for region split size, region split keys, and import goroutines in place.
// It retrieves the config from all alive tikv stores and returns the minimum values.
// If retrieving the config fails, it returns the default config values.
func (mgr *Mgr) ProcessTiKVConfigs(ctx context.Context, cfg *kvconfig.KVConfig, client *http.Client) {
mergeRegionSize := cfg.MergeRegionSize
mergeRegionKeyCount := cfg.MergeRegionKeyCount
importGoroutines := cfg.ImportGoroutines

if mergeRegionSize.Modified && mergeRegionKeyCount.Modified && importGoroutines.Modified {
log.Info("no need to retrieve the config from tikv if user has set the config")
return
}

type config struct {
Cop coprocessor `json:"coprocessor"`
}
err := mgr.GetConfigFromTiKV(ctx, client, func(resp *http.Response) error {
c := &config{}
e := json.NewDecoder(resp.Body).Decode(c)
if e != nil {
return e
respBytes, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
rs, e := units.RAMInBytes(c.Cop.RegionSplitSize)
if e != nil {
return e
if !mergeRegionSize.Modified || !mergeRegionKeyCount.Modified {
size, keys, e := kvconfig.ParseMergeRegionSizeFromConfig(respBytes)
if e != nil {
log.Warn("Failed to parse region split size and keys from config", logutil.ShortError(e))
return e
}
if mergeRegionKeyCount.Value == DefaultMergeRegionKeyCount || keys < mergeRegionKeyCount.Value {
mergeRegionSize.Value = size
mergeRegionKeyCount.Value = keys
}
}
urs := uint64(rs)
if regionSplitSize == DefaultMergeRegionSizeBytes || urs < regionSplitSize {
regionSplitSize = urs
regionSplitKeys = c.Cop.RegionSplitKeys
if !importGoroutines.Modified {
threads, e := kvconfig.ParseImportThreadsFromConfig(respBytes)
if e != nil {
log.Warn("Failed to parse import num-threads from config", logutil.ShortError(e))
return e
}
// We use 8 times the default value because it's an IO-bound case.
if importGoroutines.Value == DefaultImportNumGoroutines || (threads > 0 && threads*8 < importGoroutines.Value) {
importGoroutines.Value = threads * 8
}
}
// replace the value
cfg.MergeRegionSize = mergeRegionSize
cfg.MergeRegionKeyCount = mergeRegionKeyCount
cfg.ImportGoroutines = importGoroutines
return nil
})

if err != nil {
log.Warn("meet error when getting config from TiKV; using default", logutil.ShortError(err))
return DefaultMergeRegionSizeBytes, DefaultMergeRegionKeyCount
log.Warn("Failed to get config from TiKV; using default", logutil.ShortError(err))
}
return regionSplitSize, regionSplitKeys
}

// GetConfigFromTiKV get configs from all alive tikv stores.
Expand Down
48 changes: 31 additions & 17 deletions br/pkg/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
kvconfig "github.com/pingcap/tidb/br/pkg/config"
"github.com/pingcap/tidb/br/pkg/conn/util"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/utils"
Expand Down Expand Up @@ -269,10 +270,11 @@ func TestGetConnOnCanceledContext(t *testing.T) {

func TestGetMergeRegionSizeAndCount(t *testing.T) {
cases := []struct {
stores []*metapb.Store
content []string
regionSplitSize uint64
regionSplitKeys uint64
stores []*metapb.Store
content []string
importNumGoroutines uint
regionSplitSize uint64
regionSplitKeys uint64
}{
{
stores: []*metapb.Store{
Expand All @@ -289,8 +291,9 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) {
},
content: []string{""},
// no tikv detected in this case
regionSplitSize: DefaultMergeRegionSizeBytes,
regionSplitKeys: DefaultMergeRegionKeyCount,
importNumGoroutines: DefaultImportNumGoroutines,
regionSplitSize: DefaultMergeRegionSizeBytes,
regionSplitKeys: DefaultMergeRegionKeyCount,
},
{
stores: []*metapb.Store{
Expand Down Expand Up @@ -321,8 +324,9 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) {
"",
},
// no tikv detected in this case
regionSplitSize: DefaultMergeRegionSizeBytes,
regionSplitKeys: DefaultMergeRegionKeyCount,
importNumGoroutines: DefaultImportNumGoroutines,
regionSplitSize: DefaultMergeRegionSizeBytes,
regionSplitKeys: DefaultMergeRegionKeyCount,
},
{
stores: []*metapb.Store{
Expand All @@ -338,8 +342,10 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) {
},
},
content: []string{
"{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 1, \"region-split-size\": \"1MiB\"}}",
"{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 1, \"region-split-size\": \"1MiB\"}, \"import\": {\"num-threads\": 6}}",
},
// the number of import goroutines is 8 times than import.num-threads.
importNumGoroutines: 48,
// one tikv detected in this case we are not update default size and keys because they are too small.
regionSplitSize: 1 * units.MiB,
regionSplitKeys: 1,
Expand All @@ -358,8 +364,9 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) {
},
},
content: []string{
"{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 10000000, \"region-split-size\": \"1GiB\"}}",
"{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 10000000, \"region-split-size\": \"1GiB\"}, \"import\": {\"num-threads\": 128}}",
},
importNumGoroutines: 1024,
// one tikv detected in this case and we update with new size and keys.
regionSplitSize: 1 * units.GiB,
regionSplitKeys: 10000000,
Expand Down Expand Up @@ -388,12 +395,13 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) {
},
},
content: []string{
"{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 10000000, \"region-split-size\": \"1GiB\"}}",
"{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 12000000, \"region-split-size\": \"900MiB\"}}",
"{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 10000000, \"region-split-size\": \"1GiB\"}, \"import\": {\"num-threads\": 128}}",
"{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 12000000, \"region-split-size\": \"900MiB\"}, \"import\": {\"num-threads\": 12}}",
},
// two tikv detected in this case and we choose the small one.
regionSplitSize: 900 * units.MiB,
regionSplitKeys: 12000000,
importNumGoroutines: 96,
regionSplitSize: 1 * units.GiB,
regionSplitKeys: 10000000,
},
}

Expand All @@ -420,9 +428,15 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) {
httpCli := mockServer.Client()
mgr := &Mgr{PdController: &pdutil.PdController{}}
mgr.PdController.SetPDClient(pdCli)
rs, rk := mgr.GetMergeRegionSizeAndCount(ctx, httpCli)
require.Equal(t, ca.regionSplitSize, rs)
require.Equal(t, ca.regionSplitKeys, rk)
kvConfigs := &kvconfig.KVConfig{
ImportGoroutines: kvconfig.ConfigTerm[uint]{Value: DefaultImportNumGoroutines, Modified: false},
MergeRegionSize: kvconfig.ConfigTerm[uint64]{Value: DefaultMergeRegionSizeBytes, Modified: false},
MergeRegionKeyCount: kvconfig.ConfigTerm[uint64]{Value: DefaultMergeRegionKeyCount, Modified: false},
}
mgr.ProcessTiKVConfigs(ctx, kvConfigs, httpCli)
require.EqualValues(t, ca.regionSplitSize, kvConfigs.MergeRegionSize.Value)
require.EqualValues(t, ca.regionSplitKeys, kvConfigs.MergeRegionKeyCount.Value)
require.EqualValues(t, ca.importNumGoroutines, kvConfigs.ImportGoroutines.Value)
mockServer.Close()
}
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/rtree/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ func (rs rangesMarshaler) MarshalLogObject(encoder zapcore.ObjectEncoder) error
encoder.AddInt("totalFiles", totalFile)
encoder.AddUint64("totalKVs", totalKV)
encoder.AddUint64("totalBytes", totalBytes)
encoder.AddUint64("totalSize", totalBytes)
encoder.AddUint64("totalSize", totalSize)
return nil
}
2 changes: 1 addition & 1 deletion br/pkg/rtree/logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestLogRanges(t *testing.T) {
ranges := make([]rtree.Range, cs.count)
for j := 0; j < cs.count; j++ {
ranges[j] = *newRange([]byte(fmt.Sprintf("%d", j)), []byte(fmt.Sprintf("%d", j+1)))
ranges[j].Files = append(ranges[j].Files, &backuppb.File{TotalKvs: uint64(j), TotalBytes: uint64(j)})
ranges[j].Files = append(ranges[j].Files, &backuppb.File{TotalKvs: uint64(j), TotalBytes: uint64(j), Size_: uint64(j)})
}
out, err := encoder.EncodeEntry(zapcore.Entry{}, []zap.Field{rtree.ZapRanges(ranges)})
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ go_test(
flaky = True,
shard_count = 22,
deps = [
"//br/pkg/config",
"//br/pkg/conn",
"//br/pkg/errors",
"//br/pkg/metautil",
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/task/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

backup "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/encryptionpb"
kvconfig "github.com/pingcap/tidb/br/pkg/config"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/config"
Expand Down Expand Up @@ -208,8 +209,8 @@ func expectedDefaultRestoreConfig() RestoreConfig {
Config: defaultConfig,
RestoreCommonConfig: RestoreCommonConfig{Online: false,
Granularity: "fine-grained",
MergeSmallRegionSizeBytes: 0x6000000,
MergeSmallRegionKeyCount: 0xea600,
MergeSmallRegionSizeBytes: kvconfig.ConfigTerm[uint64]{Value: 0x6000000},
MergeSmallRegionKeyCount: kvconfig.ConfigTerm[uint64]{Value: 0xea600},
WithSysTable: true,
ResetSysUsers: []string{"cloud_admin", "root"}},
NoSchema: false,
Expand Down
Loading

0 comments on commit b6e955b

Please sign in to comment.