Skip to content
/ etcd Public
forked from etcd-io/etcd

Commit

Permalink
clientv3: Introduce custom retry interceptor based on go-grpc-middlew…
Browse files Browse the repository at this point in the history
…are/retry
  • Loading branch information
jpbetz authored and gyuho committed May 21, 2018
1 parent 059ad01 commit efdc55f
Show file tree
Hide file tree
Showing 15 changed files with 487 additions and 113 deletions.
47 changes: 39 additions & 8 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/coreos/etcd/clientv3/balancer/picker"
"github.com/coreos/etcd/clientv3/balancer/resolver/endpoint"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils"
"go.uber.org/zap"

"google.golang.org/grpc"
Expand All @@ -45,13 +46,15 @@ var (
ErrOldCluster = errors.New("etcdclient: old cluster version")

roundRobinBalancerName = fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String())
logger *zap.Logger
)

func init() {
logger = zap.NewNop() // zap.NewExample()
balancer.RegisterBuilder(balancer.Config{
Policy: picker.RoundrobinBalanced,
Name: roundRobinBalancerName,
Logger: zap.NewNop(), // zap.NewExample(),
Logger: logger,
})
}

Expand Down Expand Up @@ -263,6 +266,18 @@ func (c *Client) dialSetupOpts(target string, dopts ...grpc.DialOption) (opts []
opts = append(opts, grpc.WithInsecure())
}

// Interceptor retry and backoff.
// TODO: Replace all of clientv3/retry.go with interceptor based retry, or with
// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy
// once it is available.
rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction))
opts = append(opts,
// Disable stream retry by default since go-grpc-middleware/retry does not support client streams.
// Streams that are safe to retry are enabled individually.
grpc.WithStreamInterceptor(c.streamClientInterceptor(logger, withMax(0), rrBackoff)),
grpc.WithUnaryInterceptor(c.unaryClientInterceptor(logger, withMax(defaultUnaryMaxRetries), rrBackoff)),
)

return opts, nil
}

