From efdc55fd3d86d3e85255962033e2b529bbba657d Mon Sep 17 00:00:00 2001 From: Joe Betz Date: Fri, 11 May 2018 12:53:20 -0700 Subject: [PATCH] clientv3: Introduce custom retry interceptor based on go-grpc-middleware/retry --- clientv3/client.go | 47 ++- clientv3/integration/dial_test.go | 7 +- clientv3/integration/kv_test.go | 17 +- clientv3/integration/lease_test.go | 10 +- clientv3/integration/leasing_test.go | 13 +- clientv3/integration/maintenance_test.go | 1 - .../integration/network_partition_test.go | 3 - clientv3/integration/server_shutdown_test.go | 18 +- clientv3/integration/txn_test.go | 3 - clientv3/lease.go | 2 +- clientv3/maintenance.go | 2 +- clientv3/options.go | 17 + clientv3/retry.go | 98 +++-- clientv3/retry_interceptor.go | 355 ++++++++++++++++++ integration/v3_alarm_test.go | 7 +- 15 files changed, 487 insertions(+), 113 deletions(-) create mode 100644 clientv3/retry_interceptor.go diff --git a/clientv3/client.go b/clientv3/client.go index 2a4a6d04f953..f60bc00eb735 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -30,6 +30,7 @@ import ( "github.com/coreos/etcd/clientv3/balancer/picker" "github.com/coreos/etcd/clientv3/balancer/resolver/endpoint" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + "github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils" "go.uber.org/zap" "google.golang.org/grpc" @@ -45,13 +46,15 @@ var ( ErrOldCluster = errors.New("etcdclient: old cluster version") roundRobinBalancerName = fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String()) + logger *zap.Logger ) func init() { + logger = zap.NewNop() // zap.NewExample() balancer.RegisterBuilder(balancer.Config{ Policy: picker.RoundrobinBalanced, Name: roundRobinBalancerName, - Logger: zap.NewNop(), // zap.NewExample(), + Logger: logger, }) } @@ -263,6 +266,18 @@ func (c *Client) dialSetupOpts(target string, dopts ...grpc.DialOption) (opts [] opts = append(opts, grpc.WithInsecure()) } + // Interceptor retry and backoff. + // TODO: Replace all of clientv3/retry.go with interceptor based retry, or with + // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy + // once it is available. + rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction)) + opts = append(opts, + // Disable stream retry by default since go-grpc-middleware/retry does not support client streams. + // Streams that are safe to retry are enabled individually. + grpc.WithStreamInterceptor(c.streamClientInterceptor(logger, withMax(0), rrBackoff)), + grpc.WithUnaryInterceptor(c.unaryClientInterceptor(logger, withMax(defaultUnaryMaxRetries), rrBackoff)), + ) + return opts, nil } @@ -386,14 +401,14 @@ func newClient(cfg *Config) (*Client, error) { ctx, cancel := context.WithCancel(baseCtx) client := &Client{ - conn: nil, - cfg: *cfg, - creds: creds, - ctx: ctx, - cancel: cancel, - mu: new(sync.Mutex), - callOpts: defaultCallOpts, + conn: nil, + cfg: *cfg, + creds: creds, + ctx: ctx, + cancel: cancel, + mu: new(sync.Mutex), } + if cfg.Username != "" && cfg.Password != "" { client.Username = cfg.Username client.Password = cfg.Password @@ -461,6 +476,22 @@ func newClient(cfg *Config) (*Client, error) { return client, nil } +// roundRobinQuorumBackoff retries against quorum between each backoff. +// This is intended for use with a round robin load balancer. +func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration, jitterFraction float64) backoffFunc { + return func(attempt uint) time.Duration { + // after each round robin across quorum, backoff for our wait between duration + n := uint(len(c.Endpoints())) + quorum := (n/2 + 1) + if attempt%quorum == 0 { + logger.Info("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction)) + return backoffutils.JitterUp(waitBetween, jitterFraction) + } + logger.Info("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum)) + return 0 + } +} + func (c *Client) checkVersion() (err error) { var wg sync.WaitGroup errc := make(chan error, len(c.cfg.Endpoints)) diff --git a/clientv3/integration/dial_test.go b/clientv3/integration/dial_test.go index 587bcd565770..574f224492f5 100644 --- a/clientv3/integration/dial_test.go +++ b/clientv3/integration/dial_test.go @@ -83,7 +83,9 @@ func TestDialTLSNoConfig(t *testing.T) { // TODO: this should not be required when we set grpc.WithBlock() if c != nil { - _, err = c.KV.Get(context.Background(), "/") + ctx, cancel := context.WithTimeout(context.Background(), integration.RequestWaitTimeout) + _, err = c.KV.Get(ctx, "/") + cancel() } if !isClientTimeout(err) { t.Fatalf("expected dial timeout error, got %v", err) @@ -157,9 +159,6 @@ func TestSwitchSetEndpoints(t *testing.T) { cli.SetEndpoints(eps...) - // TODO: Remove wait once the new grpc load balancer provides retry. - integration.WaitClientV3(t, cli) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if _, err := cli.Get(ctx, "foo"); err != nil { diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index 994effa6fd60..394a8242b9d9 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -438,15 +438,12 @@ func TestKVGetErrConnClosed(t *testing.T) { cli := clus.Client(0) - // TODO: Remove wait once the new grpc load balancer provides retry. - integration.WaitClientV3(t, cli) - donec := make(chan struct{}) go func() { defer close(donec) _, err := cli.Get(context.TODO(), "foo") - if err != nil && err != context.Canceled && err != grpc.ErrClientConnClosing && !isServerUnavailable(err) { - t.Fatalf("expected %v, %v or server unavailable, got %v", context.Canceled, grpc.ErrClientConnClosing, err) + if err != nil && err != context.Canceled && err != grpc.ErrClientConnClosing { + t.Fatalf("expected %v or %v, got %v", context.Canceled, grpc.ErrClientConnClosing, err) } }() @@ -689,8 +686,6 @@ func TestKVGetRetry(t *testing.T) { donec := make(chan struct{}) go func() { - // TODO: Remove wait once the new grpc load balancer provides retry. - integration.WaitClientV3(t, kv) // Get will fail, but reconnect will trigger gresp, gerr := kv.Get(ctx, "foo") if gerr != nil { @@ -741,8 +736,6 @@ func TestKVPutFailGetRetry(t *testing.T) { donec := make(chan struct{}) go func() { - // TODO: Remove wait once the new grpc load balancer provides retry. - integration.WaitClientV3(t, kv) // Get will fail, but reconnect will trigger gresp, gerr := kv.Get(context.TODO(), "foo") if gerr != nil { @@ -800,7 +793,7 @@ func TestKVGetStoppedServerAndClose(t *testing.T) { // this Get fails and triggers an asynchronous connection retry _, err := cli.Get(ctx, "abc") cancel() - if err != nil && !(isServerUnavailable(err) || isCanceled(err) || isClientTimeout(err)) { + if err != nil && !(isCanceled(err) || isClientTimeout(err)) { t.Fatal(err) } } @@ -822,7 +815,7 @@ func TestKVPutStoppedServerAndClose(t *testing.T) { // grpc finds out the original connection is down due to the member shutdown. _, err := cli.Get(ctx, "abc") cancel() - if err != nil && !(isServerUnavailable(err) || isCanceled(err) || isClientTimeout(err)) { + if err != nil && !(isCanceled(err) || isClientTimeout(err)) { t.Fatal(err) } @@ -830,7 +823,7 @@ func TestKVPutStoppedServerAndClose(t *testing.T) { // this Put fails and triggers an asynchronous connection retry _, err = cli.Put(ctx, "abc", "123") cancel() - if err != nil && !(isServerUnavailable(err) || isCanceled(err) || isClientTimeout(err)) { + if err != nil && !(isCanceled(err) || isClientTimeout(err) || isUnavailable(err)) { t.Fatal(err) } } diff --git a/clientv3/integration/lease_test.go b/clientv3/integration/lease_test.go index 4329f7ac91c0..75a0987c52e5 100644 --- a/clientv3/integration/lease_test.go +++ b/clientv3/integration/lease_test.go @@ -145,6 +145,10 @@ func TestLeaseKeepAlive(t *testing.T) { t.Errorf("chan is closed, want not closed") } + if kresp == nil { + t.Fatalf("unexpected null response") + } + if kresp.ID != resp.ID { t.Errorf("ID = %x, want %x", kresp.ID, resp.ID) } @@ -292,7 +296,7 @@ func TestLeaseGrantErrConnClosed(t *testing.T) { go func() { defer close(donec) _, err := cli.Grant(context.TODO(), 5) - if err != nil && err != grpc.ErrClientConnClosing && err != context.Canceled && !isServerUnavailable(err) { + if err != nil && err != grpc.ErrClientConnClosing && err != context.Canceled { // grpc.ErrClientConnClosing if grpc-go balancer calls 'Get' after client.Close. // context.Canceled if grpc-go balancer calls 'Get' with an inflight client.Close. t.Fatalf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err) @@ -324,7 +328,7 @@ func TestLeaseGrantNewAfterClose(t *testing.T) { donec := make(chan struct{}) go func() { - if _, err := cli.Grant(context.TODO(), 5); err != context.Canceled && err != grpc.ErrClientConnClosing && !isServerUnavailable(err) { + if _, err := cli.Grant(context.TODO(), 5); err != context.Canceled && err != grpc.ErrClientConnClosing { t.Fatalf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err) } close(donec) @@ -356,7 +360,7 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) { donec := make(chan struct{}) go func() { - if _, err := cli.Revoke(context.TODO(), leaseID); err != context.Canceled && err != grpc.ErrClientConnClosing && !isServerUnavailable(err) { + if _, err := cli.Revoke(context.TODO(), leaseID); err != context.Canceled && err != grpc.ErrClientConnClosing { t.Fatalf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err) } close(donec) diff --git a/clientv3/integration/leasing_test.go b/clientv3/integration/leasing_test.go index ff82ae4b3e94..7c3dcc3f1796 100644 --- a/clientv3/integration/leasing_test.go +++ b/clientv3/integration/leasing_test.go @@ -869,9 +869,6 @@ func TestLeasingTxnCancel(t *testing.T) { } clus.Members[0].Stop(t) - // TODO: Remove wait once the new grpc load balancer provides retry. - integration.WaitClientV3(t, clus.Client(1)) - // wait for leader election, if any if _, err = clus.Client(1).Get(context.TODO(), "abc"); err != nil { t.Fatal(err) @@ -1536,9 +1533,6 @@ func TestLeasingReconnectOwnerConsistency(t *testing.T) { } } - // TODO: Remove wait once the new grpc load balancer provides retry. - integration.WaitClientV3(t, lkv) - lresp, lerr := lkv.Get(context.TODO(), "k") if lerr != nil { t.Fatal(lerr) @@ -1820,9 +1814,6 @@ func TestLeasingTxnOwnerPutBranch(t *testing.T) { // lkv shouldn't need to call out to server for updated leased keys clus.Members[0].Stop(t) - // TODO: Remove wait once the new grpc load balancer provides retry. - integration.WaitClientV3(t, clus.Client(1)) - for i := 0; i < n; i++ { k := fmt.Sprintf("tree/%d", i) lkvResp, err := lkv.Get(context.TODO(), k) @@ -1994,7 +1985,7 @@ func TestLeasingSessionExpireCancel(t *testing.T) { select { case err := <-errc: - if !(err == ctx.Err() || isServerUnavailable(err)) { + if err != ctx.Err() { t.Errorf("#%d: expected %v of server unavailable, got %v", i, ctx.Err(), err) } case <-time.After(5 * time.Second): @@ -2025,7 +2016,7 @@ func waitForExpireAck(t *testing.T, kv clientv3.KV) { ctx, cancel := context.WithTimeout(context.TODO(), time.Second) _, err := kv.Get(ctx, "abc") cancel() - if err == ctx.Err() || isServerUnavailable(err) { + if err == ctx.Err() { return } else if err != nil { t.Logf("current error: %v", err) diff --git a/clientv3/integration/maintenance_test.go b/clientv3/integration/maintenance_test.go index 0609c1c812bd..b4b73614a36c 100644 --- a/clientv3/integration/maintenance_test.go +++ b/clientv3/integration/maintenance_test.go @@ -157,7 +157,6 @@ func TestMaintenanceSnapshotErrorInflight(t *testing.T) { clus.Members[0].Restart(t) cli := clus.RandClient() - integration.WaitClientV3(t, cli) // reading snapshot with canceled context should error out ctx, cancel := context.WithCancel(context.Background()) rc1, err := cli.Snapshot(ctx) diff --git a/clientv3/integration/network_partition_test.go b/clientv3/integration/network_partition_test.go index 41310f75400d..f65807430eff 100644 --- a/clientv3/integration/network_partition_test.go +++ b/clientv3/integration/network_partition_test.go @@ -186,9 +186,6 @@ func TestBalancerUnderNetworkPartitionLinearizableGetLeaderElection(t *testing.T // isolate leader clus.Members[lead].InjectPartition(t, clus.Members[(lead+1)%3], clus.Members[(lead+2)%3]) - // TODO: Remove wait once the new grpc load balancer provides retry. - integration.WaitClientV3(t, cli) - // expects balancer to round robin to leader within two attempts for i := 0; i < 2; i++ { ctx, cancel := context.WithTimeout(context.TODO(), timeout) diff --git a/clientv3/integration/server_shutdown_test.go b/clientv3/integration/server_shutdown_test.go index d1e5507cc5c8..1f889edac03b 100644 --- a/clientv3/integration/server_shutdown_test.go +++ b/clientv3/integration/server_shutdown_test.go @@ -17,7 +17,6 @@ package integration import ( "bytes" "context" - "reflect" "strings" "testing" "time" @@ -352,11 +351,7 @@ func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizabl } cancel() if err != nil { - if linearizable && isServerUnavailable(err) { - t.Logf("TODO: FIX THIS after balancer rewrite! %v %v", reflect.TypeOf(err), err) - } else { - t.Fatalf("expected linearizable=true and a server unavailable error, but got linearizable=%t and '%v'", linearizable, err) - } + t.Fatalf("unexpected error: %v", err) } }() @@ -402,19 +397,22 @@ func isClientTimeout(err error) bool { return code == codes.DeadlineExceeded || ev.Message() == transport.ErrConnClosing.Desc } -func isServerUnavailable(err error) bool { +func isCanceled(err error) bool { if err == nil { return false } + if err == context.Canceled { + return true + } ev, ok := status.FromError(err) if !ok { return false } code := ev.Code() - return code == codes.Unavailable + return code == codes.Canceled } -func isCanceled(err error) bool { +func isUnavailable(err error) bool { if err == nil { return false } @@ -426,5 +424,5 @@ func isCanceled(err error) bool { return false } code := ev.Code() - return code == codes.Canceled + return code == codes.Unavailable } diff --git a/clientv3/integration/txn_test.go b/clientv3/integration/txn_test.go index 672bf79032a3..ca49ec075d84 100644 --- a/clientv3/integration/txn_test.go +++ b/clientv3/integration/txn_test.go @@ -79,9 +79,6 @@ func TestTxnWriteFail(t *testing.T) { t.Fatalf("timed out waiting for txn fail") case <-txnc: } - // TODO: Remove wait once the new grpc load balancer provides retry. - integration.WaitClientV3(t, kv) - // and ensure the put didn't take gresp, gerr := clus.Client(1).Get(context.TODO(), "foo") if gerr != nil { diff --git a/clientv3/lease.go b/clientv3/lease.go index 4097b3afa2ad..3d5ff4f72268 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -466,7 +466,7 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) { // resetRecv opens a new lease stream and starts sending keep alive requests. func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) { sctx, cancel := context.WithCancel(l.stopCtx) - stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...) + stream, err := l.remote.LeaseKeepAlive(sctx, append(l.callOpts, withMax(0))...) if err != nil { cancel() return nil, err diff --git a/clientv3/maintenance.go b/clientv3/maintenance.go index a6a2e7dec677..f814874f2cb4 100644 --- a/clientv3/maintenance.go +++ b/clientv3/maintenance.go @@ -188,7 +188,7 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (* } func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) { - ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, m.callOpts...) + ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, append(m.callOpts, withMax(defaultStreamMaxRetries))...) if err != nil { return nil, toErr(ctx, err) } diff --git a/clientv3/options.go b/clientv3/options.go index a98b19f9b4b1..e158be60ea35 100644 --- a/clientv3/options.go +++ b/clientv3/options.go @@ -16,6 +16,7 @@ package clientv3 import ( "math" + "time" "google.golang.org/grpc" ) @@ -37,6 +38,22 @@ var ( // because range response can easily exceed request send limits // Default to math.MaxInt32; writes exceeding server-side send limit fails anyway defaultMaxCallRecvMsgSize = grpc.MaxCallRecvMsgSize(math.MaxInt32) + + // client-side non-streaming retry limit, only applied to requests where server responds with + // a error code clearly indicating it was unable to process the request such as codes.Unavailable. + // If set to 0, retry is disabled. + defaultUnaryMaxRetries uint = 100 + + // client-side streaming retry limit, only applied to requests where server responds with + // a error code clearly indicating it was unable to process the request such as codes.Unavailable. + // If set to 0, retry is disabled. + defaultStreamMaxRetries uint = ^uint(0) // max uint + + // client-side retry backoff wait between requests. + defaultBackoffWaitBetween = 25 * time.Millisecond + + // client-side retry backoff default jitter fraction. + defaultBackoffJitterFraction = 0.10 ) // defaultCallOpts defines a list of default "gRPC.CallOption". diff --git a/clientv3/retry.go b/clientv3/retry.go index 6c7fcfcf6b64..66ec99dfe6ed 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -32,6 +32,17 @@ const ( nonRepeatable ) +func (rp retryPolicy) String() string { + switch rp { + case repeatable: + return "repeatable" + case nonRepeatable: + return "nonRepeatable" + default: + return "UNKNOWN" + } +} + type rpcFunc func(ctx context.Context) error type retryRPCFunc func(context.Context, rpcFunc, retryPolicy) error type retryStopErrFunc func(error) bool @@ -78,8 +89,6 @@ func isNonRepeatableStopError(err error) bool { return desc != "there is no address available" && desc != "there is no connection available" } -// TODO: Remove retry logic entirely now that we're using the new grpc load balancer interface? -/* func (c *Client) newRetryWrapper() retryRPCFunc { return func(rpcCtx context.Context, f rpcFunc, rp retryPolicy) error { var isStop retryStopErrFunc @@ -90,21 +99,14 @@ func (c *Client) newRetryWrapper() retryRPCFunc { isStop = isNonRepeatableStopError } for { - if err := readyWait(rpcCtx, c.ctx, c.balancer.ConnectNotify()); err != nil { - return err - } - pinned := c.balancer.Pinned() err := f(rpcCtx) if err == nil { return nil } - lg.Lvl(4).Infof("clientv3/retry: error %q on pinned endpoint %q", err.Error(), pinned) + lg.Lvl(4).Infof("clientv3/retry: error %q", err.Error()) if s, ok := status.FromError(err); ok && (s.Code() == codes.Unavailable || s.Code() == codes.DeadlineExceeded || s.Code() == codes.Internal) { - // mark this before endpoint switch is triggered - c.balancer.HostPortError(pinned, err) - c.balancer.Next() - lg.Lvl(4).Infof("clientv3/retry: switching from %q due to error %q", pinned, err.Error()) + lg.Lvl(4).Infof("clientv3/retry: retrying due to error %q", err.Error()) } if isStop(err) { @@ -112,23 +114,21 @@ func (c *Client) newRetryWrapper() retryRPCFunc { } } } -}*/ +} -/* func (c *Client) newAuthRetryWrapper(retryf retryRPCFunc) retryRPCFunc { return func(rpcCtx context.Context, f rpcFunc, rp retryPolicy) error { for { - pinned := c.balancer.Pinned() err := retryf(rpcCtx, f, rp) if err == nil { return nil } - lg.Lvl(4).Infof("clientv3/auth-retry: error %q on pinned endpoint %q", err.Error(), pinned) + lg.Lvl(4).Infof("clientv3/auth-retry: error %q", err.Error()) // always stop retry on etcd errors other than invalid auth token if rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken { gterr := c.getToken(rpcCtx) if gterr != nil { - lg.Lvl(4).Infof("clientv3/auth-retry: cannot retry due to error %q(%q) on pinned endpoint %q", err.Error(), gterr.Error(), pinned) + lg.Lvl(4).Infof("clientv3/auth-retry: cannot retry due to error %q(%q)", err.Error(), gterr.Error()) return err // return the original error for simplicity } continue @@ -136,7 +136,7 @@ func (c *Client) newAuthRetryWrapper(retryf retryRPCFunc) retryRPCFunc { return err } } -}*/ +} type retryKVClient struct { kc pb.KVClient @@ -145,16 +145,14 @@ type retryKVClient struct { // RetryKVClient implements a KVClient. func RetryKVClient(c *Client) pb.KVClient { - return pb.NewKVClient(c.conn) - // TODO: Remove retry logic entirely now that we're using the new grpc load balancer interface? - /*return &retryKVClient{ + return &retryKVClient{ kc: pb.NewKVClient(c.conn), retryf: c.newAuthRetryWrapper(c.newRetryWrapper()), - }*/ + } } func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) { err = rkv.retryf(ctx, func(rctx context.Context) error { - resp, err = rkv.kc.Range(rctx, in, opts...) + resp, err = rkv.kc.Range(rctx, in, append(opts, withRetryPolicy(repeatable))...) return err }, repeatable) return resp, err @@ -200,17 +198,15 @@ type retryLeaseClient struct { // RetryLeaseClient implements a LeaseClient. func RetryLeaseClient(c *Client) pb.LeaseClient { - return pb.NewLeaseClient(c.conn) - // TODO: Remove retry logic entirely now that we're using the new grpc load balancer interface? - /*return &retryLeaseClient{ + return &retryLeaseClient{ lc: pb.NewLeaseClient(c.conn), retryf: c.newAuthRetryWrapper(c.newRetryWrapper()), - }*/ + } } func (rlc *retryLeaseClient) LeaseTimeToLive(ctx context.Context, in *pb.LeaseTimeToLiveRequest, opts ...grpc.CallOption) (resp *pb.LeaseTimeToLiveResponse, err error) { err = rlc.retryf(ctx, func(rctx context.Context) error { - resp, err = rlc.lc.LeaseTimeToLive(rctx, in, opts...) + resp, err = rlc.lc.LeaseTimeToLive(rctx, in, append(opts, withRetryPolicy(repeatable))...) return err }, repeatable) return resp, err @@ -218,7 +214,7 @@ func (rlc *retryLeaseClient) LeaseTimeToLive(ctx context.Context, in *pb.LeaseTi func (rlc *retryLeaseClient) LeaseLeases(ctx context.Context, in *pb.LeaseLeasesRequest, opts ...grpc.CallOption) (resp *pb.LeaseLeasesResponse, err error) { err = rlc.retryf(ctx, func(rctx context.Context) error { - resp, err = rlc.lc.LeaseLeases(rctx, in, opts...) + resp, err = rlc.lc.LeaseLeases(rctx, in, append(opts, withRetryPolicy(repeatable))...) return err }, repeatable) return resp, err @@ -226,7 +222,7 @@ func (rlc *retryLeaseClient) LeaseLeases(ctx context.Context, in *pb.LeaseLeases func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) { err = rlc.retryf(ctx, func(rctx context.Context) error { - resp, err = rlc.lc.LeaseGrant(rctx, in, opts...) + resp, err = rlc.lc.LeaseGrant(rctx, in, append(opts, withRetryPolicy(repeatable))...) return err }, repeatable) return resp, err @@ -235,7 +231,7 @@ func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRe func (rlc *retryLeaseClient) LeaseRevoke(ctx context.Context, in *pb.LeaseRevokeRequest, opts ...grpc.CallOption) (resp *pb.LeaseRevokeResponse, err error) { err = rlc.retryf(ctx, func(rctx context.Context) error { - resp, err = rlc.lc.LeaseRevoke(rctx, in, opts...) + resp, err = rlc.lc.LeaseRevoke(rctx, in, append(opts, withRetryPolicy(repeatable))...) return err }, repeatable) return resp, err @@ -243,7 +239,7 @@ func (rlc *retryLeaseClient) LeaseRevoke(ctx context.Context, in *pb.LeaseRevoke func (rlc *retryLeaseClient) LeaseKeepAlive(ctx context.Context, opts ...grpc.CallOption) (stream pb.Lease_LeaseKeepAliveClient, err error) { err = rlc.retryf(ctx, func(rctx context.Context) error { - stream, err = rlc.lc.LeaseKeepAlive(rctx, opts...) + stream, err = rlc.lc.LeaseKeepAlive(rctx, append(opts, withRetryPolicy(repeatable))...) return err }, repeatable) return stream, err @@ -256,17 +252,15 @@ type retryClusterClient struct { // RetryClusterClient implements a ClusterClient. func RetryClusterClient(c *Client) pb.ClusterClient { - return pb.NewClusterClient(c.conn) - // TODO: Remove retry logic entirely now that we're using the new grpc load balancer interface? - /*return &retryClusterClient{ + return &retryClusterClient{ cc: pb.NewClusterClient(c.conn), retryf: c.newRetryWrapper(), - }*/ + } } func (rcc *retryClusterClient) MemberList(ctx context.Context, in *pb.MemberListRequest, opts ...grpc.CallOption) (resp *pb.MemberListResponse, err error) { err = rcc.retryf(ctx, func(rctx context.Context) error { - resp, err = rcc.cc.MemberList(rctx, in, opts...) + resp, err = rcc.cc.MemberList(rctx, in, append(opts, withRetryPolicy(repeatable))...) return err }, repeatable) return resp, err @@ -303,17 +297,15 @@ type retryMaintenanceClient struct { // RetryMaintenanceClient implements a Maintenance. func RetryMaintenanceClient(c *Client, conn *grpc.ClientConn) pb.MaintenanceClient { - return pb.NewMaintenanceClient(conn) - // TODO: Remove retry logic entirely now that we're using the new grpc load balancer interface? - /*return &retryMaintenanceClient{ + return &retryMaintenanceClient{ mc: pb.NewMaintenanceClient(conn), retryf: c.newRetryWrapper(), - }*/ + } } func (rmc *retryMaintenanceClient) Alarm(ctx context.Context, in *pb.AlarmRequest, opts ...grpc.CallOption) (resp *pb.AlarmResponse, err error) { err = rmc.retryf(ctx, func(rctx context.Context) error { - resp, err = rmc.mc.Alarm(rctx, in, opts...) + resp, err = rmc.mc.Alarm(rctx, in, append(opts, withRetryPolicy(repeatable))...) return err }, repeatable) return resp, err @@ -321,7 +313,7 @@ func (rmc *retryMaintenanceClient) Alarm(ctx context.Context, in *pb.AlarmReques func (rmc *retryMaintenanceClient) Status(ctx context.Context, in *pb.StatusRequest, opts ...grpc.CallOption) (resp *pb.StatusResponse, err error) { err = rmc.retryf(ctx, func(rctx context.Context) error { - resp, err = rmc.mc.Status(rctx, in, opts...) + resp, err = rmc.mc.Status(rctx, in, append(opts, withRetryPolicy(repeatable))...) return err }, repeatable) return resp, err @@ -329,7 +321,7 @@ func (rmc *retryMaintenanceClient) Status(ctx context.Context, in *pb.StatusRequ func (rmc *retryMaintenanceClient) Hash(ctx context.Context, in *pb.HashRequest, opts ...grpc.CallOption) (resp *pb.HashResponse, err error) { err = rmc.retryf(ctx, func(rctx context.Context) error { - resp, err = rmc.mc.Hash(rctx, in, opts...) + resp, err = rmc.mc.Hash(rctx, in, append(opts, withRetryPolicy(repeatable))...) return err }, repeatable) return resp, err @@ -337,7 +329,7 @@ func (rmc *retryMaintenanceClient) Hash(ctx context.Context, in *pb.HashRequest, func (rmc *retryMaintenanceClient) HashKV(ctx context.Context, in *pb.HashKVRequest, opts ...grpc.CallOption) (resp *pb.HashKVResponse, err error) { err = rmc.retryf(ctx, func(rctx context.Context) error { - resp, err = rmc.mc.HashKV(rctx, in, opts...) + resp, err = rmc.mc.HashKV(rctx, in, append(opts, withRetryPolicy(repeatable))...) return err }, repeatable) return resp, err @@ -345,7 +337,7 @@ func (rmc *retryMaintenanceClient) HashKV(ctx context.Context, in *pb.HashKVRequ func (rmc *retryMaintenanceClient) Snapshot(ctx context.Context, in *pb.SnapshotRequest, opts ...grpc.CallOption) (stream pb.Maintenance_SnapshotClient, err error) { err = rmc.retryf(ctx, func(rctx context.Context) error { - stream, err = rmc.mc.Snapshot(rctx, in, opts...) + stream, err = rmc.mc.Snapshot(rctx, in, append(opts, withRetryPolicy(repeatable))...) return err }, repeatable) return stream, err @@ -353,7 +345,7 @@ func (rmc *retryMaintenanceClient) Snapshot(ctx context.Context, in *pb.Snapshot func (rmc *retryMaintenanceClient) MoveLeader(ctx context.Context, in *pb.MoveLeaderRequest, opts ...grpc.CallOption) (resp *pb.MoveLeaderResponse, err error) { err = rmc.retryf(ctx, func(rctx context.Context) error { - resp, err = rmc.mc.MoveLeader(rctx, in, opts...) + resp, err = rmc.mc.MoveLeader(rctx, in, append(opts, withRetryPolicy(repeatable))...) return err }, repeatable) return resp, err @@ -374,17 +366,15 @@ type retryAuthClient struct { // RetryAuthClient implements a AuthClient. func RetryAuthClient(c *Client) pb.AuthClient { - return pb.NewAuthClient(c.conn) - // TODO: Remove retry logic entirely now that we're using the new grpc load balancer interface? - /*return &retryAuthClient{ + return &retryAuthClient{ ac: pb.NewAuthClient(c.conn), retryf: c.newRetryWrapper(), - }*/ + } } func (rac *retryAuthClient) UserList(ctx context.Context, in *pb.AuthUserListRequest, opts ...grpc.CallOption) (resp *pb.AuthUserListResponse, err error) { err = rac.retryf(ctx, func(rctx context.Context) error { - resp, err = rac.ac.UserList(rctx, in, opts...) + resp, err = rac.ac.UserList(rctx, in, append(opts, withRetryPolicy(repeatable))...) return err }, repeatable) return resp, err @@ -392,7 +382,7 @@ func (rac *retryAuthClient) UserList(ctx context.Context, in *pb.AuthUserListReq func (rac *retryAuthClient) UserGet(ctx context.Context, in *pb.AuthUserGetRequest, opts ...grpc.CallOption) (resp *pb.AuthUserGetResponse, err error) { err = rac.retryf(ctx, func(rctx context.Context) error { - resp, err = rac.ac.UserGet(rctx, in, opts...) + resp, err = rac.ac.UserGet(rctx, in, append(opts, withRetryPolicy(repeatable))...) return err }, repeatable) return resp, err @@ -400,7 +390,7 @@ func (rac *retryAuthClient) UserGet(ctx context.Context, in *pb.AuthUserGetReque func (rac *retryAuthClient) RoleGet(ctx context.Context, in *pb.AuthRoleGetRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGetResponse, err error) { err = rac.retryf(ctx, func(rctx context.Context) error { - resp, err = rac.ac.RoleGet(rctx, in, opts...) + resp, err = rac.ac.RoleGet(rctx, in, append(opts, withRetryPolicy(repeatable))...) return err }, repeatable) return resp, err @@ -408,7 +398,7 @@ func (rac *retryAuthClient) RoleGet(ctx context.Context, in *pb.AuthRoleGetReque func (rac *retryAuthClient) RoleList(ctx context.Context, in *pb.AuthRoleListRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleListResponse, err error) { err = rac.retryf(ctx, func(rctx context.Context) error { - resp, err = rac.ac.RoleList(rctx, in, opts...) + resp, err = rac.ac.RoleList(rctx, in, append(opts, withRetryPolicy(repeatable))...) return err }, repeatable) return resp, err diff --git a/clientv3/retry_interceptor.go b/clientv3/retry_interceptor.go new file mode 100644 index 000000000000..399a3009213f --- /dev/null +++ b/clientv3/retry_interceptor.go @@ -0,0 +1,355 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Based on github.com/grpc-ecosystem/go-grpc-middleware/retry, but modified to support the more +// fine grained error checking required by write-at-most-once retry semantics of etcd. + +package clientv3 + +import ( + "context" + "io" + "sync" + "time" + + "github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" +) + +// unaryClientInterceptor returns a new retrying unary client interceptor. +// +// The default configuration of the interceptor is to not retry *at all*. This behaviour can be +// changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions). +func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.UnaryClientInterceptor { + intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs) + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + grpcOpts, retryOpts := filterCallOptions(opts) + callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts) + // short circuit for simplicity, and avoiding allocations. + if callOpts.max == 0 { + return invoker(ctx, method, req, reply, cc, grpcOpts...) + } + var lastErr error + for attempt := uint(0); attempt < callOpts.max; attempt++ { + if err := waitRetryBackoff(attempt, ctx, callOpts); err != nil { + return err + } + lastErr = invoker(ctx, method, req, reply, cc, grpcOpts...) + logger.Info("retry unary intercept", zap.Uint("attempt", attempt), zap.Error(lastErr)) + if lastErr == nil { + return nil + } + if isContextError(lastErr) { + if ctx.Err() != nil { + // its the context deadline or cancellation. + return lastErr + } else { + // its the callCtx deadline or cancellation, in which case try again. + continue + } + } + if !isRetriable(lastErr, callOpts) { + return lastErr + } + } + return lastErr + } +} + +// streamClientInterceptor returns a new retrying stream client interceptor for server side streaming calls. +// +// The default configuration of the interceptor is to not retry *at all*. This behaviour can be +// changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions). +// +// Retry logic is available *only for ServerStreams*, i.e. 1:n streams, as the internal logic needs +// to buffer the messages sent by the client. If retry is enabled on any other streams (ClientStreams, +// BidiStreams), the retry interceptor will fail the call. +func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.StreamClientInterceptor { + intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs) + return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + grpcOpts, retryOpts := filterCallOptions(opts) + callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts) + // short circuit for simplicity, and avoiding allocations. + if callOpts.max == 0 { + return streamer(ctx, desc, cc, method, grpcOpts...) + } + if desc.ClientStreams { + return nil, grpc.Errorf(codes.Unimplemented, "clientv3/retry_interceptor: cannot retry on ClientStreams, set Disable()") + } + newStreamer, err := streamer(ctx, desc, cc, method, grpcOpts...) + logger.Info("retry stream intercept", zap.Error(err)) + if err != nil { + // TODO(mwitkow): Maybe dial and transport errors should be retriable? + return nil, err + } + retryingStreamer := &serverStreamingRetryingStream{ + ClientStream: newStreamer, + callOpts: callOpts, + ctx: ctx, + streamerCall: func(ctx context.Context) (grpc.ClientStream, error) { + return streamer(ctx, desc, cc, method, grpcOpts...) + }, + } + return retryingStreamer, nil + } +} + +// type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a +// proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish +// a new ClientStream according to the retry policy. +type serverStreamingRetryingStream struct { + grpc.ClientStream + bufferedSends []interface{} // single messsage that the client can sen + receivedGood bool // indicates whether any prior receives were successful + wasClosedSend bool // indicates that CloseSend was closed + ctx context.Context + callOpts *options + streamerCall func(ctx context.Context) (grpc.ClientStream, error) + mu sync.RWMutex +} + +func (s *serverStreamingRetryingStream) setStream(clientStream grpc.ClientStream) { + s.mu.Lock() + s.ClientStream = clientStream + s.mu.Unlock() +} + +func (s *serverStreamingRetryingStream) getStream() grpc.ClientStream { + s.mu.RLock() + defer s.mu.RUnlock() + return s.ClientStream +} + +func (s *serverStreamingRetryingStream) SendMsg(m interface{}) error { + s.mu.Lock() + s.bufferedSends = append(s.bufferedSends, m) + s.mu.Unlock() + return s.getStream().SendMsg(m) +} + +func (s *serverStreamingRetryingStream) CloseSend() error { + s.mu.Lock() + s.wasClosedSend = true + s.mu.Unlock() + return s.getStream().CloseSend() +} + +func (s *serverStreamingRetryingStream) Header() (metadata.MD, error) { + return s.getStream().Header() +} + +func (s *serverStreamingRetryingStream) Trailer() metadata.MD { + return s.getStream().Trailer() +} + +func (s *serverStreamingRetryingStream) RecvMsg(m interface{}) error { + attemptRetry, lastErr := s.receiveMsgAndIndicateRetry(m) + if !attemptRetry { + return lastErr // success or hard failure + } + // We start off from attempt 1, because zeroth was already made on normal SendMsg(). + for attempt := uint(1); attempt < s.callOpts.max; attempt++ { + if err := waitRetryBackoff(attempt, s.ctx, s.callOpts); err != nil { + return err + } + newStream, err := s.reestablishStreamAndResendBuffer(s.ctx) + if err != nil { + // TODO(mwitkow): Maybe dial and transport errors should be retriable? + return err + } + s.setStream(newStream) + attemptRetry, lastErr = s.receiveMsgAndIndicateRetry(m) + //fmt.Printf("Received message and indicate: %v %v\n", attemptRetry, lastErr) + if !attemptRetry { + return lastErr + } + } + return lastErr +} + +func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{}) (bool, error) { + s.mu.RLock() + wasGood := s.receivedGood + s.mu.RUnlock() + err := s.getStream().RecvMsg(m) + if err == nil || err == io.EOF { + s.mu.Lock() + s.receivedGood = true + s.mu.Unlock() + return false, err + } else if wasGood { + // previous RecvMsg in the stream succeeded, no retry logic should interfere + return false, err + } + if isContextError(err) { + if s.ctx.Err() != nil { + return false, err + } else { + // its the callCtx deadline or cancellation, in which case try again. + return true, err + } + } + return isRetriable(err, s.callOpts), err + +} + +func (s *serverStreamingRetryingStream) reestablishStreamAndResendBuffer(callCtx context.Context) (grpc.ClientStream, error) { + s.mu.RLock() + bufferedSends := s.bufferedSends + s.mu.RUnlock() + newStream, err := s.streamerCall(callCtx) + if err != nil { + return nil, err + } + for _, msg := range bufferedSends { + if err := newStream.SendMsg(msg); err != nil { + return nil, err + } + } + if err := newStream.CloseSend(); err != nil { + return nil, err + } + return newStream, nil +} + +func waitRetryBackoff(attempt uint, ctx context.Context, callOpts *options) error { + var waitTime time.Duration = 0 + if attempt > 0 { + waitTime = callOpts.backoffFunc(attempt) + } + if waitTime > 0 { + timer := time.NewTimer(waitTime) + select { + case <-ctx.Done(): + timer.Stop() + return contextErrToGrpcErr(ctx.Err()) + case <-timer.C: + } + } + return nil +} + +func isRetriable(err error, callOpts *options) bool { + if isContextError(err) { + return false + } + switch callOpts.retryPolicy { + case repeatable: + return !isRepeatableStopError(err) + case nonRepeatable: + return !isNonRepeatableStopError(err) + default: + logger.Warn("unrecognized retry policy", zap.String("retryPolicy", callOpts.retryPolicy.String())) + return false + } +} + +func isContextError(err error) bool { + return grpc.Code(err) == codes.DeadlineExceeded || grpc.Code(err) == codes.Canceled +} + +func contextErrToGrpcErr(err error) error { + switch err { + case context.DeadlineExceeded: + return grpc.Errorf(codes.DeadlineExceeded, err.Error()) + case context.Canceled: + return grpc.Errorf(codes.Canceled, err.Error()) + default: + return grpc.Errorf(codes.Unknown, err.Error()) + } +} + +var ( + defaultOptions = &options{ + retryPolicy: nonRepeatable, + max: 0, // disabed + backoffFunc: backoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10), + } +) + +// backoffFunc denotes a family of functions that control the backoff duration between call retries. +// +// They are called with an identifier of the attempt, and should return a time the system client should +// hold off for. If the time returned is longer than the `context.Context.Deadline` of the request +// the deadline of the request takes precedence and the wait will be interrupted before proceeding +// with the next iteration. +type backoffFunc func(attempt uint) time.Duration + +// withRetryPolicy sets the retry policy of this call. +func withRetryPolicy(rp retryPolicy) retryOption { + return retryOption{applyFunc: func(o *options) { + o.retryPolicy = rp + }} +} + +// withMax sets the maximum number of retries on this call, or this interceptor. +func withMax(maxRetries uint) retryOption { + return retryOption{applyFunc: func(o *options) { + o.max = maxRetries + }} +} + +// WithBackoff sets the `BackoffFunc `used to control time between retries. +func withBackoff(bf backoffFunc) retryOption { + return retryOption{applyFunc: func(o *options) { + o.backoffFunc = bf + }} +} + +type options struct { + retryPolicy retryPolicy + max uint + backoffFunc backoffFunc +} + +// retryOption is a grpc.CallOption that is local to clientv3's retry interceptor. +type retryOption struct { + grpc.EmptyCallOption // make sure we implement private after() and before() fields so we don't panic. + applyFunc func(opt *options) +} + +func reuseOrNewWithCallOptions(opt *options, retryOptions []retryOption) *options { + if len(retryOptions) == 0 { + return opt + } + optCopy := &options{} + *optCopy = *opt + for _, f := range retryOptions { + f.applyFunc(optCopy) + } + return optCopy +} + +func filterCallOptions(callOptions []grpc.CallOption) (grpcOptions []grpc.CallOption, retryOptions []retryOption) { + for _, opt := range callOptions { + if co, ok := opt.(retryOption); ok { + retryOptions = append(retryOptions, co) + } else { + grpcOptions = append(grpcOptions, opt) + } + } + return grpcOptions, retryOptions +} + +// BackoffLinearWithJitter waits a set period of time, allowing for jitter (fractional adjustment). +// +// For example waitBetween=1s and jitter=0.10 can generate waits between 900ms and 1100ms. +func backoffLinearWithJitter(waitBetween time.Duration, jitterFraction float64) backoffFunc { + return func(attempt uint) time.Duration { + return backoffutils.JitterUp(waitBetween, jitterFraction) + } +} diff --git a/integration/v3_alarm_test.go b/integration/v3_alarm_test.go index 0486ead80752..e66cf0bab163 100644 --- a/integration/v3_alarm_test.go +++ b/integration/v3_alarm_test.go @@ -88,13 +88,16 @@ func TestV3StorageQuotaApply(t *testing.T) { } } + ctx, close := context.WithTimeout(context.TODO(), RequestWaitTimeout) + defer close() + // small quota machine should reject put - if _, err := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil { + if _, err := kvc0.Put(ctx, &pb.PutRequest{Key: key, Value: smallbuf}); err == nil { t.Fatalf("past-quota instance should reject put") } // large quota machine should reject put - if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil { + if _, err := kvc1.Put(ctx, &pb.PutRequest{Key: key, Value: smallbuf}); err == nil { t.Fatalf("past-quota instance should reject put") }