Skip to content

Commit

Permalink
clientv3: Do no stop keep alive loop by server side errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Iwasaki Yudai committed May 8, 2017
1 parent 7f05e22 commit aa85b0c
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 19 deletions.
8 changes: 8 additions & 0 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,3 +503,11 @@ func toErr(ctx context.Context, err error) error {
}
return err
}

func canceledByCaller(stopCtx context.Context, err error) bool {
if stopCtx.Err() == nil || err == nil {
return false
}

return err == context.Canceled || err == context.DeadlineExceeded
}
49 changes: 30 additions & 19 deletions clientv3/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ const (
// NoLease is a lease ID for the absence of a lease.
NoLease LeaseID = 0

// retryConnWait is how long to wait before retrying on a lost leader
// retryConnWait is how long to wait before retrying request due to an error
retryConnWait = 500 * time.Millisecond
)

Expand Down Expand Up @@ -392,34 +392,45 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) {
l.mu.Unlock()
}()

stream, serr := l.resetRecv()
for serr == nil {
resp, err := stream.Recv()
if err == nil {
l.recvKeepAlive(resp)
continue
}
err = toErr(l.stopCtx, err)
if err == rpctypes.ErrNoLeader {
l.closeRequireLeader()
select {
case <-time.After(retryConnWait):
case <-l.stopCtx.Done():
for {
stream, err := l.resetRecv()
if err != nil {
if canceledByCaller(l.stopCtx, err) {
return err
}
} else if isHaltErr(l.stopCtx, err) {
return err
} else {
for {
resp, err := stream.Recv()

if err != nil {
if canceledByCaller(l.stopCtx, err) {
return err
}

if toErr(l.stopCtx, err) == rpctypes.ErrNoLeader {
l.closeRequireLeader()
}
break
}

l.recvKeepAlive(resp)
}
}

select {
case <-time.After(retryConnWait):
continue
case <-l.stopCtx.Done():
return l.stopCtx.Err()
}
stream, serr = l.resetRecv()
}
return serr
}

// resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
sctx, cancel := context.WithCancel(l.stopCtx)
stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false))
if err = toErr(sctx, err); err != nil {
if err != nil {
cancel()
return nil, err
}
Expand Down

0 comments on commit aa85b0c

Please sign in to comment.