Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Etcd downgrade] Implement downgrade validate, enable and cancel #11801

Merged
merged 2 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions etcdserver/api/v3rpc/rpctypes/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ var (
ErrGPRCNotSupportedForLearner = status.New(codes.Unavailable, "etcdserver: rpc not supported for learner").Err()
ErrGRPCBadLeaderTransferee = status.New(codes.FailedPrecondition, "etcdserver: bad leader transferee").Err()

ErrGRPCClusterVersionUnavailable = status.New(codes.Unavailable, "etcdserver: cluster version not found during downgrade").Err()
ErrGRPCWrongDowngradeVersionFormat = status.New(codes.InvalidArgument, "etcdserver: wrong downgrade target version format").Err()
ErrGRPCInvalidDowngradeTargetVersion = status.New(codes.InvalidArgument, "etcdserver: invalid downgrade target version").Err()
ErrGRPCDowngradeInProcess = status.New(codes.FailedPrecondition, "etcdserver: cluster has a downgrade job in progress").Err()
ErrGRPCNoInflightDowngrade = status.New(codes.FailedPrecondition, "etcdserver: no inflight downgrade job").Err()

errStringToError = map[string]error{
ErrorDesc(ErrGRPCEmptyKey): ErrGRPCEmptyKey,
ErrorDesc(ErrGRPCKeyNotFound): ErrGRPCKeyNotFound,
Expand Down Expand Up @@ -132,6 +138,12 @@ var (
ErrorDesc(ErrGRPCCorrupt): ErrGRPCCorrupt,
ErrorDesc(ErrGPRCNotSupportedForLearner): ErrGPRCNotSupportedForLearner,
ErrorDesc(ErrGRPCBadLeaderTransferee): ErrGRPCBadLeaderTransferee,

ErrorDesc(ErrGRPCClusterVersionUnavailable): ErrGRPCClusterVersionUnavailable,
ErrorDesc(ErrGRPCWrongDowngradeVersionFormat): ErrGRPCWrongDowngradeVersionFormat,
ErrorDesc(ErrGRPCInvalidDowngradeTargetVersion): ErrGRPCInvalidDowngradeTargetVersion,
ErrorDesc(ErrGRPCDowngradeInProcess): ErrGRPCDowngradeInProcess,
ErrorDesc(ErrGRPCNoInflightDowngrade): ErrGRPCNoInflightDowngrade,
}
)

Expand Down Expand Up @@ -190,6 +202,12 @@ var (
ErrUnhealthy = Error(ErrGRPCUnhealthy)
ErrCorrupt = Error(ErrGRPCCorrupt)
ErrBadLeaderTransferee = Error(ErrGRPCBadLeaderTransferee)

ErrClusterVersionUnavailable = Error(ErrGRPCClusterVersionUnavailable)
ErrWrongDowngradeVersionFormat = Error(ErrGRPCWrongDowngradeVersionFormat)
ErrInvalidDowngradeTargetVersion = Error(ErrGRPCInvalidDowngradeTargetVersion)
ErrDowngradeInProcess = Error(ErrGRPCDowngradeInProcess)
ErrNoInflightDowngrade = Error(ErrGRPCNoInflightDowngrade)
)

// EtcdError defines gRPC server errors.
Expand Down
6 changes: 6 additions & 0 deletions etcdserver/api/v3rpc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ var toGRPCErrorMap = map[error]error{
etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt,
etcdserver.ErrBadLeaderTransferee: rpctypes.ErrGRPCBadLeaderTransferee,

etcdserver.ErrClusterVersionUnavailable: rpctypes.ErrGRPCClusterVersionUnavailable,
etcdserver.ErrWrongDowngradeVersionFormat: rpctypes.ErrGRPCWrongDowngradeVersionFormat,
etcdserver.ErrInvalidDowngradeTargetVersion: rpctypes.ErrGRPCInvalidDowngradeTargetVersion,
etcdserver.ErrDowngradeInProcess: rpctypes.ErrGRPCDowngradeInProcess,
etcdserver.ErrNoInflightDowngrade: rpctypes.ErrGRPCNoInflightDowngrade,

lease.ErrLeaseNotFound: rpctypes.ErrGRPCLeaseNotFound,
lease.ErrLeaseExists: rpctypes.ErrGRPCLeaseExist,
lease.ErrLeaseTTLTooLarge: rpctypes.ErrGRPCLeaseTTLTooLarge,
Expand Down
11 changes: 11 additions & 0 deletions etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type applyResult struct {
type applierV3Internal interface {
ClusterVersionSet(r *membershippb.ClusterVersionSetRequest)
ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest)
DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest)
}

// applierV3 is the interface for processing V3 raft messages
Expand Down Expand Up @@ -195,6 +196,8 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet)
case r.ClusterMemberAttrSet != nil:
a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet)
case r.DowngradeInfoSet != nil:
a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet)
default:
panic("not implemented")
}
Expand Down Expand Up @@ -882,6 +885,14 @@ func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAtt
)
}

