Skip to content

Commit

Permalink
clientv3: balancer.go and retry.go
Browse files Browse the repository at this point in the history
when a endpoint is unavailable, switch it in retry

fix: etcd-io#8326
  • Loading branch information
HardySimpson committed Aug 3, 2017
1 parent ae74871 commit 0bfa92a
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 0 deletions.
55 changes: 55 additions & 0 deletions clientv3/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"net/url"
"strings"
"sync"
"time"

"golang.org/x/net/context"
"google.golang.org/grpc"
Expand Down Expand Up @@ -68,6 +69,12 @@ type simpleBalancer struct {
// intialization and shutdown.
pinAddr string

// pinAddrcond is a bell rings when pinAddr change
pinAddrCond *sync.Cond

// last switch time, avoid switch too frequently
lastSwitchTime time.Time

closed bool
}

Expand All @@ -88,6 +95,7 @@ func newSimpleBalancer(eps []string) *simpleBalancer {
updateAddrsC: make(chan struct{}, 1),
host2ep: getHost2ep(eps),
}
sb.pinAddrCond = sync.NewCond(&sb.mu)
close(sb.downc)
go sb.updateNotifyLoop()
return sb
Expand Down Expand Up @@ -251,6 +259,7 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
close(b.upc)
b.downc = make(chan struct{})
b.pinAddr = addr.Addr
b.pinAddrCond.Broadcast()
// notify client that a connection is up
b.readyOnce.Do(func() { close(b.readyc) })
return func(err error) {
Expand Down Expand Up @@ -354,3 +363,49 @@ func getHost(ep string) string {
}
return url.Host
}

func (b *simpleBalancer) trySwitchEndpoint() {
tNow := time.Now()

b.mu.RLock()
addrs := make([]grpc.Address, 0, len(b.addrs))
lastSwitchTime := b.lastSwitchTime
b.mu.RUnlock()

if len(addrs) == 1 {
return
}

if lastSwitchTime.Add(5 * time.Second).After(tNow) {
// avoid too frequently switch
return
}

b.mu.Lock()
prevPinAddr := b.pinAddr
prevPinAddrEp := b.host2ep[prevPinAddr]
delete(b.host2ep, prevPinAddr)
for _, addr := range b.addrs {
if prevPinAddr != addr.Addr {
addrs = append(addrs, addr)
}

}
b.addrs = addrs
b.lastSwitchTime = tNow
b.mu.Unlock()

select {
case b.updateAddrsC <- struct{}{}:
case <-b.stopc:
}

b.mu.Lock()
for prevPinAddr == b.pinAddr {
b.pinAddrCond.Wait()
}

b.host2ep[prevPinAddr] = prevPinAddrEp
b.addrs = append(b.addrs, grpc.Address{Addr: prevPinAddr})
b.mu.Unlock()
}
48 changes: 48 additions & 0 deletions clientv3/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,51 @@ func (kcl *killConnListener) close() {
close(kcl.stopc)
kcl.wg.Wait()
}

func TestBalancerTrySwitch(t *testing.T) {
sb := newSimpleBalancer(endpoints)
defer sb.Close()
if addrs := <-sb.Notify(); len(addrs) != len(endpoints) {
t.Errorf("Initialize newSimpleBalancer should have triggered Notify() chan, but it didn't")
}

go func() {
for {
_, ok := <-sb.Notify():
if !ok {
return
}
}
}()

blockingOpts := grpc.BalancerGetOptions{BlockingWait: true}
ctx := context.Background()
addr1, _, err := sb.Get(ctx, blockingOpts)
if err != nil {
t.Fatal(err)
}
sb.trySwitchEndpoint()
addr2, _, err := sb.Get(ctx, blockingOpts)
if err != nil {
t.Fatal(err)
}
if addr2 == addr1 {
t.Errorf("switch not work, addr[%v]", addr1)
}
time.Sleep(2 * time.Second)
addr3, _, err := sb.Get(ctx, blockingOpts)
if err != nil {
t.Fatal(err)
}
if addr3 != addr2 {
t.Errorf("frequent switch avoid failed, addr2[%v],addr3[%v]", addr2, addr3)
}
time.Sleep(4 * time.Second)
addr4, _, err := sb.Get(ctx, blockingOpts)
if err != nil {
t.Fatal(err)
}
if addr4 == addr3 {
t.Errorf("switch not work, addr[%v]", addr3)
}
}
7 changes: 7 additions & 0 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ func (c *Client) SetEndpoints(eps ...string) {
c.balancer.updateAddrs(eps)
}

// TrySwitchEndpoint try to make balancer change it's fix endpoint.
// It may do nothing while their is only one endpoint, or switch has just happened
// It is called when the endpoint now is not available, like network partition happens
func (c *Client) TrySwitchEndpoint() {
c.balancer.trySwitchEndpoint()
}

// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
func (c *Client) Sync(ctx context.Context) error {
mresp, err := c.MemberList(ctx)
Expand Down
3 changes: 3 additions & 0 deletions clientv3/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc {
return func(rpcCtx context.Context, f rpcFunc) error {
for {
if err := f(rpcCtx); err == nil || isStop(err) {
if grpc.Code(err) == codes.Unavailable {
c.TrySwitchEndpoint()
}
return err
}
select {
Expand Down

0 comments on commit 0bfa92a

Please sign in to comment.