Expand Down Expand Up @@ -386,14 +401,14 @@ func newClient(cfg *Config) (*Client, error) {

ctx, cancel := context.WithCancel(baseCtx)
client := &Client{
conn: nil,
cfg: *cfg,
creds: creds,
ctx: ctx,
cancel: cancel,
mu: new(sync.Mutex),
callOpts: defaultCallOpts,
conn: nil,
cfg: *cfg,
creds: creds,
ctx: ctx,
cancel: cancel,
mu: new(sync.Mutex),
}

if cfg.Username != "" && cfg.Password != "" {
client.Username = cfg.Username
client.Password = cfg.Password
Expand Down Expand Up @@ -461,6 +476,22 @@ func newClient(cfg *Config) (*Client, error) {
return client, nil
}

// roundRobinQuorumBackoff retries against quorum between each backoff.
// This is intended for use with a round robin load balancer.
func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration, jitterFraction float64) backoffFunc {
return func(attempt uint) time.Duration {
// after each round robin across quorum, backoff for our wait between duration
n := uint(len(c.Endpoints()))
quorum := (n/2 + 1)
if attempt%quorum == 0 {
logger.Info("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction))
return backoffutils.JitterUp(waitBetween, jitterFraction)
}
logger.Info("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum))
return 0
}
}

func (c *Client) checkVersion() (err error) {
var wg sync.WaitGroup
errc := make(chan error, len(c.cfg.Endpoints))
Expand Down
7 changes: 3 additions & 4 deletions clientv3/integration/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ func TestDialTLSNoConfig(t *testing.T) {

// TODO: this should not be required when we set grpc.WithBlock()
if c != nil {
_, err = c.KV.Get(context.Background(), "/")
ctx, cancel := context.WithTimeout(context.Background(), integration.RequestWaitTimeout)
_, err = c.KV.Get(ctx, "/")
cancel()
}
if !isClientTimeout(err) {
t.Fatalf("expected dial timeout error, got %v", err)
Expand Down Expand Up @@ -157,9 +159,6 @@ func TestSwitchSetEndpoints(t *testing.T) {

cli.SetEndpoints(eps...)

// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, cli)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if _, err := cli.Get(ctx, "foo"); err != nil {
Expand Down
17 changes: 5 additions & 12 deletions clientv3/integration/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,15 +438,12 @@ func TestKVGetErrConnClosed(t *testing.T) {

cli := clus.Client(0)

// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, cli)

donec := make(chan struct{})
go func() {
defer close(donec)
_, err := cli.Get(context.TODO(), "foo")
if err != nil && err != context.Canceled && err != grpc.ErrClientConnClosing && !isServerUnavailable(err) {
t.Fatalf("expected %v, %v or server unavailable, got %v", context.Canceled, grpc.ErrClientConnClosing, err)
if err != nil && err != context.Canceled && err != grpc.ErrClientConnClosing {
t.Fatalf("expected %v or %v, got %v", context.Canceled, grpc.ErrClientConnClosing, err)
}
}()

Expand Down Expand Up @@ -689,8 +686,6 @@ func TestKVGetRetry(t *testing.T) {

donec := make(chan struct{})
go func() {
// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, kv)
// Get will fail, but reconnect will trigger
gresp, gerr := kv.Get(ctx, "foo")
if gerr != nil {
Expand Down Expand Up @@ -741,8 +736,6 @@ func TestKVPutFailGetRetry(t *testing.T) {

donec := make(chan struct{})
go func() {
// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, kv)
// Get will fail, but reconnect will trigger
gresp, gerr := kv.Get(context.TODO(), "foo")
if gerr != nil {
Expand Down Expand Up @@ -800,7 +793,7 @@ func TestKVGetStoppedServerAndClose(t *testing.T) {
// this Get fails and triggers an asynchronous connection retry
_, err := cli.Get(ctx, "abc")
cancel()
if err != nil && !(isServerUnavailable(err) || isCanceled(err) || isClientTimeout(err)) {
if err != nil && !(isCanceled(err) || isClientTimeout(err)) {
t.Fatal(err)
}
}
Expand All @@ -822,15 +815,15 @@ func TestKVPutStoppedServerAndClose(t *testing.T) {
// grpc finds out the original connection is down due to the member shutdown.
_, err := cli.Get(ctx, "abc")
cancel()
if err != nil && !(isServerUnavailable(err) || isCanceled(err) || isClientTimeout(err)) {
if err != nil && !(isCanceled(err) || isClientTimeout(err)) {
t.Fatal(err)
}

ctx, cancel = context.WithTimeout(context.TODO(), time.Second)
// this Put fails and triggers an asynchronous connection retry
_, err = cli.Put(ctx, "abc", "123")
cancel()
if err != nil && !(isServerUnavailable(err) || isCanceled(err) || isClientTimeout(err)) {
if err != nil && !(isCanceled(err) || isClientTimeout(err) || isUnavailable(err)) {
t.Fatal(err)
}
}
Expand Down
10 changes: 7 additions & 3 deletions clientv3/integration/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ func TestLeaseKeepAlive(t *testing.T) {
t.Errorf("chan is closed, want not closed")
}

if kresp == nil {
t.Fatalf("unexpected null response")
}

if kresp.ID != resp.ID {
t.Errorf("ID = %x, want %x", kresp.ID, resp.ID)
}
Expand Down Expand Up @@ -292,7 +296,7 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {
go func() {
defer close(donec)
_, err := cli.Grant(context.TODO(), 5)
if err != nil && err != grpc.ErrClientConnClosing && err != context.Canceled && !isServerUnavailable(err) {
if err != nil && err != grpc.ErrClientConnClosing && err != context.Canceled {
// grpc.ErrClientConnClosing if grpc-go balancer calls 'Get' after client.Close.
// context.Canceled if grpc-go balancer calls 'Get' with an inflight client.Close.
t.Fatalf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
Expand Down Expand Up @@ -324,7 +328,7 @@ func TestLeaseGrantNewAfterClose(t *testing.T) {

donec := make(chan struct{})
go func() {
if _, err := cli.Grant(context.TODO(), 5); err != context.Canceled && err != grpc.ErrClientConnClosing && !isServerUnavailable(err) {
if _, err := cli.Grant(context.TODO(), 5); err != context.Canceled && err != grpc.ErrClientConnClosing {
t.Fatalf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
}
close(donec)
Expand Down Expand Up @@ -356,7 +360,7 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) {

donec := make(chan struct{})
go func() {
if _, err := cli.Revoke(context.TODO(), leaseID); err != context.Canceled && err != grpc.ErrClientConnClosing && !isServerUnavailable(err) {
if _, err := cli.Revoke(context.TODO(), leaseID); err != context.Canceled && err != grpc.ErrClientConnClosing {
t.Fatalf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
}
close(donec)
Expand Down
13 changes: 2 additions & 11 deletions clientv3/integration/leasing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,9 +869,6 @@ func TestLeasingTxnCancel(t *testing.T) {
}
clus.Members[0].Stop(t)

// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, clus.Client(1))

// wait for leader election, if any
if _, err = clus.Client(1).Get(context.TODO(), "abc"); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1536,9 +1533,6 @@ func TestLeasingReconnectOwnerConsistency(t *testing.T) {
}
}

// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, lkv)

lresp, lerr := lkv.Get(context.TODO(), "k")
if lerr != nil {
t.Fatal(lerr)
Expand Down Expand Up @@ -1820,9 +1814,6 @@ func TestLeasingTxnOwnerPutBranch(t *testing.T) {
// lkv shouldn't need to call out to server for updated leased keys
clus.Members[0].Stop(t)

// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, clus.Client(1))

for i := 0; i < n; i++ {
k := fmt.Sprintf("tree/%d", i)
lkvResp, err := lkv.Get(context.TODO(), k)
Expand Down Expand Up @@ -1994,7 +1985,7 @@ func TestLeasingSessionExpireCancel(t *testing.T) {

select {
case err := <-errc:
if !(err == ctx.Err() || isServerUnavailable(err)) {
if err != ctx.Err() {
t.Errorf("#%d: expected %v of server unavailable, got %v", i, ctx.Err(), err)
}
case <-time.After(5 * time.Second):
Expand Down Expand Up @@ -2025,7 +2016,7 @@ func waitForExpireAck(t *testing.T, kv clientv3.KV) {
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
_, err := kv.Get(ctx, "abc")
cancel()
if err == ctx.Err() || isServerUnavailable(err) {
if err == ctx.Err() {
return
} else if err != nil {
t.Logf("current error: %v", err)
Expand Down
1 change: 0 additions & 1 deletion clientv3/integration/maintenance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ func TestMaintenanceSnapshotErrorInflight(t *testing.T) {
clus.Members[0].Restart(t)

cli := clus.RandClient()
integration.WaitClientV3(t, cli)
// reading snapshot with canceled context should error out
ctx, cancel := context.WithCancel(context.Background())
rc1, err := cli.Snapshot(ctx)
Expand Down
3 changes: 0 additions & 3 deletions clientv3/integration/network_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,6 @@ func TestBalancerUnderNetworkPartitionLinearizableGetLeaderElection(t *testing.T
// isolate leader
clus.Members[lead].InjectPartition(t, clus.Members[(lead+1)%3], clus.Members[(lead+2)%3])

// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, cli)

// expects balancer to round robin to leader within two attempts
for i := 0; i < 2; i++ {
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
Expand Down
18 changes: 8 additions & 10 deletions clientv3/integration/server_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package integration
import (
"bytes"
"context"
"reflect"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -352,11 +351,7 @@ func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizabl
}
cancel()
if err != nil {
if linearizable && isServerUnavailable(err) {
t.Logf("TODO: FIX THIS after balancer rewrite! %v %v", reflect.TypeOf(err), err)
} else {
t.Fatalf("expected linearizable=true and a server unavailable error, but got linearizable=%t and '%v'", linearizable, err)
}
t.Fatalf("unexpected error: %v", err)
}
}()

Expand Down Expand Up @@ -402,19 +397,22 @@ func isClientTimeout(err error) bool {
return code == codes.DeadlineExceeded || ev.Message() == transport.ErrConnClosing.Desc
}

func isServerUnavailable(err error) bool {
func isCanceled(err error) bool {
if err == nil {
return false
}
if err == context.Canceled {
return true
}
ev, ok := status.FromError(err)
if !ok {
return false
}
code := ev.Code()
return code == codes.Unavailable
return code == codes.Canceled
}

func isCanceled(err error) bool {
func isUnavailable(err error) bool {
if err == nil {
return false
}
Expand All @@ -426,5 +424,5 @@ func isCanceled(err error) bool {
return false
}
code := ev.Code()
return code == codes.Canceled
return code == codes.Unavailable
}
3 changes: 0 additions & 3 deletions clientv3/integration/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ func TestTxnWriteFail(t *testing.T) {
t.Fatalf("timed out waiting for txn fail")
case <-txnc:
}
// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, kv)

// and ensure the put didn't take
gresp, gerr := clus.Client(1).Get(context.TODO(), "foo")
if gerr != nil {
Expand Down
2 changes: 1 addition & 1 deletion clientv3/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) {
// resetRecv opens a new lease stream and starts sending keep alive requests.
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
sctx, cancel := context.WithCancel(l.stopCtx)
stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...)
stream, err := l.remote.LeaseKeepAlive(sctx, append(l.callOpts, withMax(0))...)
if err != nil {
cancel()
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion clientv3/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*
}

func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, m.callOpts...)
ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, append(m.callOpts, withMax(defaultStreamMaxRetries))...)
if err != nil {
return nil, toErr(ctx, err)
}
Expand Down
17 changes: 17 additions & 0 deletions clientv3/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package clientv3

import (
"math"
"time"

"google.golang.org/grpc"
)
Expand All @@ -37,6 +38,22 @@ var (
// because range response can easily exceed request send limits
// Default to math.MaxInt32; writes exceeding server-side send limit fails anyway
defaultMaxCallRecvMsgSize = grpc.MaxCallRecvMsgSize(math.MaxInt32)

// client-side non-streaming retry limit, only applied to requests where server responds with
// a error code clearly indicating it was unable to process the request such as codes.Unavailable.
// If set to 0, retry is disabled.
defaultUnaryMaxRetries uint = 100

// client-side streaming retry limit, only applied to requests where server responds with
// a error code clearly indicating it was unable to process the request such as codes.Unavailable.
// If set to 0, retry is disabled.
defaultStreamMaxRetries uint = ^uint(0) // max uint

// client-side retry backoff wait between requests.
defaultBackoffWaitBetween = 25 * time.Millisecond

// client-side retry backoff default jitter fraction.
defaultBackoffJitterFraction = 0.10
)

// defaultCallOpts defines a list of default "gRPC.CallOption".
Expand Down
Loading

0 comments on commit efdc55f

Please sign in to comment.