Skip to content

Commit

Permalink
*: support MockAutoScaler and AWSAutoScaler (#40729)
Browse files Browse the repository at this point in the history
close #40747
  • Loading branch information
guo-shaoge authored Feb 1, 2023
1 parent 01b2310 commit d28a8d8
Show file tree
Hide file tree
Showing 16 changed files with 729 additions and 116 deletions.
1 change: 1 addition & 0 deletions config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//br/pkg/streamhelper/config",
"//parser/terror",
"//util/logutil",
"//util/tiflashcompute",
"//util/tikvutil",
"//util/versioninfo",
"@com_github_burntsushi_toml//:toml",
Expand Down
45 changes: 44 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
logbackupconf "github.com/pingcap/tidb/br/pkg/streamhelper/config"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/tiflashcompute"
"github.com/pingcap/tidb/util/tikvutil"
"github.com/pingcap/tidb/util/versioninfo"
tikvcfg "github.com/tikv/client-go/v2/config"
Expand Down Expand Up @@ -287,7 +288,14 @@ type Config struct {
Plugin Plugin `toml:"plugin" json:"plugin"`
MaxServerConnections uint32 `toml:"max-server-connections" json:"max-server-connections"`
RunDDL bool `toml:"run-ddl" json:"run-ddl"`
DisaggregatedTiFlash bool `toml:"disaggregated-tiflash" json:"disaggregated-tiflash"`

// These configs are related to disaggregated-tiflash mode.
DisaggregatedTiFlash bool `toml:"disaggregated-tiflash" json:"disaggregated-tiflash"`
TiFlashComputeAutoScalerType string `toml:"autoscaler-type" json:"autoscaler-type"`
TiFlashComputeAutoScalerAddr string `toml:"autoscaler-addr" json:"autoscaler-addr"`
IsTiFlashComputeFixedPool bool `toml:"is-tiflashcompute-fixed-pool" json:"is-tiflashcompute-fixed-pool"`
AutoScalerClusterID string `toml:"autoscaler-cluster-id" json:"autoscaler-cluster-id"`

// TiDBMaxReuseChunk indicates max cached chunk num
TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"`
// TiDBMaxReuseColumn indicates max cached column num
Expand Down Expand Up @@ -1000,6 +1008,10 @@ var defaultConf = Config{
EnableGlobalKill: true,
TrxSummary: DefaultTrxSummary(),
DisaggregatedTiFlash: false,
TiFlashComputeAutoScalerType: tiflashcompute.DefASStr,
TiFlashComputeAutoScalerAddr: tiflashcompute.DefAWSAutoScalerAddr,
IsTiFlashComputeFixedPool: false,
AutoScalerClusterID: "",
TiDBMaxReuseChunk: 64,
TiDBMaxReuseColumn: 256,
TiDBEnableExitCheck: false,
Expand Down Expand Up @@ -1031,6 +1043,26 @@ func StoreGlobalConfig(config *Config) {
tikvcfg.StoreGlobalConfig(&cfg)
}

// GetAutoScalerClusterID returns KeyspaceName or AutoScalerClusterID.
func GetAutoScalerClusterID() (string, error) {
c := GetGlobalConfig()
keyspaceName := c.KeyspaceName
clusterID := c.AutoScalerClusterID

if keyspaceName != "" && clusterID != "" {
return "", errors.Errorf("config.KeyspaceName(%s) and config.AutoScalerClusterID(%s) are not empty both", keyspaceName, clusterID)
}
if keyspaceName == "" && clusterID == "" {
return "", errors.Errorf("config.KeyspaceName and config.AutoScalerClusterID are both empty")
}

res := keyspaceName
if res == "" {
res = clusterID
}
return res, nil
}

// removedConfig contains items that are no longer supported.
// they might still be in the config struct to support import,
// but are not actively used.
Expand Down Expand Up @@ -1315,6 +1347,17 @@ func (c *Config) Valid() error {
return fmt.Errorf("stats-load-queue-size should be [%d, %d]", DefStatsLoadQueueSizeLimit, DefMaxOfStatsLoadQueueSizeLimit)
}

// Check tiflash_compute topo fetch is valid.
if c.DisaggregatedTiFlash {
if !tiflashcompute.IsValidAutoScalerConfig(c.TiFlashComputeAutoScalerType) {
return fmt.Errorf("invalid AutoScaler type, expect %s, %s or %s, got %s",
tiflashcompute.MockASStr, tiflashcompute.AWSASStr, tiflashcompute.GCPASStr, c.TiFlashComputeAutoScalerType)
}
if c.TiFlashComputeAutoScalerAddr == "" {
return fmt.Errorf("autoscaler-addr cannot be empty when disaggregated-tiflash mode is true")
}
}

// test log level
l := zap.NewAtomicLevel()
return l.UnmarshalText([]byte(c.Log.Level))
Expand Down
15 changes: 15 additions & 0 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,21 @@ enable-enum-length-limit = true
# command can be forwarded to the right TiDB instance to execute.
enable-global-kill = true

# disaggregated-tiflash indicates whether TiDB is in disaggregated tiflash mode, if true, MPP will runs on tiflash_compute nodes.
disaggregated-tiflash = false

# autoscaler-type indicates which type of AutoScaler will be used. Possible values are: mock, aws, gcp.
# Only meaningful when disaggregated-tiflash is true.
autoscaler-type = "aws"

# autoscaler-addr is the host of AutoScaler, Only meaningful when disaggregated-tiflash is true.
# Only meaningful when disaggregated-tiflash is true.
autoscaler-addr = "tiflash-autoscale-lb.tiflash-autoscale.svc.cluster.local:8081"

# autoscaler-cluster-id is the unique id for each TiDB cluster, which will used by AutoScaler.
# Only meaningful when disaggregated-tiflash is true.
autoscaler-cluster-id = ""

[log]
# Log level: debug, info, warn, error, fatal.
level = "info"
Expand Down
1 change: 1 addition & 0 deletions executor/tiflashtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_test(
"//testkit",
"//testkit/external",
"//util/israce",
"//util/tiflashcompute",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/metapb",
Expand Down
21 changes: 15 additions & 6 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/testkit/external"
"github.com/pingcap/tidb/util/israce"
"github.com/pingcap/tidb/util/tiflashcompute"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
)
Expand Down Expand Up @@ -1258,24 +1259,32 @@ func TestDisaggregatedTiFlash(t *testing.T) {
config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = true
})
defer config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = false
})
err := tiflashcompute.InitGlobalTopoFetcher(tiflashcompute.TestASStr, "", "", false)
require.NoError(t, err)

