From b08eac44499dfef0397e7528b67eed885dfc7012 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Tue, 7 Nov 2017 14:13:19 -0800 Subject: [PATCH] clientv3: combine "healthBalancer" into "simpleBalancer" Signed-off-by: Gyu-Ho Lee --- clientv3/balancer.go | 297 +++++++++++++++++++++++++++++------- clientv3/balancer_test.go | 9 +- clientv3/client.go | 21 ++- clientv3/health_balancer.go | 249 ------------------------------ 4 files changed, 259 insertions(+), 317 deletions(-) delete mode 100644 clientv3/health_balancer.go diff --git a/clientv3/balancer.go b/clientv3/balancer.go index 19a298cbf897..514fd7feaa08 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -16,15 +16,23 @@ package clientv3 import ( "context" + "errors" "net/url" "strings" "sync" + "time" "google.golang.org/grpc" "google.golang.org/grpc/codes" + healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/status" ) +const minHealthRetryDuration = 3 * time.Second +const unknownService = "unknown service grpc.health.v1.Health" + +type healthCheckFunc func(ep string) (bool, error) + // ErrNoAddrAvilable is returned by Get() when the balancer does not have // any active connection to endpoints at the time. // This error is returned only when opts.BlockingWait is true. @@ -53,6 +61,13 @@ type simpleBalancer struct { readyc chan struct{} readyOnce sync.Once + // healthCheck checks an endpoint's health. + healthCheck healthCheckFunc + healthCheckTimeout time.Duration + + unhealthyMu sync.RWMutex + unhealthyHostPorts map[string]time.Time + // mu protects all fields below. mu sync.RWMutex @@ -63,7 +78,9 @@ type simpleBalancer struct { downc chan struct{} // stopc is closed to signal updateNotifyLoop should stop. - stopc chan struct{} + stopc chan struct{} + stopOnce sync.Once + wg sync.WaitGroup // donec closes when all goroutines are exited donec chan struct{} @@ -83,23 +100,35 @@ type simpleBalancer struct { closed bool } -func newSimpleBalancer(eps []string) *simpleBalancer { +func newSimpleBalancer(eps []string, timeout time.Duration, hc healthCheckFunc) *simpleBalancer { notifyCh := make(chan []grpc.Address) addrs := eps2addrs(eps) sb := &simpleBalancer{ - addrs: addrs, - eps: eps, - notifyCh: notifyCh, - readyc: make(chan struct{}), - upc: make(chan struct{}), - stopc: make(chan struct{}), - downc: make(chan struct{}), - donec: make(chan struct{}), - updateAddrsC: make(chan notifyMsg), - hostPort2ep: getHostPort2ep(eps), + addrs: addrs, + eps: eps, + notifyCh: notifyCh, + readyc: make(chan struct{}), + healthCheck: hc, + unhealthyHostPorts: make(map[string]time.Time), + upc: make(chan struct{}), + stopc: make(chan struct{}), + downc: make(chan struct{}), + donec: make(chan struct{}), + updateAddrsC: make(chan notifyMsg), + hostPort2ep: getHostPort2ep(eps), } + if timeout < minHealthRetryDuration { + timeout = minHealthRetryDuration + } + sb.healthCheckTimeout = timeout + close(sb.downc) go sb.updateNotifyLoop() + sb.wg.Add(1) + go func() { + defer sb.wg.Done() + sb.updateUnhealthy() + }() return sb } @@ -114,65 +143,140 @@ func (b *simpleBalancer) ConnectNotify() <-chan struct{} { func (b *simpleBalancer) ready() <-chan struct{} { return b.readyc } func (b *simpleBalancer) endpoint(hostPort string) string { - b.mu.Lock() - defer b.mu.Unlock() + b.mu.RLock() + defer b.mu.RUnlock() return b.hostPort2ep[hostPort] } -func (b *simpleBalancer) endpoints() []string { +func (b *simpleBalancer) pinned() string { b.mu.RLock() defer b.mu.RUnlock() - return b.eps + return b.pinAddr } -func (b *simpleBalancer) pinned() string { +func (b *simpleBalancer) hostPortError(hostPort string, err error) { + if b.endpoint(hostPort) == "" { + if logger.V(4) { + logger.Infof("clientv3/balancer: %q is stale (skip marking as unhealthy on %q)", hostPort, err.Error()) + } + return + } + + b.unhealthyMu.Lock() + b.unhealthyHostPorts[hostPort] = time.Now() + b.unhealthyMu.Unlock() + if logger.V(4) { + logger.Infof("clientv3/balancer: %q is marked unhealthy (%q)", hostPort, err.Error()) + } +} + +func (b *simpleBalancer) removeUnhealthy(hostPort, msg string) { + if b.endpoint(hostPort) == "" { + if logger.V(4) { + logger.Infof("clientv3/balancer: %q was not in unhealthy (%q)", hostPort, msg) + } + return + } + + b.unhealthyMu.Lock() + delete(b.unhealthyHostPorts, hostPort) + b.unhealthyMu.Unlock() + if logger.V(4) { + logger.Infof("clientv3/balancer: %q is removed from unhealthy (%q)", hostPort, msg) + } +} + +func (b *simpleBalancer) countUnhealthy() (count int) { + b.unhealthyMu.RLock() + count = len(b.unhealthyHostPorts) + b.unhealthyMu.RUnlock() + return count +} + +func (b *simpleBalancer) isUnhealthy(hostPort string) (unhealthy bool) { + b.unhealthyMu.RLock() + _, unhealthy = b.unhealthyHostPorts[hostPort] + b.unhealthyMu.RUnlock() + return unhealthy +} + +func (b *simpleBalancer) cleanupUnhealthy() { + b.unhealthyMu.Lock() + for k, v := range b.unhealthyHostPorts { + if time.Since(v) > b.healthCheckTimeout { + delete(b.unhealthyHostPorts, k) + if logger.V(4) { + logger.Infof("clientv3/balancer: removes %q from unhealthy after %v", k, b.healthCheckTimeout) + } + } + } + b.unhealthyMu.Unlock() +} + +func (b *simpleBalancer) liveAddrs() []grpc.Address { + unhealthyCnt := b.countUnhealthy() + b.mu.RLock() defer b.mu.RUnlock() - return b.pinAddr + + hbAddrs := b.addrs + if len(b.addrs) == 1 || unhealthyCnt == 0 || unhealthyCnt == len(b.addrs) { + return hbAddrs + } + + addrs := make([]grpc.Address, 0, len(b.addrs)-unhealthyCnt) + for _, addr := range b.addrs { + if !b.isUnhealthy(addr.Addr) { + addrs = append(addrs, addr) + } + } + return addrs } -func getHostPort2ep(eps []string) map[string]string { - hm := make(map[string]string, len(eps)) - for i := range eps { - _, host, _ := parseEndpoint(eps[i]) - hm[host] = eps[i] +func (b *simpleBalancer) updateUnhealthy() { + for { + select { + case <-time.After(b.healthCheckTimeout): + b.cleanupUnhealthy() + if b.isUnhealthy(b.pinned()) { + select { + case b.updateAddrsC <- notifyNext: + case <-b.stopc: + return + } + } + case <-b.stopc: + return + } } - return hm } func (b *simpleBalancer) updateAddrs(eps ...string) { np := getHostPort2ep(eps) b.mu.Lock() + defer b.mu.Unlock() match := len(np) == len(b.hostPort2ep) - for k, v := range np { - if b.hostPort2ep[k] != v { - match = false - break + if match { + for k, v := range np { + if b.hostPort2ep[k] != v { + match = false + break + } } } if match { // same endpoints, so no need to update address - b.mu.Unlock() return } b.hostPort2ep = np b.addrs, b.eps = eps2addrs(eps), eps - // updating notifyCh can trigger new connections, - // only update addrs if all connections are down - // or addrs does not include pinAddr. - update := !hasAddr(b.addrs, b.pinAddr) - b.mu.Unlock() - - if update { - select { - case b.updateAddrsC <- notifyNext: - case <-b.stopc: - } - } + b.unhealthyMu.Lock() + b.unhealthyHostPorts = make(map[string]time.Time) + b.unhealthyMu.Unlock() } func (b *simpleBalancer) next() { @@ -190,15 +294,6 @@ func (b *simpleBalancer) next() { } } -func hasAddr(addrs []grpc.Address, targetAddr string) bool { - for _, addr := range addrs { - if targetAddr == addr.Addr { - return true - } - } - return false -} - func (b *simpleBalancer) updateNotifyLoop() { defer close(b.donec) @@ -291,11 +386,10 @@ func (b *simpleBalancer) notifyAddrs(msg notifyMsg) { } func (b *simpleBalancer) Up(addr grpc.Address) func(error) { - f, _ := b.up(addr) - return f -} + if !b.mayPin(addr) { + return func(err error) {} + } -func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) { b.mu.Lock() defer b.mu.Unlock() @@ -303,18 +397,18 @@ func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) { // to "fix" it up at application layer. Otherwise, will panic // if b.upc is already closed. if b.closed { - return func(err error) {}, false + return func(err error) {} } // gRPC might call Up on a stale address. // Prevent updating pinAddr with a stale address. if !hasAddr(b.addrs, addr.Addr) { - return func(err error) {}, false + return func(err error) {} } if b.pinAddr != "" { if logger.V(4) { logger.Infof("clientv3/balancer: %q is up but not pinned (already pinned %q)", addr.Addr, b.pinAddr) } - return func(err error) {}, false + return func(err error) {} } // notify waiting Get()s and pin first connected address close(b.upc) @@ -326,6 +420,12 @@ func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) { // notify client that a connection is up b.readyOnce.Do(func() { close(b.readyc) }) return func(err error) { + // If connected to a black hole endpoint or a killed server, the gRPC ping + // timeout will induce a network I/O error, and retrying until success; + // finding healthy endpoint on retry could take several timeouts and redials. + // To avoid wasting retries, gray-list unhealthy endpoints. + b.hostPortError(addr.Addr, err) + b.mu.Lock() b.upc = make(chan struct{}) close(b.downc) @@ -334,7 +434,45 @@ func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) { if logger.V(4) { logger.Infof("clientv3/balancer: unpin %q (%q)", addr.Addr, err.Error()) } - }, true + } +} + +func (b *simpleBalancer) mayPin(addr grpc.Address) bool { + if b.endpoint(addr.Addr) == "" { // stale host:port + return false + } + + b.unhealthyMu.RLock() + unhealthyCnt := len(b.unhealthyHostPorts) + failedTime, bad := b.unhealthyHostPorts[addr.Addr] + b.unhealthyMu.RUnlock() + + b.mu.RLock() + skip := len(b.addrs) == 1 || unhealthyCnt == 0 || len(b.addrs) == unhealthyCnt + b.mu.RUnlock() + if skip || !bad { + return true + } + + // prevent isolated member's endpoint from being infinitely retried, as follows: + // 1. keepalive pings detects GoAway with http2.ErrCodeEnhanceYourCalm + // 2. balancer 'Up' unpins with grpc: failed with network I/O error + // 3. grpc-healthcheck still SERVING, thus retry to pin + // instead, return before grpc-healthcheck if failed within healthcheck timeout + if elapsed := time.Since(failedTime); elapsed < b.healthCheckTimeout { + if logger.V(4) { + logger.Infof("clientv3/balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, b.healthCheckTimeout) + } + return false + } + + if ok, _ := b.healthCheck(addr.Addr); ok { + b.removeUnhealthy(addr.Addr, "health check success") + return true + } + + b.hostPortError(addr.Addr, errors.New("health check failed")) + return false } func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) { @@ -397,7 +535,7 @@ func (b *simpleBalancer) Close() error { return nil } b.closed = true - close(b.stopc) + b.stopOnce.Do(func() { close(b.stopc) }) b.pinAddr = "" // In the case of following scenario: @@ -414,6 +552,7 @@ func (b *simpleBalancer) Close() error { } b.mu.Unlock() + b.wg.Wait() // wait for updateNotifyLoop to finish <-b.donec @@ -422,6 +561,28 @@ func (b *simpleBalancer) Close() error { return nil } +func grpcHealthCheck(client *Client, ep string) (bool, error) { + conn, err := client.dial(ep) + if err != nil { + return false, err + } + defer conn.Close() + cli := healthpb.NewHealthClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + resp, err := cli.Check(ctx, &healthpb.HealthCheckRequest{}) + cancel() + if err != nil { + if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable { + if s.Message() == unknownService { + // etcd < v3.3.0 + return true, nil + } + } + return false, err + } + return resp.Status == healthpb.HealthCheckResponse_SERVING, nil +} + func getHost(ep string) string { url, uerr := url.Parse(ep) if uerr != nil || !strings.Contains(ep, "://") { @@ -437,3 +598,21 @@ func eps2addrs(eps []string) []grpc.Address { } return addrs } + +func getHostPort2ep(eps []string) map[string]string { + hm := make(map[string]string, len(eps)) + for i := range eps { + _, host, _ := parseEndpoint(eps[i]) + hm[host] = eps[i] + } + return hm +} + +func hasAddr(addrs []grpc.Address, targetAddr string) bool { + for _, addr := range addrs { + if targetAddr == addr.Addr { + return true + } + } + return false +} diff --git a/clientv3/balancer_test.go b/clientv3/balancer_test.go index 7048f939c450..0ce4114eccff 100644 --- a/clientv3/balancer_test.go +++ b/clientv3/balancer_test.go @@ -33,7 +33,7 @@ var ( ) func TestBalancerGetUnblocking(t *testing.T) { - sb := newSimpleBalancer(endpoints) + sb := newSimpleBalancer(endpoints, minHealthRetryDuration, func(string) (bool, error) { return true, nil }) defer sb.Close() if addrs := <-sb.Notify(); len(addrs) != len(endpoints) { t.Errorf("Initialize newSimpleBalancer should have triggered Notify() chan, but it didn't") @@ -77,7 +77,7 @@ func TestBalancerGetUnblocking(t *testing.T) { } func TestBalancerGetBlocking(t *testing.T) { - sb := newSimpleBalancer(endpoints) + sb := newSimpleBalancer(endpoints, minHealthRetryDuration, func(string) (bool, error) { return true, nil }) defer sb.Close() if addrs := <-sb.Notify(); len(addrs) != len(endpoints) { t.Errorf("Initialize newSimpleBalancer should have triggered Notify() chan, but it didn't") @@ -168,9 +168,8 @@ func TestHealthBalancerGraylist(t *testing.T) { }() } - sb := newSimpleBalancer(eps) tf := func(s string) (bool, error) { return false, nil } - hb := newHealthBalancer(sb, 5*time.Second, tf) + hb := newSimpleBalancer(eps, 5*time.Second, tf) conn, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithBalancer(hb)) testutil.AssertNil(t, err) @@ -203,7 +202,7 @@ func TestBalancerDoNotBlockOnClose(t *testing.T) { defer kcl.close() for i := 0; i < 5; i++ { - sb := newSimpleBalancer(kcl.endpoints()) + sb := newSimpleBalancer(kcl.endpoints(), minHealthRetryDuration, func(string) (bool, error) { return true, nil }) conn, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithBalancer(sb)) if err != nil { t.Fatal(err) diff --git a/clientv3/client.go b/clientv3/client.go index bff7d7cc6388..ef6ba981b952 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -55,7 +55,7 @@ type Client struct { cfg Config creds *credentials.TransportCredentials - balancer *healthBalancer + balancer *simpleBalancer mu sync.Mutex ctx context.Context @@ -121,6 +121,19 @@ func (c *Client) SetEndpoints(eps ...string) { c.cfg.Endpoints = eps c.mu.Unlock() c.balancer.updateAddrs(eps...) + + // updating notifyCh can trigger new connections, + // need update addrs if all connections are down + // or addrs does not include pinAddr. + c.balancer.mu.RLock() + update := !hasAddr(c.balancer.addrs, c.balancer.pinAddr) + c.balancer.mu.RUnlock() + if update { + select { + case c.balancer.updateAddrsC <- notifyNext: + case <-c.balancer.stopc: + } + } } // Sync synchronizes client's endpoints with the known endpoints from the etcd membership. @@ -378,9 +391,9 @@ func newClient(cfg *Config) (*Client, error) { client.Password = cfg.Password } - sb := newSimpleBalancer(cfg.Endpoints) - hc := func(ep string) (bool, error) { return grpcHealthCheck(client, ep) } - client.balancer = newHealthBalancer(sb, cfg.DialTimeout, hc) + client.balancer = newSimpleBalancer(cfg.Endpoints, cfg.DialTimeout, func(ep string) (bool, error) { + return grpcHealthCheck(client, ep) + }) // use Endpoints[0] so that for https:// without any tls config given, then // grpc will assume the certificate server name is the endpoint host. diff --git a/clientv3/health_balancer.go b/clientv3/health_balancer.go deleted file mode 100644 index 8f4ba08ae69e..000000000000 --- a/clientv3/health_balancer.go +++ /dev/null @@ -1,249 +0,0 @@ -// Copyright 2017 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package clientv3 - -import ( - "context" - "sync" - "time" - - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - healthpb "google.golang.org/grpc/health/grpc_health_v1" - "google.golang.org/grpc/status" -) - -const minHealthRetryDuration = 3 * time.Second -const unknownService = "unknown service grpc.health.v1.Health" - -type healthCheckFunc func(ep string) (bool, error) - -// healthBalancer wraps a balancer so that it uses health checking -// to choose its endpoints. -type healthBalancer struct { - *simpleBalancer - - // healthCheck checks an endpoint's health. - healthCheck healthCheckFunc - healthCheckTimeout time.Duration - - // mu protects addrs, eps, unhealthy map, and stopc. - mu sync.RWMutex - - // addrs stores all grpc addresses associated with the balancer. - addrs []grpc.Address - - // eps stores all client endpoints - eps []string - - // unhealthy tracks the last unhealthy time of endpoints. - unhealthy map[string]time.Time - - stopc chan struct{} - stopOnce sync.Once - - hostPort2ep map[string]string - - wg sync.WaitGroup -} - -func newHealthBalancer(b *simpleBalancer, timeout time.Duration, hc healthCheckFunc) *healthBalancer { - hb := &healthBalancer{ - simpleBalancer: b, - healthCheck: hc, - eps: b.endpoints(), - addrs: eps2addrs(b.endpoints()), - hostPort2ep: getHostPort2ep(b.endpoints()), - unhealthy: make(map[string]time.Time), - stopc: make(chan struct{}), - } - if timeout < minHealthRetryDuration { - timeout = minHealthRetryDuration - } - hb.healthCheckTimeout = timeout - - hb.wg.Add(1) - go func() { - defer hb.wg.Done() - hb.updateUnhealthy(timeout) - }() - - return hb -} - -func (hb *healthBalancer) Up(addr grpc.Address) func(error) { - f, used := hb.up(addr) - if !used { - return f - } - return func(err error) { - // If connected to a black hole endpoint or a killed server, the gRPC ping - // timeout will induce a network I/O error, and retrying until success; - // finding healthy endpoint on retry could take several timeouts and redials. - // To avoid wasting retries, gray-list unhealthy endpoints. - hb.hostPortError(addr.Addr, err) - f(err) - } -} - -func (hb *healthBalancer) up(addr grpc.Address) (func(error), bool) { - if !hb.mayPin(addr) { - return func(err error) {}, false - } - return hb.simpleBalancer.up(addr) -} - -func (hb *healthBalancer) Close() error { - hb.stopOnce.Do(func() { close(hb.stopc) }) - hb.wg.Wait() - return hb.simpleBalancer.Close() -} - -func (hb *healthBalancer) updateAddrs(eps ...string) { - addrs, hostPort2ep := eps2addrs(eps), getHostPort2ep(eps) - hb.mu.Lock() - hb.addrs, hb.eps, hb.hostPort2ep = addrs, eps, hostPort2ep - hb.unhealthy = make(map[string]time.Time) - hb.mu.Unlock() - hb.simpleBalancer.updateAddrs(eps...) -} - -func (hb *healthBalancer) endpoint(host string) string { - hb.mu.RLock() - defer hb.mu.RUnlock() - return hb.hostPort2ep[host] -} - -func (hb *healthBalancer) endpoints() []string { - hb.mu.RLock() - defer hb.mu.RUnlock() - return hb.eps -} - -func (hb *healthBalancer) updateUnhealthy(timeout time.Duration) { - for { - select { - case <-time.After(timeout): - hb.mu.Lock() - for k, v := range hb.unhealthy { - if time.Since(v) > timeout { - delete(hb.unhealthy, k) - if logger.V(4) { - logger.Infof("clientv3/health-balancer: removes %q from unhealthy after %v", k, timeout) - } - } - } - hb.mu.Unlock() - eps := []string{} - for _, addr := range hb.liveAddrs() { - eps = append(eps, hb.endpoint(addr.Addr)) - } - hb.simpleBalancer.updateAddrs(eps...) - case <-hb.stopc: - return - } - } -} - -func (hb *healthBalancer) liveAddrs() []grpc.Address { - hb.mu.RLock() - defer hb.mu.RUnlock() - hbAddrs := hb.addrs - if len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.unhealthy) == len(hb.addrs) { - return hbAddrs - } - addrs := make([]grpc.Address, 0, len(hb.addrs)-len(hb.unhealthy)) - for _, addr := range hb.addrs { - if _, unhealthy := hb.unhealthy[addr.Addr]; !unhealthy { - addrs = append(addrs, addr) - } - } - return addrs -} - -func (hb *healthBalancer) hostPortError(hostPort string, err error) { - hb.mu.Lock() - if _, ok := hb.hostPort2ep[hostPort]; ok { - hb.unhealthy[hostPort] = time.Now() - if logger.V(4) { - logger.Infof("clientv3/health-balancer: marking %q as unhealthy (%q)", hostPort, err.Error()) - } - } - hb.mu.Unlock() -} - -func (hb *healthBalancer) mayPin(addr grpc.Address) bool { - hb.mu.RLock() - if _, ok := hb.hostPort2ep[addr.Addr]; !ok { // stale host:port - hb.mu.RUnlock() - return false - } - skip := len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.addrs) == len(hb.unhealthy) - failedTime, bad := hb.unhealthy[addr.Addr] - dur := hb.healthCheckTimeout - hb.mu.RUnlock() - if skip || !bad { - return true - } - // prevent isolated member's endpoint from being infinitely retried, as follows: - // 1. keepalive pings detects GoAway with http2.ErrCodeEnhanceYourCalm - // 2. balancer 'Up' unpins with grpc: failed with network I/O error - // 3. grpc-healthcheck still SERVING, thus retry to pin - // instead, return before grpc-healthcheck if failed within healthcheck timeout - if elapsed := time.Since(failedTime); elapsed < dur { - if logger.V(4) { - logger.Infof("clientv3/health-balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, dur) - } - return false - } - if ok, _ := hb.healthCheck(addr.Addr); ok { - hb.mu.Lock() - delete(hb.unhealthy, addr.Addr) - hb.mu.Unlock() - if logger.V(4) { - logger.Infof("clientv3/health-balancer: %q is healthy (health check success)", addr.Addr) - } - return true - } - hb.mu.Lock() - hb.unhealthy[addr.Addr] = time.Now() - hb.mu.Unlock() - if logger.V(4) { - logger.Infof("clientv3/health-balancer: %q becomes unhealthy (health check failed)", addr.Addr) - } - return false -} - -func grpcHealthCheck(client *Client, ep string) (bool, error) { - conn, err := client.dial(ep) - if err != nil { - return false, err - } - defer conn.Close() - cli := healthpb.NewHealthClient(conn) - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - resp, err := cli.Check(ctx, &healthpb.HealthCheckRequest{}) - cancel() - if err != nil { - if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable { - if s.Message() == unknownService { - // etcd < v3.3.0 - return true, nil - } - } - return false, err - } - return resp.Status == healthpb.HealthCheckResponse_SERVING, nil -}