From e1af99b0b22d3428a733fb4fa393b9e70ffa4d23 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 5 May 2023 14:28:53 +0800 Subject: [PATCH 1/2] use another etcd client for election Signed-off-by: Ryan Leung --- pkg/mcs/resource_manager/server/server.go | 2 +- pkg/mcs/tso/server/server.go | 2 +- pkg/utils/etcdutil/etcdutil.go | 20 +++++++------------- server/server.go | 17 ++++++++++++----- 4 files changed, 21 insertions(+), 20 deletions(-) diff --git a/pkg/mcs/resource_manager/server/server.go b/pkg/mcs/resource_manager/server/server.go index 5e89bda1120..d1dd5f476b8 100644 --- a/pkg/mcs/resource_manager/server/server.go +++ b/pkg/mcs/resource_manager/server/server.go @@ -264,7 +264,7 @@ func (s *Server) initClient() error { if err != nil { return err } - s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u)[0]) + s.etcdClient, _, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u)[0]) return err } diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 66a22d98c26..060b832f52f 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -380,7 +380,7 @@ func (s *Server) initClient() error { if err != nil { return err } - s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls[0]) + s.etcdClient, _, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls[0]) return err } diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 32f32bd6087..c7f1ee7cd8b 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -196,24 +196,18 @@ func EtcdKVPutWithTTL(ctx context.Context, c *clientv3.Client, key string, value return kv.Put(ctx, key, value, clientv3.WithLease(grantResp.ID)) } -// CreateClientsWithMultiEndpoint creates etcd v3 client and http client. -func CreateClientsWithMultiEndpoint(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, *http.Client, error) { - client, err := createEtcdClientWithMultiEndpoint(tlsConfig, acUrls) - if err != nil { - return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() - } - httpClient := createHTTPClient(tlsConfig) - return client, httpClient, nil -} - // CreateClients creates etcd v3 client and http client. -func CreateClients(tlsConfig *tls.Config, acUrls url.URL) (*clientv3.Client, *http.Client, error) { +func CreateClients(tlsConfig *tls.Config, acUrls url.URL) (*clientv3.Client, *clientv3.Client, *http.Client, error) { client, err := createEtcdClient(tlsConfig, acUrls) if err != nil { - return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() + return nil, nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() + } + electionClient, err := createEtcdClient(tlsConfig, acUrls) + if err != nil { + return nil, nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() } httpClient := createHTTPClient(tlsConfig) - return client, httpClient, nil + return electionClient, client, httpClient, nil } // createEtcdClientWithMultiEndpoint creates etcd v3 client. diff --git a/server/server.go b/server/server.go index e61f3c6ad58..1ff9f7ee1c4 100644 --- a/server/server.go +++ b/server/server.go @@ -145,6 +145,8 @@ type Server struct { member *member.EmbeddedEtcdMember // etcd client client *clientv3.Client + // electionClient is used for leader election. + electionClient *clientv3.Client // http client httpClient *http.Client clusterID uint64 // pd cluster id. @@ -330,7 +332,7 @@ func (s *Server) startEtcd(ctx context.Context) error { } // start client - s.client, s.httpClient, err = startClient(s.cfg) + s.client, s.electionClient, s.httpClient, err = startClient(s.cfg) if err != nil { return err } @@ -353,18 +355,18 @@ func (s *Server) startEtcd(ctx context.Context) error { failpoint.Inject("memberNil", func() { time.Sleep(1500 * time.Millisecond) }) - s.member = member.NewMember(etcd, s.client, etcdServerID) + s.member = member.NewMember(etcd, s.electionClient, etcdServerID) return nil } -func startClient(cfg *config.Config) (*clientv3.Client, *http.Client, error) { +func startClient(cfg *config.Config) (*clientv3.Client, *clientv3.Client, *http.Client, error) { tlsConfig, err := cfg.Security.ToTLSConfig() if err != nil { - return nil, nil, err + return nil, nil, nil, err } etcdCfg, err := cfg.GenEmbedEtcdConfig() if err != nil { - return nil, nil, err + return nil, nil, nil, err } return etcdutil.CreateClients(tlsConfig, etcdCfg.ACUrls[0]) } @@ -484,6 +486,11 @@ func (s *Server) Close() { log.Error("close etcd client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err)) } } + if s.electionClient != nil { + if err := s.electionClient.Close(); err != nil { + log.Error("close election client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err)) + } + } if s.httpClient != nil { s.httpClient.CloseIdleConnections() From 343bda6c1c4e7487657688bfbd318b414c48c24b Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 12 May 2023 14:37:38 +0800 Subject: [PATCH 2/2] address comments Signed-off-by: Ryan Leung --- pkg/mcs/resource_manager/server/server.go | 2 +- pkg/mcs/tso/server/server.go | 2 +- pkg/utils/etcdutil/etcdutil.go | 16 ++++++-------- pkg/utils/etcdutil/etcdutil_test.go | 12 +++++------ server/server.go | 26 +++++++++++++++++++---- 5 files changed, 36 insertions(+), 22 deletions(-) diff --git a/pkg/mcs/resource_manager/server/server.go b/pkg/mcs/resource_manager/server/server.go index d1dd5f476b8..5e89bda1120 100644 --- a/pkg/mcs/resource_manager/server/server.go +++ b/pkg/mcs/resource_manager/server/server.go @@ -264,7 +264,7 @@ func (s *Server) initClient() error { if err != nil { return err } - s.etcdClient, _, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u)[0]) + s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u)[0]) return err } diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 060b832f52f..66a22d98c26 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -380,7 +380,7 @@ func (s *Server) initClient() error { if err != nil { return err } - s.etcdClient, _, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls[0]) + s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls[0]) return err } diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index c7f1ee7cd8b..deee3b17a3a 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -197,17 +197,13 @@ func EtcdKVPutWithTTL(ctx context.Context, c *clientv3.Client, key string, value } // CreateClients creates etcd v3 client and http client. -func CreateClients(tlsConfig *tls.Config, acUrls url.URL) (*clientv3.Client, *clientv3.Client, *http.Client, error) { - client, err := createEtcdClient(tlsConfig, acUrls) +func CreateClients(tlsConfig *tls.Config, acUrls url.URL) (*clientv3.Client, *http.Client, error) { + client, err := CreateEtcdClient(tlsConfig, acUrls) if err != nil { - return nil, nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() - } - electionClient, err := createEtcdClient(tlsConfig, acUrls) - if err != nil { - return nil, nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() + return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() } httpClient := createHTTPClient(tlsConfig) - return electionClient, client, httpClient, nil + return client, httpClient, nil } // createEtcdClientWithMultiEndpoint creates etcd v3 client. @@ -249,9 +245,9 @@ func createEtcdClientWithMultiEndpoint(tlsConfig *tls.Config, acUrls []url.URL) return client, err } -// createEtcdClient creates etcd v3 client. +// CreateEtcdClient creates etcd v3 client. // Note: it will be used by legacy pd-server, and only connect to leader only. -func createEtcdClient(tlsConfig *tls.Config, acURL url.URL) (*clientv3.Client, error) { +func CreateEtcdClient(tlsConfig *tls.Config, acURL url.URL) (*clientv3.Client, error) { lgc := zap.NewProductionConfig() lgc.Encoding = log.ZapEncodingName client, err := clientv3.New(clientv3.Config{ diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 6bf63db79c9..4b7e2d9957b 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -305,12 +305,12 @@ func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) { // Create two etcd clients with etcd1 as endpoint. urls, err := types.NewURLs([]string{ep1}) re.NoError(err) - client1, err := createEtcdClient(nil, urls[0]) // execute member change operation with this client + client1, err := CreateEtcdClient(nil, urls[0]) // execute member change operation with this client defer func() { client1.Close() }() re.NoError(err) - client2, err := createEtcdClient(nil, urls[0]) // check member change with this client + client2, err := CreateEtcdClient(nil, urls[0]) // check member change with this client defer func() { client2.Close() }() @@ -482,7 +482,7 @@ func (suite *loopWatcherTestSuite) SetupSuite() { ep1 := suite.config.LCUrls[0].String() urls, err := types.NewURLs([]string{ep1}) suite.NoError(err) - suite.client, err = createEtcdClient(nil, urls[0]) + suite.client, err = CreateEtcdClient(nil, urls[0]) suite.NoError(err) suite.cleans = append(suite.cleans, func() { suite.client.Close() @@ -685,7 +685,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { // Case2: close the etcd client and put a new value after watcher restarts suite.client.Close() - suite.client, err = createEtcdClient(nil, suite.config.LCUrls[0]) + suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0]) suite.NoError(err) watcher.updateClientCh <- suite.client suite.put("TestWatcherBreak", "2") @@ -693,7 +693,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { // Case3: close the etcd client and put a new value before watcher restarts suite.client.Close() - suite.client, err = createEtcdClient(nil, suite.config.LCUrls[0]) + suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0]) suite.NoError(err) suite.put("TestWatcherBreak", "3") watcher.updateClientCh <- suite.client @@ -701,7 +701,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { // Case4: close the etcd client and put a new value with compact suite.client.Close() - suite.client, err = createEtcdClient(nil, suite.config.LCUrls[0]) + suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0]) suite.NoError(err) suite.put("TestWatcherBreak", "4") resp, err := EtcdKVGet(suite.client, "TestWatcherBreak") diff --git a/server/server.go b/server/server.go index 1ff9f7ee1c4..4e49faa6b24 100644 --- a/server/server.go +++ b/server/server.go @@ -332,7 +332,12 @@ func (s *Server) startEtcd(ctx context.Context) error { } // start client - s.client, s.electionClient, s.httpClient, err = startClient(s.cfg) + s.client, s.httpClient, err = startClient(s.cfg) + if err != nil { + return err + } + + s.electionClient, err = startElectionClient(s.cfg) if err != nil { return err } @@ -359,18 +364,31 @@ func (s *Server) startEtcd(ctx context.Context) error { return nil } -func startClient(cfg *config.Config) (*clientv3.Client, *clientv3.Client, *http.Client, error) { +func startClient(cfg *config.Config) (*clientv3.Client, *http.Client, error) { tlsConfig, err := cfg.Security.ToTLSConfig() if err != nil { - return nil, nil, nil, err + return nil, nil, err } etcdCfg, err := cfg.GenEmbedEtcdConfig() if err != nil { - return nil, nil, nil, err + return nil, nil, err } return etcdutil.CreateClients(tlsConfig, etcdCfg.ACUrls[0]) } +func startElectionClient(cfg *config.Config) (*clientv3.Client, error) { + tlsConfig, err := cfg.Security.ToTLSConfig() + if err != nil { + return nil, err + } + etcdCfg, err := cfg.GenEmbedEtcdConfig() + if err != nil { + return nil, err + } + + return etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.ACUrls[0]) +} + // AddStartCallback adds a callback in the startServer phase. func (s *Server) AddStartCallback(callbacks ...func()) { s.startCallbacks = append(s.startCallbacks, callbacks...)