diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 32f32bd6087..deee3b17a3a 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -196,19 +196,9 @@ func EtcdKVPutWithTTL(ctx context.Context, c *clientv3.Client, key string, value return kv.Put(ctx, key, value, clientv3.WithLease(grantResp.ID)) } -// CreateClientsWithMultiEndpoint creates etcd v3 client and http client. -func CreateClientsWithMultiEndpoint(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, *http.Client, error) { - client, err := createEtcdClientWithMultiEndpoint(tlsConfig, acUrls) - if err != nil { - return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() - } - httpClient := createHTTPClient(tlsConfig) - return client, httpClient, nil -} - // CreateClients creates etcd v3 client and http client. func CreateClients(tlsConfig *tls.Config, acUrls url.URL) (*clientv3.Client, *http.Client, error) { - client, err := createEtcdClient(tlsConfig, acUrls) + client, err := CreateEtcdClient(tlsConfig, acUrls) if err != nil { return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() } @@ -255,9 +245,9 @@ func createEtcdClientWithMultiEndpoint(tlsConfig *tls.Config, acUrls []url.URL) return client, err } -// createEtcdClient creates etcd v3 client. +// CreateEtcdClient creates etcd v3 client. // Note: it will be used by legacy pd-server, and only connect to leader only. -func createEtcdClient(tlsConfig *tls.Config, acURL url.URL) (*clientv3.Client, error) { +func CreateEtcdClient(tlsConfig *tls.Config, acURL url.URL) (*clientv3.Client, error) { lgc := zap.NewProductionConfig() lgc.Encoding = log.ZapEncodingName client, err := clientv3.New(clientv3.Config{ diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 6bf63db79c9..4b7e2d9957b 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -305,12 +305,12 @@ func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) { // Create two etcd clients with etcd1 as endpoint. urls, err := types.NewURLs([]string{ep1}) re.NoError(err) - client1, err := createEtcdClient(nil, urls[0]) // execute member change operation with this client + client1, err := CreateEtcdClient(nil, urls[0]) // execute member change operation with this client defer func() { client1.Close() }() re.NoError(err) - client2, err := createEtcdClient(nil, urls[0]) // check member change with this client + client2, err := CreateEtcdClient(nil, urls[0]) // check member change with this client defer func() { client2.Close() }() @@ -482,7 +482,7 @@ func (suite *loopWatcherTestSuite) SetupSuite() { ep1 := suite.config.LCUrls[0].String() urls, err := types.NewURLs([]string{ep1}) suite.NoError(err) - suite.client, err = createEtcdClient(nil, urls[0]) + suite.client, err = CreateEtcdClient(nil, urls[0]) suite.NoError(err) suite.cleans = append(suite.cleans, func() { suite.client.Close() @@ -685,7 +685,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { // Case2: close the etcd client and put a new value after watcher restarts suite.client.Close() - suite.client, err = createEtcdClient(nil, suite.config.LCUrls[0]) + suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0]) suite.NoError(err) watcher.updateClientCh <- suite.client suite.put("TestWatcherBreak", "2") @@ -693,7 +693,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { // Case3: close the etcd client and put a new value before watcher restarts suite.client.Close() - suite.client, err = createEtcdClient(nil, suite.config.LCUrls[0]) + suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0]) suite.NoError(err) suite.put("TestWatcherBreak", "3") watcher.updateClientCh <- suite.client @@ -701,7 +701,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { // Case4: close the etcd client and put a new value with compact suite.client.Close() - suite.client, err = createEtcdClient(nil, suite.config.LCUrls[0]) + suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0]) suite.NoError(err) suite.put("TestWatcherBreak", "4") resp, err := EtcdKVGet(suite.client, "TestWatcherBreak") diff --git a/server/server.go b/server/server.go index e61f3c6ad58..4e49faa6b24 100644 --- a/server/server.go +++ b/server/server.go @@ -145,6 +145,8 @@ type Server struct { member *member.EmbeddedEtcdMember // etcd client client *clientv3.Client + // electionClient is used for leader election. + electionClient *clientv3.Client // http client httpClient *http.Client clusterID uint64 // pd cluster id. @@ -335,6 +337,11 @@ func (s *Server) startEtcd(ctx context.Context) error { return err } + s.electionClient, err = startElectionClient(s.cfg) + if err != nil { + return err + } + // update advertise peer urls. etcdMembers, err := etcdutil.ListEtcdMembers(s.client) if err != nil { @@ -353,7 +360,7 @@ func (s *Server) startEtcd(ctx context.Context) error { failpoint.Inject("memberNil", func() { time.Sleep(1500 * time.Millisecond) }) - s.member = member.NewMember(etcd, s.client, etcdServerID) + s.member = member.NewMember(etcd, s.electionClient, etcdServerID) return nil } @@ -369,6 +376,19 @@ func startClient(cfg *config.Config) (*clientv3.Client, *http.Client, error) { return etcdutil.CreateClients(tlsConfig, etcdCfg.ACUrls[0]) } +func startElectionClient(cfg *config.Config) (*clientv3.Client, error) { + tlsConfig, err := cfg.Security.ToTLSConfig() + if err != nil { + return nil, err + } + etcdCfg, err := cfg.GenEmbedEtcdConfig() + if err != nil { + return nil, err + } + + return etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.ACUrls[0]) +} + // AddStartCallback adds a callback in the startServer phase. func (s *Server) AddStartCallback(callbacks ...func()) { s.startCallbacks = append(s.startCallbacks, callbacks...) @@ -484,6 +504,11 @@ func (s *Server) Close() { log.Error("close etcd client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err)) } } + if s.electionClient != nil { + if err := s.electionClient.Close(); err != nil { + log.Error("close election client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err)) + } + } if s.httpClient != nil { s.httpClient.CloseIdleConnections()