Skip to content

Commit

Permalink
Merge pull request #7023 from heyitsanthony/lease-freeze
Browse files Browse the repository at this point in the history
clientv3: fix lease "freezing" on unhealthy cluster
  • Loading branch information
Anthony Romano authored Dec 16, 2016
2 parents d9e928d + a375e91 commit 531c306
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 21 deletions.
56 changes: 56 additions & 0 deletions clientv3/integration/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ package integration
import (
"reflect"
"sort"
"sync"
"testing"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
Expand Down Expand Up @@ -574,3 +576,57 @@ func TestLeaseKeepAliveLoopExit(t *testing.T) {
t.Fatalf("expected %T, got %v(%T)", clientv3.ErrKeepAliveHalted{}, err, err)
}
}

// TestV3LeaseFailureOverlap issues Grant and Keepalive requests to a cluster
// before, during, and after quorum loss to confirm Grant/Keepalive tolerates
// transient cluster failure.
func TestV3LeaseFailureOverlap(t *testing.T) {
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
defer clus.Terminate(t)

numReqs := 5
cli := clus.Client(0)

// bring up a session, tear it down
updown := func(i int) error {
sess, err := concurrency.NewSession(cli)
if err != nil {
return err
}
ch := make(chan struct{})
go func() {
defer close(ch)
sess.Close()
}()
select {
case <-ch:
case <-time.After(time.Minute / 4):
t.Fatalf("timeout %d", i)
}
return nil
}

var wg sync.WaitGroup
mkReqs := func(n int) {
wg.Add(numReqs)
for i := 0; i < numReqs; i++ {
go func() {
defer wg.Done()
err := updown(n)
if err == nil || err == rpctypes.ErrTimeoutDueToConnectionLost {
return
}
t.Fatal(err)
}()
}
}

mkReqs(1)
clus.Members[1].Stop(t)
mkReqs(2)
time.Sleep(time.Second)
mkReqs(3)
clus.Members[1].Restart(t)
mkReqs(4)
wg.Wait()
}
3 changes: 0 additions & 3 deletions clientv3/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,6 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err
if isHaltErr(cctx, err) {
return nil, toErr(cctx, err)
}
if nerr := l.newStream(); nerr != nil {
return nil, nerr
}
}
}