func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest) {
d := membership.DowngradeInfo{Enabled: false}
if r.Enabled {
d = membership.DowngradeInfo{Enabled: true, TargetVersion: r.Ver}
}
a.s.cluster.SetDowngradeInfo(&d)
}

type quotaApplierV3 struct {
applierV3
q Quota
Expand Down
19 changes: 19 additions & 0 deletions etcdserver/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,3 +355,22 @@ func promoteMemberHTTP(ctx context.Context, url string, id uint64, peerRt http.R
}
return membs, nil
}

func convertToClusterVersion(v string) (*semver.Version, error) {
ver, err := semver.NewVersion(v)
if err != nil {
// allow input version format Major.Minor
ver, err = semver.NewVersion(v + ".0")
if err != nil {
return nil, ErrWrongDowngradeVersionFormat
}
}
// cluster version only keeps major.minor, remove patch version
ver = &semver.Version{Major: ver.Major, Minor: ver.Minor}
return ver, nil
}

// Todo: handle the case that downgrading from higher major version(e.g. downgrade from v4.0 to v3.x)
func allowedDowngradeVersion(ver *semver.Version) *semver.Version {
return &semver.Version{Major: ver.Major, Minor: ver.Minor - 1}
}
43 changes: 43 additions & 0 deletions etcdserver/cluster_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,46 @@ func TestIsCompatibleWithVers(t *testing.T) {
}
}
}

func TestConvertToClusterVersion(t *testing.T) {
tests := []struct {
name string
inputVerStr string
expectedVer string
hasError bool
}{
{
"Succeeded: Major.Minor.Patch",
"3.4.2",
"3.4.0",
false,
},
{
"Succeeded: Major.Minor",
"3.4",
"3.4.0",
false,
},
{
"Failed: wrong version format",
"3*.9",
"",
true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ver, err := convertToClusterVersion(tt.inputVerStr)
hasError := err != nil
if hasError != tt.hasError {
t.Errorf("Expected error status is %v; Got %v", tt.hasError, err)
}
if tt.hasError {
return
}
if ver == nil || tt.expectedVer != ver.String() {
t.Errorf("Expected output cluster version is %v; Got %v", tt.expectedVer, ver)
}
})
}
}
43 changes: 24 additions & 19 deletions etcdserver/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,30 @@ import (
)

var (
ErrUnknownMethod = errors.New("etcdserver: unknown method")
ErrStopped = errors.New("etcdserver: server stopped")
ErrCanceled = errors.New("etcdserver: request cancelled")
ErrTimeout = errors.New("etcdserver: request timed out")
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long")
ErrLeaderChanged = errors.New("etcdserver: leader changed")
ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members")
ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader")
ErrNoLeader = errors.New("etcdserver: no leader")
ErrNotLeader = errors.New("etcdserver: not leader")
ErrRequestTooLarge = errors.New("etcdserver: request is too large")
ErrNoSpace = errors.New("etcdserver: no space")
ErrTooManyRequests = errors.New("etcdserver: too many requests")
ErrUnhealthy = errors.New("etcdserver: unhealthy cluster")
ErrKeyNotFound = errors.New("etcdserver: key not found")
ErrCorrupt = errors.New("etcdserver: corrupt cluster")
ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee")
ErrUnknownMethod = errors.New("etcdserver: unknown method")
ErrStopped = errors.New("etcdserver: server stopped")
ErrCanceled = errors.New("etcdserver: request cancelled")
ErrTimeout = errors.New("etcdserver: request timed out")
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long")
ErrLeaderChanged = errors.New("etcdserver: leader changed")
ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members")
ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader")
ErrNoLeader = errors.New("etcdserver: no leader")
ErrNotLeader = errors.New("etcdserver: not leader")
ErrRequestTooLarge = errors.New("etcdserver: request is too large")
ErrNoSpace = errors.New("etcdserver: no space")
ErrTooManyRequests = errors.New("etcdserver: too many requests")
ErrUnhealthy = errors.New("etcdserver: unhealthy cluster")
ErrKeyNotFound = errors.New("etcdserver: key not found")
ErrCorrupt = errors.New("etcdserver: corrupt cluster")
ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee")
ErrClusterVersionUnavailable = errors.New("etcdserver: cluster version not found during downgrade")
ErrWrongDowngradeVersionFormat = errors.New("etcdserver: wrong downgrade target version format")
ErrInvalidDowngradeTargetVersion = errors.New("etcdserver: invalid downgrade target version")
ErrDowngradeInProcess = errors.New("etcdserver: cluster has a downgrade job in progress")
ErrNoInflightDowngrade = errors.New("etcdserver: no inflight downgrade job")
)

