diff --git a/client/base_client.go b/client/base_client.go index d0ecdb9d8b3d..b26ef654af7c 100755 --- a/client/base_client.go +++ b/client/base_client.go @@ -23,6 +23,8 @@ import ( "sync/atomic" "time" + "google.golang.org/grpc/connectivity" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" @@ -64,6 +66,8 @@ type baseClient struct { // Client option. option *option + + reConnectClientCh chan struct{} } // SecurityOption records options about tls @@ -105,7 +109,8 @@ func (c *baseClient) init() error { log.Info("[pd] init cluster id", zap.Uint64("cluster-id", c.clusterID)) c.wg.Add(1) - go c.memberLoop() + //go c.memberLoop() + go c.reconnectLoop(c.ctx) return nil } @@ -146,8 +151,130 @@ func (c *baseClient) memberLoop() { } } +func (c *baseClient) reconnectLoop(ctx context.Context) { + // first time to connect to PD + if err := c.updateMember(); err != nil { + log.Error("[pd.reconnectLoop] failed updateMember", errs.ZapError(err)) + } + + lastConnect := time.Now() + + for { + // wait for ready + if !c.WaitForConnected(ctx) { + log.Error("[pd.reconnectLoop] failed WaitForConnected") + } + + target := lastConnect.Add(updateMemberTimeout) + if target.After(time.Now()) { + time.Sleep(target.Sub(time.Now())) + } + lastConnect = time.Now() + if err := c.updateMember(); err != nil { + log.Error("[pd.reconnectLoop] failed to reconnect to PD", errs.ZapError(err)) + } + c.ScheduleReconnect() + } +} + +// select WaitForConnected or wait for a new client +func (c *baseClient) waitForReady(ctx context.Context) error { + log.Info("[pd.waitForReady] start wait for ready") + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + + ch := make(chan bool) + + go func() { + ch <- c.WaitForConnected(ctx) + }() + go func() { + ch <- c.waitForNewClient() + }() + + for { + select { + case r := <-ch: + if r { + log.Info("[pd.waitForReady] success") + return nil + } + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (c *baseClient) WaitForConnected(ctx context.Context) bool { + for _, u := range c.GetURLs() { + old, ok := c.clientConns.Load(u) + if !ok { + log.Error("[pd.WaitForConnected] failed to load old clientConn") + return false + } + cc := old.(*grpc.ClientConn) + + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer func() { + cancel() + }() + + for { + s := cc.GetState() + if s == connectivity.Ready && c.checkLeaderMembers(ctx, cc) != nil { + log.Info("[pd.WaitForConnected] failed") + return false + } + if !cc.WaitForStateChange(ctx, s) { + // ctx got timeout or canceled. + log.Error("[pd.WaitForConnected] failed to connect to PD", errs.ZapError(ctx.Err())) + return false + } + } + } + return true +} + +func (c *baseClient) checkLeaderMembers(ctx context.Context, cc *grpc.ClientConn) error { + members, err := pdpb.NewPDClient(cc).GetMembers(ctx, &pdpb.GetMembersRequest{}) + if err != nil { + attachErr := errors.Errorf("error:%s target:%s status:%s", err, cc.Target(), cc.GetState().String()) + return errs.ErrClientGetMember.Wrap(attachErr).GenWithStackByCause() + } + if members.GetHeader().GetError() != nil { + attachErr := errors.Errorf("error:%s target:%s status:%s", members.GetHeader().GetError().String(), cc.Target(), cc.GetState().String()) + return errs.ErrClientGetMember.Wrap(attachErr).GenWithStackByCause() + } + + return nil +} + +func (c *baseClient) waitForNewClient() bool { + deadline := time.Now().Add(requestTimeout) + for { + select { + case <-time.After(deadline.Sub(time.Now())): + log.Error("[pd.waitForNewClient] failed to wait for new client") + return false + case <-c.reConnectClientCh: + log.Info("[pd.waitForNewClient] wait for new client success") + return true + } + } +} + +func (c *baseClient) ScheduleReconnect() { + select { + case c.reConnectClientCh <- struct{}{}: + log.Info("[pd.ScheduleReconnect] schedule reconnect") + default: + } +} + // ScheduleCheckLeader is used to check leader. func (c *baseClient) ScheduleCheckLeader() { + log.Info("[pd.ScheduleCheckLeader] schedule check leader") + c.waitForReady(c.ctx) select { case c.checkLeaderCh <- struct{}{}: default: @@ -273,6 +400,7 @@ func (c *baseClient) initClusterID() error { } func (c *baseClient) updateMember() error { + log.Info("[pd] update member") for i, u := range c.GetURLs() { if _, _err_ := failpoint.Eval(_curpkg_("skipFirstUpdateMember")); _err_ == nil { if i == 0 { diff --git a/client/client.go b/client/client.go index ed706e7aad0b..5eebf5c4df48 100755 --- a/client/client.go +++ b/client/client.go @@ -331,6 +331,7 @@ const ( defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst retryInterval = 500 * time.Millisecond maxRetryTimes = 6 + requestTimeout = 2 * time.Second ) // LeaderHealthCheckInterval might be changed in the unit to shorten the testing time. @@ -861,7 +862,8 @@ tsoBatchLoop: stream = nil // Because ScheduleCheckLeader is asynchronous, if the leader changes, we better call `updateMember` ASAP. if IsLeaderChange(err) { - if err := c.updateMember(); err != nil { + log.Info("[pd.handleDispatcher] leader may change soon", zap.String("dc-location", dc)) + if err := c.waitForReady(dispatcherCtx); err != nil { select { case <-dispatcherCtx.Done(): return @@ -953,6 +955,7 @@ func (c *client) tryConnect( // retry several times before falling back to the follower when the network problem happens for i := 0; i < maxRetryTimes; i++ { + log.Info("[pd] create tso stream with leader", zap.String("dc-location", dc)) c.ScheduleCheckLeader() cc, url = c.getAllocatorClientConnByDCLocation(dc) cctx, cancel := context.WithCancel(dispatcherCtx) @@ -1923,6 +1926,7 @@ func (c *client) respForErr(observer prometheus.Observer, start time.Time, err e if err != nil || header.GetError() != nil { observer.Observe(time.Since(start).Seconds()) if err != nil { + log.Info("[pd] send request failed", zap.Error(err)) c.ScheduleCheckLeader() return errors.WithStack(err) }