Skip to content

Commit d6ca02a

Browse files
authored
Merge branch 'master' into switcher_autoscaler
2 parents cd657f8 + d0d321f commit d6ca02a

File tree

10 files changed

+219
-66
lines changed

10 files changed

+219
-66
lines changed

ddl/attributes_sql_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ PARTITION BY RANGE (c) (
269269
func TestFlashbackTable(t *testing.T) {
270270
store, dom := testkit.CreateMockStoreAndDomain(t)
271271

272-
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
272+
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
273273
require.NoError(t, err)
274274
tk := testkit.NewTestKit(t, store)
275275
tk.MustExec("use test")
@@ -327,7 +327,7 @@ PARTITION BY RANGE (c) (
327327
func TestDropTable(t *testing.T) {
328328
store, dom := testkit.CreateMockStoreAndDomain(t)
329329

330-
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
330+
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
331331
require.NoError(t, err)
332332
tk := testkit.NewTestKit(t, store)
333333
tk.MustExec("use test")
@@ -380,7 +380,7 @@ PARTITION BY RANGE (c) (
380380
func TestCreateWithSameName(t *testing.T) {
381381
store, dom := testkit.CreateMockStoreAndDomain(t)
382382

383-
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
383+
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
384384
require.NoError(t, err)
385385
tk := testkit.NewTestKit(t, store)
386386
tk.MustExec("use test")
@@ -444,7 +444,7 @@ PARTITION BY RANGE (c) (
444444
func TestPartition(t *testing.T) {
445445
store, dom := testkit.CreateMockStoreAndDomain(t)
446446

447-
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
447+
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
448448
require.NoError(t, err)
449449
tk := testkit.NewTestKit(t, store)
450450
tk.MustExec("use test")
@@ -504,7 +504,7 @@ PARTITION BY RANGE (c) (
504504
func TestDropSchema(t *testing.T) {
505505
store, dom := testkit.CreateMockStoreAndDomain(t)
506506

507-
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
507+
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
508508
require.NoError(t, err)
509509
tk := testkit.NewTestKit(t, store)
510510
tk.MustExec("use test")
@@ -530,7 +530,7 @@ PARTITION BY RANGE (c) (
530530
func TestDefaultKeyword(t *testing.T) {
531531
store, dom := testkit.CreateMockStoreAndDomain(t)
532532

533-
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
533+
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
534534
require.NoError(t, err)
535535
tk := testkit.NewTestKit(t, store)
536536
tk.MustExec("use test")

ddl/main_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func TestMain(m *testing.M) {
5252
conf.Experimental.AllowsExpressionIndex = true
5353
})
5454

55-
_, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, true)
55+
_, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true)
5656
if err != nil {
5757
_, _ = fmt.Fprintf(os.Stderr, "ddl: infosync.GlobalInfoSyncerInit: %v\n", err)
5858
os.Exit(1)

domain/db_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func TestNormalSessionPool(t *testing.T) {
7373
domain, err := session.BootstrapSession(store)
7474
require.NoError(t, err)
7575
defer domain.Close()
76-
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, true)
76+
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true)
7777
require.NoError(t, err1)
7878
conf := config.GetGlobalConfig()
7979
conf.Socket = ""
@@ -107,7 +107,7 @@ func TestAbnormalSessionPool(t *testing.T) {
107107
domain, err := session.BootstrapSession(store)
108108
require.NoError(t, err)
109109
defer domain.Close()
110-
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, true)
110+
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true)
111111
require.NoError(t, err1)
112112
conf := config.GetGlobalConfig()
113113
conf.Socket = ""

domain/domain.go

+53-34
Original file line numberDiff line numberDiff line change
@@ -98,20 +98,26 @@ func NewMockDomain() *Domain {
9898
// Domain represents a storage space. Different domains can use the same database name.
9999
// Multiple domains can be used in parallel without synchronization.
100100
type Domain struct {
101-
store kv.Storage
102-
infoCache *infoschema.InfoCache
103-
privHandle *privileges.Handle
104-
bindHandle atomic.Pointer[bindinfo.BindHandle]
105-
statsHandle unsafe.Pointer
106-
statsLease time.Duration
107-
ddl ddl.DDL
108-
info *infosync.InfoSyncer
109-
globalCfgSyncer *globalconfigsync.GlobalConfigSyncer
110-
m sync.Mutex
111-
SchemaValidator SchemaValidator
112-
sysSessionPool *sessionPool
113-
exit chan struct{}
114-
etcdClient *clientv3.Client
101+
store kv.Storage
102+
infoCache *infoschema.InfoCache
103+
privHandle *privileges.Handle
104+
bindHandle atomic.Pointer[bindinfo.BindHandle]
105+
statsHandle unsafe.Pointer
106+
statsLease time.Duration
107+
ddl ddl.DDL
108+
info *infosync.InfoSyncer
109+
globalCfgSyncer *globalconfigsync.GlobalConfigSyncer
110+
m sync.Mutex
111+
SchemaValidator SchemaValidator
112+
sysSessionPool *sessionPool
113+
exit chan struct{}
114+
// `etcdClient` must be used when keyspace is not set, or when the logic to each etcd path needs to be separated by keyspace.
115+
etcdClient *clientv3.Client
116+
// `unprefixedEtcdCli` will never set the etcd namespace prefix by keyspace.
117+
// It is only used in storeMinStartTS and RemoveMinStartTS now.
118+
// It must be used when the etcd path isn't needed to separate by keyspace.
119+
// See keyspace RFC: https://github.com/pingcap/tidb/pull/39685
120+
unprefixedEtcdCli *clientv3.Client
115121
sysVarCache sysVarCache // replaces GlobalVariableCache
116122
slowQuery *topNSlowQueries
117123
expensiveQueryHandle *expensivequery.Handle
@@ -881,6 +887,10 @@ func (do *Domain) Close() {
881887
terror.Log(errors.Trace(do.etcdClient.Close()))
882888
}
883889

890+
if do.unprefixedEtcdCli != nil {
891+
terror.Log(errors.Trace(do.unprefixedEtcdCli.Close()))
892+
}
893+
884894
do.slowQuery.Close()
885895
if do.cancel != nil {
886896
do.cancel()
@@ -927,6 +937,27 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
927937

928938
const serverIDForStandalone = 1 // serverID for standalone deployment.
929939

940+
func newEtcdCli(addrs []string, ebd kv.EtcdBackend) (*clientv3.Client, error) {
941+
cfg := config.GetGlobalConfig()
942+
etcdLogCfg := zap.NewProductionConfig()
943+
etcdLogCfg.Level = zap.NewAtomicLevelAt(zap.ErrorLevel)
944+
cli, err := clientv3.New(clientv3.Config{
945+
LogConfig: &etcdLogCfg,
946+
Endpoints: addrs,
947+
AutoSyncInterval: 30 * time.Second,
948+
DialTimeout: 5 * time.Second,
949+
DialOptions: []grpc.DialOption{
950+
grpc.WithBackoffMaxDelay(time.Second * 3),
951+
grpc.WithKeepaliveParams(keepalive.ClientParameters{
952+
Time: time.Duration(cfg.TiKVClient.GrpcKeepAliveTime) * time.Second,
953+
Timeout: time.Duration(cfg.TiKVClient.GrpcKeepAliveTimeout) * time.Second,
954+
}),
955+
},
956+
TLS: ebd.TLSConfig(),
957+
})
958+
return cli, err
959+
}
960+
930961
// Init initializes a domain.
931962
func (do *Domain) Init(
932963
ddlLease time.Duration,
@@ -942,32 +973,20 @@ func (do *Domain) Init(
942973
return err
943974
}
944975
if addrs != nil {
945-
cfg := config.GetGlobalConfig()
946-
// silence etcd warn log, when domain closed, it won't randomly print warn log
947-
// see details at the issue https://github.com/pingcap/tidb/issues/15479
948-
etcdLogCfg := zap.NewProductionConfig()
949-
etcdLogCfg.Level = zap.NewAtomicLevelAt(zap.ErrorLevel)
950-
cli, err := clientv3.New(clientv3.Config{
951-
LogConfig: &etcdLogCfg,
952-
Endpoints: addrs,
953-
AutoSyncInterval: 30 * time.Second,
954-
DialTimeout: 5 * time.Second,
955-
DialOptions: []grpc.DialOption{
956-
grpc.WithBackoffMaxDelay(time.Second * 3),
957-
grpc.WithKeepaliveParams(keepalive.ClientParameters{
958-
Time: time.Duration(cfg.TiKVClient.GrpcKeepAliveTime) * time.Second,
959-
Timeout: time.Duration(cfg.TiKVClient.GrpcKeepAliveTimeout) * time.Second,
960-
}),
961-
},
962-
TLS: ebd.TLSConfig(),
963-
})
976+
cli, err := newEtcdCli(addrs, ebd)
964977
if err != nil {
965978
return errors.Trace(err)
966979
}
967980

968981
etcd.SetEtcdCliByNamespace(cli, keyspace.MakeKeyspaceEtcdNamespace(do.store.GetCodec()))
969982

970983
do.etcdClient = cli
984+
985+
unprefixedEtcdCli, err := newEtcdCli(addrs, ebd)
986+
if err != nil {
987+
return errors.Trace(err)
988+
}
989+
do.unprefixedEtcdCli = unprefixedEtcdCli
971990
}
972991
}
973992

@@ -1029,7 +1048,7 @@ func (do *Domain) Init(
10291048

10301049
// step 1: prepare the info/schema syncer which domain reload needed.
10311050
skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard
1032-
do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, skipRegisterToDashboard)
1051+
do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, do.unprefixedEtcdCli, skipRegisterToDashboard)
10331052
if err != nil {
10341053
return err
10351054
}

domain/infosync/info.go

+22-15
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,18 @@ var ErrPrometheusAddrIsNotSet = dbterror.ClassDomain.NewStd(errno.ErrPrometheusA
9595

9696
// InfoSyncer stores server info to etcd when the tidb-server starts and delete when tidb-server shuts down.
9797
type InfoSyncer struct {
98-
etcdCli *clientv3.Client
99-
info *ServerInfo
100-
serverInfoPath string
101-
minStartTS uint64
102-
minStartTSPath string
103-
managerMu struct {
98+
// `etcdClient` must be used when keyspace is not set, or when the logic to each etcd path needs to be separated by keyspace.
99+
etcdCli *clientv3.Client
100+
// `unprefixedEtcdCli` will never set the etcd namespace prefix by keyspace.
101+
// It is only used in storeMinStartTS and RemoveMinStartTS now.
102+
// It must be used when the etcd path isn't needed to separate by keyspace.
103+
// See keyspace RFC: https://github.com/pingcap/tidb/pull/39685
104+
unprefixedEtcdCli *clientv3.Client
105+
info *ServerInfo
106+
serverInfoPath string
107+
minStartTS uint64
108+
minStartTSPath string
109+
managerMu struct {
104110
mu sync.RWMutex
105111
util2.SessionManager
106112
}
@@ -180,12 +186,13 @@ func setGlobalInfoSyncer(is *InfoSyncer) {
180186
}
181187

182188
// GlobalInfoSyncerInit return a new InfoSyncer. It is exported for testing.
183-
func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func() uint64, etcdCli *clientv3.Client, skipRegisterToDashBoard bool) (*InfoSyncer, error) {
189+
func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func() uint64, etcdCli *clientv3.Client, unprefixedEtcdCli *clientv3.Client, skipRegisterToDashBoard bool) (*InfoSyncer, error) {
184190
is := &InfoSyncer{
185-
etcdCli: etcdCli,
186-
info: getServerInfo(id, serverIDGetter),
187-
serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id),
188-
minStartTSPath: fmt.Sprintf("%s/%s", ServerMinStartTSPath, id),
191+
etcdCli: etcdCli,
192+
unprefixedEtcdCli: unprefixedEtcdCli,
193+
info: getServerInfo(id, serverIDGetter),
194+
serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id),
195+
minStartTSPath: fmt.Sprintf("%s/%s", ServerMinStartTSPath, id),
189196
}
190197
err := is.init(ctx, skipRegisterToDashBoard)
191198
if err != nil {
@@ -721,20 +728,20 @@ func (is *InfoSyncer) GetMinStartTS() uint64 {
721728

722729
// storeMinStartTS stores self server min start timestamp to etcd.
723730
func (is *InfoSyncer) storeMinStartTS(ctx context.Context) error {
724-
if is.etcdCli == nil {
731+
if is.unprefixedEtcdCli == nil {
725732
return nil
726733
}
727-
return util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, is.minStartTSPath,
734+
return util.PutKVToEtcd(ctx, is.unprefixedEtcdCli, keyOpDefaultRetryCnt, is.minStartTSPath,
728735
strconv.FormatUint(is.minStartTS, 10),
729736
clientv3.WithLease(is.session.Lease()))
730737
}
731738

732739
// RemoveMinStartTS removes self server min start timestamp from etcd.
733740
func (is *InfoSyncer) RemoveMinStartTS() {
734-
if is.etcdCli == nil {
741+
if is.unprefixedEtcdCli == nil {
735742
return
736743
}
737-
err := util.DeleteKeyFromEtcd(is.minStartTSPath, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout)
744+
err := util.DeleteKeyFromEtcd(is.minStartTSPath, is.unprefixedEtcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout)
738745
if err != nil {
739746
logutil.BgLogger().Error("remove minStartTS failed", zap.Error(err))
740747
}

domain/infosync/info_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func TestTopology(t *testing.T) {
6767
require.NoError(t, err)
6868
}()
6969

70-
info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, false)
70+
info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, false)
7171
require.NoError(t, err)
7272

7373
err = info.newTopologySessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt)
@@ -152,7 +152,7 @@ func (is *InfoSyncer) ttlKeyExists(ctx context.Context) (bool, error) {
152152
}
153153

154154
func TestPutBundlesRetry(t *testing.T) {
155-
_, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, false)
155+
_, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, false)
156156
require.NoError(t, err)
157157

158158
bundle, err := placement.NewBundleFromOptions(&model.PlacementSettings{PrimaryRegion: "r1", Regions: "r1,r2"})
@@ -216,7 +216,7 @@ func TestPutBundlesRetry(t *testing.T) {
216216

217217
func TestTiFlashManager(t *testing.T) {
218218
ctx := context.Background()
219-
_, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, false)
219+
_, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, false)
220220
tiflash := NewMockTiFlash()
221221
SetMockTiFlash(tiflash)
222222

expression/multi_valued_index_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ func TestMultiValuedIndexDDL(t *testing.T) {
4646
tk.MustExec("drop table t")
4747
tk.MustGetErrCode("CREATE TABLE t(x INT, KEY k ((1 AND CAST(JSON_ARRAY(x) AS UNSIGNED ARRAY))));", errno.ErrNotSupportedYet)
4848
tk.MustGetErrCode("CREATE TABLE t1 (f1 json, key mvi((cast(cast(f1 as unsigned array) as unsigned array))));", errno.ErrNotSupportedYet)
49+
tk.MustGetErrCode("CREATE TABLE t1 (f1 json, primary key mvi((cast(cast(f1 as unsigned array) as unsigned array))));", errno.ErrNotSupportedYet)
4950
tk.MustGetErrCode("CREATE TABLE t1 (f1 json, key mvi((cast(f1->>'$[*]' as unsigned array))));", errno.ErrInvalidTypeForJSON)
5051
tk.MustGetErrCode("CREATE TABLE t1 (f1 json, key mvi((cast(f1->'$[*]' as year array))));", errno.ErrNotSupportedYet)
5152
tk.MustGetErrCode("CREATE TABLE t1 (f1 json, key mvi((cast(f1->'$[*]' as json array))));", errno.ErrNotSupportedYet)
@@ -75,6 +76,9 @@ func TestMultiValuedIndexDDL(t *testing.T) {
7576
tk.MustExec("drop table t")
7677
tk.MustExec("set names gbk")
7778
tk.MustExec("create table t(a json, b int, index idx3(b, (cast(a as char(10) array))));")
79+
80+
tk.MustExec("CREATE TABLE users (id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, doc JSON);")
81+
tk.MustExecToErr("CREATE TABLE t (id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, doc JSON, FOREIGN KEY fk_user_id ((cast(doc->'$[*]' as signed array))) REFERENCES users(id));")
7882
}
7983

8084
func TestMultiValuedIndexDML(t *testing.T) {

server/stat_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func TestUptime(t *testing.T) {
4646
}()
4747
require.NoError(t, err)
4848

49-
_, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
49+
_, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
5050
require.NoError(t, err)
5151

5252
tidbdrv := NewTiDBDriver(store)

store/driver/tikv_driver.go

-4
Original file line numberDiff line numberDiff line change
@@ -196,10 +196,6 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (kv.Storage,
196196
if err != nil {
197197
return nil, errors.Trace(err)
198198
}
199-
// If there's setting keyspace-name, then skipped GC worker logic.
200-
// It needs a group of special tidb nodes to execute GC worker logic.
201-
// TODO: remove this restriction while merged keyspace GC worker logic.
202-
disableGC = true
203199
}
204200

205201
codec := pdClient.GetCodec()

0 commit comments

Comments
 (0)