Skip to content

Commit

Permalink
Merge pull request #17330 from siyuanfoundation/3.4-downgrade
Browse files Browse the repository at this point in the history
[3.4] allow downgrade from 3.5
  • Loading branch information
ahrtr committed Feb 20, 2024
2 parents 2662038 + 2caf0f0 commit c057f47
Show file tree
Hide file tree
Showing 34 changed files with 1,303 additions and 444 deletions.
2 changes: 1 addition & 1 deletion clientv3/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error {
return err
}

s.cl, err = membership.NewClusterFromURLsMap(s.lg, cfg.InitialClusterToken, ics)
s.cl, err = membership.NewClusterFromURLsMap(s.lg, cfg.InitialClusterToken, ics, true)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ type Config struct {
// UnsafeNoFsync disables all uses of fsync.
// Setting this is unsafe and will cause data loss.
UnsafeNoFsync bool `json:"unsafe-no-fsync"`
// NextClusterVersionCompatible enables 3.4 to be compatible with next version 3.5, to allow 3.4 server to join 3.5 cluster and start on 3.5 schema.
NextClusterVersionCompatible bool `json:"next-cluster-version-compatible"`
}

// configYAML holds the config suitable for yaml parsing
Expand Down
106 changes: 56 additions & 50 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"go.etcd.io/etcd/etcdserver/api/v2v3"
"go.etcd.io/etcd/etcdserver/api/v3client"
"go.etcd.io/etcd/etcdserver/api/v3rpc"
"go.etcd.io/etcd/etcdserver/verify"
"go.etcd.io/etcd/pkg/debugutil"
runtimeutil "go.etcd.io/etcd/pkg/runtime"
"go.etcd.io/etcd/pkg/transport"
Expand Down Expand Up @@ -164,56 +165,57 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
backendFreelistType := parseBackendFreelistType(cfg.ExperimentalBackendFreelistType)

