Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: use another etcd client for election #6409

Merged
merged 2 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 3 additions & 13 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,19 +196,9 @@ func EtcdKVPutWithTTL(ctx context.Context, c *clientv3.Client, key string, value
return kv.Put(ctx, key, value, clientv3.WithLease(grantResp.ID))
}

// CreateClientsWithMultiEndpoint creates etcd v3 client and http client.
func CreateClientsWithMultiEndpoint(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, *http.Client, error) {
client, err := createEtcdClientWithMultiEndpoint(tlsConfig, acUrls)
if err != nil {
return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause()
}
httpClient := createHTTPClient(tlsConfig)
return client, httpClient, nil
}

// CreateClients creates etcd v3 client and http client.
func CreateClients(tlsConfig *tls.Config, acUrls url.URL) (*clientv3.Client, *http.Client, error) {
client, err := createEtcdClient(tlsConfig, acUrls)
client, err := CreateEtcdClient(tlsConfig, acUrls)
if err != nil {
return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause()
}
Expand Down Expand Up @@ -255,9 +245,9 @@ func createEtcdClientWithMultiEndpoint(tlsConfig *tls.Config, acUrls []url.URL)
return client, err
}

// createEtcdClient creates etcd v3 client.
// CreateEtcdClient creates etcd v3 client.
// Note: it will be used by legacy pd-server, and only connect to leader only.
func createEtcdClient(tlsConfig *tls.Config, acURL url.URL) (*clientv3.Client, error) {
func CreateEtcdClient(tlsConfig *tls.Config, acURL url.URL) (*clientv3.Client, error) {
lgc := zap.NewProductionConfig()
lgc.Encoding = log.ZapEncodingName
client, err := clientv3.New(clientv3.Config{
Expand Down
12 changes: 6 additions & 6 deletions pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,12 @@ func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) {
// Create two etcd clients with etcd1 as endpoint.
urls, err := types.NewURLs([]string{ep1})
re.NoError(err)
client1, err := createEtcdClient(nil, urls[0]) // execute member change operation with this client
client1, err := CreateEtcdClient(nil, urls[0]) // execute member change operation with this client
defer func() {
client1.Close()
}()
re.NoError(err)
client2, err := createEtcdClient(nil, urls[0]) // check member change with this client
client2, err := CreateEtcdClient(nil, urls[0]) // check member change with this client
defer func() {
client2.Close()
}()
Expand Down Expand Up @@ -482,7 +482,7 @@ func (suite *loopWatcherTestSuite) SetupSuite() {
ep1 := suite.config.LCUrls[0].String()
urls, err := types.NewURLs([]string{ep1})
suite.NoError(err)
suite.client, err = createEtcdClient(nil, urls[0])
suite.client, err = CreateEtcdClient(nil, urls[0])
suite.NoError(err)
suite.cleans = append(suite.cleans, func() {
suite.client.Close()
Expand Down Expand Up @@ -685,23 +685,23 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() {

// Case2: close the etcd client and put a new value after watcher restarts
suite.client.Close()
suite.client, err = createEtcdClient(nil, suite.config.LCUrls[0])
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0])
suite.NoError(err)
watcher.updateClientCh <- suite.client
suite.put("TestWatcherBreak", "2")
checkCache("2")

// Case3: close the etcd client and put a new value before watcher restarts
suite.client.Close()
suite.client, err = createEtcdClient(nil, suite.config.LCUrls[0])
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0])
suite.NoError(err)
suite.put("TestWatcherBreak", "3")
watcher.updateClientCh <- suite.client
checkCache("3")

// Case4: close the etcd client and put a new value with compact
suite.client.Close()
suite.client, err = createEtcdClient(nil, suite.config.LCUrls[0])
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0])
suite.NoError(err)
suite.put("TestWatcherBreak", "4")
resp, err := EtcdKVGet(suite.client, "TestWatcherBreak")
Expand Down
27 changes: 26 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ type Server struct {
member *member.EmbeddedEtcdMember
// etcd client
client *clientv3.Client
// electionClient is used for leader election.
electionClient *clientv3.Client
// http client
httpClient *http.Client
clusterID uint64 // pd cluster id.
Expand Down Expand Up @@ -335,6 +337,11 @@ func (s *Server) startEtcd(ctx context.Context) error {
return err
}

s.electionClient, err = startElectionClient(s.cfg)
if err != nil {
return err
}

// update advertise peer urls.
etcdMembers, err := etcdutil.ListEtcdMembers(s.client)
if err != nil {
Expand All @@ -353,7 +360,7 @@ func (s *Server) startEtcd(ctx context.Context) error {
failpoint.Inject("memberNil", func() {
time.Sleep(1500 * time.Millisecond)
})
s.member = member.NewMember(etcd, s.client, etcdServerID)
s.member = member.NewMember(etcd, s.electionClient, etcdServerID)
return nil
}

Expand All @@ -369,6 +376,19 @@ func startClient(cfg *config.Config) (*clientv3.Client, *http.Client, error) {
return etcdutil.CreateClients(tlsConfig, etcdCfg.ACUrls[0])
}

func startElectionClient(cfg *config.Config) (*clientv3.Client, error) {
tlsConfig, err := cfg.Security.ToTLSConfig()
if err != nil {
return nil, err
}
etcdCfg, err := cfg.GenEmbedEtcdConfig()
if err != nil {
return nil, err
}

return etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.ACUrls[0])
}

// AddStartCallback adds a callback in the startServer phase.
func (s *Server) AddStartCallback(callbacks ...func()) {
s.startCallbacks = append(s.startCallbacks, callbacks...)
Expand Down Expand Up @@ -484,6 +504,11 @@ func (s *Server) Close() {
log.Error("close etcd client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err))
}
}
if s.electionClient != nil {
if err := s.electionClient.Close(); err != nil {
log.Error("close election client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err))
}
}

if s.httpClient != nil {
s.httpClient.CloseIdleConnections()
Expand Down