Skip to content

Commit

Permalink
poc of client fail over
Browse files Browse the repository at this point in the history
Signed-off-by: Chao Chen <chaochn@amazon.com>
  • Loading branch information
chaochn47 committed Sep 13, 2023
1 parent cc282a8 commit deb6582
Show file tree
Hide file tree
Showing 5 changed files with 310 additions and 6 deletions.
2 changes: 1 addition & 1 deletion server/etcdserver/api/v3client/v3client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func New(s *etcdserver.EtcdServer) *clientv3.Client {
wc := adapter.WatchServerToWatchClient(v3rpc.NewWatchServer(s))
c.Watcher = &watchWrapper{clientv3.NewWatchFromWatchClient(wc, c)}

mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s))
mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s, v3rpc.NewHealthChecker(nil, s)))
c.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(mc, c)

clc := adapter.ClusterServerToClusterClient(v3rpc.NewClusterServer(s))
Expand Down
5 changes: 3 additions & 2 deletions server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,15 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer
pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))

// server should register all the services manually
// use empty service name for all etcd services' health status,
// see https://github.com/grpc/grpc/blob/master/doc/health-checking.md for more
hsrv := health.NewServer()
hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
healthpb.RegisterHealthServer(grpcServer, hsrv)
hc := NewHealthChecker(hsrv, s)

pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s, hc))

// set zero values for metrics registered for this grpc server
grpc_prometheus.Register(grpcServer)
Expand Down
139 changes: 139 additions & 0 deletions server/etcdserver/api/v3rpc/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v3rpc

import (
"context"
"fmt"
"time"

"go.etcd.io/raft/v3"
"go.uber.org/zap"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"

"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver"
)

type healthNotifier interface {
StartServe()
StopServe(reason string)
}

type healthChecker struct {
hs *health.Server
served bool

// implements readyz prober
s *etcdserver.EtcdServer
// prober configuration TODO 2023-09-13 externalize as etcd server flag
successThreshold int
failureThreshold int
healthCheckInterval time.Duration
// prober state
successCount int
failureCount int
}

func NewHealthChecker(hs *health.Server, s *etcdserver.EtcdServer) healthNotifier {
hc := &healthChecker{hs: hs, s: s, successThreshold: 2, failureThreshold: 3, healthCheckInterval: 100 * time.Millisecond}
// set grpc health server as serving status blindly since
// the grpc server will serve iff s.ReadyNotify() is closed.
hc.StartServe()

go hc.startProbe()
return hc
}

func (hc *healthChecker) StartServe() {
if hc.hs != nil {
if hc.s.Logger() != nil {
hc.s.Logger().Info("start serving gRPC requests from client")
}
hc.hs.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
hc.served = true
}
}

func (hc *healthChecker) StopServe(reason string) {
if hc.hs != nil {
if hc.s.Logger() != nil {
hc.s.Logger().Warn("stop serving gRPC requests from client", zap.String("reason", reason))
}
hc.hs.SetServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING)
hc.served = false
}
}

func (hc *healthChecker) startProbe() {
// stop probing if there is no consumer.
if hc.hs == nil {
return
}

ticker := time.NewTicker(hc.healthCheckInterval)
defer ticker.Stop()
for {
select {
case <-hc.s.StoppingNotify():
return
case <-ticker.C:
if ready, notReadyReason := checkReadyz(hc.s, 2*time.Second); ready {
hc.successCount += 1
hc.failureCount = 0
if hc.successCount >= hc.successThreshold && !hc.served {
hc.StartServe()
}
} else {
hc.failureCount += 1
hc.successCount = 0
hc.s.Logger().Warn("readyz check failed", zap.Int("failure-count", hc.failureCount))
if hc.failureCount >= hc.failureThreshold && hc.served {
hc.StopServe(notReadyReason)
}
}
}
}
}