store := testkit.CreateMockStore(t, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(c1 int)")
tk.MustExec("alter table t set tiflash replica 1")
tb := external.GetTableByName(t, tk, "test", "t")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")

err = tk.ExecToErr("select * from t;")
require.Contains(t, err.Error(), "tiflash_compute node is unavailable")
// Expect error, because TestAutoScaler return empty topo.
require.Contains(t, err.Error(), "Cannot find proper topo from AutoScaler")

config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = false
})
tk.MustQuery("select * from t;").Check(testkit.Rows())
err = tiflashcompute.InitGlobalTopoFetcher(tiflashcompute.AWSASStr, "", "", false)
require.NoError(t, err)
err = tk.ExecToErr("select * from t;")
// Expect error, because AWSAutoScaler is not setup, so http request will fail.
require.Contains(t, err.Error(), "[util:1815]Internal : get tiflash_compute topology failed")
}

func TestDisaggregatedTiFlashQuery(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions planner/core/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ type mppAddr struct {
addr string
}

var _ kv.MPPTaskMeta = &mppAddr{}

func (m *mppAddr) GetAddress() string {
return m.addr
}
Expand Down
8 changes: 0 additions & 8 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3339,14 +3339,6 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
return nil, err
}

if config.GetGlobalConfig().DisaggregatedTiFlash {
// Invalid client-go tiflash_compute store cache if necessary.
err = dom.WatchTiFlashComputeNodeChange()
if err != nil {
return nil, err
}
}

if err = extensionimpl.Bootstrap(context.Background(), dom); err != nil {
return nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//util/mathutil",
"//util/memory",
"//util/paging",
"//util/tiflashcompute",
"//util/trxevents",
"@com_github_dgraph_io_ristretto//:ristretto",
"@com_github_gogo_protobuf//proto",
Expand All @@ -44,6 +45,7 @@ go_library(
"@com_github_pingcap_kvproto//pkg/mpp",
"@com_github_pingcap_log//:log",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_stathat_consistent//:consistent",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//metrics",
Expand Down Expand Up @@ -77,6 +79,7 @@ go_test(
"//kv",
"//store/driver/backoff",
"//testkit/testsetup",
"//util/logutil",
"//util/paging",
"//util/trxevents",
"@com_github_pingcap_errors//:errors",
Expand All @@ -89,5 +92,6 @@ go_test(
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_zap//:zap",
],
)
Loading

0 comments on commit d28a8d8

Please sign in to comment.