Skip to content

Commit

Permalink
clientv3: balancer.go and retry.go
Browse files Browse the repository at this point in the history
when a endpoint is unavailable, switch it in retry

fix: etcd-io#8326
  • Loading branch information
HardySimpson committed Jul 29, 2017
1 parent b36463e commit 0b87a29
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 0 deletions.
91 changes: 91 additions & 0 deletions clientv3/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package clientv3

import (
"net/url"
"time"
"strings"
"sync"

Expand Down Expand Up @@ -68,6 +69,13 @@ type simpleBalancer struct {
// intialization and shutdown.
pinAddr string


// pinAddrcond is a bell rings when pinAddr change
pinAddrCond sync.Cond

// last switch time, avoid switch too frequently
lastSwitchTime time.Time

closed bool
}

Expand Down Expand Up @@ -251,6 +259,7 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
close(b.upc)
b.downc = make(chan struct{})
b.pinAddr = addr.Addr
b.pinAddrCond.Broadcast()
// notify client that a connection is up
b.readyOnce.Do(func() { close(b.readyc) })
return func(err error) {
Expand Down Expand Up @@ -354,3 +363,85 @@ func getHost(ep string) string {
}
return url.Host
}

func (b *simpleBalancer) removeAddrs(eps []string) {

np := getHost2ep(eps)
addrs := make([]grpc.Address, 0, len(eps))

b.mu.Lock()

change := false

for k, v := range b.host2ep {
if np[k] == v {
delete(b.host2ep, k)
change = true
} else {
addrs = append(addrs, grpc.Address{Addr: k)})
}
}

if !change {
// same endpoints, so no need to update address
b.mu.Unlock()
return
}

b.addrs = addrs


// updating notifyCh can trigger new connections,
// only update addrs if all connections are down
// or addrs does not include pinAddr.
update := !hasAddr(addrs, b.pinAddr)
b.mu.Unlock()

if update {
select {
case b.updateAddrsC <- struct{}{}:
case <-b.stopc:
}
}
}

func (b *simpleBalancer) switchEndpoint() {
tNow := time.Now()
b.mu.RLock()
lastSwitchTime := b.lastSwitchTime
b.mu.RUnlock()

if lastSwitchTime.Add(30 * time.Second).After(tNow) {
// avoid too frequently switch
return
}

addrs := make([]grpc.Address, 0, len(eps))

b.mu.Lock()
prevPinAddr := b.pinAddr
prevPinAddrEp := b.host2ep[prevPinAddr]
delete(b.host2ep, prevPinAddr)
for _, addr := range b.addrs {
if prevPinAddr != addr {
addrs = append(addrs, addr)
}

}
b.mu.Unlock()

select {
case b.updateAddrsC <- struct{}{}:
case <-b.stopc:
}


b.mu.Lock()
for (prevPinAddr == b.pinAddr) {
b.pinAddrCond.Wait()
}

b.host2ep[prevPinAddr] = prevPinAddrEp
b.addrs = append(b.addrs, prevPinAddr)
b.mu.Unlock()
}
6 changes: 6 additions & 0 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ func (c *Client) SetEndpoints(eps ...string) {
c.balancer.updateAddrs(eps)
}

// SwitchEndpoint make balancer change it's fix endpoint.
// It is called when the endpoint now is not available, like network partition happens
func (c *Client) SwitchEndpoint() {
c.balancer.switchEndpoint()
}

// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
func (c *Client) Sync(ctx context.Context) error {
mresp, err := c.MemberList(ctx)
Expand Down
9 changes: 9 additions & 0 deletions clientv3/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ func (c *Client) newRetryWrapper() retryRpcFunc {
eErr := rpctypes.Error(err)
// always stop retry on etcd errors
if _, ok := eErr.(rpctypes.EtcdError); ok {
if grpc.Code(err) != codes.Unavailable {
// try to request another endpoint at next rpc
// leave this not retry
// already do anti-too-frequently switch
// at most switch one time per 30 seconds
// can be securely called by concurrently
c.SwitchEndpoint()
}
return err
}

Expand All @@ -44,6 +52,7 @@ func (c *Client) newRetryWrapper() retryRpcFunc {
return err
}


select {
case <-c.balancer.ConnectNotify():
case <-rpcCtx.Done():
Expand Down

0 comments on commit 0b87a29

Please sign in to comment.