Skip to content

Commit

Permalink
clientv3: Do no stop keep alive loop by server side errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Iwasaki Yudai committed May 5, 2017
1 parent 7f05e22 commit 4fa2607
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 19 deletions.
13 changes: 13 additions & 0 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,3 +503,16 @@ func toErr(ctx context.Context, err error) error {
}
return err
}

func canceledByCaller(ctx context.Context, err error) bool {
if ctx.Err() == nil {
return false
}
if err == nil {
return false
}
if err != context.Canceled && err != context.DeadlineExceeded {
return false
}
return true
}
50 changes: 31 additions & 19 deletions clientv3/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ const (
// NoLease is a lease ID for the absence of a lease.
NoLease LeaseID = 0

// retryConnWait is how long to wait before retrying on a lost leader
// retryConnWait is how long to wait before retrying request due to an error
retryConnWait = 500 * time.Millisecond
)

Expand Down Expand Up @@ -392,34 +392,46 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) {
l.mu.Unlock()
}()

stream, serr := l.resetRecv()
for serr == nil {
resp, err := stream.Recv()
if err == nil {
l.recvKeepAlive(resp)
continue
}
err = toErr(l.stopCtx, err)
if err == rpctypes.ErrNoLeader {
l.closeRequireLeader()
select {
case <-time.After(retryConnWait):
case <-l.stopCtx.Done():
for {
stream, err := l.resetRecv()
if err != nil {
if canceledByCaller(l.stopCtx, err) {
return err
}
} else if isHaltErr(l.stopCtx, err) {
return err
} else {
for {
resp, err := stream.Recv()

if err != nil {
if canceledByCaller(l.stopCtx, err) {
return err
}

err = toErr(l.stopCtx, err)
if err == rpctypes.ErrNoLeader {
l.closeRequireLeader()
}
break
}

l.recvKeepAlive(resp)
}
}

select {
case <-time.After(retryConnWait):
continue
case <-l.stopCtx.Done():
return l.stopCtx.Err()
}
stream, serr = l.resetRecv()
}
return serr
}

// resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
sctx, cancel := context.WithCancel(l.stopCtx)
stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false))
if err = toErr(sctx, err); err != nil {
if err != nil {
cancel()
return nil, err
}
Expand Down

0 comments on commit 4fa2607

Please sign in to comment.