// checkReadyz prober implementation should be in sync with readyz design in the future.
// https://docs.google.com/document/d/1109lUxD326yRwmMVX-tkJMpm8pTbOw7MD1XsSIo37MU
// it is now in sync with etcdhttp.HandleHealth implementation
// 1. checkLeader
// 2. checkAlarms
// 3. checkQuorumRead
func checkReadyz(s *etcdserver.EtcdServer, timeout time.Duration) (ready bool, notReadyReason string) {
if s.Leader() == types.ID(raft.None) {
return false, "local member does not have leader"
}
for _, alarm := range s.Alarms() {
if types.ID(alarm.MemberID) == s.MemberId() && alarm.Alarm == etcdserverpb.AlarmType_CORRUPT {
return false, "local member has corrupt alarm"
}
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
// range requests will only be timed-out in the LinearizableReadNotify phase, and
// blocking in the mvcc phase in case of defrag.
// However, use solely LinearizableReadNotify to indicate readyz will produce false positive serving status when defrag is active.
_, err := s.Range(ctx, &etcdserverpb.RangeRequest{KeysOnly: true, Limit: 1})
cancel()
if err != nil && err != auth.ErrUserEmpty && err != auth.ErrPermissionDenied {
return false, fmt.Sprintf("local member quorum read with %s timeout failed due to %v", timeout.String(), err)
}
return true, ""
}
10 changes: 7 additions & 3 deletions server/etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (

"github.com/dustin/go-humanize"

"go.etcd.io/raft/v3"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/api/v3/version"
Expand All @@ -32,7 +34,6 @@ import (
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/raft/v3"

"go.uber.org/zap"
)
Expand Down Expand Up @@ -75,10 +76,11 @@ type maintenanceServer struct {
cs ClusterStatusGetter
d Downgrader
vs serverversion.Server
hs healthNotifier
}

func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s)}
func NewMaintenanceServer(s *etcdserver.EtcdServer, hs healthNotifier) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s), hs: hs}
if srv.lg == nil {
srv.lg = zap.NewNop()
}
Expand All @@ -87,6 +89,8 @@ func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {

func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
ms.lg.Info("starting defragment")
ms.hs.StopServe("defrag is active")
defer ms.hs.StartServe()
err := ms.bg.Backend().Defrag()
if err != nil {
ms.lg.Warn("failed to defragment", zap.Error(err))
Expand Down
160 changes: 160 additions & 0 deletions tests/e2e/failover_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"google.golang.org/grpc"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)

const (
// in sync with how kubernetes uses etcd
// https://github.com/kubernetes/kubernetes/blob/release-1.28/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go#L59-L71
keepaliveTime = 30 * time.Second
keepaliveTimeout = 10 * time.Second
dialTimeout = 20 * time.Second

clientRuntime = 10 * time.Second
failureDetectionLatency = 6 * time.Second
// expect no more than 5 failed requests
failedRequests = 5
)

func TestFailover(t *testing.T) {
tcs := []struct {
name string
clusterOptions []e2e.EPClusterOption
failureInjector func(t *testing.T, clus *e2e.EtcdProcessCluster)
failureDetectionLatency time.Duration
}{
{
name: "network_partition",
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(3), e2e.WithIsPeerTLS(true), e2e.WithPeerProxy(true)},
failureInjector: blackhole,
failureDetectionLatency: failureDetectionLatency,
},
{
name: "stalled_disk_write",
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(3), e2e.WithGoFailEnabled(true)},
failureInjector: blockDiskWrite,
failureDetectionLatency: failureDetectionLatency,
},
{
name: "defrag",
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(3), e2e.WithGoFailEnabled(true)},
failureInjector: triggerDefrag,
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
e2e.BeforeTest(t)
clus, err := e2e.NewEtcdProcessCluster(context.TODO(), t, tc.clusterOptions...)
t.Cleanup(func() { clus.Stop() })
endpoints := clus.EndpointsGRPC()

cnt, success := 0, 0
donec := make(chan struct{})
errc := make(chan error, 1)

go func() {
var lastErr error
var cc *clientv3.Client
defer func() {
if cc != nil {
cc.Close()
}
errc <- lastErr
close(donec)
close(errc)
}()
cc, cerr := clientv3.New(clientv3.Config{
DialTimeout: dialTimeout,
DialKeepAliveTime: keepaliveTime,
DialKeepAliveTimeout: keepaliveTimeout,
Endpoints: endpoints,
DialOptions: []grpc.DialOption{
grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
// the following service config will disable grpc client health check
//grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
},
})
require.NoError(t, cerr)
timeout := time.After(clientRuntime)

time.Sleep(tc.failureDetectionLatency)
for {
select {
case <-timeout:
return
default:
}
cctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
//start := time.Now()
_, err = cc.Get(cctx, "health")
//t.Logf("TS (%s): number #%d health check took %v", time.Now().UTC().Format(time.RFC3339), cnt, time.Since(start))
cancel()
cnt++
if err != nil {
lastErr = err
continue
}
success++
}
}()

tc.failureInjector(t, clus)

<-donec
err, ok := <-errc
if ok && err != nil {
t.Logf("etcd client failed to fail over, error (%v)", err)
}
t.Logf("request failure rate is %.2f%%, traffic volume success %d requests, total %d requests", (1-float64(success)/float64(cnt))*100, success, cnt)
// expect no more than 5 failed requests
require.InDelta(t, cnt, success, failedRequests)
})
}
}

func blackhole(t *testing.T, clus *e2e.EtcdProcessCluster) {
member := clus.Procs[0]
proxy := member.PeerProxy()
t.Logf("Blackholing traffic from and to member %q", member.Config().Name)
proxy.BlackholeTx()
proxy.BlackholeRx()
}

func blockDiskWrite(t *testing.T, clus *e2e.EtcdProcessCluster) {
err := clus.Procs[0].Failpoints().Setup(context.Background(), "raftBeforeSave", `sleep(10000)`)
require.NoError(t, err)
clus.Procs[0].Etcdctl().Put(context.Background(), "foo", "bar", config.PutOptions{})
}

func triggerDefrag(t *testing.T, clus *e2e.EtcdProcessCluster) {
err := clus.Procs[0].Failpoints().Setup(context.Background(), "defragBeforeCopy", `sleep(8000)`)
require.NoError(t, err)
err = clus.Procs[0].Etcdctl().Defragment(context.Background(), config.DefragOption{Timeout: time.Minute})
require.NoError(t, err)
}

0 comments on commit deb6582

Please sign in to comment.