Skip to content

Commit

Permalink
clientv3: balancer.go and retry.go
Browse files Browse the repository at this point in the history
when a endpoint is unavailable, switch it in retry

fix: etcd-io#8326
  • Loading branch information
HardySimpson committed Aug 4, 2017
1 parent ae74871 commit 9716712
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 5 deletions.
78 changes: 75 additions & 3 deletions clientv3/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"net/url"
"strings"
"sync"
"time"

"fmt"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -68,6 +70,12 @@ type simpleBalancer struct {
// intialization and shutdown.
pinAddr string

// pinAddrcond is a bell rings when pinAddr change
pinAddrCond *sync.Cond

// last switch time, avoid switch too frequently
lastSwitchTime time.Time

closed bool
}

Expand All @@ -88,12 +96,15 @@ func newSimpleBalancer(eps []string) *simpleBalancer {
updateAddrsC: make(chan struct{}, 1),
host2ep: getHost2ep(eps),
}
sb.pinAddrCond = sync.NewCond(&sb.mu)
close(sb.downc)
go sb.updateNotifyLoop()
return sb
}

func (b *simpleBalancer) Start(target string, config grpc.BalancerConfig) error { return nil }
func (b *simpleBalancer) Start(target string, config grpc.BalancerConfig) error {
return nil
}

func (b *simpleBalancer) ConnectNotify() <-chan struct{} {
b.mu.Lock()
Expand Down Expand Up @@ -251,8 +262,11 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
close(b.upc)
b.downc = make(chan struct{})
b.pinAddr = addr.Addr
b.pinAddrCond.Broadcast()
// notify client that a connection is up
b.readyOnce.Do(func() { close(b.readyc) })
b.readyOnce.Do(func() {
close(b.readyc)
})
return func(err error) {
b.mu.Lock()
b.upc = make(chan struct{})
Expand Down Expand Up @@ -298,6 +312,7 @@ func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions)
b.mu.RLock()
closed = b.closed
addr = b.pinAddr
b.pinAddrCond.Broadcast()
b.mu.RUnlock()
// Close() which sets b.closed = true can be called before Get(), Get() must exit if balancer is closed.
if closed {
Expand All @@ -310,7 +325,9 @@ func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions)
return grpc.Address{Addr: addr}, func() {}, nil
}

func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh }
func (b *simpleBalancer) Notify() <-chan []grpc.Address {
return b.notifyCh
}

func (b *simpleBalancer) Close() error {
b.mu.Lock()
Expand Down Expand Up @@ -354,3 +371,58 @@ func getHost(ep string) string {
}
return url.Host
}

