diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index e6a281460d5..26e3fd378c1 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -76,6 +76,12 @@ var ( ErrGPRCNotSupportedForLearner = status.New(codes.Unavailable, "etcdserver: rpc not supported for learner").Err() ErrGRPCBadLeaderTransferee = status.New(codes.FailedPrecondition, "etcdserver: bad leader transferee").Err() + ErrGRPCClusterVersionUnavailable = status.New(codes.Unavailable, "etcdserver: cluster version not found during downgrade").Err() + ErrGRPCWrongDowngradeVersionFormat = status.New(codes.InvalidArgument, "etcdserver: wrong downgrade target version format").Err() + ErrGRPCInvalidDowngradeTargetVersion = status.New(codes.InvalidArgument, "etcdserver: invalid downgrade target version").Err() + ErrGRPCDowngradeInProcess = status.New(codes.FailedPrecondition, "etcdserver: cluster has a downgrade job in progress").Err() + ErrGRPCNoInflightDowngrade = status.New(codes.FailedPrecondition, "etcdserver: no inflight downgrade job").Err() + errStringToError = map[string]error{ ErrorDesc(ErrGRPCEmptyKey): ErrGRPCEmptyKey, ErrorDesc(ErrGRPCKeyNotFound): ErrGRPCKeyNotFound, @@ -132,6 +138,12 @@ var ( ErrorDesc(ErrGRPCCorrupt): ErrGRPCCorrupt, ErrorDesc(ErrGPRCNotSupportedForLearner): ErrGPRCNotSupportedForLearner, ErrorDesc(ErrGRPCBadLeaderTransferee): ErrGRPCBadLeaderTransferee, + + ErrorDesc(ErrGRPCClusterVersionUnavailable): ErrGRPCClusterVersionUnavailable, + ErrorDesc(ErrGRPCWrongDowngradeVersionFormat): ErrGRPCWrongDowngradeVersionFormat, + ErrorDesc(ErrGRPCInvalidDowngradeTargetVersion): ErrGRPCInvalidDowngradeTargetVersion, + ErrorDesc(ErrGRPCDowngradeInProcess): ErrGRPCDowngradeInProcess, + ErrorDesc(ErrGRPCNoInflightDowngrade): ErrGRPCNoInflightDowngrade, } ) @@ -190,6 +202,12 @@ var ( ErrUnhealthy = Error(ErrGRPCUnhealthy) ErrCorrupt = Error(ErrGRPCCorrupt) ErrBadLeaderTransferee = Error(ErrGRPCBadLeaderTransferee) + + ErrClusterVersionUnavailable = Error(ErrGRPCClusterVersionUnavailable) + ErrWrongDowngradeVersionFormat = Error(ErrGRPCWrongDowngradeVersionFormat) + ErrInvalidDowngradeTargetVersion = Error(ErrGRPCInvalidDowngradeTargetVersion) + ErrDowngradeInProcess = Error(ErrGRPCDowngradeInProcess) + ErrNoInflightDowngrade = Error(ErrGRPCNoInflightDowngrade) ) // EtcdError defines gRPC server errors. diff --git a/etcdserver/api/v3rpc/util.go b/etcdserver/api/v3rpc/util.go index 6027724b251..2ae37af05bf 100644 --- a/etcdserver/api/v3rpc/util.go +++ b/etcdserver/api/v3rpc/util.go @@ -58,6 +58,12 @@ var toGRPCErrorMap = map[error]error{ etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt, etcdserver.ErrBadLeaderTransferee: rpctypes.ErrGRPCBadLeaderTransferee, + etcdserver.ErrClusterVersionUnavailable: rpctypes.ErrGRPCClusterVersionUnavailable, + etcdserver.ErrWrongDowngradeVersionFormat: rpctypes.ErrGRPCWrongDowngradeVersionFormat, + etcdserver.ErrInvalidDowngradeTargetVersion: rpctypes.ErrGRPCInvalidDowngradeTargetVersion, + etcdserver.ErrDowngradeInProcess: rpctypes.ErrGRPCDowngradeInProcess, + etcdserver.ErrNoInflightDowngrade: rpctypes.ErrGRPCNoInflightDowngrade, + lease.ErrLeaseNotFound: rpctypes.ErrGRPCLeaseNotFound, lease.ErrLeaseExists: rpctypes.ErrGRPCLeaseExist, lease.ErrLeaseTTLTooLarge: rpctypes.ErrGRPCLeaseTTLTooLarge, diff --git a/etcdserver/apply.go b/etcdserver/apply.go index b85dd50d654..0135f8bcd37 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -55,6 +55,7 @@ type applyResult struct { type applierV3Internal interface { ClusterVersionSet(r *membershippb.ClusterVersionSetRequest) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest) + DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest) } // applierV3 is the interface for processing V3 raft messages @@ -195,6 +196,8 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet) case r.ClusterMemberAttrSet != nil: a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet) + case r.DowngradeInfoSet != nil: + a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet) default: panic("not implemented") } @@ -882,6 +885,14 @@ func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAtt ) } +func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest) { + d := membership.DowngradeInfo{Enabled: false} + if r.Enabled { + d = membership.DowngradeInfo{Enabled: true, TargetVersion: r.Ver} + } + a.s.cluster.SetDowngradeInfo(&d) +} + type quotaApplierV3 struct { applierV3 q Quota diff --git a/etcdserver/cluster_util.go b/etcdserver/cluster_util.go index 37351519c59..7990a31d58c 100644 --- a/etcdserver/cluster_util.go +++ b/etcdserver/cluster_util.go @@ -355,3 +355,22 @@ func promoteMemberHTTP(ctx context.Context, url string, id uint64, peerRt http.R } return membs, nil } + +func convertToClusterVersion(v string) (*semver.Version, error) { + ver, err := semver.NewVersion(v) + if err != nil { + // allow input version format Major.Minor + ver, err = semver.NewVersion(v + ".0") + if err != nil { + return nil, ErrWrongDowngradeVersionFormat + } + } + // cluster version only keeps major.minor, remove patch version + ver = &semver.Version{Major: ver.Major, Minor: ver.Minor} + return ver, nil +} + +// Todo: handle the case that downgrading from higher major version(e.g. downgrade from v4.0 to v3.x) +func allowedDowngradeVersion(ver *semver.Version) *semver.Version { + return &semver.Version{Major: ver.Major, Minor: ver.Minor - 1} +} diff --git a/etcdserver/cluster_util_test.go b/etcdserver/cluster_util_test.go index de90d60396f..04b9925ff95 100644 --- a/etcdserver/cluster_util_test.go +++ b/etcdserver/cluster_util_test.go @@ -133,3 +133,46 @@ func TestIsCompatibleWithVers(t *testing.T) { } } } + +func TestConvertToClusterVersion(t *testing.T) { + tests := []struct { + name string + inputVerStr string + expectedVer string + hasError bool + }{ + { + "Succeeded: Major.Minor.Patch", + "3.4.2", + "3.4.0", + false, + }, + { + "Succeeded: Major.Minor", + "3.4", + "3.4.0", + false, + }, + { + "Failed: wrong version format", + "3*.9", + "", + true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ver, err := convertToClusterVersion(tt.inputVerStr) + hasError := err != nil + if hasError != tt.hasError { + t.Errorf("Expected error status is %v; Got %v", tt.hasError, err) + } + if tt.hasError { + return + } + if ver == nil || tt.expectedVer != ver.String() { + t.Errorf("Expected output cluster version is %v; Got %v", tt.expectedVer, ver) + } + }) + } +} diff --git a/etcdserver/errors.go b/etcdserver/errors.go index d0fe28970d1..dc2a85fdd47 100644 --- a/etcdserver/errors.go +++ b/etcdserver/errors.go @@ -20,25 +20,30 @@ import ( ) var ( - ErrUnknownMethod = errors.New("etcdserver: unknown method") - ErrStopped = errors.New("etcdserver: server stopped") - ErrCanceled = errors.New("etcdserver: request cancelled") - ErrTimeout = errors.New("etcdserver: request timed out") - ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure") - ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost") - ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long") - ErrLeaderChanged = errors.New("etcdserver: leader changed") - ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members") - ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader") - ErrNoLeader = errors.New("etcdserver: no leader") - ErrNotLeader = errors.New("etcdserver: not leader") - ErrRequestTooLarge = errors.New("etcdserver: request is too large") - ErrNoSpace = errors.New("etcdserver: no space") - ErrTooManyRequests = errors.New("etcdserver: too many requests") - ErrUnhealthy = errors.New("etcdserver: unhealthy cluster") - ErrKeyNotFound = errors.New("etcdserver: key not found") - ErrCorrupt = errors.New("etcdserver: corrupt cluster") - ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee") + ErrUnknownMethod = errors.New("etcdserver: unknown method") + ErrStopped = errors.New("etcdserver: server stopped") + ErrCanceled = errors.New("etcdserver: request cancelled") + ErrTimeout = errors.New("etcdserver: request timed out") + ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure") + ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost") + ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long") + ErrLeaderChanged = errors.New("etcdserver: leader changed") + ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members") + ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader") + ErrNoLeader = errors.New("etcdserver: no leader") + ErrNotLeader = errors.New("etcdserver: not leader") + ErrRequestTooLarge = errors.New("etcdserver: request is too large") + ErrNoSpace = errors.New("etcdserver: no space") + ErrTooManyRequests = errors.New("etcdserver: too many requests") + ErrUnhealthy = errors.New("etcdserver: unhealthy cluster") + ErrKeyNotFound = errors.New("etcdserver: key not found") + ErrCorrupt = errors.New("etcdserver: corrupt cluster") + ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee") + ErrClusterVersionUnavailable = errors.New("etcdserver: cluster version not found during downgrade") + ErrWrongDowngradeVersionFormat = errors.New("etcdserver: wrong downgrade target version format") + ErrInvalidDowngradeTargetVersion = errors.New("etcdserver: invalid downgrade target version") + ErrDowngradeInProcess = errors.New("etcdserver: cluster has a downgrade job in progress") + ErrNoInflightDowngrade = errors.New("etcdserver: no inflight downgrade job") ) type DiscoveryError struct { diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 75df7c48126..92d8fb5348e 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -22,6 +22,7 @@ import ( "go.etcd.io/etcd/v3/auth" "go.etcd.io/etcd/v3/etcdserver/api/membership" + "go.etcd.io/etcd/v3/etcdserver/api/membership/membershippb" pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb" "go.etcd.io/etcd/v3/lease" "go.etcd.io/etcd/v3/lease/leasehttp" @@ -806,5 +807,93 @@ func (s *EtcdServer) AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error } func (s *EtcdServer) Downgrade(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) { - return nil, nil + switch r.Action { + case pb.DowngradeRequest_VALIDATE: + return s.downgradeValidate(ctx, r.Version) + case pb.DowngradeRequest_ENABLE: + return s.downgradeEnable(ctx, r) + case pb.DowngradeRequest_CANCEL: + return s.downgradeCancel(ctx) + default: + return nil, ErrUnknownMethod + } +} + +func (s *EtcdServer) downgradeValidate(ctx context.Context, v string) (*pb.DowngradeResponse, error) { + resp := &pb.DowngradeResponse{} + + targetVersion, err := convertToClusterVersion(v) + if err != nil { + return nil, err + } + + // gets leaders commit index and wait for local store to finish applying that index + // to avoid using stale downgrade information + err = s.linearizableReadNotify(ctx) + if err != nil { + return nil, err + } + + cv := s.ClusterVersion() + if cv == nil { + return nil, ErrClusterVersionUnavailable + } + resp.Version = cv.String() + + allowedTargetVersion := allowedDowngradeVersion(cv) + if !targetVersion.Equal(*allowedTargetVersion) { + return nil, ErrInvalidDowngradeTargetVersion + } + + downgradeInfo := s.cluster.DowngradeInfo() + if downgradeInfo.Enabled { + // Todo: return the downgrade status along with the error msg + return nil, ErrDowngradeInProcess + } + return resp, nil +} + +func (s *EtcdServer) downgradeEnable(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) { + // validate downgrade capability before starting downgrade + v := r.Version + lg := s.getLogger() + if resp, err := s.downgradeValidate(ctx, v); err != nil { + lg.Warn("reject downgrade request", zap.Error(err)) + return resp, err + } + targetVersion, err := convertToClusterVersion(v) + if err != nil { + lg.Warn("reject downgrade request", zap.Error(err)) + return nil, err + } + + raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: true, Ver: targetVersion.String()} + _, err = s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest}) + if err != nil { + lg.Warn("reject downgrade request", zap.Error(err)) + return nil, err + } + resp := pb.DowngradeResponse{Version: s.ClusterVersion().String()} + return &resp, nil +} + +func (s *EtcdServer) downgradeCancel(ctx context.Context) (*pb.DowngradeResponse, error) { + // gets leaders commit index and wait for local store to finish applying that index + // to avoid using stale downgrade information + if err := s.linearizableReadNotify(ctx); err != nil { + return nil, err + } + + downgradeInfo := s.cluster.DowngradeInfo() + if !downgradeInfo.Enabled { + return nil, ErrNoInflightDowngrade + } + + raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: false} + _, err := s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest}) + if err != nil { + return nil, err + } + resp := pb.DowngradeResponse{Version: s.ClusterVersion().String()} + return &resp, nil }