diff --git a/client/client.go b/client/client.go index 4123c8168e6..b3885c53791 100644 --- a/client/client.go +++ b/client/client.go @@ -314,8 +314,6 @@ func NewClientWithContext(ctx context.Context, svrAddrs []string, security Secur // NewClientWithKeyspace creates a client with context and the specified keyspace id. func NewClientWithKeyspace(ctx context.Context, keyspaceID uint32, svrAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) { - log.Info("[pd] create pd client with endpoints and keyspace", zap.Strings("pd-address", svrAddrs), zap.Uint32("keyspace-id", keyspaceID)) - tlsCfg := &tlsutil.TLSConfig{ CAPath: security.CAPath, CertPath: security.CertPath, @@ -351,6 +349,49 @@ func NewClientWithKeyspace(ctx context.Context, keyspaceID uint32, svrAddrs []st return c, nil } +// NewClientWithKeyspaceName creates a client with context and the specified keyspace name. +func NewClientWithKeyspaceName(ctx context.Context, keyspace string, svrAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) { + log.Info("[pd] create pd client with endpoints and keyspace", zap.Strings("pd-address", svrAddrs), zap.String("keyspace", keyspace)) + + tlsCfg := &tlsutil.TLSConfig{ + CAPath: security.CAPath, + CertPath: security.CertPath, + KeyPath: security.KeyPath, + + SSLCABytes: security.SSLCABytes, + SSLCertBytes: security.SSLCertBytes, + SSLKEYBytes: security.SSLKEYBytes, + } + + clientCtx, clientCancel := context.WithCancel(ctx) + c := &client{ + updateTokenConnectionCh: make(chan struct{}, 1), + ctx: clientCtx, + cancel: clientCancel, + svrUrls: addrsToUrls(svrAddrs), + tlsCfg: tlsCfg, + option: newOption(), + } + + // Inject the client options. + for _, opt := range opts { + opt(c) + } + + c.pdSvcDiscovery = newPDServiceDiscovery(clientCtx, clientCancel, &c.wg, c.setServiceMode, c.svrUrls, c.tlsCfg, c.option) + if err := c.setup(); err != nil { + c.cancel() + return nil, err + } + keyspaceMeta, err := c.LoadKeyspace(context.TODO(), keyspace) + // Here we ignore ENTRY_NOT_FOUND error and it will set the keyspaceID to 0. + if err != nil && !strings.Contains(err.Error(), "ENTRY_NOT_FOUND") { + return nil, err + } + c.keyspaceID = keyspaceMeta.GetId() + return c, nil +} + func (c *client) setup() error { // Init the client base. if err := c.pdSvcDiscovery.Init(); err != nil { diff --git a/server/keyspace_service.go b/server/keyspace_service.go index 64e646119ed..0ecfe45c1d7 100644 --- a/server/keyspace_service.go +++ b/server/keyspace_service.go @@ -55,10 +55,6 @@ func (s *KeyspaceServer) LoadKeyspace(_ context.Context, request *keyspacepb.Loa if err := s.validateRequest(request.GetHeader()); err != nil { return nil, err } - rc := s.GetRaftCluster() - if rc == nil { - return &keyspacepb.LoadKeyspaceResponse{Header: s.notBootstrappedHeader()}, nil - } manager := s.GetKeyspaceManager() meta, err := manager.LoadKeyspace(request.GetName()) @@ -77,10 +73,6 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques if err := s.validateRequest(request.GetHeader()); err != nil { return err } - rc := s.GetRaftCluster() - if rc == nil { - return stream.Send(&keyspacepb.WatchKeyspacesResponse{Header: s.notBootstrappedHeader()}) - } ctx, cancel := context.WithCancel(s.Context()) defer cancel() @@ -161,10 +153,7 @@ func (s *KeyspaceServer) UpdateKeyspaceState(_ context.Context, request *keyspac if err := s.validateRequest(request.GetHeader()); err != nil { return nil, err } - rc := s.GetRaftCluster() - if rc == nil { - return &keyspacepb.UpdateKeyspaceStateResponse{Header: s.notBootstrappedHeader()}, nil - } + manager := s.GetKeyspaceManager() meta, err := manager.UpdateKeyspaceStateByID(request.GetId(), request.GetState(), time.Now().Unix()) if err != nil { diff --git a/tests/integrations/mcs/testutil.go b/tests/integrations/mcs/testutil.go index da23d2fe7f2..f9c47aa56f0 100644 --- a/tests/integrations/mcs/testutil.go +++ b/tests/integrations/mcs/testutil.go @@ -27,7 +27,6 @@ import ( bs "github.com/tikv/pd/pkg/basicserver" rm "github.com/tikv/pd/pkg/mcs/resource_manager/server" tso "github.com/tikv/pd/pkg/mcs/tso/server" - "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/testutil" ) @@ -51,14 +50,7 @@ func InitLogger(cfg *tso.Config) (err error) { // SetupClientWithKeyspace creates a TSO client for test. func SetupClientWithKeyspace(ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption) pd.Client { - cli, err := pd.NewClientWithKeyspace(ctx, utils.DefaultKeyspaceID, endpoints, pd.SecurityOption{}, opts...) - re.NoError(err) - return cli -} - -// SetupClient creates a TSO client for test. -func SetupClient(ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption) pd.Client { - cli, err := pd.NewClientWithContext(ctx, endpoints, pd.SecurityOption{}, opts...) + cli, err := pd.NewClientWithKeyspaceName(ctx, "", endpoints, pd.SecurityOption{}, opts...) re.NoError(err) return cli } diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index 3537e13fd1b..da23ab1d1eb 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -174,8 +174,8 @@ func checkTSOPath(re *require.Assertions, isAPIServiceMode bool) { re.NoError(err) leaderName := cluster.WaitLeader() pdLeader := cluster.GetServer(leaderName) - backendEndpoints := pdLeader.GetAddr() re.NoError(pdLeader.BootstrapCluster()) + backendEndpoints := pdLeader.GetAddr() client := pdLeader.GetEtcdClient() if isAPIServiceMode { re.Equal(0, getEtcdTimestampKeyNum(re, client))