func (b *simpleBalancer) trySwitchEndpoint(d time.Duration) bool {
tNow := time.Now()

b.mu.Lock()
addrs := make([]grpc.Address, 0, len(b.addrs))
lastSwitchTime := b.lastSwitchTime
prevPinAddr := b.pinAddr

if len(addrs) == 1 || prevPinAddr == "" || (d != 0 && lastSwitchTime.Add(d).After(tNow)) {
// avoid only one addr, not connected, too frequent switch
fmt.Println("not switch prevAddr", prevPinAddr, " duration ", d)
if lastSwitchTime.After(tNow) {
// avoid system time jump
b.lastSwitchTime = tNow
}
b.mu.Unlock()
return false
}

for _, addr := range b.addrs {
if prevPinAddr != addr.Addr {
addrs = append(addrs, addr)
}
}
prevPinAddrEp := b.host2ep[prevPinAddr]
delete(b.host2ep, prevPinAddr)
b.addrs = addrs
b.lastSwitchTime = tNow
b.mu.Unlock()

select {
case b.updateAddrsC <- struct{}{}:
case <-b.stopc:
}

b.mu.Lock()
for prevPinAddr == b.pinAddr || b.pinAddr == "" {
b.pinAddrCond.Wait()
}

//fmt.Printf("prev[%v], now[%v]\n", prevPinAddr, b.pinAddr)

b.host2ep[prevPinAddr] = prevPinAddrEp
b.addrs = append(b.addrs, grpc.Address{Addr: prevPinAddr})

b.mu.Unlock()

select {
case b.updateAddrsC <- struct{}{}:
case <-b.stopc:
}

return true
}
75 changes: 73 additions & 2 deletions clientv3/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestBalancerGetBlocking(t *testing.T) {
}
blockingOpts := grpc.BalancerGetOptions{BlockingWait: true}

ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*100)
ctx, _ := context.WithTimeout(context.Background(), time.Millisecond * 100)
_, _, err := sb.Get(ctx, blockingOpts)
if err != context.DeadlineExceeded {
t.Errorf("Get() with no up endpoints should timeout, got %v", err)
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestBalancerGetBlocking(t *testing.T) {
t.Errorf("closing the only connection should triggered balancer to send the all endpoints via Notify chan so that we can establish a connection")
}
down2(errors.New("error"))
ctx, _ = context.WithTimeout(context.Background(), time.Millisecond*100)
ctx, _ = context.WithTimeout(context.Background(), time.Millisecond * 100)
_, _, err = sb.Get(ctx, blockingOpts)
if err != context.DeadlineExceeded {
t.Errorf("Get() with no up endpoints should timeout, got %v", err)
Expand Down Expand Up @@ -237,3 +237,74 @@ func (kcl *killConnListener) close() {
close(kcl.stopc)
kcl.wg.Wait()
}

func TestBalancerTrySwitch(t *testing.T) {

endpoints := []string{"localhost:2379", "localhost:22379", "localhost:32379"}
sb := newSimpleBalancer(endpoints)
defer sb.Close()

go func() {
prevAddrs := map[string]func(error){}
for {
nowAddrs := map[string]func(error){}
addrs, ok := <-sb.Notify()
if !ok {
return
}

for _, addr := range addrs {
nowAddrs[addr.Addr] = nil
}
for addr, fc := range prevAddrs {
if _, ok := nowAddrs[addr]; !ok && fc != nil {
fc(errors.New("stop"))
}
}
for addr, _ := range nowAddrs {
if _, ok := prevAddrs[addr]; !ok {
fc := sb.Up(grpc.Address{Addr:addr})
nowAddrs[addr] = fc
} else {
nowAddrs[addr] = prevAddrs[addr]
}
}
prevAddrs = nowAddrs

}
}()

blockingOpts := grpc.BalancerGetOptions{BlockingWait: true}
ctx := context.Background()

// test at least one time switch do works
equal := true

for i := 0; i < 100; i++ {
addr1, _, err := sb.Get(ctx, blockingOpts)
if err != nil {
t.Fatal(err)
}

done := sb.trySwitchEndpoint(0)
if !done {
continue
}

addr2, _, err := sb.Get(ctx, blockingOpts)
if err != nil {
t.Fatal(err)
}
if addr2.Addr == addr1.Addr {
continue
} else {
equal = false
break
}

}
if equal {
t.Errorf("switch not works")
}

}
7 changes: 7 additions & 0 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ func (c *Client) SetEndpoints(eps ...string) {
c.balancer.updateAddrs(eps)
}

// TrySwitchEndpoint try to make balancer change it's fix endpoint.
// It may do nothing while their is only one endpoint, or switch has just happened
// It is called when the endpoint now is not available, like network partition happens
func (c *Client) TrySwitchEndpoint() {
c.balancer.trySwitchEndpoint(5 * time.Second)
}

// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
func (c *Client) Sync(ctx context.Context) error {
mresp, err := c.MemberList(ctx)
Expand Down
3 changes: 3 additions & 0 deletions clientv3/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc {
return func(rpcCtx context.Context, f rpcFunc) error {
for {
if err := f(rpcCtx); err == nil || isStop(err) {
if grpc.Code(err) == codes.Unavailable {
c.TrySwitchEndpoint()
}
return err
}
select {
Expand Down

0 comments on commit 9716712

Please sign in to comment.