type DiscoveryError struct {
Expand Down
91 changes: 90 additions & 1 deletion etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"go.etcd.io/etcd/v3/auth"
"go.etcd.io/etcd/v3/etcdserver/api/membership"
"go.etcd.io/etcd/v3/etcdserver/api/membership/membershippb"
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
"go.etcd.io/etcd/v3/lease"
"go.etcd.io/etcd/v3/lease/leasehttp"
Expand Down Expand Up @@ -806,5 +807,93 @@ func (s *EtcdServer) AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error
}

func (s *EtcdServer) Downgrade(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
return nil, nil
switch r.Action {
case pb.DowngradeRequest_VALIDATE:
return s.downgradeValidate(ctx, r.Version)
case pb.DowngradeRequest_ENABLE:
return s.downgradeEnable(ctx, r)
case pb.DowngradeRequest_CANCEL:
return s.downgradeCancel(ctx)
default:
return nil, ErrUnknownMethod
}
}

func (s *EtcdServer) downgradeValidate(ctx context.Context, v string) (*pb.DowngradeResponse, error) {
resp := &pb.DowngradeResponse{}

targetVersion, err := convertToClusterVersion(v)
if err != nil {
return nil, err
}

// gets leaders commit index and wait for local store to finish applying that index
// to avoid using stale downgrade information
err = s.linearizableReadNotify(ctx)
if err != nil {
return nil, err
}

cv := s.ClusterVersion()
if cv == nil {
return nil, ErrClusterVersionUnavailable
}
resp.Version = cv.String()

allowedTargetVersion := allowedDowngradeVersion(cv)
if !targetVersion.Equal(*allowedTargetVersion) {
return nil, ErrInvalidDowngradeTargetVersion
}

downgradeInfo := s.cluster.DowngradeInfo()
if downgradeInfo.Enabled {
// Todo: return the downgrade status along with the error msg
return nil, ErrDowngradeInProcess
}
return resp, nil
}

func (s *EtcdServer) downgradeEnable(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
// validate downgrade capability before starting downgrade
v := r.Version
lg := s.getLogger()
if resp, err := s.downgradeValidate(ctx, v); err != nil {
lg.Warn("reject downgrade request", zap.Error(err))
return resp, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we log when downgrade request is denied?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added warning level log.

}
targetVersion, err := convertToClusterVersion(v)
if err != nil {
lg.Warn("reject downgrade request", zap.Error(err))
return nil, err
}

raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: true, Ver: targetVersion.String()}
_, err = s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest})
if err != nil {
lg.Warn("reject downgrade request", zap.Error(err))
return nil, err
}
resp := pb.DowngradeResponse{Version: s.ClusterVersion().String()}
return &resp, nil
}

func (s *EtcdServer) downgradeCancel(ctx context.Context) (*pb.DowngradeResponse, error) {
// gets leaders commit index and wait for local store to finish applying that index
// to avoid using stale downgrade information
if err := s.linearizableReadNotify(ctx); err != nil {
return nil, err
}

downgradeInfo := s.cluster.DowngradeInfo()
if !downgradeInfo.Enabled {
return nil, ErrNoInflightDowngrade
}

raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: false}
_, err := s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest})
if err != nil {
return nil, err
}
resp := pb.DowngradeResponse{Version: s.ClusterVersion().String()}
return &resp, nil
}