Skip to content

Commit

Permalink
Add MemberDowngrade failpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Siyuan Zhang <sizhang@google.com>
  • Loading branch information
siyuanfoundation committed Dec 10, 2024
1 parent 854bdd6 commit a809747
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 0 deletions.
7 changes: 7 additions & 0 deletions tests/framework/e2e/curl.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,10 @@ func CURLGet(clus *EtcdProcessCluster, req CURLReq) error {

return SpawnWithExpectsContext(ctx, CURLPrefixArgsCluster(clus.Cfg, clus.Procs[rand.Intn(clus.Cfg.ClusterSize)], "GET", req), nil, req.Expected)
}

func CURLGetFromMember(clus *EtcdProcessCluster, member EtcdProcess, req CURLReq) error {
ctx, cancel := context.WithTimeout(context.Background(), req.timeoutDuration())
defer cancel()

return SpawnWithExpectsContext(ctx, CURLPrefixArgsCluster(clus.Cfg, member, "GET", req), nil, req.Expected)
}
116 changes: 116 additions & 0 deletions tests/robustness/failpoint/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ import (
"testing"
"time"

"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/expect"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/identity"
Expand All @@ -35,6 +38,7 @@ import (
)

var MemberReplace Failpoint = memberReplace{}
var MemberDowngrade Failpoint = memberDowngrade{}

type memberReplace struct{}

Expand Down Expand Up @@ -138,6 +142,92 @@ func (f memberReplace) Available(config e2e.EtcdProcessClusterConfig, member e2e
return config.ClusterSize > 1 && (config.Version == e2e.QuorumLastVersion || member.Config().ExecPath == e2e.BinPath.Etcd)
}

type memberDowngrade struct{}

func (f memberDowngrade) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) {
v, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd)
if err != nil {
return nil, err
}
targetVersion := semver.Version{Major: v.Major, Minor: v.Minor - 1}
numberOfMembersToDowngrade := rand.Int()%len(clus.Procs) + 1
membersToDowngrade := rand.Perm(len(clus.Procs))[:numberOfMembersToDowngrade]
lg.Info("Test downgrading members", zap.Any("members", membersToDowngrade))

member := clus.Procs[0]
endpoints := []string{member.EndpointsGRPC()[0]}
cc, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
Logger: zap.NewNop(),
DialKeepAliveTime: 10 * time.Second,
DialKeepAliveTimeout: 100 * time.Millisecond,
})
if err != nil {
return nil, err
}
defer cc.Close()

// Need to wait health interval for cluster to accept changes
time.Sleep(etcdserver.HealthInterval)
lg.Info("Enable downgrade")
err = enableDowngrade(ctx, cc, &targetVersion)
if err != nil {
return nil, err
}
// Need to wait health interval for cluster to prepare for downgrade
time.Sleep(etcdserver.HealthInterval)

for _, memberID := range membersToDowngrade {
member = clus.Procs[memberID]
lg.Info("Downgrading member", zap.String("member", member.Config().Name))
for member.IsRunning() {
err = member.Kill()
if err != nil {
lg.Info("Sending kill signal failed", zap.Error(err))
}
err = member.Wait(ctx)
if err != nil && !strings.Contains(err.Error(), "unexpected exit code") {
lg.Info("Failed to kill the process", zap.Error(err))
return nil, fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err)
}
}
if lazyfs := member.LazyFS(); lazyfs != nil {
lg.Info("Removing data that was not fsynced")
err := lazyfs.ClearCache(ctx)
if err != nil {
return nil, err
}
}
member.Config().ExecPath = e2e.BinPath.EtcdLastRelease
err = patchArgs(member.Config().Args, "initial-cluster-state", "existing")
if err != nil {
return nil, err
}
lg.Info("Restarting member", zap.String("member", member.Config().Name))
err = member.Start(ctx)
if err != nil {
return nil, err
}
err = verifyVersion(t, clus, member, targetVersion)
}
time.Sleep(etcdserver.HealthInterval)
return nil, err
}

func (f memberDowngrade) Name() string {
return "MemberDowngrade"
}

func (f memberDowngrade) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, profile traffic.Profile) bool {
v, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd)
if err != nil {
panic("Failed checking etcd version binary")
}
v3_6 := semver.Version{Major: 3, Minor: 6}
// only current version cluster can be downgraded.
return config.ClusterSize > 1 && v.Compare(v3_6) >= 0 && (config.Version == e2e.CurrentVersion && member.Config().ExecPath == e2e.BinPath.Etcd)
}

func getID(ctx context.Context, cc *clientv3.Client, name string) (id uint64, found bool, err error) {
// Ensure linearized MemberList by first making a linearized Get request from the same member.
// This is required for v3.4 support as it doesn't support linearized MemberList https://github.com/etcd-io/etcd/issues/18929
Expand Down Expand Up @@ -170,3 +260,29 @@ func patchArgs(args []string, flag, newValue string) error {
}
return fmt.Errorf("--%s flag not found", flag)
}

func enableDowngrade(ctx context.Context, cc *clientv3.Client, targetVersion *semver.Version) error {
_, err := cc.Maintenance.Downgrade(ctx, clientv3.DowngradeAction(pb.DowngradeRequest_VALIDATE), targetVersion.String())
if err != nil {
return err
}
_, err = cc.Maintenance.Downgrade(ctx, clientv3.DowngradeAction(pb.DowngradeRequest_ENABLE), targetVersion.String())
return err
}

func verifyVersion(t *testing.T, clus *e2e.EtcdProcessCluster, member e2e.EtcdProcess, expectedVersion semver.Version) error {
var err error
expected := fmt.Sprintf(`"etcdserver":"%d.%d\..*"etcdcluster":"%d\.%d\.`, expectedVersion.Major, expectedVersion.Minor, expectedVersion.Major, expectedVersion.Minor)
for i := 0; i < 35; i++ {
if err = e2e.CURLGetFromMember(clus, member, e2e.CURLReq{Endpoint: "/version", Expected: expect.ExpectedResponse{Value: expected, IsRegularExpr: true}}); err != nil {
t.Logf("#%d: v3 is not ready yet (%v)", i, err)
time.Sleep(200 * time.Millisecond)
continue
}
break
}
if err != nil {
return fmt.Errorf("failed to verify version, expected %v got (%v)", expected, err)
}
return nil
}
1 change: 1 addition & 0 deletions tests/robustness/failpoint/failpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ var allFailpoints = []Failpoint{
RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, BlackholeUntilSnapshot,
BeforeApplyOneConfChangeSleep,
MemberReplace,
MemberDowngrade,
DropPeerNetwork,
RaftBeforeSaveSleep,
RaftAfterSaveSleep,
Expand Down
2 changes: 2 additions & 0 deletions tests/robustness/report/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ func parseEntryNormal(ent raftpb.Entry) (*model.EtcdRequest, error) {
return nil, nil
case raftReq.ClusterVersionSet != nil:
return nil, nil
case raftReq.DowngradeInfoSet != nil:
return nil, nil
case raftReq.Compaction != nil:
request := model.EtcdRequest{
Type: model.Compact,
Expand Down

0 comments on commit a809747

Please sign in to comment.