srvcfg := etcdserver.ServerConfig{
Name: cfg.Name,
ClientURLs: cfg.AdvertiseClientUrls,
PeerURLs: cfg.AdvertisePeerUrls,
DataDir: cfg.Dir,
DedicatedWALDir: cfg.WalDir,
SnapshotCount: cfg.SnapshotCount,
SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries,
MaxSnapFiles: cfg.MaxSnapFiles,
MaxWALFiles: cfg.MaxWalFiles,
InitialPeerURLsMap: urlsmap,
InitialClusterToken: token,
DiscoveryURL: cfg.Durl,
DiscoveryProxy: cfg.Dproxy,
NewCluster: cfg.IsNewCluster(),
PeerTLSInfo: cfg.PeerTLSInfo,
TickMs: cfg.TickMs,
ElectionTicks: cfg.ElectionTicks(),
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
AutoCompactionRetention: autoCompactionRetention,
AutoCompactionMode: cfg.AutoCompactionMode,
QuotaBackendBytes: cfg.QuotaBackendBytes,
BackendBatchLimit: cfg.BackendBatchLimit,
BackendFreelistType: backendFreelistType,
BackendBatchInterval: cfg.BackendBatchInterval,
MaxTxnOps: cfg.MaxTxnOps,
MaxRequestBytes: cfg.MaxRequestBytes,
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
AuthToken: cfg.AuthToken,
BcryptCost: cfg.BcryptCost,
TokenTTL: cfg.AuthTokenTTL,
CORS: cfg.CORS,
HostWhitelist: cfg.HostWhitelist,
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
PreVote: cfg.PreVote,
Logger: cfg.logger,
LoggerConfig: cfg.loggerConfig,
LoggerCore: cfg.loggerCore,
LoggerWriteSyncer: cfg.loggerWriteSyncer,
Debug: cfg.Debug,
ForceNewCluster: cfg.ForceNewCluster,
EnableGRPCGateway: cfg.EnableGRPCGateway,
UnsafeNoFsync: cfg.UnsafeNoFsync,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
Name: cfg.Name,
ClientURLs: cfg.AdvertiseClientUrls,
PeerURLs: cfg.AdvertisePeerUrls,
DataDir: cfg.Dir,
DedicatedWALDir: cfg.WalDir,
SnapshotCount: cfg.SnapshotCount,
SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries,
MaxSnapFiles: cfg.MaxSnapFiles,
MaxWALFiles: cfg.MaxWalFiles,
InitialPeerURLsMap: urlsmap,
InitialClusterToken: token,
DiscoveryURL: cfg.Durl,
DiscoveryProxy: cfg.Dproxy,
NewCluster: cfg.IsNewCluster(),
PeerTLSInfo: cfg.PeerTLSInfo,
TickMs: cfg.TickMs,
ElectionTicks: cfg.ElectionTicks(),
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
AutoCompactionRetention: autoCompactionRetention,
AutoCompactionMode: cfg.AutoCompactionMode,
QuotaBackendBytes: cfg.QuotaBackendBytes,
BackendBatchLimit: cfg.BackendBatchLimit,
BackendFreelistType: backendFreelistType,
BackendBatchInterval: cfg.BackendBatchInterval,
MaxTxnOps: cfg.MaxTxnOps,
MaxRequestBytes: cfg.MaxRequestBytes,
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
AuthToken: cfg.AuthToken,
BcryptCost: cfg.BcryptCost,
TokenTTL: cfg.AuthTokenTTL,
CORS: cfg.CORS,
HostWhitelist: cfg.HostWhitelist,
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
PreVote: cfg.PreVote,
Logger: cfg.logger,
LoggerConfig: cfg.loggerConfig,
LoggerCore: cfg.loggerCore,
LoggerWriteSyncer: cfg.loggerWriteSyncer,
Debug: cfg.Debug,
ForceNewCluster: cfg.ForceNewCluster,
EnableGRPCGateway: cfg.EnableGRPCGateway,
UnsafeNoFsync: cfg.UnsafeNoFsync,
NextClusterVersionCompatible: cfg.NextClusterVersionCompatible,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
}
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
Expand Down Expand Up @@ -375,6 +377,10 @@ func (e *Etcd) Close() {
defer func() {
if lg != nil {
lg.Info("closed etcd server", fields...)
verify.MustVerifyIfEnabled(verify.Config{
Logger: lg,
DataDir: e.cfg.Dir,
})
lg.Sync()
}
}()
Expand Down
2 changes: 1 addition & 1 deletion etcdctl/ctlv3/command/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func prepareBackend() backend.Backend {

func rebuildStoreV2() (v2store.Store, uint64) {
var index uint64
cl := membership.NewCluster(zap.NewExample(), "")
cl := membership.NewCluster(zap.NewExample(), "", true)

waldir := migrateWALdir
if len(waldir) == 0 {
Expand Down
1 change: 1 addition & 0 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func newConfig() *config {
fs.BoolVar(&cfg.ec.StrictReconfigCheck, "strict-reconfig-check", cfg.ec.StrictReconfigCheck, "Reject reconfiguration requests that would cause quorum loss.")
fs.BoolVar(&cfg.ec.EnableV2, "enable-v2", cfg.ec.EnableV2, "Accept etcd V2 client requests.")
fs.BoolVar(&cfg.ec.PreVote, "pre-vote", cfg.ec.PreVote, "Enable to run an additional Raft election phase.")
fs.BoolVar(&cfg.ec.NextClusterVersionCompatible, "next-cluster-version-compatible", false, "Enable 3.4 to be compatible with next version 3.5, to allow 3.4 server to join 3.5 cluster and start on 3.5 schema")

// proxy
fs.Var(cfg.cf.proxy, "proxy", fmt.Sprintf("Valid values include %q", cfg.cf.proxy.Valids()))
Expand Down
2 changes: 1 addition & 1 deletion etcdmain/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func startProxy(cfg *config) error {

clientURLs := []string{}
uf := func() []string {
gcls, gerr := etcdserver.GetClusterFromRemotePeers(lg, peerURLs, tr)
gcls, gerr := etcdserver.GetClusterFromRemotePeers(lg, peerURLs, tr, cfg.ec.NextClusterVersionCompatible)
if gerr != nil {
if lg != nil {
lg.Warn(
Expand Down
2 changes: 2 additions & 0 deletions etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ Clustering:
Interpret 'auto-compaction-retention' one of: periodic|revision. 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. '5m'). 'revision' for revision number based retention.
--enable-v2 '` + strconv.FormatBool(embed.DefaultEnableV2) + `'
Accept etcd V2 client requests.
--next-cluster-version-compatible 'false'
Enable 3.4 to be compatible with next version 3.5, to allow 3.4 server to join 3.5 cluster and start on 3.5 schema.
Security:
--cert-file ''
Expand Down
1 change: 1 addition & 0 deletions etcdserver/api/capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
"3.2.0": {AuthCapability: true, V3rpcCapability: true},
"3.3.0": {AuthCapability: true, V3rpcCapability: true},
"3.4.0": {AuthCapability: true, V3rpcCapability: true},
"3.5.0": {AuthCapability: true, V3rpcCapability: true},
}

enableMapMu sync.RWMutex
Expand Down
58 changes: 40 additions & 18 deletions etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type RaftCluster struct {
// removed contains the ids of removed members in the cluster.
// removed id cannot be reused.
removed map[types.ID]bool
// NextClusterVersionCompatible allows downgrade from 3.5 to 3.4.
NextClusterVersionCompatible bool
}

// ConfigChangeContext represents a context for confChange.
Expand All @@ -72,8 +74,8 @@ type ConfigChangeContext struct {

// NewClusterFromURLsMap creates a new raft cluster using provided urls map. Currently, it does not support creating
// cluster with raft learner member.
func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap) (*RaftCluster, error) {
c := NewCluster(lg, token)
func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap, nextClusterVersionCompatible bool) (*RaftCluster, error) {
c := NewCluster(lg, token, nextClusterVersionCompatible)
for name, urls := range urlsmap {
m := NewMember(name, urls, token, nil)
if _, ok := c.members[m.ID]; ok {
Expand All @@ -88,21 +90,22 @@ func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap)
return c, nil
}

func NewClusterFromMembers(lg *zap.Logger, token string, id types.ID, membs []*Member) *RaftCluster {
c := NewCluster(lg, token)
func NewClusterFromMembers(lg *zap.Logger, token string, id types.ID, membs []*Member, nextClusterVersionCompatible bool) *RaftCluster {
c := NewCluster(lg, token, nextClusterVersionCompatible)
c.cid = id
for _, m := range membs {
c.members[m.ID] = m
}
return c
}

func NewCluster(lg *zap.Logger, token string) *RaftCluster {
func NewCluster(lg *zap.Logger, token string, nextClusterVersionCompatible bool) *RaftCluster {
return &RaftCluster{
lg: lg,
token: token,
members: make(map[types.ID]*Member),
removed: make(map[types.ID]bool),
lg: lg,
token: token,
NextClusterVersionCompatible: nextClusterVersionCompatible,
members: make(map[types.ID]*Member),
removed: make(map[types.ID]bool),
}
}

Expand Down Expand Up @@ -248,7 +251,7 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {

c.members, c.removed = membersFromStore(c.lg, c.v2store)
c.version = clusterVersionFromStore(c.lg, c.v2store)
mustDetectDowngrade(c.lg, c.version)
mustDetectDowngrade(c.lg, c.version, c.NextClusterVersionCompatible)
onSet(c.lg, c.version)

for _, m := range c.members {
Expand Down Expand Up @@ -567,7 +570,7 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s
}
oldVer := c.version
c.version = ver
mustDetectDowngrade(c.lg, c.version)
mustDetectDowngrade(c.lg, c.version, c.NextClusterVersionCompatible)
if c.v2store != nil {
mustSaveClusterVersionToStore(c.v2store, ver)
}
Expand Down Expand Up @@ -786,23 +789,42 @@ func ValidateClusterAndAssignIDs(lg *zap.Logger, local *RaftCluster, existing *R
return nil
}

func mustDetectDowngrade(lg *zap.Logger, cv *semver.Version) {
lv := semver.Must(semver.NewVersion(version.Version))
// only keep major.minor version for comparison against cluster version
lv = &semver.Version{Major: lv.Major, Minor: lv.Minor}
if cv != nil && lv.LessThan(*cv) {
func mustDetectDowngrade(lg *zap.Logger, cv *semver.Version, nextClusterVersionCompatible bool) {
err := detectDowngrade(cv, nextClusterVersionCompatible)
if err != nil {
if lg != nil {
lg.Fatal(
"invalid downgrade; server version is lower than determined cluster version",
err.Error(),
zap.String("current-server-version", version.Version),
zap.String("determined-cluster-version", version.Cluster(cv.String())),
)
} else {
plog.Fatalf("cluster cannot be downgraded (current version: %s is lower than determined cluster version: %s).", version.Version, version.Cluster(cv.String()))
plog.Fatal(err)
}
}
}

func detectDowngrade(cv *semver.Version, nextClusterVersionCompatible bool) error {
if cv == nil {
return nil
}
lv := semver.Must(semver.NewVersion(version.Version))
// only keep major.minor version for comparison against cluster version
lv = &semver.Version{Major: lv.Major, Minor: lv.Minor}
if !lv.LessThan(*cv) {
return nil
}
// allow 3.4 server to join 3.5 cluster. Note the local data schema will
// be automatically migrated to 3.4 if `--next-cluster-version-compatible`
// is enabled (true). Users can also execute `etcdutl migrate` to migrate
// the data before starting the server.
oneMinorVersionDown := &semver.Version{Major: cv.Major, Minor: cv.Minor - 1}
if !nextClusterVersionCompatible || !lv.Equal(*oneMinorVersionDown) {
return fmt.Errorf("invalid downgrade; (current version: %s is lower than determined cluster version: %s).", version.Version, version.Cluster(cv.String()))
}
return nil
}

// IsLocalMemberLearner returns if the local member is raft learner
func (c *RaftCluster) IsLocalMemberLearner() bool {
c.Lock()
Expand Down
54 changes: 53 additions & 1 deletion etcdserver/api/membership/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"reflect"
"testing"

"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/etcdserver/api/v2store"
"go.etcd.io/etcd/pkg/mock/mockstore"
"go.etcd.io/etcd/pkg/testutil"
Expand Down Expand Up @@ -276,7 +277,7 @@ func TestClusterValidateAndAssignIDs(t *testing.T) {
}

func TestClusterValidateConfigurationChange(t *testing.T) {
cl := NewCluster(zap.NewExample(), "")
cl := NewCluster(zap.NewExample(), "", false)
cl.SetStore(v2store.New())
for i := 1; i <= 4; i++ {
attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", i)}}
Expand Down Expand Up @@ -946,3 +947,54 @@ func TestIsReadyToPromoteMember(t *testing.T) {
}
}
}

func TestDetectDowngrade(t *testing.T) {
tests := []struct {
clusterVersion string
nextClusterVersionCompatible bool
expectErr bool
}{
{
expectErr: false,
},
{
clusterVersion: "3.5.0",
expectErr: true,
},
{
clusterVersion: "3.5.0",
nextClusterVersionCompatible: true,
expectErr: false,
},
{
clusterVersion: "3.6.0",
expectErr: true,
},
{
clusterVersion: "3.6.0",
nextClusterVersionCompatible: true,
expectErr: true,
},
{
clusterVersion: "3.4.0",
expectErr: false,
},
{
clusterVersion: "3.3.0",
expectErr: false,
},
}
for i, tt := range tests {
var cv *semver.Version
if len(tt.clusterVersion) > 0 {
cv = semver.Must(semver.NewVersion(tt.clusterVersion))
}
err := detectDowngrade(cv, tt.nextClusterVersionCompatible)
if tt.expectErr && err == nil {
t.Errorf("%d: expect detectDowngrade error, got nil", i)
}
if !tt.expectErr && err != nil {
t.Errorf("%d: expect no detectDowngrade error, got %v", i, err)
}
}
}
Loading

0 comments on commit c057f47

Please sign in to comment.