Skip to content

Commit 3e00797

Browse files
authored
Merge branch 'master' into fk-fix-br0
2 parents 3f708ef + d28a8d8 commit 3e00797

19 files changed

+777
-118
lines changed

config/BUILD.bazel

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ go_library(
1313
"//br/pkg/streamhelper/config",
1414
"//parser/terror",
1515
"//util/logutil",
16+
"//util/tiflashcompute",
1617
"//util/tikvutil",
1718
"//util/versioninfo",
1819
"@com_github_burntsushi_toml//:toml",

config/config.go

+44-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
logbackupconf "github.com/pingcap/tidb/br/pkg/streamhelper/config"
3636
"github.com/pingcap/tidb/parser/terror"
3737
"github.com/pingcap/tidb/util/logutil"
38+
"github.com/pingcap/tidb/util/tiflashcompute"
3839
"github.com/pingcap/tidb/util/tikvutil"
3940
"github.com/pingcap/tidb/util/versioninfo"
4041
tikvcfg "github.com/tikv/client-go/v2/config"
@@ -287,7 +288,14 @@ type Config struct {
287288
Plugin Plugin `toml:"plugin" json:"plugin"`
288289
MaxServerConnections uint32 `toml:"max-server-connections" json:"max-server-connections"`
289290
RunDDL bool `toml:"run-ddl" json:"run-ddl"`
290-
DisaggregatedTiFlash bool `toml:"disaggregated-tiflash" json:"disaggregated-tiflash"`
291+
292+
// These configs are related to disaggregated-tiflash mode.
293+
DisaggregatedTiFlash bool `toml:"disaggregated-tiflash" json:"disaggregated-tiflash"`
294+
TiFlashComputeAutoScalerType string `toml:"autoscaler-type" json:"autoscaler-type"`
295+
TiFlashComputeAutoScalerAddr string `toml:"autoscaler-addr" json:"autoscaler-addr"`
296+
IsTiFlashComputeFixedPool bool `toml:"is-tiflashcompute-fixed-pool" json:"is-tiflashcompute-fixed-pool"`
297+
AutoScalerClusterID string `toml:"autoscaler-cluster-id" json:"autoscaler-cluster-id"`
298+
291299
// TiDBMaxReuseChunk indicates max cached chunk num
292300
TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"`
293301
// TiDBMaxReuseColumn indicates max cached column num
@@ -1000,6 +1008,10 @@ var defaultConf = Config{
10001008
EnableGlobalKill: true,
10011009
TrxSummary: DefaultTrxSummary(),
10021010
DisaggregatedTiFlash: false,
1011+
TiFlashComputeAutoScalerType: tiflashcompute.DefASStr,
1012+
TiFlashComputeAutoScalerAddr: tiflashcompute.DefAWSAutoScalerAddr,
1013+
IsTiFlashComputeFixedPool: false,
1014+
AutoScalerClusterID: "",
10031015
TiDBMaxReuseChunk: 64,
10041016
TiDBMaxReuseColumn: 256,
10051017
TiDBEnableExitCheck: false,
@@ -1031,6 +1043,26 @@ func StoreGlobalConfig(config *Config) {
10311043
tikvcfg.StoreGlobalConfig(&cfg)
10321044
}
10331045

1046+
// GetAutoScalerClusterID returns KeyspaceName or AutoScalerClusterID.
1047+
func GetAutoScalerClusterID() (string, error) {
1048+
c := GetGlobalConfig()
1049+
keyspaceName := c.KeyspaceName
1050+
clusterID := c.AutoScalerClusterID
1051+
1052+
if keyspaceName != "" && clusterID != "" {
1053+
return "", errors.Errorf("config.KeyspaceName(%s) and config.AutoScalerClusterID(%s) are not empty both", keyspaceName, clusterID)
1054+
}
1055+
if keyspaceName == "" && clusterID == "" {
1056+
return "", errors.Errorf("config.KeyspaceName and config.AutoScalerClusterID are both empty")
1057+
}
1058+
1059+
res := keyspaceName
1060+
if res == "" {
1061+
res = clusterID
1062+
}
1063+
return res, nil
1064+
}
1065+
10341066
// removedConfig contains items that are no longer supported.
10351067
// they might still be in the config struct to support import,
10361068
// but are not actively used.
@@ -1315,6 +1347,17 @@ func (c *Config) Valid() error {
13151347
return fmt.Errorf("stats-load-queue-size should be [%d, %d]", DefStatsLoadQueueSizeLimit, DefMaxOfStatsLoadQueueSizeLimit)
13161348
}
13171349

1350+
// Check tiflash_compute topo fetch is valid.
1351+
if c.DisaggregatedTiFlash {
1352+
if !tiflashcompute.IsValidAutoScalerConfig(c.TiFlashComputeAutoScalerType) {
1353+
return fmt.Errorf("invalid AutoScaler type, expect %s, %s or %s, got %s",
1354+
tiflashcompute.MockASStr, tiflashcompute.AWSASStr, tiflashcompute.GCPASStr, c.TiFlashComputeAutoScalerType)
1355+
}
1356+
if c.TiFlashComputeAutoScalerAddr == "" {
1357+
return fmt.Errorf("autoscaler-addr cannot be empty when disaggregated-tiflash mode is true")
1358+
}
1359+
}
1360+
13181361
// test log level
13191362
l := zap.NewAtomicLevel()
13201363
return l.UnmarshalText([]byte(c.Log.Level))

config/config.toml.example

+15
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,21 @@ enable-enum-length-limit = true
114114
# command can be forwarded to the right TiDB instance to execute.
115115
enable-global-kill = true
116116

117+
# disaggregated-tiflash indicates whether TiDB is in disaggregated tiflash mode, if true, MPP will runs on tiflash_compute nodes.
118+
disaggregated-tiflash = false
119+
120+
# autoscaler-type indicates which type of AutoScaler will be used. Possible values are: mock, aws, gcp.
121+
# Only meaningful when disaggregated-tiflash is true.
122+
autoscaler-type = "aws"
123+
124+
# autoscaler-addr is the host of AutoScaler, Only meaningful when disaggregated-tiflash is true.
125+
# Only meaningful when disaggregated-tiflash is true.
126+
autoscaler-addr = "tiflash-autoscale-lb.tiflash-autoscale.svc.cluster.local:8081"
127+
128+
# autoscaler-cluster-id is the unique id for each TiDB cluster, which will used by AutoScaler.
129+
# Only meaningful when disaggregated-tiflash is true.
130+
autoscaler-cluster-id = ""
131+
117132
[log]
118133
# Log level: debug, info, warn, error, fatal.
119134
level = "info"

executor/admin.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,7 @@ func (e *CleanupIndexExec) getIdxColTypes() []*types.FieldType {
551551
}
552552
e.idxColFieldTypes = make([]*types.FieldType, 0, len(e.columns))
553553
for _, col := range e.columns {
554-
e.idxColFieldTypes = append(e.idxColFieldTypes, &col.FieldType)
554+
e.idxColFieldTypes = append(e.idxColFieldTypes, col.FieldType.ArrayType())
555555
}
556556
return e.idxColFieldTypes
557557
}

executor/admin_test.go

+45
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,51 @@ func TestAdminRecoverIndex(t *testing.T) {
304304
tk.MustExec("admin check table admin_test")
305305
}
306306

307+
func TestAdminCleanupMVIndex(t *testing.T) {
308+
store, domain := testkit.CreateMockStoreAndDomain(t)
309+
310+
tk := testkit.NewTestKit(t, store)
311+
tk.MustExec("use test")
312+
tk.MustExec("drop table if exists t")
313+
tk.MustExec("create table t(pk int primary key, a json, index idx((cast(a as signed array))))")
314+
tk.MustExec("insert into t values (0, '[0,1,2]')")
315+
tk.MustExec("insert into t values (1, '[1,2,3]')")
316+
tk.MustExec("insert into t values (2, '[2,3,4]')")
317+
tk.MustExec("insert into t values (3, '[3,4,5]')")
318+
tk.MustExec("insert into t values (4, '[4,5,6]')")
319+
tk.MustExec("admin check table t")
320+
321+
// Make some corrupted index. Build the index information.
322+
ctx := mock.NewContext()
323+
ctx.Store = store
324+
is := domain.InfoSchema()
325+
dbName := model.NewCIStr("test")
326+
tblName := model.NewCIStr("t")
327+
tbl, err := is.TableByName(dbName, tblName)
328+
require.NoError(t, err)
329+
tblInfo := tbl.Meta()
330+
idxInfo := tblInfo.Indices[0]
331+
tk.Session().GetSessionVars().IndexLookupSize = 3
332+
tk.Session().GetSessionVars().MaxChunkSize = 3
333+
334+
cpIdx := idxInfo.Clone()
335+
cpIdx.MVIndex = false
336+
indexOpr := tables.NewIndex(tblInfo.ID, tblInfo, cpIdx)
337+
338+
txn, err := store.Begin()
339+
require.NoError(t, err)
340+
_, err = indexOpr.Create(ctx, txn, types.MakeDatums(9), kv.IntHandle(9), nil)
341+
require.NoError(t, err)
342+
err = txn.Commit(context.Background())
343+
require.NoError(t, err)
344+
err = tk.ExecToErr("admin check table t")
345+
require.Error(t, err)
346+
347+
r := tk.MustQuery("admin cleanup index t idx")
348+
r.Check(testkit.Rows("1"))
349+
tk.MustExec("admin check table t")
350+
}
351+
307352
func TestClusteredIndexAdminRecoverIndex(t *testing.T) {
308353
store, domain := testkit.CreateMockStoreAndDomain(t)
309354

executor/tiflashtest/BUILD.bazel

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ go_test(
2222
"//testkit",
2323
"//testkit/external",
2424
"//util/israce",
25+
"//util/tiflashcompute",
2526
"@com_github_pingcap_errors//:errors",
2627
"@com_github_pingcap_failpoint//:failpoint",
2728
"@com_github_pingcap_kvproto//pkg/metapb",

executor/tiflashtest/tiflash_test.go

+15-6
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/pingcap/tidb/testkit"
3838
"github.com/pingcap/tidb/testkit/external"
3939
"github.com/pingcap/tidb/util/israce"
40+
"github.com/pingcap/tidb/util/tiflashcompute"
4041
"github.com/stretchr/testify/require"
4142
"github.com/tikv/client-go/v2/testutils"
4243
)
@@ -1258,24 +1259,32 @@ func TestDisaggregatedTiFlash(t *testing.T) {
12581259
config.UpdateGlobal(func(conf *config.Config) {
12591260
conf.DisaggregatedTiFlash = true
12601261
})
1262+
defer config.UpdateGlobal(func(conf *config.Config) {
1263+
conf.DisaggregatedTiFlash = false
1264+
})
1265+
err := tiflashcompute.InitGlobalTopoFetcher(tiflashcompute.TestASStr, "", "", false)
1266+
require.NoError(t, err)
1267+
12611268
store := testkit.CreateMockStore(t, withMockTiFlash(2))
12621269
tk := testkit.NewTestKit(t, store)
12631270
tk.MustExec("use test")
12641271
tk.MustExec("drop table if exists t")
12651272
tk.MustExec("create table t(c1 int)")
12661273
tk.MustExec("alter table t set tiflash replica 1")
12671274
tb := external.GetTableByName(t, tk, "test", "t")
1268-
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
1275+
err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
12691276
require.NoError(t, err)
12701277
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
12711278

12721279
err = tk.ExecToErr("select * from t;")
1273-
require.Contains(t, err.Error(), "tiflash_compute node is unavailable")
1280+
// Expect error, because TestAutoScaler return empty topo.
1281+
require.Contains(t, err.Error(), "Cannot find proper topo from AutoScaler")
12741282

1275-
config.UpdateGlobal(func(conf *config.Config) {
1276-
conf.DisaggregatedTiFlash = false
1277-
})
1278-
tk.MustQuery("select * from t;").Check(testkit.Rows())
1283+
err = tiflashcompute.InitGlobalTopoFetcher(tiflashcompute.AWSASStr, "", "", false)
1284+
require.NoError(t, err)
1285+
err = tk.ExecToErr("select * from t;")
1286+
// Expect error, because AWSAutoScaler is not setup, so http request will fail.
1287+
require.Contains(t, err.Error(), "[util:1815]Internal : get tiflash_compute topology failed")
12791288
}
12801289

12811290
func TestDisaggregatedTiFlashQuery(t *testing.T) {

planner/core/fragment.go

+2
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ type mppAddr struct {
136136
addr string
137137
}
138138

139+
var _ kv.MPPTaskMeta = &mppAddr{}
140+
139141
func (m *mppAddr) GetAddress() string {
140142
return m.addr
141143
}

session/session.go

-8
Original file line numberDiff line numberDiff line change
@@ -3339,14 +3339,6 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
33393339
return nil, err
33403340
}
33413341

3342-
if config.GetGlobalConfig().DisaggregatedTiFlash {
3343-
// Invalid client-go tiflash_compute store cache if necessary.
3344-
err = dom.WatchTiFlashComputeNodeChange()
3345-
if err != nil {
3346-
return nil, err
3347-
}
3348-
}
3349-
33503342
if err = extensionimpl.Bootstrap(context.Background(), dom); err != nil {
33513343
return nil, err
33523344
}

store/copr/BUILD.bazel

+4
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ go_library(
3131
"//util/mathutil",
3232
"//util/memory",
3333
"//util/paging",
34+
"//util/tiflashcompute",
3435
"//util/trxevents",
3536
"@com_github_dgraph_io_ristretto//:ristretto",
3637
"@com_github_gogo_protobuf//proto",
@@ -44,6 +45,7 @@ go_library(
4445
"@com_github_pingcap_kvproto//pkg/mpp",
4546
"@com_github_pingcap_log//:log",
4647
"@com_github_pingcap_tipb//go-tipb",
48+
"@com_github_stathat_consistent//:consistent",
4749
"@com_github_tikv_client_go_v2//config",
4850
"@com_github_tikv_client_go_v2//error",
4951
"@com_github_tikv_client_go_v2//metrics",
@@ -77,6 +79,7 @@ go_test(
7779
"//kv",
7880
"//store/driver/backoff",
7981
"//testkit/testsetup",
82+
"//util/logutil",
8083
"//util/paging",
8184
"//util/trxevents",
8285
"@com_github_pingcap_errors//:errors",
@@ -89,5 +92,6 @@ go_test(
8992
"@com_github_tikv_client_go_v2//tikv",
9093
"@com_github_tikv_client_go_v2//tikvrpc",
9194
"@org_uber_go_goleak//:goleak",
95+
"@org_uber_go_zap//:zap",
9296
],
9397
)

0 commit comments

Comments
 (0)