diff --git a/clientv3/balancer.go b/clientv3/balancer.go index 6ae047e98419..cc4194e0a70c 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -68,6 +68,12 @@ type simpleBalancer struct { // intialization and shutdown. pinAddr string + // pinAddrcond is a bell rings when pinAddr change + pinAddrCond *sync.Cond + + // last pinAddr, avoid switch too frequently + lastPinAddr string + closed bool } @@ -88,12 +94,15 @@ func newSimpleBalancer(eps []string) *simpleBalancer { updateAddrsC: make(chan struct{}, 1), host2ep: getHost2ep(eps), } + sb.pinAddrCond = sync.NewCond(&sb.mu) close(sb.downc) go sb.updateNotifyLoop() return sb } -func (b *simpleBalancer) Start(target string, config grpc.BalancerConfig) error { return nil } +func (b *simpleBalancer) Start(target string, config grpc.BalancerConfig) error { + return nil +} func (b *simpleBalancer) ConnectNotify() <-chan struct{} { b.mu.Lock() @@ -251,8 +260,11 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) { close(b.upc) b.downc = make(chan struct{}) b.pinAddr = addr.Addr + b.pinAddrCond.Broadcast() // notify client that a connection is up - b.readyOnce.Do(func() { close(b.readyc) }) + b.readyOnce.Do(func() { + close(b.readyc) + }) return func(err error) { b.mu.Lock() b.upc = make(chan struct{}) @@ -298,6 +310,7 @@ func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) b.mu.RLock() closed = b.closed addr = b.pinAddr + b.pinAddrCond.Broadcast() b.mu.RUnlock() // Close() which sets b.closed = true can be called before Get(), Get() must exit if balancer is closed. if closed { @@ -310,7 +323,9 @@ func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) return grpc.Address{Addr: addr}, func() {}, nil } -func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh } +func (b *simpleBalancer) Notify() <-chan []grpc.Address { + return b.notifyCh +} func (b *simpleBalancer) Close() error { b.mu.Lock() @@ -354,3 +369,66 @@ func getHost(ep string) string { } return url.Host } + +func (b *simpleBalancer) trySwitchEndpoint() (doSwitch bool) { + var prevPinAddr string + var prevEp string + doSwitch = true + + func() { + b.mu.Lock() + defer b.mu.Unlock() + + if len(b.host2ep) <= 1 { + doSwitch = false + return + } + + // in another switching + if b.lastPinAddr == b.pinAddr { + doSwitch = false + return + } + b.lastPinAddr = b.pinAddr + prevPinAddr = b.pinAddr + prevEp = b.host2ep[b.pinAddr] + delete(b.host2ep, prevPinAddr) + + addrs := make([]grpc.Address, 0, len(b.addrs)) + + for _, addr := range b.addrs { + if prevPinAddr != addr.Addr { + addrs = append(addrs, addr) + } + } + b.addrs = addrs + + select { + case b.updateAddrsC <- struct{}{}: + case <-b.stopc: + } + }() + + if !doSwitch { + return + } + + go func() { + b.mu.Lock() + for prevPinAddr == b.pinAddr || b.pinAddr == "" { + b.pinAddrCond.Wait() + } + + b.host2ep[prevPinAddr] = prevEp + b.addrs = append(b.addrs, grpc.Address{Addr: prevPinAddr}) + + b.mu.Unlock() + + select { + case b.updateAddrsC <- struct{}{}: + case <-b.stopc: + } + }() + + return +} diff --git a/clientv3/balancer_test.go b/clientv3/balancer_test.go index 5245b69a2aa8..cc963cef592f 100644 --- a/clientv3/balancer_test.go +++ b/clientv3/balancer_test.go @@ -24,6 +24,7 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/testutil" + "fmt" "golang.org/x/net/context" "google.golang.org/grpc" ) @@ -237,3 +238,78 @@ func (kcl *killConnListener) close() { close(kcl.stopc) kcl.wg.Wait() } + +func TestBalancerTrySwitch(t *testing.T) { + + endpoints := []string{"localhost:2379", "localhost:22379", "localhost:32379"} + sb := newSimpleBalancer(endpoints) + defer sb.Close() + + prevAddrs := map[string]func(error){} + simuGrpc := func(n int) { + for i := 0; ; i++ { + nowAddrs := map[string]func(error){} + addrs := []grpc.Address{} + ok := true + select { + case addrs, ok = <-sb.Notify(): + if !ok { + return + } + case <-time.After(time.Second): + fmt.Println("-----") + return + } + fmt.Println("get from notify addr: ", addrs) + + for _, addr := range addrs { + nowAddrs[addr.Addr] = nil + } + for addr, fc := range prevAddrs { + if _, ok := nowAddrs[addr]; !ok && fc != nil { + fc(errors.New("stop")) + } + } + for addr := range nowAddrs { + if _, ok := prevAddrs[addr]; !ok { + fc := sb.Up(grpc.Address{Addr: addr}) + nowAddrs[addr] = fc + } else { + nowAddrs[addr] = prevAddrs[addr] + } + } + prevAddrs = nowAddrs + + } + } + + simuGrpc(2) + + blockingOpts := grpc.BalancerGetOptions{BlockingWait: true} + ctx := context.Background() + + // test at least one time switch do works + for i := 0; i < 100; i++ { + addr1, _, err := sb.Get(ctx, blockingOpts) + if err != nil { + t.Fatal(err) + } + + doSwitch := sb.trySwitchEndpoint() + if !doSwitch { + continue + } + + simuGrpc(2) + + addr2, _, err := sb.Get(ctx, blockingOpts) + if err != nil { + t.Fatal(err) + } + if addr2.Addr == addr1.Addr { + t.Errorf("addr switch not work, addr[%v]", addr1) + } + + } + +} diff --git a/clientv3/client.go b/clientv3/client.go index 1f8c83f5750c..c5ca17a64fe1 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -119,6 +119,13 @@ func (c *Client) SetEndpoints(eps ...string) { c.balancer.updateAddrs(eps) } +// TrySwitchEndpoint try to make balancer change it's fix endpoint. +// It may do nothing while their is only one endpoint, or switch has just happened +// It is called when the endpoint now is not available, like network partition happens +func (c *Client) TrySwitchEndpoint() { + c.balancer.trySwitchEndpoint() +} + // Sync synchronizes client's endpoints with the known endpoints from the etcd membership. func (c *Client) Sync(ctx context.Context) error { mresp, err := c.MemberList(ctx) diff --git a/clientv3/retry.go b/clientv3/retry.go index b4b8b3d38da5..41c6d705a921 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -45,6 +45,9 @@ func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc { return func(rpcCtx context.Context, f rpcFunc) error { for { if err := f(rpcCtx); err == nil || isStop(err) { + if grpc.Code(err) == codes.Unavailable { + c.TrySwitchEndpoint() + } return err } select {