Skip to content

Commit

Permalink
clientv3: balancer.go and retry.go
Browse files Browse the repository at this point in the history
when a endpoint is unavailable, switch it in retry

fix: #8326
  • Loading branch information
HardySimpson committed Jul 31, 2017
1 parent b36463e commit 9da2545
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 0 deletions.
59 changes: 59 additions & 0 deletions clientv3/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package clientv3

import (
"net/url"
"time"
"strings"
"sync"

Expand Down Expand Up @@ -68,6 +69,12 @@ type simpleBalancer struct {
// intialization and shutdown.
pinAddr string

// pinAddrcond is a bell rings when pinAddr change
pinAddrCond sync.Cond

// last switch time, avoid switch too frequently
lastSwitchTime time.Time

closed bool
}

Expand All @@ -77,16 +84,19 @@ func newSimpleBalancer(eps []string) *simpleBalancer {
for i := range eps {
addrs[i].Addr = getHost(eps[i])
}
mu := sync.RWMutex{}
sb := &simpleBalancer{
addrs: addrs,
notifyCh: notifyCh,
mu: mu,
readyc: make(chan struct{}),
upc: make(chan struct{}),
stopc: make(chan struct{}),
downc: make(chan struct{}),
donec: make(chan struct{}),
updateAddrsC: make(chan struct{}, 1),
host2ep: getHost2ep(eps),
pinAddrCond:sync.NewCond(mu),
}
close(sb.downc)
go sb.updateNotifyLoop()
Expand Down Expand Up @@ -251,6 +261,7 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
close(b.upc)
b.downc = make(chan struct{})
b.pinAddr = addr.Addr
b.pinAddrCond.Broadcast()
// notify client that a connection is up
b.readyOnce.Do(func() { close(b.readyc) })
return func(err error) {
Expand Down Expand Up @@ -354,3 +365,51 @@ func getHost(ep string) string {
}
return url.Host
}

func (b *simpleBalancer) trySwitchEndpoint() {
tNow := time.Now()
b.mu.RLock()

addrs := make([]grpc.Address, 0, len(b.addrs))
lastSwitchTime := b.lastSwitchTime

b.mu.RUnlock()

if len(addrs) == 1 {
// only one addr, do nothing
return
}

if lastSwitchTime.Add(5 * time.Second).After(tNow) {
// avoid too frequently switch
return
}

b.mu.Lock()
prevPinAddr := b.pinAddr
prevPinAddrEp := b.host2ep[prevPinAddr]
delete(b.host2ep, prevPinAddr)
for _, addr := range b.addrs {
if prevPinAddr != addr.Addr {
addrs = append(addrs, addr)
}

}
b.addrs = addrs
b.lastSwitchTime = tNow
b.mu.Unlock()

select {
case b.updateAddrsC <- struct{}{}:
case <-b.stopc:
}

b.mu.Lock()
for (prevPinAddr == b.pinAddr) {
b.pinAddrCond.Wait()
}

b.host2ep[prevPinAddr] = prevPinAddrEp
b.addrs = append(b.addrs, grpc.Address{Addr: prevPinAddr})
b.mu.Unlock()
}
7 changes: 7 additions & 0 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ func (c *Client) SetEndpoints(eps ...string) {
c.balancer.updateAddrs(eps)
}

// TrySwitchEndpoint try to make balancer change it's fix endpoint.
// It may do nothing while their is only one endpoint, or switch has just happened
// It is called when the endpoint now is not available, like network partition happens
func (c *Client) TrySwitchEndpoint() {
c.balancer.trySwitchEndpoint()
}

// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
func (c *Client) Sync(ctx context.Context) error {
mresp, err := c.MemberList(ctx)
Expand Down
5 changes: 5 additions & 0 deletions clientv3/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func (c *Client) newRetryWrapper() retryRpcFunc {
return nil
}

if grpc.Code(err) == codes.Unavailable {
c.TrySwitchEndpoint()
}


eErr := rpctypes.Error(err)
// always stop retry on etcd errors
if _, ok := eErr.(rpctypes.EtcdError); ok {
Expand Down

0 comments on commit 9da2545

Please sign in to comment.