Skip to content
/ etcd Public
forked from etcd-io/etcd
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clientv3: Add retry for round robin load balancer based on grpc-middleware's retry interceptor #16

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 39 additions & 8 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/coreos/etcd/clientv3/balancer/picker"
"github.com/coreos/etcd/clientv3/balancer/resolver/endpoint"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils"
"go.uber.org/zap"

"google.golang.org/grpc"
Expand All @@ -45,13 +46,15 @@ var (
ErrOldCluster = errors.New("etcdclient: old cluster version")

roundRobinBalancerName = fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String())
logger *zap.Logger
)

func init() {
logger = zap.NewNop() // zap.NewExample()
balancer.RegisterBuilder(balancer.Config{
Policy: picker.RoundrobinBalanced,
Name: roundRobinBalancerName,
Logger: zap.NewNop(), // zap.NewExample(),
Logger: logger,
})
}

Expand Down Expand Up @@ -263,6 +266,18 @@ func (c *Client) dialSetupOpts(target string, dopts ...grpc.DialOption) (opts []
opts = append(opts, grpc.WithInsecure())
}

// Interceptor retry and backoff.
// TODO: Replace all of clientv3/retry.go with interceptor based retry, or with
// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy
// once it is available.
rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction))
opts = append(opts,
// Disable stream retry by default since go-grpc-middleware/retry does not support client streams.
// Streams that are safe to retry are enabled individually.
grpc.WithStreamInterceptor(c.streamClientInterceptor(logger, withMax(0), rrBackoff)),
grpc.WithUnaryInterceptor(c.unaryClientInterceptor(logger, withMax(defaultUnaryMaxRetries), rrBackoff)),
)

return opts, nil
}

