diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index e10650704e3..e6c75a01674 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -18,6 +18,7 @@ import ( "context" "sync" "sync/atomic" + "time" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" @@ -30,6 +31,11 @@ import ( "go.uber.org/zap" ) +const ( + watchLoopUnhealthyTimeout = 60 * time.Second + detectHealthyInterval = 10 * time.Second +) + // GetLeader gets the corresponding leader from etcd by given leaderPath (as the key). func GetLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, int64, error) { leader := &pdpb.Member{} @@ -182,26 +188,86 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { if ls == nil { return } + + interval := detectHealthyInterval + unhealthyTimeout := watchLoopUnhealthyTimeout + failpoint.Inject("fastTick", func() { + unhealthyTimeout = 5 * time.Second + interval = 1 * time.Second + }) + ticker := time.NewTicker(interval) + defer ticker.Stop() + lastHealthyTime := time.Now() + watcher := clientv3.NewWatcher(ls.client) defer watcher.Close() - ctx, cancel := context.WithCancel(serverCtx) - defer cancel() - // The revision is the revision of last modification on this key. - // If the revision is compacted, will meet required revision has been compacted error. - // In this case, use the compact revision to re-watch the key. + var watchChanCancel context.CancelFunc + defer func() { + if watchChanCancel != nil { + watchChanCancel() + } + }() for { failpoint.Inject("delayWatcher", nil) - rch := watcher.Watch(ctx, ls.leaderKey, clientv3.WithRev(revision)) - for wresp := range rch { + if watchChanCancel != nil { + watchChanCancel() + } + // In order to prevent a watch stream being stuck in a partitioned node, + // make sure to wrap context with "WithRequireLeader". + watchChanCtx, cancel := context.WithCancel(clientv3.WithRequireLeader(serverCtx)) + watchChanCancel = cancel + + // When etcd is not available, the watcher.Watch will block, + // so we check the etcd availability first. + if !etcdutil.IsHealthy(serverCtx, ls.client) { + if time.Since(lastHealthyTime) > unhealthyTimeout { + log.Error("the connect of leadership watcher is unhealthy", + zap.Int64("revision", revision), + zap.String("leader-key", ls.leaderKey), + zap.String("purpose", ls.purpose)) + return + } + select { + case <-serverCtx.Done(): + log.Info("server is closed, exit leader watch loop", + zap.String("leader-key", ls.leaderKey), + zap.String("purpose", ls.purpose)) + return + case <-ticker.C: + // continue to check the etcd availability + continue + } + } + + watchChan := watcher.Watch(watchChanCtx, ls.leaderKey, clientv3.WithRev(revision)) + WatchChanLoop: + select { + case <-serverCtx.Done(): + log.Info("server is closed, exit leader watch loop", + zap.String("leader-key", ls.leaderKey), + zap.String("purpose", ls.purpose)) + return + case <-ticker.C: + if !etcdutil.IsHealthy(serverCtx, ls.client) { + if time.Since(lastHealthyTime) > unhealthyTimeout { + log.Error("the connect of leadership watcher is unhealthy", + zap.Int64("revision", revision), + zap.String("leader-key", ls.leaderKey), + zap.String("purpose", ls.purpose)) + return + } + goto WatchChanLoop + } + case wresp := <-watchChan: // meet compacted error, use the compact revision. if wresp.CompactRevision != 0 { log.Warn("required revision has been compacted, use the compact revision", zap.Int64("required-revision", revision), zap.Int64("compact-revision", wresp.CompactRevision)) revision = wresp.CompactRevision - break - } - if wresp.Canceled { + lastHealthyTime = time.Now() + continue + } else if wresp.Err() != nil { // wresp.Err() contains CompactRevision not equal to 0 log.Error("leadership watcher is canceled with", zap.Int64("revision", revision), zap.String("leader-key", ls.leaderKey), @@ -213,19 +279,16 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { for _, ev := range wresp.Events { if ev.Type == mvccpb.DELETE { log.Info("current leadership is deleted", + zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose)) return } } + revision = wresp.Header.Revision + 1 } - - select { - case <-ctx.Done(): - // server closed, return - return - default: - } + lastHealthyTime = time.Now() + goto WatchChanLoop // use goto to avoid to create a new watchChan } } diff --git a/pkg/election/leadership_test.go b/pkg/election/leadership_test.go index 2857c89a881..43dbb9d254b 100644 --- a/pkg/election/leadership_test.go +++ b/pkg/election/leadership_test.go @@ -19,8 +19,10 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/testutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" ) @@ -118,3 +120,87 @@ func TestLeadership(t *testing.T) { re.NoError(lease1.Close()) re.NoError(lease2.Close()) } + +func TestExitWatch(t *testing.T) { + re := require.New(t) + leaderKey := "/test_leader" + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/election/fastTick", "return(true)")) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)")) + // Case1: close the client before the watch loop starts + checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) { + re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayWatcher", `pause`)) + client.Close() + re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayWatcher")) + }) + // Case2: close the client when the watch loop is running + checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) { + // Wait for the watch loop to start + time.Sleep(500 * time.Millisecond) + client.Close() + }) + // Case3: delete the leader key + checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) { + leaderKey := leaderKey + _, err := client.Delete(context.Background(), leaderKey) + re.NoError(err) + }) + // Case4: close the server before the watch loop starts + checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) { + re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayWatcher", `pause`)) + server.Close() + re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayWatcher")) + }) + // Case5: close the server when the watch loop is running + checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) { + // Wait for the watch loop to start + time.Sleep(500 * time.Millisecond) + server.Close() + }) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/fastTick")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick")) +} + +func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embed.Etcd, client *clientv3.Client)) { + re := require.New(t) + cfg := etcdutil.NewTestSingleConfig(t) + etcd, err := embed.StartEtcd(cfg) + defer func() { + etcd.Close() + }() + re.NoError(err) + + ep := cfg.LCUrls[0].String() + client1, err := clientv3.New(clientv3.Config{ + Endpoints: []string{ep}, + }) + re.NoError(err) + client2, err := clientv3.New(clientv3.Config{ + Endpoints: []string{ep}, + }) + re.NoError(err) + + <-etcd.Server.ReadyNotify() + + leadership1 := NewLeadership(client1, leaderKey, "test_leader_1") + leadership2 := NewLeadership(client2, leaderKey, "test_leader_2") + err = leadership1.Campaign(defaultLeaseTimeout, "test_leader_1") + re.NoError(err) + resp, err := client2.Get(context.Background(), leaderKey) + re.NoError(err) + done := make(chan struct{}) + go func() { + leadership2.Watch(context.Background(), resp.Header.Revision) + done <- struct{}{} + }() + + injectFunc(etcd, client2) + + testutil.Eventually(re, func() bool { + select { + case <-done: + return true + default: + return false + } + }) +} diff --git a/pkg/mcs/resource_manager/server/server.go b/pkg/mcs/resource_manager/server/server.go index 1ec0e912fda..8870fb92938 100644 --- a/pkg/mcs/resource_manager/server/server.go +++ b/pkg/mcs/resource_manager/server/server.go @@ -270,7 +270,7 @@ func (s *Server) initClient() error { if err != nil { return err } - s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u)[0]) + s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u)) return err } diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go new file mode 100644 index 00000000000..21c25509a9d --- /dev/null +++ b/pkg/mcs/scheduling/server/server.go @@ -0,0 +1,500 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "context" + "crypto/tls" + "fmt" + "net" + "net/http" + "net/url" + "os" + "os/signal" + "path" + "strconv" + "strings" + "sync" + "sync/atomic" + "syscall" + "time" + + grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/pingcap/kvproto/pkg/diagnosticspb" + "github.com/pingcap/log" + "github.com/pingcap/sysutil" + "github.com/soheilhy/cmux" + "github.com/spf13/cobra" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/discovery" + "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/member" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/memberutil" + "github.com/tikv/pd/pkg/utils/metricutil" + "github.com/tikv/pd/pkg/versioninfo" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/pkg/types" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +// Server is the scheduling server, and it implements bs.Server. +type Server struct { + diagnosticspb.DiagnosticsServer + // Server state. 0 is not running, 1 is running. + isRunning int64 + // Server start timestamp + startTimestamp int64 + + ctx context.Context + serverLoopCtx context.Context + serverLoopCancel func() + serverLoopWg sync.WaitGroup + + cfg *Config + clusterID uint64 + name string + listenURL *url.URL + + // for the primary election of scheduling + participant *member.Participant + etcdClient *clientv3.Client + httpClient *http.Client + + muxListener net.Listener + service *Service + + // Callback functions for different stages + // startCallbacks will be called after the server is started. + startCallbacks []func() + // primaryCallbacks will be called after the server becomes leader. + primaryCallbacks []func(context.Context) + + serviceRegister *discovery.ServiceRegister +} + +// Name returns the unique etcd name for this server in etcd cluster. +func (s *Server) Name() string { + return s.name +} + +// Context returns the context. +func (s *Server) Context() context.Context { + return s.ctx +} + +// GetAddr returns the server address. +func (s *Server) GetAddr() string { + return s.cfg.ListenAddr +} + +// Run runs the Scheduling server. +func (s *Server) Run() (err error) { + if err = s.initClient(); err != nil { + return err + } + if err = s.startServer(); err != nil { + return err + } + + s.startServerLoop() + + return nil +} + +func (s *Server) startServerLoop() { + s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) + s.serverLoopWg.Add(1) + go s.primaryElectionLoop() +} + +func (s *Server) primaryElectionLoop() { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + for { + if s.IsClosed() { + log.Info("server is closed, exit scheduling primary election loop") + return + } + + primary, checkAgain := s.participant.CheckLeader() + if checkAgain { + continue + } + if primary != nil { + log.Info("start to watch the primary", zap.Stringer("scheduling-primary", primary)) + // Watch will keep looping and never return unless the primary/leader has changed. + primary.Watch(s.serverLoopCtx) + log.Info("the scheduling primary has changed, try to re-campaign a primary") + } + + s.campaignLeader() + } +} + +func (s *Server) campaignLeader() { + log.Info("start to campaign the primary/leader", zap.String("campaign-scheduling-primary-name", s.participant.Name())) + if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil { + if err.Error() == errs.ErrEtcdTxnConflict.Error() { + log.Info("campaign scheduling primary meets error due to txn conflict, another server may campaign successfully", + zap.String("campaign-scheduling-primary-name", s.participant.Name())) + } else { + log.Error("campaign scheduling primary meets error due to etcd error", + zap.String("campaign-scheduling-primary-name", s.participant.Name()), + errs.ZapError(err)) + } + return + } + + // Start keepalive the leadership and enable Scheduling service. + ctx, cancel := context.WithCancel(s.serverLoopCtx) + var resetLeaderOnce sync.Once + defer resetLeaderOnce.Do(func() { + cancel() + s.participant.ResetLeader() + }) + + // maintain the leadership, after this, Scheduling could be ready to provide service. + s.participant.KeepLeader(ctx) + log.Info("campaign scheduling primary ok", zap.String("campaign-scheduling-primary-name", s.participant.Name())) + + log.Info("triggering the primary callback functions") + for _, cb := range s.primaryCallbacks { + cb(ctx) + } + + s.participant.EnableLeader() + log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name())) + + leaderTicker := time.NewTicker(utils.LeaderTickInterval) + defer leaderTicker.Stop() + + for { + select { + case <-leaderTicker.C: + if !s.participant.IsLeader() { + log.Info("no longer a primary/leader because lease has expired, the scheduling primary/leader will step down") + return + } + case <-ctx.Done(): + // Server is closed and it should return nil. + log.Info("server is closed") + return + } + } +} + +// Close closes the server. +func (s *Server) Close() { + if !atomic.CompareAndSwapInt64(&s.isRunning, 1, 0) { + // server is already closed + return + } + + log.Info("closing scheduling server ...") + s.serviceRegister.Deregister() + s.muxListener.Close() + s.serverLoopCancel() + s.serverLoopWg.Wait() + + if s.etcdClient != nil { + if err := s.etcdClient.Close(); err != nil { + log.Error("close etcd client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err)) + } + } + + if s.httpClient != nil { + s.httpClient.CloseIdleConnections() + } + + log.Info("scheduling server is closed") +} + +// GetClient returns builtin etcd client. +func (s *Server) GetClient() *clientv3.Client { + return s.etcdClient +} + +// GetHTTPClient returns builtin http client. +func (s *Server) GetHTTPClient() *http.Client { + return s.httpClient +} + +// AddStartCallback adds a callback in the startServer phase. +func (s *Server) AddStartCallback(callbacks ...func()) { + s.startCallbacks = append(s.startCallbacks, callbacks...) +} + +// IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise. +func (s *Server) IsServing() bool { + return !s.IsClosed() && s.participant.IsLeader() +} + +// IsClosed checks if the server loop is closed +func (s *Server) IsClosed() bool { + return s != nil && atomic.LoadInt64(&s.isRunning) == 0 +} + +// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. +func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { + s.primaryCallbacks = append(s.primaryCallbacks, callbacks...) +} + +func (s *Server) initClient() error { + tlsConfig, err := s.cfg.Security.ToTLSConfig() + if err != nil { + return err + } + u, err := types.NewURLs(strings.Split(s.cfg.BackendEndpoints, ",")) + if err != nil { + return err + } + s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u)) + return err +} + +func (s *Server) startGRPCServer(l net.Listener) { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + gs := grpc.NewServer() + s.service.RegisterGRPCService(gs) + err := gs.Serve(l) + log.Info("gRPC server stop serving") + + // Attempt graceful stop (waits for pending RPCs), but force a stop if + // it doesn't happen in a reasonable amount of time. + done := make(chan struct{}) + go func() { + defer logutil.LogPanic() + log.Info("try to gracefully stop the server now") + gs.GracefulStop() + close(done) + }() + timer := time.NewTimer(utils.DefaultGRPCGracefulStopTimeout) + defer timer.Stop() + select { + case <-done: + case <-timer.C: + log.Info("stopping grpc gracefully is taking longer than expected and force stopping now", zap.Duration("default", utils.DefaultGRPCGracefulStopTimeout)) + gs.Stop() + } + if s.IsClosed() { + log.Info("grpc server stopped") + } else { + log.Fatal("grpc server stopped unexpectedly", errs.ZapError(err)) + } +} + +func (s *Server) startHTTPServer(l net.Listener) { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + handler, _ := SetUpRestHandler(s.service) + hs := &http.Server{ + Handler: handler, + ReadTimeout: 5 * time.Minute, + ReadHeaderTimeout: 5 * time.Second, + } + err := hs.Serve(l) + log.Info("http server stop serving") + + ctx, cancel := context.WithTimeout(context.Background(), utils.DefaultHTTPGracefulShutdownTimeout) + defer cancel() + if err := hs.Shutdown(ctx); err != nil { + log.Error("http server shutdown encountered problem", errs.ZapError(err)) + } else { + log.Info("all http(s) requests finished") + } + if s.IsClosed() { + log.Info("http server stopped") + } else { + log.Fatal("http server stopped unexpectedly", errs.ZapError(err)) + } +} + +func (s *Server) startGRPCAndHTTPServers(l net.Listener) { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + mux := cmux.New(l) + grpcL := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc")) + httpL := mux.Match(cmux.Any()) + + s.serverLoopWg.Add(2) + go s.startGRPCServer(grpcL) + go s.startHTTPServer(httpL) + + if err := mux.Serve(); err != nil { + if s.IsClosed() { + log.Info("mux stop serving", errs.ZapError(err)) + } else { + log.Fatal("mux stop serving unexpectedly", errs.ZapError(err)) + } + } +} + +// GetLeaderListenUrls gets service endpoints from the leader in election group. +func (s *Server) GetLeaderListenUrls() []string { + return s.participant.GetLeaderListenUrls() +} + +func (s *Server) startServer() (err error) { + if s.clusterID, err = utils.InitClusterID(s.ctx, s.etcdClient); err != nil { + return err + } + log.Info("init cluster id", zap.Uint64("cluster-id", s.clusterID)) + // The independent Scheduling service still reuses PD version info since PD and Scheduling are just + // different service modes provided by the same pd-server binary + serverInfo.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix())) + + uniqueName := s.cfg.ListenAddr + uniqueID := memberutil.GenerateUniqueID(uniqueName) + log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID)) + schedulingPrimaryPrefix := endpoint.SchedulingSvcRootPath(s.clusterID) + s.participant = member.NewParticipant(s.etcdClient) + s.participant.InitInfo(uniqueName, uniqueID, path.Join(schedulingPrimaryPrefix, fmt.Sprintf("%05d", 0)), + utils.KeyspaceGroupsPrimaryKey, "keyspace group primary election", s.cfg.AdvertiseListenAddr) + + tlsConfig, err := s.cfg.Security.ToTLSConfig() + if err != nil { + return err + } + s.listenURL, err = url.Parse(s.cfg.ListenAddr) + if err != nil { + return err + } + if tlsConfig != nil { + s.muxListener, err = tls.Listen(utils.TCPNetworkStr, s.listenURL.Host, tlsConfig) + } else { + s.muxListener, err = net.Listen(utils.TCPNetworkStr, s.listenURL.Host) + } + if err != nil { + return err + } + + s.serverLoopWg.Add(1) + go s.startGRPCAndHTTPServers(s.muxListener) + + // Run callbacks + log.Info("triggering the start callback functions") + for _, cb := range s.startCallbacks { + cb() + } + + // Server has started. + entry := &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr} + serializedEntry, err := entry.Serialize() + if err != nil { + return err + } + s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, strconv.FormatUint(s.clusterID, 10), + utils.SchedulingServiceName, s.cfg.AdvertiseListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds) + if err := s.serviceRegister.Register(); err != nil { + log.Error("failed to register the service", zap.String("service-name", utils.SchedulingServiceName), errs.ZapError(err)) + return err + } + atomic.StoreInt64(&s.isRunning, 1) + return nil +} + +// CreateServer creates the Server +func CreateServer(ctx context.Context, cfg *Config) *Server { + svr := &Server{ + DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename), + startTimestamp: time.Now().Unix(), + cfg: cfg, + ctx: ctx, + } + return svr +} + +// CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server +func CreateServerWrapper(cmd *cobra.Command, args []string) { + cmd.Flags().Parse(args) + cfg := NewConfig() + flagSet := cmd.Flags() + err := cfg.Parse(flagSet) + defer logutil.LogPanic() + + if err != nil { + cmd.Println(err) + return + } + + if printVersion, err := flagSet.GetBool("version"); err != nil { + cmd.Println(err) + return + } else if printVersion { + versioninfo.Print() + exit(0) + } + + // New zap logger + err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) + if err == nil { + log.ReplaceGlobals(cfg.Logger, cfg.LogProps) + } else { + log.Fatal("initialize logger error", errs.ZapError(err)) + } + // Flushing any buffered log entries + defer log.Sync() + + versioninfo.Log("Scheduling") + log.Info("Scheduling config", zap.Reflect("config", cfg)) + + grpcprometheus.EnableHandlingTimeHistogram() + metricutil.Push(&cfg.Metric) + + ctx, cancel := context.WithCancel(context.Background()) + svr := CreateServer(ctx, cfg) + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + var sig os.Signal + go func() { + sig = <-sc + cancel() + }() + + if err := svr.Run(); err != nil { + log.Fatal("run server failed", errs.ZapError(err)) + } + + <-ctx.Done() + log.Info("got signal to exit", zap.String("signal", sig.String())) + + svr.Close() + switch sig { + case syscall.SIGTERM: + exit(0) + default: + exit(1) + } +} + +func exit(code int) { + log.Sync() + os.Exit(code) +} diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index c94b5f00f78..8525af0fce6 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -380,7 +380,7 @@ func (s *Server) initClient() error { if err != nil { return err } - s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls[0]) + s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls) return err } diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 23fa0503726..59173222205 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -17,6 +17,7 @@ package etcdutil import ( "context" "crypto/tls" + "fmt" "math/rand" "net/http" "net/url" @@ -32,6 +33,7 @@ import ( "github.com/tikv/pd/pkg/utils/typeutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/etcdserver" + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.etcd.io/etcd/mvcc/mvccpb" "go.etcd.io/etcd/pkg/types" "go.uber.org/zap" @@ -41,9 +43,6 @@ const ( // defaultEtcdClientTimeout is the default timeout for etcd client. defaultEtcdClientTimeout = 3 * time.Second - // defaultAutoSyncInterval is the interval to sync etcd cluster. - defaultAutoSyncInterval = 60 * time.Second - // defaultDialKeepAliveTime is the time after which client pings the server to see if transport is alive. defaultDialKeepAliveTime = 10 * time.Second @@ -61,6 +60,8 @@ const ( // DefaultSlowRequestTime 1s for the threshold for normal request, for those // longer then 1s, they are considered as slow requests. DefaultSlowRequestTime = time.Second + + healthyPath = "health" ) // CheckClusterID checks etcd cluster ID, returns an error if mismatch. @@ -144,6 +145,20 @@ func EtcdKVGet(c *clientv3.Client, key string, opts ...clientv3.OpOption) (*clie return resp, nil } +// IsHealthy checks if the etcd is healthy. +func IsHealthy(ctx context.Context, client *clientv3.Client) bool { + timeout := DefaultRequestTimeout + failpoint.Inject("fastTick", func() { + timeout = 100 * time.Millisecond + }) + ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(ctx), timeout) + defer cancel() + _, err := client.Get(ctx, healthyPath) + // permission denied is OK since proposal goes through consensus to get it + // See: https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/etcdctl/ctlv3/command/ep_command.go#L124 + return err == nil || err == rpctypes.ErrPermissionDenied +} + // GetValue gets value with key from etcd. func GetValue(c *clientv3.Client, key string, opts ...clientv3.OpOption) ([]byte, error) { resp, err := get(c, key, opts...) @@ -196,82 +211,226 @@ func EtcdKVPutWithTTL(ctx context.Context, c *clientv3.Client, key string, value return kv.Put(ctx, key, value, clientv3.WithLease(grantResp.ID)) } -// CreateClientsWithMultiEndpoint creates etcd v3 client and http client. -func CreateClientsWithMultiEndpoint(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, *http.Client, error) { - client, err := createEtcdClientWithMultiEndpoint(tlsConfig, acUrls) - if err != nil { - return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() - } - httpClient := createHTTPClient(tlsConfig) - return client, httpClient, nil -} - -// CreateClients creates etcd v3 client and http client. -func CreateClients(tlsConfig *tls.Config, acUrls url.URL) (*clientv3.Client, *http.Client, error) { - client, err := CreateEtcdClient(tlsConfig, acUrls) - if err != nil { - return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() - } - httpClient := createHTTPClient(tlsConfig) - return client, httpClient, nil -} +const ( + // etcdServerOfflineTimeout is the timeout for an unhealthy etcd endpoint to be offline from healthy checker. + etcdServerOfflineTimeout = 30 * time.Minute + // etcdServerDisconnectedTimeout is the timeout for an unhealthy etcd endpoint to be disconnected from healthy checker. + etcdServerDisconnectedTimeout = 1 * time.Minute +) -// createEtcdClientWithMultiEndpoint creates etcd v3 client. -// Note: it will be used by micro service server and support multi etcd endpoints. -// FIXME: But it cannot switch etcd endpoints as soon as possible when one of endpoints is with io hang. -func createEtcdClientWithMultiEndpoint(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, error) { - if len(acUrls) == 0 { - return nil, errs.ErrNewEtcdClient.FastGenByArgs("no available etcd address") - } - endpoints := make([]string, 0, len(acUrls)) - for _, u := range acUrls { - endpoints = append(endpoints, u.String()) +func newClient(tlsConfig *tls.Config, endpoints ...string) (*clientv3.Client, error) { + if len(endpoints) == 0 { + return nil, errs.ErrNewEtcdClient.FastGenByArgs("empty etcd endpoints") } lgc := zap.NewProductionConfig() lgc.Encoding = log.ZapEncodingName - autoSyncInterval := defaultAutoSyncInterval - dialKeepAliveTime := defaultDialKeepAliveTime - dialKeepAliveTimeout := defaultDialKeepAliveTimeout - failpoint.Inject("autoSyncInterval", func() { - autoSyncInterval = 10 * time.Millisecond - }) - failpoint.Inject("closeKeepAliveCheck", func() { - autoSyncInterval = 0 - dialKeepAliveTime = 0 - dialKeepAliveTimeout = 0 - }) client, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: defaultEtcdClientTimeout, - AutoSyncInterval: autoSyncInterval, TLS: tlsConfig, LogConfig: &lgc, - DialKeepAliveTime: dialKeepAliveTime, - DialKeepAliveTimeout: dialKeepAliveTimeout, + DialKeepAliveTime: defaultDialKeepAliveTime, + DialKeepAliveTimeout: defaultDialKeepAliveTimeout, }) - if err == nil { - log.Info("create etcd v3 client", zap.Strings("endpoints", endpoints)) + return client, err +} + +// CreateEtcdClient creates etcd v3 client with detecting endpoints. +func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client, error) { + urls := make([]string, 0, len(acURLs)) + for _, u := range acURLs { + urls = append(urls, u.String()) } + client, err := newClient(tlsConfig, urls...) + if err != nil { + return nil, err + } + + tickerInterval := defaultDialKeepAliveTime + failpoint.Inject("fastTick", func() { + tickerInterval = 100 * time.Millisecond + }) + failpoint.Inject("closeTick", func() { + failpoint.Return(client, err) + }) + + checker := &healthyChecker{ + tlsConfig: tlsConfig, + } + eps := syncUrls(client) + checker.update(eps) + + // Create a goroutine to check the health of etcd endpoints periodically. + go func(client *clientv3.Client) { + defer logutil.LogPanic() + ticker := time.NewTicker(tickerInterval) + defer ticker.Stop() + lastAvailable := time.Now() + for { + select { + case <-client.Ctx().Done(): + log.Info("[etcd client] etcd client is closed, exit health check goroutine") + checker.Range(func(key, value interface{}) bool { + client := value.(*healthyClient) + client.Close() + return true + }) + return + case <-ticker.C: + usedEps := client.Endpoints() + healthyEps := checker.patrol(client.Ctx()) + if len(healthyEps) == 0 { + // when all endpoints are unhealthy, try to reset endpoints to update connect + // rather than delete them to avoid there is no any endpoint in client. + // Note: reset endpoints will trigger subconn closed, and then trigger reconnect. + // otherwise, the subconn will be retrying in grpc layer and use exponential backoff, + // and it cannot recover as soon as possible. + if time.Since(lastAvailable) > etcdServerDisconnectedTimeout { + log.Info("[etcd client] no available endpoint, try to reset endpoints", zap.Strings("last-endpoints", usedEps)) + client.SetEndpoints([]string{}...) + client.SetEndpoints(usedEps...) + } + } else { + if !typeutil.AreStringSlicesEquivalent(healthyEps, usedEps) { + client.SetEndpoints(healthyEps...) + change := fmt.Sprintf("%d->%d", len(usedEps), len(healthyEps)) + etcdStateGauge.WithLabelValues("endpoints").Set(float64(len(healthyEps))) + log.Info("[etcd client] update endpoints", zap.String("num-change", change), + zap.Strings("last-endpoints", usedEps), zap.Strings("endpoints", client.Endpoints())) + } + lastAvailable = time.Now() + } + } + } + }(client) + + // Notes: use another goroutine to update endpoints to avoid blocking health check in the first goroutine. + go func(client *clientv3.Client) { + defer logutil.LogPanic() + ticker := time.NewTicker(tickerInterval) + defer ticker.Stop() + for { + select { + case <-client.Ctx().Done(): + log.Info("[etcd client] etcd client is closed, exit update endpoint goroutine") + return + case <-ticker.C: + eps := syncUrls(client) + checker.update(eps) + } + } + }(client) + return client, err } -// CreateEtcdClient creates etcd v3 client. -// Note: it will be used by legacy pd-server, and only connect to leader only. -func CreateEtcdClient(tlsConfig *tls.Config, acURL url.URL) (*clientv3.Client, error) { - lgc := zap.NewProductionConfig() - lgc.Encoding = log.ZapEncodingName - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{acURL.String()}, - DialTimeout: defaultEtcdClientTimeout, - TLS: tlsConfig, - LogConfig: &lgc, +type healthyClient struct { + *clientv3.Client + lastHealth time.Time +} + +type healthyChecker struct { + sync.Map // map[string]*healthyClient + tlsConfig *tls.Config +} + +func (checker *healthyChecker) patrol(ctx context.Context) []string { + // See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/etcdctl/ctlv3/command/ep_command.go#L105-L145 + var wg sync.WaitGroup + count := 0 + checker.Range(func(key, value interface{}) bool { + count++ + return true }) - if err == nil { - log.Info("create etcd v3 client", zap.String("endpoints", acURL.String())) + hch := make(chan string, count) + healthyList := make([]string, 0, count) + checker.Range(func(key, value interface{}) bool { + wg.Add(1) + go func(key, value interface{}) { + defer wg.Done() + defer logutil.LogPanic() + ep := key.(string) + client := value.(*healthyClient) + if IsHealthy(ctx, client.Client) { + hch <- ep + checker.Store(ep, &healthyClient{ + Client: client.Client, + lastHealth: time.Now(), + }) + return + } + }(key, value) + return true + }) + wg.Wait() + close(hch) + for h := range hch { + healthyList = append(healthyList, h) } - return client, err + return healthyList +} + +func (checker *healthyChecker) update(eps []string) { + for _, ep := range eps { + // check if client exists, if not, create one, if exists, check if it's offline or disconnected. + if client, ok := checker.Load(ep); ok { + lastHealthy := client.(*healthyClient).lastHealth + if time.Since(lastHealthy) > etcdServerOfflineTimeout { + log.Info("[etcd client] some etcd server maybe offline", zap.String("endpoint", ep)) + checker.Delete(ep) + } + if time.Since(lastHealthy) > etcdServerDisconnectedTimeout { + // try to reset client endpoint to trigger reconnect + client.(*healthyClient).Client.SetEndpoints([]string{}...) + client.(*healthyClient).Client.SetEndpoints(ep) + } + continue + } + checker.addClient(ep, time.Now()) + } +} + +func (checker *healthyChecker) addClient(ep string, lastHealth time.Time) { + client, err := newClient(checker.tlsConfig, ep) + if err != nil { + log.Error("[etcd client] failed to create etcd healthy client", zap.Error(err)) + return + } + checker.Store(ep, &healthyClient{ + Client: client, + lastHealth: lastHealth, + }) } +func syncUrls(client *clientv3.Client) []string { + // See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/clientv3/client.go#L170-L183 + ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(client.Ctx()), DefaultRequestTimeout) + defer cancel() + mresp, err := client.MemberList(ctx) + if err != nil { + log.Error("[etcd client] failed to list members", errs.ZapError(err)) + return []string{} + } + var eps []string + for _, m := range mresp.Members { + if len(m.Name) != 0 && !m.IsLearner { + eps = append(eps, m.ClientURLs...) + } + } + return eps +} + +// CreateClients creates etcd v3 client and http client. +func CreateClients(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, *http.Client, error) { + client, err := CreateEtcdClient(tlsConfig, acUrls) + if err != nil { + return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() + } + httpClient := createHTTPClient(tlsConfig) + return client, httpClient, nil +} + +// createHTTPClient creates a http client with the given tls config. func createHTTPClient(tlsConfig *tls.Config) *http.Client { return &http.Client{ Transport: &http.Transport{ @@ -431,36 +590,39 @@ func NewLoopWatcher(ctx context.Context, wg *sync.WaitGroup, client *clientv3.Cl // StartWatchLoop starts a loop to watch the key. func (lw *LoopWatcher) StartWatchLoop() { - defer logutil.LogPanic() - defer lw.wg.Done() - - ctx, cancel := context.WithCancel(lw.ctx) - defer cancel() - watchStartRevision := lw.initFromEtcd(ctx) - - log.Info("start to watch loop", zap.String("name", lw.name), zap.String("key", lw.key)) - for { - select { - case <-ctx.Done(): - log.Info("server is closed, exit watch loop", zap.String("name", lw.name), zap.String("key", lw.key)) - return - default: - } - nextRevision, err := lw.watch(ctx, watchStartRevision) - if err != nil { - log.Error("watcher canceled unexpectedly and a new watcher will start after a while for watch loop", - zap.String("name", lw.name), - zap.String("key", lw.key), - zap.Int64("next-revision", nextRevision), - zap.Time("retry-at", time.Now().Add(lw.watchChangeRetryInterval)), - zap.Error(err)) - watchStartRevision = nextRevision - time.Sleep(lw.watchChangeRetryInterval) - failpoint.Inject("updateClient", func() { - lw.client = <-lw.updateClientCh - }) + lw.wg.Add(1) + go func() { + defer logutil.LogPanic() + defer lw.wg.Done() + + ctx, cancel := context.WithCancel(lw.ctx) + defer cancel() + watchStartRevision := lw.initFromEtcd(ctx) + + log.Info("start to watch loop", zap.String("name", lw.name), zap.String("key", lw.key)) + for { + select { + case <-ctx.Done(): + log.Info("server is closed, exit watch loop", zap.String("name", lw.name), zap.String("key", lw.key)) + return + default: + } + nextRevision, err := lw.watch(ctx, watchStartRevision) + if err != nil { + log.Error("watcher canceled unexpectedly and a new watcher will start after a while for watch loop", + zap.String("name", lw.name), + zap.String("key", lw.key), + zap.Int64("next-revision", nextRevision), + zap.Time("retry-at", time.Now().Add(lw.watchChangeRetryInterval)), + zap.Error(err)) + watchStartRevision = nextRevision + time.Sleep(lw.watchChangeRetryInterval) + failpoint.Inject("updateClient", func() { + lw.client = <-lw.updateClientCh + }) + } } - } + }() } func (lw *LoopWatcher) initFromEtcd(ctx context.Context) int64 { @@ -508,15 +670,23 @@ func (lw *LoopWatcher) initFromEtcd(ctx context.Context) int64 { func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision int64, err error) { watcher := clientv3.NewWatcher(lw.client) defer watcher.Close() - + var watchChanCancel context.CancelFunc + defer func() { + if watchChanCancel != nil { + watchChanCancel() + } + }() for { - WatchChan: + if watchChanCancel != nil { + watchChanCancel() + } // In order to prevent a watch stream being stuck in a partitioned node, // make sure to wrap context with "WithRequireLeader". - watchChanCtx, watchChanCancel := context.WithCancel(clientv3.WithRequireLeader(ctx)) - defer watchChanCancel() + watchChanCtx, cancel := context.WithCancel(clientv3.WithRequireLeader(ctx)) + watchChanCancel = cancel opts := append(lw.opts, clientv3.WithRev(revision)) watchChan := watcher.Watch(watchChanCtx, lw.key, opts...) + WatchChanLoop: select { case <-ctx.Done(): return revision, nil @@ -526,16 +696,14 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision log.Warn("force load key failed in watch loop", zap.String("name", lw.name), zap.String("key", lw.key), zap.Error(err)) } - watchChanCancel() - goto WatchChan + continue case wresp := <-watchChan: if wresp.CompactRevision != 0 { log.Warn("required revision has been compacted, use the compact revision in watch loop", zap.Int64("required-revision", revision), zap.Int64("compact-revision", wresp.CompactRevision)) revision = wresp.CompactRevision - watchChanCancel() - goto WatchChan + continue } else if wresp.Err() != nil { // wresp.Err() contains CompactRevision not equal to 0 log.Error("watcher is canceled in watch loop", zap.Int64("revision", revision), @@ -568,8 +736,8 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision zap.String("key", lw.key), zap.Error(err)) } revision = wresp.Header.Revision + 1 + goto WatchChanLoop // use goto to avoid to create a new watchChan } - watchChanCancel() } } diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 6bf63db79c9..5c7cc2ecaf4 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -19,6 +19,7 @@ import ( "crypto/tls" "fmt" "io" + "math/rand" "net" "strings" "sync" @@ -233,7 +234,7 @@ func TestInitClusterID(t *testing.T) { func TestEtcdClientSync(t *testing.T) { re := require.New(t) - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval", "return(true)")) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)")) // Start a etcd server. cfg1 := NewTestSingleConfig(t) @@ -244,10 +245,7 @@ func TestEtcdClientSync(t *testing.T) { re.NoError(err) // Create a etcd client with etcd1 as endpoint. - ep1 := cfg1.LCUrls[0].String() - urls, err := types.NewURLs([]string{ep1}) - re.NoError(err) - client1, err := createEtcdClientWithMultiEndpoint(nil, urls) + client1, err := CreateEtcdClient(nil, cfg1.LCUrls) defer func() { client1.Close() }() @@ -258,39 +256,26 @@ func TestEtcdClientSync(t *testing.T) { etcd2 := checkAddEtcdMember(t, cfg1, client1) defer etcd2.Close() checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2}) + testutil.Eventually(re, func() bool { + // wait for etcd client sync endpoints + return len(client1.Endpoints()) == 2 + }) // Remove the first member and close the etcd1. _, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID())) re.NoError(err) - time.Sleep(20 * time.Millisecond) // wait for etcd client sync endpoints and client will be connected to etcd2 etcd1.Close() // Check the client can get the new member with the new endpoints. - listResp3, err := ListEtcdMembers(client1) - re.NoError(err) - re.Len(listResp3.Members, 1) - re.Equal(uint64(etcd2.Server.ID()), listResp3.Members[0].ID) - - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval")) -} - -func TestEtcdWithHangLeaderEnableCheck(t *testing.T) { - re := require.New(t) - var err error - // Test with enable check. - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval", "return(true)")) - err = checkEtcdWithHangLeader(t) - re.NoError(err) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval")) + testutil.Eventually(re, func() bool { + // wait for etcd client sync endpoints + return len(client1.Endpoints()) == 1 + }) - // Test with disable check. - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck", "return(true)")) - err = checkEtcdWithHangLeader(t) - re.Error(err) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick")) } -func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) { +func TestEtcdScaleInAndOut(t *testing.T) { re := require.New(t) // Start a etcd server. cfg1 := NewTestSingleConfig(t) @@ -299,18 +284,15 @@ func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) { etcd1.Close() }() re.NoError(err) - ep1 := cfg1.LCUrls[0].String() <-etcd1.Server.ReadyNotify() // Create two etcd clients with etcd1 as endpoint. - urls, err := types.NewURLs([]string{ep1}) - re.NoError(err) - client1, err := createEtcdClient(nil, urls[0]) // execute member change operation with this client + client1, err := CreateEtcdClient(nil, cfg1.LCUrls) // execute member change operation with this client defer func() { client1.Close() }() re.NoError(err) - client2, err := createEtcdClient(nil, urls[0]) // check member change with this client + client2, err := CreateEtcdClient(nil, cfg1.LCUrls) // check member change with this client defer func() { client2.Close() }() @@ -329,6 +311,71 @@ func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) { checkMembers(re, client2, []*embed.Etcd{etcd2}) } +func TestRandomKillEtcd(t *testing.T) { + re := require.New(t) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)")) + // Start a etcd server. + cfg1 := NewTestSingleConfig(t) + etcd1, err := embed.StartEtcd(cfg1) + re.NoError(err) + <-etcd1.Server.ReadyNotify() + client1, err := CreateEtcdClient(nil, cfg1.LCUrls) + re.NoError(err) + defer func() { + client1.Close() + }() + + etcd2 := checkAddEtcdMember(t, cfg1, client1) + cfg2 := etcd2.Config() + <-etcd2.Server.ReadyNotify() + + etcd3 := checkAddEtcdMember(t, &cfg2, client1) + <-etcd3.Server.ReadyNotify() + + time.Sleep(1 * time.Second) + re.Len(client1.Endpoints(), 3) + + // Randomly kill an etcd server and restart it + etcds := []*embed.Etcd{etcd1, etcd2, etcd3} + cfgs := []embed.Config{etcd1.Config(), etcd2.Config(), etcd3.Config()} + for i := 0; i < 10; i++ { + killIndex := rand.Intn(len(etcds)) + etcds[killIndex].Close() + testutil.Eventually(re, func() bool { + return IsHealthy(context.Background(), client1) + }) + etcd, err := embed.StartEtcd(&cfgs[killIndex]) + re.NoError(err) + <-etcd.Server.ReadyNotify() + etcds[killIndex] = etcd + testutil.Eventually(re, func() bool { + return IsHealthy(context.Background(), client1) + }) + } + for _, etcd := range etcds { + if etcd != nil { + etcd.Close() + } + } + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick")) +} + +func TestEtcdWithHangLeaderEnableCheck(t *testing.T) { + re := require.New(t) + var err error + // Test with enable check. + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)")) + err = checkEtcdWithHangLeader(t) + re.NoError(err) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick")) + + // Test with disable check. + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/closeTick", "return(true)")) + err = checkEtcdWithHangLeader(t) + re.Error(err) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeTick")) +} + func checkEtcdWithHangLeader(t *testing.T) error { re := require.New(t) // Start a etcd server. @@ -349,13 +396,13 @@ func checkEtcdWithHangLeader(t *testing.T) error { // Create a etcd client with etcd1 as endpoint. urls, err := types.NewURLs([]string{proxyAddr}) re.NoError(err) - client1, err := createEtcdClientWithMultiEndpoint(nil, urls) + client1, err := CreateEtcdClient(nil, urls) defer func() { client1.Close() }() re.NoError(err) - // Add a new member and set the client endpoints to etcd1 and etcd2. + // Add a new member etcd2 := checkAddEtcdMember(t, cfg1, client1) defer etcd2.Close() checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2}) @@ -363,7 +410,7 @@ func checkEtcdWithHangLeader(t *testing.T) error { // Hang the etcd1 and wait for the client to connect to etcd2. enableDiscard.Store(true) - time.Sleep(defaultDialKeepAliveTime + defaultDialKeepAliveTimeout*2) + time.Sleep(time.Second) _, err = EtcdKVGet(client1, "test/key1") return err } @@ -473,16 +520,14 @@ func TestLoopWatcherTestSuite(t *testing.T) { } func (suite *loopWatcherTestSuite) SetupSuite() { + var err error t := suite.T() suite.ctx, suite.cancel = context.WithCancel(context.Background()) suite.cleans = make([]func(), 0) // Start a etcd server and create a client with etcd1 as endpoint. suite.config = NewTestSingleConfig(t) suite.startEtcd() - ep1 := suite.config.LCUrls[0].String() - urls, err := types.NewURLs([]string{ep1}) - suite.NoError(err) - suite.client, err = createEtcdClient(nil, urls[0]) + suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls) suite.NoError(err) suite.cleans = append(suite.cleans, func() { suite.client.Close() @@ -685,7 +730,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { // Case2: close the etcd client and put a new value after watcher restarts suite.client.Close() - suite.client, err = createEtcdClient(nil, suite.config.LCUrls[0]) + suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls) suite.NoError(err) watcher.updateClientCh <- suite.client suite.put("TestWatcherBreak", "2") @@ -693,7 +738,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { // Case3: close the etcd client and put a new value before watcher restarts suite.client.Close() - suite.client, err = createEtcdClient(nil, suite.config.LCUrls[0]) + suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls) suite.NoError(err) suite.put("TestWatcherBreak", "3") watcher.updateClientCh <- suite.client @@ -701,7 +746,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { // Case4: close the etcd client and put a new value with compact suite.client.Close() - suite.client, err = createEtcdClient(nil, suite.config.LCUrls[0]) + suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls) suite.NoError(err) suite.put("TestWatcherBreak", "4") resp, err := EtcdKVGet(suite.client, "TestWatcherBreak") diff --git a/pkg/utils/etcdutil/metrics.go b/pkg/utils/etcdutil/metrics.go new file mode 100644 index 00000000000..f78e0864ba2 --- /dev/null +++ b/pkg/utils/etcdutil/metrics.go @@ -0,0 +1,29 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdutil + +import "github.com/prometheus/client_golang/prometheus" + +var etcdStateGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "server", + Name: "etcd_client", + Help: "Etcd client states.", + }, []string{"type"}) + +func init() { + prometheus.MustRegister(etcdStateGauge) +} diff --git a/pkg/utils/typeutil/comparison.go b/pkg/utils/typeutil/comparison.go index 2c94ad94182..c976ac47102 100644 --- a/pkg/utils/typeutil/comparison.go +++ b/pkg/utils/typeutil/comparison.go @@ -16,6 +16,7 @@ package typeutil import ( "math" + "sort" "time" ) @@ -43,8 +44,9 @@ func MinDuration(a, b time.Duration) time.Duration { return b } -// StringsEqual checks if two string slices are equal. Empyt slice and nil are considered equal. -func StringsEqual(a, b []string) bool { +// AreStringSlicesEqual checks if two string slices are equal. Empyt slice and nil are considered equal. +// It returns true if the slices are of the same length and all elements are identical in both slices, otherwise, it returns false. +func AreStringSlicesEqual(a, b []string) bool { if len(a) != len(b) { return false } @@ -56,6 +58,22 @@ func StringsEqual(a, b []string) bool { return true } +// AreStringSlicesEquivalent checks if two string slices are equivalent. +// If the slices are of the same length and contain the same elements (but possibly in different order), the function returns true. +func AreStringSlicesEquivalent(a, b []string) bool { + if len(a) != len(b) { + return false + } + sort.Strings(a) + sort.Strings(b) + for i, v := range a { + if v != b[i] { + return false + } + } + return true +} + // Float64Equal checks if two float64 are equal. func Float64Equal(a, b float64) bool { return math.Abs(a-b) <= 1e-6 diff --git a/server/server.go b/server/server.go index 8a512531d08..c7bdede4bf8 100644 --- a/server/server.go +++ b/server/server.go @@ -336,12 +336,12 @@ func (s *Server) startEtcd(ctx context.Context) error { } // start client - s.client, s.httpClient, err = startClient(s.cfg) + s.client, s.httpClient, err = s.startClient() if err != nil { return err } - s.electionClient, err = startElectionClient(s.cfg) + s.electionClient, err = s.startElectionClient() if err != nil { return err } @@ -368,29 +368,29 @@ func (s *Server) startEtcd(ctx context.Context) error { return nil } -func startClient(cfg *config.Config) (*clientv3.Client, *http.Client, error) { - tlsConfig, err := cfg.Security.ToTLSConfig() +func (s *Server) startClient() (*clientv3.Client, *http.Client, error) { + tlsConfig, err := s.cfg.Security.ToTLSConfig() if err != nil { return nil, nil, err } - etcdCfg, err := cfg.GenEmbedEtcdConfig() + etcdCfg, err := s.cfg.GenEmbedEtcdConfig() if err != nil { return nil, nil, err } - return etcdutil.CreateClients(tlsConfig, etcdCfg.ACUrls[0]) + return etcdutil.CreateClients(tlsConfig, etcdCfg.ACUrls) } -func startElectionClient(cfg *config.Config) (*clientv3.Client, error) { - tlsConfig, err := cfg.Security.ToTLSConfig() +func (s *Server) startElectionClient() (*clientv3.Client, error) { + tlsConfig, err := s.cfg.Security.ToTLSConfig() if err != nil { return nil, err } - etcdCfg, err := cfg.GenEmbedEtcdConfig() + etcdCfg, err := s.cfg.GenEmbedEtcdConfig() if err != nil { return nil, err } - return etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.ACUrls[0]) + return etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.ACUrls) } // AddStartCallback adds a callback in the startServer phase. @@ -1022,14 +1022,14 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error { len(defaultRule.StartKey) == 0 && len(defaultRule.EndKey) == 0) { return errors.New("cannot update MaxReplicas or LocationLabels when placement rules feature is enabled and not only default rule exists, please update rule instead") } - if !(defaultRule.Count == int(old.MaxReplicas) && typeutil.StringsEqual(defaultRule.LocationLabels, []string(old.LocationLabels))) { + if !(defaultRule.Count == int(old.MaxReplicas) && typeutil.AreStringSlicesEqual(defaultRule.LocationLabels, []string(old.LocationLabels))) { return errors.New("cannot to update replication config, the default rules do not consistent with replication config, please update rule instead") } return nil } - if !(cfg.MaxReplicas == old.MaxReplicas && typeutil.StringsEqual(cfg.LocationLabels, old.LocationLabels)) { + if !(cfg.MaxReplicas == old.MaxReplicas && typeutil.AreStringSlicesEqual(cfg.LocationLabels, old.LocationLabels)) { if err := CheckInDefaultRule(); err != nil { return err }