diff --git a/clientv3/client.go b/clientv3/client.go index 1d3b7e39912..51aba8b00f2 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -503,3 +503,11 @@ func toErr(ctx context.Context, err error) error { } return err } + +func canceledByCaller(stopCtx context.Context, err error) bool { + if stopCtx.Err() == nil || err == nil { + return false + } + + return err == context.Canceled || err == context.DeadlineExceeded +} diff --git a/clientv3/lease.go b/clientv3/lease.go index a6494ceee45..6e7ea1f82d0 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -69,7 +69,7 @@ const ( // NoLease is a lease ID for the absence of a lease. NoLease LeaseID = 0 - // retryConnWait is how long to wait before retrying on a lost leader + // retryConnWait is how long to wait before retrying request due to an error retryConnWait = 500 * time.Millisecond ) @@ -392,34 +392,45 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) { l.mu.Unlock() }() - stream, serr := l.resetRecv() - for serr == nil { - resp, err := stream.Recv() - if err == nil { - l.recvKeepAlive(resp) - continue - } - err = toErr(l.stopCtx, err) - if err == rpctypes.ErrNoLeader { - l.closeRequireLeader() - select { - case <-time.After(retryConnWait): - case <-l.stopCtx.Done(): + for { + stream, err := l.resetRecv() + if err != nil { + if canceledByCaller(l.stopCtx, err) { return err } - } else if isHaltErr(l.stopCtx, err) { - return err + } else { + for { + resp, err := stream.Recv() + + if err != nil { + if canceledByCaller(l.stopCtx, err) { + return err + } + + if toErr(l.stopCtx, err) == rpctypes.ErrNoLeader { + l.closeRequireLeader() + } + break + } + + l.recvKeepAlive(resp) + } + } + + select { + case <-time.After(retryConnWait): + continue + case <-l.stopCtx.Done(): + return l.stopCtx.Err() } - stream, serr = l.resetRecv() } - return serr } // resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) { sctx, cancel := context.WithCancel(l.stopCtx) stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false)) - if err = toErr(sctx, err); err != nil { + if err != nil { cancel() return nil, err }