Expand Down Expand Up @@ -386,14 +401,14 @@ func newClient(cfg *Config) (*Client, error) {

ctx, cancel := context.WithCancel(baseCtx)
client := &Client{
conn: nil,
cfg: *cfg,
creds: creds,
ctx: ctx,
cancel: cancel,
mu: new(sync.Mutex),
callOpts: defaultCallOpts,
conn: nil,
cfg: *cfg,
creds: creds,
ctx: ctx,
cancel: cancel,
mu: new(sync.Mutex),
}

if cfg.Username != "" && cfg.Password != "" {
client.Username = cfg.Username
client.Password = cfg.Password
Expand Down Expand Up @@ -461,6 +476,22 @@ func newClient(cfg *Config) (*Client, error) {
return client, nil
}

// roundRobinQuorumBackoff retries against quorum between each backoff.
// This is intended for use with a round robin load balancer.
func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration, jitterFraction float64) backoffFunc {
return func(attempt uint) time.Duration {
// after each round robin across quorum, backoff for our wait between duration
n := uint(len(c.Endpoints()))
quorum := (n/2 + 1)
if attempt%quorum == 0 {
logger.Info("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction))
return backoffutils.JitterUp(waitBetween, jitterFraction)
}
logger.Info("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum))
return 0
}
}

func (c *Client) checkVersion() (err error) {
var wg sync.WaitGroup
errc := make(chan error, len(c.cfg.Endpoints))
Expand Down
7 changes: 3 additions & 4 deletions clientv3/integration/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ func TestDialTLSNoConfig(t *testing.T) {

// TODO: this should not be required when we set grpc.WithBlock()
if c != nil {
_, err = c.KV.Get(context.Background(), "/")
ctx, cancel := context.WithTimeout(context.Background(), integration.RequestWaitTimeout)
_, err = c.KV.Get(ctx, "/")
cancel()
}
if !isClientTimeout(err) {
t.Fatalf("expected dial timeout error, got %v", err)
Expand Down Expand Up @@ -157,9 +159,6 @@ func TestSwitchSetEndpoints(t *testing.T) {

cli.SetEndpoints(eps...)

// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, cli)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if _, err := cli.Get(ctx, "foo"); err != nil {
Expand Down
17 changes: 5 additions & 12 deletions clientv3/integration/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,15 +438,12 @@ func TestKVGetErrConnClosed(t *testing.T) {

cli := clus.Client(0)

// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, cli)

donec := make(chan struct{})
go func() {
defer close(donec)
_, err := cli.Get(context.TODO(), "foo")
if err != nil && err != context.Canceled && err != grpc.ErrClientConnClosing && !isServerUnavailable(err) {
t.Fatalf("expected %v, %v or server unavailable, got %v", context.Canceled, grpc.ErrClientConnClosing, err)
if err != nil && err != context.Canceled && err != grpc.ErrClientConnClosing {
t.Fatalf("expected %v or %v, got %v", context.Canceled, grpc.ErrClientConnClosing, err)
}
}()

Expand Down Expand Up @@ -689,8 +686,6 @@ func TestKVGetRetry(t *testing.T) {

donec := make(chan struct{})
go func() {
// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, kv)
// Get will fail, but reconnect will trigger
gresp, gerr := kv.Get(ctx, "foo")
if gerr != nil {
Expand Down Expand Up @@ -741,8 +736,6 @@ func TestKVPutFailGetRetry(t *testing.T) {

donec := make(chan struct{})
go func() {
// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, kv)
// Get will fail, but reconnect will trigger
gresp, gerr := kv.Get(context.TODO(), "foo")
if gerr != nil {
Expand Down Expand Up @@ -800,7 +793,7 @@ func TestKVGetStoppedServerAndClose(t *testing.T) {
// this Get fails and triggers an asynchronous connection retry
_, err := cli.Get(ctx, "abc")
cancel()
if err != nil && !(isServerUnavailable(err) || isCanceled(err) || isClientTimeout(err)) {
if err != nil && !(isCanceled(err) || isClientTimeout(err)) {
t.Fatal(err)
}
}
Expand All @@ -822,15 +815,15 @@ func TestKVPutStoppedServerAndClose(t *testing.T) {
// grpc finds out the original connection is down due to the member shutdown.
_, err := cli.Get(ctx, "abc")
cancel()
if err != nil && !(isServerUnavailable(err) || isCanceled(err) || isClientTimeout(err)) {
if err != nil && !(isCanceled(err) || isClientTimeout(err)) {
t.Fatal(err)
}

ctx, cancel = context.WithTimeout(context.TODO(), time.Second)
// this Put fails and triggers an asynchronous connection retry
_, err = cli.Put(ctx, "abc", "123")
cancel()
if err != nil && !(isServerUnavailable(err) || isCanceled(err) || isClientTimeout(err)) {
if err != nil && !(isCanceled(err) || isClientTimeout(err) || isUnavailable(err)) {
t.Fatal(err)
}
}
Expand Down
10 changes: 7 additions & 3 deletions clientv3/integration/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ func TestLeaseKeepAlive(t *testing.T) {
t.Errorf("chan is closed, want not closed")
}

if kresp == nil {
t.Fatalf("unexpected null response")
}

if kresp.ID != resp.ID {
t.Errorf("ID = %x, want %x", kresp.ID, resp.ID)
}
Expand Down Expand Up @@ -292,7 +296,7 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {
go func() {
defer close(donec)
_, err := cli.Grant(context.TODO(), 5)
if err != nil && err != grpc.ErrClientConnClosing && err != context.Canceled && !isServerUnavailable(err) {
if err != nil && err != grpc.ErrClientConnClosing && err != context.Canceled {
// grpc.ErrClientConnClosing if grpc-go balancer calls 'Get' after client.Close.
// context.Canceled if grpc-go balancer calls 'Get' with an inflight client.Close.
t.Fatalf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
Expand Down Expand Up @@ -324,7 +328,7 @@ func TestLeaseGrantNewAfterClose(t *testing.T) {

donec := make(chan struct{})
go func() {
if _, err := cli.Grant(context.TODO(), 5); err != context.Canceled && err != grpc.ErrClientConnClosing && !isServerUnavailable(err) {
if _, err := cli.Grant(context.TODO(), 5); err != context.Canceled && err != grpc.ErrClientConnClosing {
t.Fatalf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
}
close(donec)
Expand Down Expand Up @@ -356,7 +360,7 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) {

donec := make(chan struct{})
go func() {
if _, err := cli.Revoke(context.TODO(), leaseID); err != context.Canceled && err != grpc.ErrClientConnClosing && !isServerUnavailable(err) {
if _, err := cli.Revoke(context.TODO(), leaseID); err != context.Canceled && err != grpc.ErrClientConnClosing {
t.Fatalf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
}
close(donec)
Expand Down
13 changes: 2 additions & 11 deletions clientv3/integration/leasing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,9 +869,6 @@ func TestLeasingTxnCancel(t *testing.T) {
}
clus.Members[0].Stop(t)

// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, clus.Client(1))

// wait for leader election, if any
if _, err = clus.Client(1).Get(context.TODO(), "abc"); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1536,9 +1533,6 @@ func TestLeasingReconnectOwnerConsistency(t *testing.T) {
}
}

// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, lkv)

lresp, lerr := lkv.Get(context.TODO(), "k")
if lerr != nil {
t.Fatal(lerr)
Expand Down Expand Up @@ -1820,9 +1814,6 @@ func TestLeasingTxnOwnerPutBranch(t *testing.T) {
// lkv shouldn't need to call out to server for updated leased keys
clus.Members[0].Stop(t)

// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, clus.Client(1))

for i := 0; i < n; i++ {
k := fmt.Sprintf("tree/%d", i)
lkvResp, err := lkv.Get(context.TODO(), k)
Expand Down Expand Up @@ -1994,7 +1985,7 @@ func TestLeasingSessionExpireCancel(t *testing.T) {

select {
case err := <-errc:
if !(err == ctx.Err() || isServerUnavailable(err)) {
if err != ctx.Err() {
t.Errorf("#%d: expected %v of server unavailable, got %v", i, ctx.Err(), err)
}
case <-time.After(5 * time.Second):
Expand Down Expand Up @@ -2025,7 +2016,7 @@ func waitForExpireAck(t *testing.T, kv clientv3.KV) {
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
_, err := kv.Get(ctx, "abc")
cancel()
if err == ctx.Err() || isServerUnavailable(err) {
if err == ctx.Err() {
return
} else if err != nil {
t.Logf("current error: %v", err)
Expand Down
1 change: 0 additions & 1 deletion clientv3/integration/maintenance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ func TestMaintenanceSnapshotErrorInflight(t *testing.T) {
clus.Members[0].Restart(t)

cli := clus.RandClient()
integration.WaitClientV3(t, cli)
// reading snapshot with canceled context should error out
ctx, cancel := context.WithCancel(context.Background())
rc1, err := cli.Snapshot(ctx)
Expand Down
3 changes: 0 additions & 3 deletions clientv3/integration/network_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,6 @@ func TestBalancerUnderNetworkPartitionLinearizableGetLeaderElection(t *testing.T
// isolate leader
clus.Members[lead].InjectPartition(t, clus.Members[(lead+1)%3], clus.Members[(lead+2)%3])

// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, cli)

// expects balancer to round robin to leader within two attempts
for i := 0; i < 2; i++ {
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
Expand Down
18 changes: 8 additions & 10 deletions clientv3/integration/server_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package integration
import (
"bytes"
"context"
"reflect"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -352,11 +351,7 @@ func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizabl
}
cancel()
if err != nil {
if linearizable && isServerUnavailable(err) {
t.Logf("TODO: FIX THIS after balancer rewrite! %v %v", reflect.TypeOf(err), err)
} else {
t.Fatalf("expected linearizable=true and a server unavailable error, but got linearizable=%t and '%v'", linearizable, err)
}
t.Fatalf("unexpected error: %v", err)
}
}()

Expand Down Expand Up @@ -402,19 +397,22 @@ func isClientTimeout(err error) bool {
return code == codes.DeadlineExceeded || ev.Message() == transport.ErrConnClosing.Desc
}

func isServerUnavailable(err error) bool {
func isCanceled(err error) bool {
if err == nil {
return false
}
if err == context.Canceled {
return true
}
ev, ok := status.FromError(err)
if !ok {
return false
}
code := ev.Code()
return code == codes.Unavailable
return code == codes.Canceled
}

func isCanceled(err error) bool {
func isUnavailable(err error) bool {
if err == nil {
return false
}
Expand All @@ -426,5 +424,5 @@ func isCanceled(err error) bool {
return false
}
code := ev.Code()
return code == codes.Canceled
return code == codes.Unavailable
}
3 changes: 0 additions & 3 deletions clientv3/integration/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ func TestTxnWriteFail(t *testing.T) {
t.Fatalf("timed out waiting for txn fail")
case <-txnc:
}
// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, kv)

// and ensure the put didn't take
gresp, gerr := clus.Client(1).Get(context.TODO(), "foo")
if gerr != nil {
Expand Down
2 changes: 1 addition & 1 deletion clientv3/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) {
// resetRecv opens a new lease stream and starts sending keep alive requests.
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
sctx, cancel := context.WithCancel(l.stopCtx)
stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...)
stream, err := l.remote.LeaseKeepAlive(sctx, append(l.callOpts, withMax(0))...)
if err != nil {
cancel()
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion clientv3/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*
}

func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, m.callOpts...)
ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, append(m.callOpts, withMax(defaultStreamMaxRetries))...)
if err != nil {
return nil, toErr(ctx, err)
}
Expand Down
17 changes: 17 additions & 0 deletions clientv3/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package clientv3

import (
"math"
"time"

"google.golang.org/grpc"
)
Expand All @@ -37,6 +38,22 @@ var (
// because range response can easily exceed request send limits
// Default to math.MaxInt32; writes exceeding server-side send limit fails anyway
defaultMaxCallRecvMsgSize = grpc.MaxCallRecvMsgSize(math.MaxInt32)

// client-side non-streaming retry limit, only applied to requests where server responds with
// a error code clearly indicating it was unable to process the request such as codes.Unavailable.
// If set to 0, retry is disabled.
defaultUnaryMaxRetries uint = 100

// client-side streaming retry limit, only applied to requests where server responds with
// a error code clearly indicating it was unable to process the request such as codes.Unavailable.
// If set to 0, retry is disabled.
defaultStreamMaxRetries uint = ^uint(0) // max uint

// client-side retry backoff wait between requests.
defaultBackoffWaitBetween = 25 * time.Millisecond

// client-side retry backoff default jitter fraction.
defaultBackoffJitterFraction = 0.10
)

// defaultCallOpts defines a list of default "gRPC.CallOption".
Expand Down
Loading