Expand Down
39 changes: 21 additions & 18 deletions etcdserver/api/v3rpc/rpctypes/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ var (
ErrGRPCPermissionNotGranted = grpc.Errorf(codes.FailedPrecondition, "etcdserver: permission is not granted to the role")
ErrGRPCAuthNotEnabled = grpc.Errorf(codes.FailedPrecondition, "etcdserver: authentication is not enabled")

ErrGRPCNoLeader = grpc.Errorf(codes.Unavailable, "etcdserver: no leader")
ErrGRPCNotCapable = grpc.Errorf(codes.Unavailable, "etcdserver: not capable")
ErrGRPCStopped = grpc.Errorf(codes.Unavailable, "etcdserver: server stopped")
ErrGRPCTimeout = grpc.Errorf(codes.Unavailable, "etcdserver: request timed out")
ErrGRPCTimeoutDueToLeaderFail = grpc.Errorf(codes.Unavailable, "etcdserver: request timed out, possibly due to previous leader failure")
ErrGRPCUnhealthy = grpc.Errorf(codes.Unavailable, "etcdserver: unhealthy cluster")
ErrGRPCNoLeader = grpc.Errorf(codes.Unavailable, "etcdserver: no leader")
ErrGRPCNotCapable = grpc.Errorf(codes.Unavailable, "etcdserver: not capable")
ErrGRPCStopped = grpc.Errorf(codes.Unavailable, "etcdserver: server stopped")
ErrGRPCTimeout = grpc.Errorf(codes.Unavailable, "etcdserver: request timed out")
ErrGRPCTimeoutDueToLeaderFail = grpc.Errorf(codes.Unavailable, "etcdserver: request timed out, possibly due to previous leader failure")
ErrGRPCTimeoutDueToConnectionLost = grpc.Errorf(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost")
ErrGRPCUnhealthy = grpc.Errorf(codes.Unavailable, "etcdserver: unhealthy cluster")

errStringToError = map[string]error{
grpc.ErrorDesc(ErrGRPCEmptyKey): ErrGRPCEmptyKey,
Expand Down Expand Up @@ -91,12 +92,13 @@ var (
grpc.ErrorDesc(ErrGRPCPermissionNotGranted): ErrGRPCPermissionNotGranted,
grpc.ErrorDesc(ErrGRPCAuthNotEnabled): ErrGRPCAuthNotEnabled,

grpc.ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader,
grpc.ErrorDesc(ErrGRPCNotCapable): ErrGRPCNotCapable,
grpc.ErrorDesc(ErrGRPCStopped): ErrGRPCStopped,
grpc.ErrorDesc(ErrGRPCTimeout): ErrGRPCTimeout,
grpc.ErrorDesc(ErrGRPCTimeoutDueToLeaderFail): ErrGRPCTimeoutDueToLeaderFail,
grpc.ErrorDesc(ErrGRPCUnhealthy): ErrGRPCUnhealthy,
grpc.ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader,
grpc.ErrorDesc(ErrGRPCNotCapable): ErrGRPCNotCapable,
grpc.ErrorDesc(ErrGRPCStopped): ErrGRPCStopped,
grpc.ErrorDesc(ErrGRPCTimeout): ErrGRPCTimeout,
grpc.ErrorDesc(ErrGRPCTimeoutDueToLeaderFail): ErrGRPCTimeoutDueToLeaderFail,
grpc.ErrorDesc(ErrGRPCTimeoutDueToConnectionLost): ErrGRPCTimeoutDueToConnectionLost,
grpc.ErrorDesc(ErrGRPCUnhealthy): ErrGRPCUnhealthy,
}

// client-side error
Expand Down Expand Up @@ -131,12 +133,13 @@ var (
ErrPermissionNotGranted = Error(ErrGRPCPermissionNotGranted)
ErrAuthNotEnabled = Error(ErrGRPCAuthNotEnabled)

ErrNoLeader = Error(ErrGRPCNoLeader)
ErrNotCapable = Error(ErrGRPCNotCapable)
ErrStopped = Error(ErrGRPCStopped)
ErrTimeout = Error(ErrGRPCTimeout)
ErrTimeoutDueToLeaderFail = Error(ErrGRPCTimeoutDueToLeaderFail)
ErrUnhealthy = Error(ErrGRPCUnhealthy)
ErrNoLeader = Error(ErrGRPCNoLeader)
ErrNotCapable = Error(ErrGRPCNotCapable)
ErrStopped = Error(ErrGRPCStopped)
ErrTimeout = Error(ErrGRPCTimeout)
ErrTimeoutDueToLeaderFail = Error(ErrGRPCTimeoutDueToLeaderFail)
ErrTimeoutDueToConnectionLost = Error(ErrGRPCTimeoutDueToConnectionLost)
ErrUnhealthy = Error(ErrGRPCUnhealthy)
)

// EtcdError defines gRPC server errors.
Expand Down
2 changes: 2 additions & 0 deletions etcdserver/api/v3rpc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func togRPCError(err error) error {
return rpctypes.ErrGRPCTimeout
case etcdserver.ErrTimeoutDueToLeaderFail:
return rpctypes.ErrGRPCTimeoutDueToLeaderFail
case etcdserver.ErrTimeoutDueToConnectionLost:
return rpctypes.ErrGRPCTimeoutDueToConnectionLost
case etcdserver.ErrUnhealthy:
return rpctypes.ErrGRPCUnhealthy

Expand Down

0 comments on commit 531c306

Please sign in to comment.