Skip to content

Commit

Permalink
add log
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp committed Aug 15, 2023
1 parent 064efcc commit 82dd922
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 2 deletions.
139 changes: 138 additions & 1 deletion client/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"sync/atomic"
"time"

"google.golang.org/grpc/connectivity"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
Expand Down Expand Up @@ -64,6 +66,10 @@ type baseClient struct {

// Client option.
option *option

reConnectClientCh chan struct{}
readyMu sync.RWMutex
ready atomic.Value
}

// SecurityOption records options about tls
Expand Down Expand Up @@ -105,7 +111,8 @@ func (c *baseClient) init() error {
log.Info("[pd] init cluster id", zap.Uint64("cluster-id", c.clusterID))

c.wg.Add(1)
go c.memberLoop()
//go c.memberLoop()
go c.reconnectLoop(c.ctx)
return nil
}

Expand Down Expand Up @@ -146,8 +153,137 @@ func (c *baseClient) memberLoop() {
}
}

func (c *baseClient) reconnectLoop(ctx context.Context) {
// first time to connect to PD
if err := c.updateMember(); err != nil {
log.Error("[pd.reconnectLoop] failed updateMember", errs.ZapError(err))
}

lastConnect := time.Now()

for {
// wait for conn ready
if c.waitForLeaderReady(ctx) != nil {
log.Error("[pd.reconnectLoop] failed WaitForConnected")
}

target := lastConnect.Add(updateMemberTimeout)
if target.After(time.Now()) {
time.Sleep(target.Sub(time.Now()))
}
lastConnect = time.Now()
if err := c.updateMember(); err != nil {
c.readyMu.Lock()
c.ready.Store(false)
c.readyMu.Unlock()
log.Error("[pd.reconnectLoop] failed to reconnect to PD", errs.ZapError(err))
c.ScheduleReconnect()
} else {
c.readyMu.Lock()
c.ready.Store(true)
c.readyMu.Unlock()
log.Info("[pd.reconnectLoop] success to reconnect to PD")
}

}
}

func (c *baseClient) waitForReady(ctx context.Context) error {
log.Info("[pd.waitForReady] start wait for ready")
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

ch := make(chan bool)

go func() {
ch <- c.WaitForConnected(ctx)
}()
go func() {
ch <- c.waitForNewClient()
}()

for {
select {
case r := <-ch:
if r {
log.Info("[pd.waitForReady] success")
return nil
}
case <-ctx.Done():
return ctx.Err()
}
}
}

// waitForLeaderReady waits for the leader to be ready.
func (c *baseClient) waitForLeaderReady(ctx context.Context) error {
old, ok := c.clientConns.Load(c.GetLeaderAddr())
if !ok {
return errors.New("no leader")
}
cc := old.(*grpc.ClientConn)

ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
for {
s := cc.GetState()
if s == connectivity.Ready {
return nil
}
if !cc.WaitForStateChange(ctx, s) {
// ctx got timeout or canceled.
return ctx.Err()
}
}
}

func (c *baseClient) WaitForConnected(ctx context.Context) bool {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer func() {
log.Info("[pd.WaitForConnected] cancel exit")
cancel()
}()

lastConnect := time.Now()
for {
target := lastConnect.Add(retryInterval)
if target.After(time.Now()) {
time.Sleep(target.Sub(time.Now()))
}
lastConnect = time.Now()

c.readyMu.RLock()
if ready := c.ready.Load(); ready != nil && ready.(bool) {
log.Info("[pd.WaitForConnected] success")
return true
}
c.readyMu.RUnlock()
}
}

func (c *baseClient) waitForNewClient() bool {
deadline := time.Now().Add(requestTimeout)
for {
select {
case <-time.After(deadline.Sub(time.Now())):
log.Error("[pd.waitForNewClient] failed to wait for new client")
return false
case <-c.reConnectClientCh:
log.Info("[pd.waitForNewClient] wait for new client success")
return true
}
}
}

func (c *baseClient) ScheduleReconnect() {
c.reConnectClientCh <- struct{}{}
log.Info("[pd.ScheduleReconnect] schedule reconnect")
}

// ScheduleCheckLeader is used to check leader.
func (c *baseClient) ScheduleCheckLeader() {
log.Info("[pd.ScheduleCheckLeader] schedule check leader")
c.waitForReady(c.ctx)
select {
case c.checkLeaderCh <- struct{}{}:
default:
Expand Down Expand Up @@ -273,6 +409,7 @@ func (c *baseClient) initClusterID() error {
}

func (c *baseClient) updateMember() error {
log.Info("[pd] update member")
for i, u := range c.GetURLs() {
if _, _err_ := failpoint.Eval(_curpkg_("skipFirstUpdateMember")); _err_ == nil {
if i == 0 {
Expand Down
6 changes: 5 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ const (
defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst
retryInterval = 500 * time.Millisecond
maxRetryTimes = 6
requestTimeout = 2 * time.Second
)

// LeaderHealthCheckInterval might be changed in the unit to shorten the testing time.
Expand Down Expand Up @@ -861,7 +862,8 @@ tsoBatchLoop:
stream = nil
// Because ScheduleCheckLeader is asynchronous, if the leader changes, we better call `updateMember` ASAP.
if IsLeaderChange(err) {
if err := c.updateMember(); err != nil {
log.Info("[pd.handleDispatcher] leader may change soon", zap.String("dc-location", dc))
if err := c.waitForReady(dispatcherCtx); err != nil {
select {
case <-dispatcherCtx.Done():
return
Expand Down Expand Up @@ -953,6 +955,7 @@ func (c *client) tryConnect(
// retry several times before falling back to the follower when the network problem happens

for i := 0; i < maxRetryTimes; i++ {
log.Info("[pd] create tso stream with leader", zap.String("dc-location", dc))
c.ScheduleCheckLeader()
cc, url = c.getAllocatorClientConnByDCLocation(dc)
cctx, cancel := context.WithCancel(dispatcherCtx)
Expand Down Expand Up @@ -1923,6 +1926,7 @@ func (c *client) respForErr(observer prometheus.Observer, start time.Time, err e
if err != nil || header.GetError() != nil {
observer.Observe(time.Since(start).Seconds())
if err != nil {
log.Info("[pd] send request failed", zap.Error(err))
c.ScheduleCheckLeader()
return errors.WithStack(err)
}
Expand Down

0 comments on commit 82dd922

Please sign in to comment.