Skip to content

Commit

Permalink
clientv3: balancer.go and retry.go
Browse files Browse the repository at this point in the history
when a endpoint is unavailable, switch it in retry

fix: etcd-io#8326
  • Loading branch information
HardySimpson authored and Abel committed Aug 25, 2017
1 parent c1b7e78 commit 978a6ea
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 3 deletions.
84 changes: 81 additions & 3 deletions clientv3/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ type simpleBalancer struct {
// intialization and shutdown.
pinAddr string

// pinAddrcond is a bell rings when pinAddr change
pinAddrCond *sync.Cond

// last pinAddr, avoid switch too frequently
lastPinAddr string

closed bool
}

Expand All @@ -88,12 +94,15 @@ func newSimpleBalancer(eps []string) *simpleBalancer {
updateAddrsC: make(chan struct{}, 1),
host2ep: getHost2ep(eps),
}
sb.pinAddrCond = sync.NewCond(&sb.mu)
close(sb.downc)
go sb.updateNotifyLoop()
return sb
}

func (b *simpleBalancer) Start(target string, config grpc.BalancerConfig) error { return nil }
func (b *simpleBalancer) Start(target string, config grpc.BalancerConfig) error {
return nil
}

func (b *simpleBalancer) ConnectNotify() <-chan struct{} {
b.mu.Lock()
Expand Down Expand Up @@ -251,8 +260,11 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
close(b.upc)
b.downc = make(chan struct{})
b.pinAddr = addr.Addr
b.pinAddrCond.Broadcast()
// notify client that a connection is up
b.readyOnce.Do(func() { close(b.readyc) })
b.readyOnce.Do(func() {
close(b.readyc)
})
return func(err error) {
b.mu.Lock()
b.upc = make(chan struct{})
Expand Down Expand Up @@ -298,6 +310,7 @@ func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions)
b.mu.RLock()
closed = b.closed
addr = b.pinAddr
b.pinAddrCond.Broadcast()
b.mu.RUnlock()
// Close() which sets b.closed = true can be called before Get(), Get() must exit if balancer is closed.
if closed {
Expand All @@ -310,7 +323,9 @@ func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions)
return grpc.Address{Addr: addr}, func() {}, nil
}

func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh }
func (b *simpleBalancer) Notify() <-chan []grpc.Address {
return b.notifyCh
}

func (b *simpleBalancer) Close() error {
b.mu.Lock()
Expand Down Expand Up @@ -354,3 +369,66 @@ func getHost(ep string) string {
}
return url.Host
}

func (b *simpleBalancer) trySwitchEndpoint() (doSwitch bool) {
var prevPinAddr string
var prevEp string
doSwitch = true

func() {
b.mu.Lock()
defer b.mu.Unlock()

if len(b.host2ep) <= 1 {
doSwitch = false
return
}

// in another switching
if b.lastPinAddr == b.pinAddr {
doSwitch = false
return
}
b.lastPinAddr = b.pinAddr
prevPinAddr = b.pinAddr
prevEp = b.host2ep[b.pinAddr]
delete(b.host2ep, prevPinAddr)

addrs := make([]grpc.Address, 0, len(b.addrs))

for _, addr := range b.addrs {
if prevPinAddr != addr.Addr {
addrs = append(addrs, addr)
}
}
b.addrs = addrs

select {
case b.updateAddrsC <- struct{}{}:
case <-b.stopc:
}
}()

if !doSwitch {
return
}

go func() {
b.mu.Lock()
for prevPinAddr == b.pinAddr || b.pinAddr == "" {
b.pinAddrCond.Wait()
}

b.host2ep[prevPinAddr] = prevEp
b.addrs = append(b.addrs, grpc.Address{Addr: prevPinAddr})

b.mu.Unlock()

select {
case b.updateAddrsC <- struct{}{}:
case <-b.stopc:
}
}()

return
}
76 changes: 76 additions & 0 deletions clientv3/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/testutil"

"fmt"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -237,3 +238,78 @@ func (kcl *killConnListener) close() {
close(kcl.stopc)
kcl.wg.Wait()
}

func TestBalancerTrySwitch(t *testing.T) {

endpoints := []string{"localhost:2379", "localhost:22379", "localhost:32379"}
sb := newSimpleBalancer(endpoints)
defer sb.Close()

prevAddrs := map[string]func(error){}
simuGrpc := func(n int) {
for i := 0; ; i++ {
nowAddrs := map[string]func(error){}
addrs := []grpc.Address{}
ok := true
select {
case addrs, ok = <-sb.Notify():
if !ok {
return
}
case <-time.After(time.Second):
fmt.Println("-----")
return
}
fmt.Println("get from notify addr: ", addrs)

for _, addr := range addrs {
nowAddrs[addr.Addr] = nil
}
for addr, fc := range prevAddrs {
if _, ok := nowAddrs[addr]; !ok && fc != nil {
fc(errors.New("stop"))
}
}
for addr := range nowAddrs {
if _, ok := prevAddrs[addr]; !ok {
fc := sb.Up(grpc.Address{Addr: addr})
nowAddrs[addr] = fc
} else {
nowAddrs[addr] = prevAddrs[addr]
}
}
prevAddrs = nowAddrs

}
}

simuGrpc(2)

blockingOpts := grpc.BalancerGetOptions{BlockingWait: true}
ctx := context.Background()

// test at least one time switch do works
for i := 0; i < 100; i++ {
addr1, _, err := sb.Get(ctx, blockingOpts)
if err != nil {
t.Fatal(err)
}

doSwitch := sb.trySwitchEndpoint()
if !doSwitch {
continue
}

simuGrpc(2)

addr2, _, err := sb.Get(ctx, blockingOpts)
if err != nil {
t.Fatal(err)
}
if addr2.Addr == addr1.Addr {
t.Errorf("addr switch not work, addr[%v]", addr1)
}

}

}
7 changes: 7 additions & 0 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ func (c *Client) SetEndpoints(eps ...string) {
c.balancer.updateAddrs(eps)
}

// TrySwitchEndpoint try to make balancer change it's fix endpoint.
// It may do nothing while their is only one endpoint, or switch has just happened
// It is called when the endpoint now is not available, like network partition happens
func (c *Client) TrySwitchEndpoint() {
c.balancer.trySwitchEndpoint()
}

// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
func (c *Client) Sync(ctx context.Context) error {
mresp, err := c.MemberList(ctx)
Expand Down
3 changes: 3 additions & 0 deletions clientv3/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc {
return func(rpcCtx context.Context, f rpcFunc) error {
for {
if err := f(rpcCtx); err == nil || isStop(err) {
if grpc.Code(err) == codes.Unavailable {
c.TrySwitchEndpoint()
}
return err
}
select {
Expand Down

0 comments on commit 978a6ea

Please sign in to comment.