From c06ec72230e6c90a23fd00ce671072170832560e Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Wed, 7 Feb 2024 15:41:08 -0800 Subject: [PATCH] test: add mix version e2e test. Signed-off-by: Siyuan Zhang --- tests/e2e/ctl_v3_test.go | 6 +- tests/e2e/etcd_mix_versions_test.go | 194 ++++++++++++ tests/e2e/etcd_release_upgrade_test.go | 8 +- tests/framework/e2e/cluster.go | 398 +++++++++++++++---------- tests/framework/e2e/config.go | 28 ++ tests/framework/e2e/etcd_process.go | 10 + tests/framework/e2e/etcdctl.go | 16 + tests/framework/e2e/flags.go | 1 + 8 files changed, 502 insertions(+), 159 deletions(-) create mode 100644 tests/e2e/etcd_mix_versions_test.go create mode 100644 tests/framework/e2e/config.go diff --git a/tests/e2e/ctl_v3_test.go b/tests/e2e/ctl_v3_test.go index e5c446095d3..6265964995f 100644 --- a/tests/e2e/ctl_v3_test.go +++ b/tests/e2e/ctl_v3_test.go @@ -50,13 +50,11 @@ func TestClusterVersion(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - binary := e2e.BinDir + "/etcd" - if !fileutil.Exist(binary) { - t.Skipf("%q does not exist", binary) + if !fileutil.Exist(e2e.BinPath) { + t.Skipf("%q does not exist", e2e.BinPath) } e2e.BeforeTest(t) cfg := e2e.NewConfigNoTLS() - cfg.ExecPath = binary cfg.SnapshotCount = 3 cfg.BasePeerScheme = "unix" // to avoid port conflict cfg.RollingStart = tt.rollingStart diff --git a/tests/e2e/etcd_mix_versions_test.go b/tests/e2e/etcd_mix_versions_test.go new file mode 100644 index 00000000000..04ac376fbc7 --- /dev/null +++ b/tests/e2e/etcd_mix_versions_test.go @@ -0,0 +1,194 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/client/pkg/v3/fileutil" + "go.etcd.io/etcd/tests/v3/framework/e2e" +) + +type clusterTestCase struct { + name string + config e2e.EtcdProcessClusterConfig +} + +func clusterTestCases(size int) []clusterTestCase { + tcs := []clusterTestCase{ + { + name: "CurrentVersion", + config: e2e.EtcdProcessClusterConfig{ClusterSize: size}, + }, + } + if !fileutil.Exist(e2e.BinPathLastRelease) { + return tcs + } + + tcs = append(tcs, + clusterTestCase{ + name: "LastVersion", + config: e2e.EtcdProcessClusterConfig{ClusterSize: size, Version: e2e.LastVersion}, + }, + ) + if size > 2 { + tcs = append(tcs, + clusterTestCase{ + name: "MinorityLastVersion", + config: e2e.EtcdProcessClusterConfig{ClusterSize: size, Version: e2e.MinorityLastVersion}, + }, clusterTestCase{ + name: "QuorumLastVersion", + config: e2e.EtcdProcessClusterConfig{ClusterSize: size, Version: e2e.QuorumLastVersion}, + }, + ) + } + return tcs +} + +// TestMixVersionsSnapshotByAddingMember tests the mix version send snapshots by adding member +func TestMixVersionsSnapshotByAddingMember(t *testing.T) { + for _, tc := range clusterTestCases(1) { + t.Run(tc.name+"-adding-new-member-of-current-version", func(t *testing.T) { + mixVersionsSnapshotTestByAddingMember(t, tc.config, e2e.CurrentVersion) + }) + // TODO: uncomment after v3.4.32 release. + // After v3.4.32, etcd can support adding a new member of 3.4 into + // a cluster with 3.5 if the 3.4 member is started with '--next-cluster-version-compatible'. + + // t.Run(tc.name+"-adding-new-member-of-last-version", func(t *testing.T) { + // mixVersionsSnapshotTestByAddingMember(t, tc.config, e2e.LastVersion) + // }) + } +} + +func mixVersionsSnapshotTestByAddingMember(t *testing.T, cfg e2e.EtcdProcessClusterConfig, newInstanceVersion e2e.ClusterVersion) { + e2e.BeforeTest(t) + + t.Logf("Create an etcd cluster with %d member\n", cfg.ClusterSize) + cfg.SnapshotCount = 10 + cfg.BasePeerScheme = "unix" // to avoid port conflict + + epc, err := e2e.NewEtcdProcessCluster(t, &cfg) + + require.NoError(t, err, "failed to start etcd cluster: %v", err) + defer func() { + derr := epc.Close() + require.NoError(t, derr, "failed to close etcd cluster: %v", derr) + }() + + t.Log("Writing 20 keys to the cluster (more than SnapshotCount entries to trigger at least a snapshot)") + + etcdctl := epc.Procs[0].Etcdctl(e2e.ClientNonTLS, false, false) + writeKVs(t, etcdctl, 0, 20) + + t.Log("Start a new etcd instance, which will receive a snapshot from the leader.") + newCfg := *epc.Cfg + newCfg.Version = newInstanceVersion + t.Log("Starting a new etcd instance") + _, err = epc.StartNewProc(&newCfg, t) + require.NoError(t, err, "failed to start the new etcd instance: %v", err) + defer epc.Close() + + assertKVHash(t, epc) +} + +func TestMixVersionsSnapshotByMockingPartition(t *testing.T) { + mockPartitionNodeIndex := 2 + for _, tc := range clusterTestCases(3) { + t.Run(tc.name, func(t *testing.T) { + mixVersionsSnapshotTestByMockPartition(t, tc.config, mockPartitionNodeIndex) + }) + } +} + +func mixVersionsSnapshotTestByMockPartition(t *testing.T, cfg e2e.EtcdProcessClusterConfig, mockPartitionNodeIndex int) { + e2e.BeforeTest(t) + + if !fileutil.Exist(e2e.BinPathLastRelease) { + t.Skipf("%q does not exist", e2e.BinPathLastRelease) + } + + t.Logf("Create an etcd cluster with %d member\n", cfg.ClusterSize) + cfg.SnapshotCount = 10 + cfg.BasePeerScheme = "unix" // to avoid port conflict + + epc, err := e2e.NewEtcdProcessCluster(t, &cfg) + require.NoError(t, err, "failed to start etcd cluster: %v", err) + defer func() { + derr := epc.Close() + require.NoError(t, derr, "failed to close etcd cluster: %v", derr) + }() + toPartitionedMember := epc.Procs[mockPartitionNodeIndex] + + t.Log("Stop and restart the partitioned member") + err = toPartitionedMember.Stop() + require.NoError(t, err) + time.Sleep(2 * time.Second) + + t.Log("Writing 20 keys to the cluster (more than SnapshotCount entries to trigger at least a snapshot)") + etcdctl := epc.Procs[0].Etcdctl(e2e.ClientNonTLS, false, false) + writeKVs(t, etcdctl, 0, 20) + + t.Log("Verify logs to check leader has saved snapshot") + leaderEPC := epc.Procs[epc.WaitLeader(t)] + e2e.AssertProcessLogs(t, leaderEPC, "saved snapshot") + + t.Log("Restart the partitioned member") + err = toPartitionedMember.Restart() + require.NoError(t, err) + + assertKVHash(t, epc) +} + +func writeKVs(t *testing.T, etcdctl *e2e.Etcdctl, startIdx, endIdx int) { + for i := startIdx; i < endIdx; i++ { + key := fmt.Sprintf("key-%d", i) + value := fmt.Sprintf("value-%d", i) + err := etcdctl.Put(key, value) + require.NoError(t, err, "failed to put %q, error: %v", key, err) + } +} + +func assertKVHash(t *testing.T, epc *e2e.EtcdProcessCluster) { + clusterSize := len(epc.Procs) + if clusterSize < 2 { + return + } + t.Log("Verify all nodes have exact same revision and hash") + assert.Eventually(t, func() bool { + hashKvs, err := epc.Etcdctl().HashKV(0) + if err != nil { + t.Logf("failed to get HashKV: %v", err) + return false + } + if len(hashKvs) != clusterSize { + t.Logf("expected %d hashkv responses, but got: %d", clusterSize, len(hashKvs)) + return false + } + for i := 1; i < clusterSize; i++ { + if hashKvs[0].Header.Revision != hashKvs[i].Header.Revision { + t.Logf("Got different revisions, [%d, %d]", hashKvs[0].Header.Revision, hashKvs[1].Header.Revision) + return false + } + + assert.Equal(t, hashKvs[0].Hash, hashKvs[i].Hash) + } + return true + }, 10*time.Second, 500*time.Millisecond) +} diff --git a/tests/e2e/etcd_release_upgrade_test.go b/tests/e2e/etcd_release_upgrade_test.go index e548f67c710..b1aeaa75dee 100644 --- a/tests/e2e/etcd_release_upgrade_test.go +++ b/tests/e2e/etcd_release_upgrade_test.go @@ -37,7 +37,7 @@ func TestReleaseUpgrade(t *testing.T) { e2e.BeforeTest(t) copiedCfg := e2e.NewConfigNoTLS() - copiedCfg.ExecPath = lastReleaseBinary + copiedCfg.Version = e2e.LastVersion copiedCfg.SnapshotCount = 3 copiedCfg.BasePeerScheme = "unix" // to avoid port conflict @@ -78,7 +78,7 @@ func TestReleaseUpgrade(t *testing.T) { t.Fatalf("#%d: error closing etcd process (%v)", i, err) } t.Logf("Stopped node: %v", i) - epc.Procs[i].Config().ExecPath = e2e.BinDir + "/etcd" + epc.Procs[i].Config().ExecPath = e2e.BinPath epc.Procs[i].Config().KeepDataDir = true t.Logf("Restarting node in the new version: %v", i) @@ -123,7 +123,7 @@ func TestReleaseUpgradeWithRestart(t *testing.T) { e2e.BeforeTest(t) copiedCfg := e2e.NewConfigNoTLS() - copiedCfg.ExecPath = lastReleaseBinary + copiedCfg.Version = e2e.LastVersion copiedCfg.SnapshotCount = 10 copiedCfg.BasePeerScheme = "unix" @@ -166,7 +166,7 @@ func TestReleaseUpgradeWithRestart(t *testing.T) { wg.Add(len(epc.Procs)) for i := range epc.Procs { go func(i int) { - epc.Procs[i].Config().ExecPath = e2e.BinDir + "/etcd" + epc.Procs[i].Config().ExecPath = e2e.BinPath epc.Procs[i].Config().KeepDataDir = true if err := epc.Procs[i].Restart(); err != nil { t.Errorf("error restarting etcd process (%v)", err) diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index fbe932ded2f..9a5f9958b4d 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -132,19 +132,21 @@ func ConfigStandalone(cfg EtcdProcessClusterConfig) *EtcdProcessClusterConfig { } type EtcdProcessCluster struct { - lg *zap.Logger - Cfg *EtcdProcessClusterConfig - Procs []EtcdProcess + lg *zap.Logger + Cfg *EtcdProcessClusterConfig + Procs []EtcdProcess + nextSeq int // sequence number of the next etcd process (if it will be required) } type EtcdProcessClusterConfig struct { - ExecPath string DataDirPath string KeepDataDir bool + Logger *zap.Logger GoFailEnabled bool GoFailClientTimeout time.Duration PeerProxy bool EnvVars map[string]string + Version ClusterVersion ClusterSize int @@ -207,9 +209,10 @@ func InitEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdP etcdCfgs := cfg.EtcdServerProcessConfigs(t) epc := &EtcdProcessCluster{ - Cfg: cfg, - lg: zaptest.NewLogger(t), - Procs: make([]EtcdProcess, cfg.ClusterSize), + Cfg: cfg, + lg: zaptest.NewLogger(t), + Procs: make([]EtcdProcess, cfg.ClusterSize), + nextSeq: cfg.ClusterSize, } // launch etcd processes @@ -253,170 +256,190 @@ func (cfg *EtcdProcessClusterConfig) PeerScheme() string { return setupScheme(cfg.BasePeerScheme, cfg.IsPeerTLS) } -func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []*EtcdServerProcessConfig { - lg := zaptest.NewLogger(tb) +func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i int) *EtcdServerProcessConfig { + var curls []string + var curl string + port := cfg.BasePort + 5*i + clientPort := port + peerPort := port + 1 + peer2Port := port + 3 + clientHttpPort := port + 4 - if cfg.BasePort == 0 { - cfg.BasePort = EtcdProcessBasePort + if cfg.ClientTLS == ClientTLSAndNonTLS { + curl = clientURL(cfg.ClientScheme(), clientPort, ClientNonTLS) + curls = []string{curl, clientURL(cfg.ClientScheme(), clientPort, ClientTLS)} + } else { + curl = clientURL(cfg.ClientScheme(), clientPort, cfg.ClientTLS) + curls = []string{curl} } - if cfg.ExecPath == "" { - cfg.ExecPath = BinPath + + purl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)} + peerAdvertiseUrl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)} + var proxyCfg *proxy.ServerConfig + if cfg.PeerProxy { + if !cfg.IsPeerTLS { + panic("Can't use peer proxy without peer TLS as it can result in malformed packets") + } + peerAdvertiseUrl.Host = fmt.Sprintf("localhost:%d", peer2Port) + proxyCfg = &proxy.ServerConfig{ + Logger: zap.NewNop(), + To: purl, + From: peerAdvertiseUrl, + } } - if cfg.SnapshotCount == 0 { - cfg.SnapshotCount = etcdserver.DefaultSnapshotCount + + name := fmt.Sprintf("test-%d", i) + dataDirPath := cfg.DataDirPath + if cfg.DataDirPath == "" { + dataDirPath = tb.TempDir() } - etcdCfgs := make([]*EtcdServerProcessConfig, cfg.ClusterSize) - initialCluster := make([]string, cfg.ClusterSize) - for i := 0; i < cfg.ClusterSize; i++ { - var curls []string - var curl string - port := cfg.BasePort + 5*i - clientPort := port - peerPort := port + 1 - peer2Port := port + 3 - clientHttpPort := port + 4 - - if cfg.ClientTLS == ClientTLSAndNonTLS { - curl = clientURL(cfg.ClientScheme(), clientPort, ClientNonTLS) - curls = []string{curl, clientURL(cfg.ClientScheme(), clientPort, ClientTLS)} - } else { - curl = clientURL(cfg.ClientScheme(), clientPort, cfg.ClientTLS) - curls = []string{curl} - } + args := []string{ + "--name", name, + "--listen-client-urls", strings.Join(curls, ","), + "--advertise-client-urls", strings.Join(curls, ","), + "--listen-peer-urls", purl.String(), + "--initial-advertise-peer-urls", peerAdvertiseUrl.String(), + "--initial-cluster-token", cfg.InitialToken, + "--data-dir", dataDirPath, + "--snapshot-count", fmt.Sprintf("%d", cfg.SnapshotCount), + } + var clientHttpUrl string + if cfg.ClientHttpSeparate { + clientHttpUrl = clientURL(cfg.ClientScheme(), clientHttpPort, cfg.ClientTLS) + args = append(args, "--listen-client-http-urls", clientHttpUrl) + } + args = AddV2Args(args) + if cfg.ForceNewCluster { + args = append(args, "--force-new-cluster") + } + if cfg.QuotaBackendBytes > 0 { + args = append(args, + "--quota-backend-bytes", fmt.Sprintf("%d", cfg.QuotaBackendBytes), + ) + } + if cfg.NoStrictReconfig { + args = append(args, "--strict-reconfig-check=false") + } + if cfg.EnableV2 { + args = append(args, "--enable-v2") + } + if cfg.InitialCorruptCheck { + args = append(args, "--experimental-initial-corrupt-check") + } + var murl string + if cfg.MetricsURLScheme != "" { + murl = (&url.URL{ + Scheme: cfg.MetricsURLScheme, + Host: fmt.Sprintf("localhost:%d", port+2), + }).String() + args = append(args, "--listen-metrics-urls", murl) + } - purl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)} - peerAdvertiseUrl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)} - var proxyCfg *proxy.ServerConfig - if cfg.PeerProxy { - if !cfg.IsPeerTLS { - panic("Can't use peer proxy without peer TLS as it can result in malformed packets") - } - peerAdvertiseUrl.Host = fmt.Sprintf("localhost:%d", peer2Port) - proxyCfg = &proxy.ServerConfig{ - Logger: zap.NewNop(), - To: purl, - From: peerAdvertiseUrl, - } - } + args = append(args, cfg.TlsArgs()...) - name := fmt.Sprintf("test-%d", i) - dataDirPath := cfg.DataDirPath - if cfg.DataDirPath == "" { - dataDirPath = tb.TempDir() - } - initialCluster[i] = fmt.Sprintf("%s=%s", name, peerAdvertiseUrl.String()) - - args := []string{ - "--name", name, - "--listen-client-urls", strings.Join(curls, ","), - "--advertise-client-urls", strings.Join(curls, ","), - "--listen-peer-urls", purl.String(), - "--initial-advertise-peer-urls", peerAdvertiseUrl.String(), - "--initial-cluster-token", cfg.InitialToken, - "--data-dir", dataDirPath, - "--snapshot-count", fmt.Sprintf("%d", cfg.SnapshotCount), - } - var clientHttpUrl string - if cfg.ClientHttpSeparate { - clientHttpUrl = clientURL(cfg.ClientScheme(), clientHttpPort, cfg.ClientTLS) - args = append(args, "--listen-client-http-urls", clientHttpUrl) - } - args = AddV2Args(args) - if cfg.ForceNewCluster { - args = append(args, "--force-new-cluster") - } - if cfg.QuotaBackendBytes > 0 { - args = append(args, - "--quota-backend-bytes", fmt.Sprintf("%d", cfg.QuotaBackendBytes), - ) - } - if cfg.NoStrictReconfig { - args = append(args, "--strict-reconfig-check=false") - } - if cfg.EnableV2 { - args = append(args, "--enable-v2") - } - if cfg.InitialCorruptCheck { - args = append(args, "--experimental-initial-corrupt-check") - } - var murl string - if cfg.MetricsURLScheme != "" { - murl = (&url.URL{ - Scheme: cfg.MetricsURLScheme, - Host: fmt.Sprintf("localhost:%d", port+2), - }).String() - args = append(args, "--listen-metrics-urls", murl) - } + if cfg.AuthTokenOpts != "" { + args = append(args, "--auth-token", cfg.AuthTokenOpts) + } - args = append(args, cfg.TlsArgs()...) + if cfg.V2deprecation != "" { + args = append(args, "--v2-deprecation", cfg.V2deprecation) + } - if cfg.AuthTokenOpts != "" { - args = append(args, "--auth-token", cfg.AuthTokenOpts) - } + if cfg.LogLevel != "" { + args = append(args, "--log-level", cfg.LogLevel) + } - if cfg.V2deprecation != "" { - args = append(args, "--v2-deprecation", cfg.V2deprecation) - } + if cfg.MaxConcurrentStreams != 0 { + args = append(args, "--max-concurrent-streams", fmt.Sprintf("%d", cfg.MaxConcurrentStreams)) + } - if cfg.LogLevel != "" { - args = append(args, "--log-level", cfg.LogLevel) - } + if cfg.CorruptCheckTime != 0 { + args = append(args, "--experimental-corrupt-check-time", fmt.Sprintf("%s", cfg.CorruptCheckTime)) + } + if cfg.CompactHashCheckEnabled { + args = append(args, "--experimental-compact-hash-check-enabled") + } + if cfg.CompactHashCheckTime != 0 { + args = append(args, "--experimental-compact-hash-check-time", cfg.CompactHashCheckTime.String()) + } + if cfg.WatchProcessNotifyInterval != 0 { + args = append(args, "--experimental-watch-progress-notify-interval", cfg.WatchProcessNotifyInterval.String()) + } + if cfg.CompactionBatchLimit != 0 { + args = append(args, "--experimental-compaction-batch-limit", fmt.Sprintf("%d", cfg.CompactionBatchLimit)) + } - if cfg.MaxConcurrentStreams != 0 { - args = append(args, "--max-concurrent-streams", fmt.Sprintf("%d", cfg.MaxConcurrentStreams)) - } + envVars := map[string]string{} + for key, value := range cfg.EnvVars { + envVars[key] = value + } + var gofailPort int + if cfg.GoFailEnabled { + gofailPort = (i+1)*10000 + 2381 + envVars["GOFAIL_HTTP"] = fmt.Sprintf("127.0.0.1:%d", gofailPort) + } - if cfg.CorruptCheckTime != 0 { - args = append(args, "--experimental-corrupt-check-time", fmt.Sprintf("%s", cfg.CorruptCheckTime)) - } - if cfg.CompactHashCheckEnabled { - args = append(args, "--experimental-compact-hash-check-enabled") - } - if cfg.CompactHashCheckTime != 0 { - args = append(args, "--experimental-compact-hash-check-time", cfg.CompactHashCheckTime.String()) - } - if cfg.WatchProcessNotifyInterval != 0 { - args = append(args, "--experimental-watch-progress-notify-interval", cfg.WatchProcessNotifyInterval.String()) + var execPath string + switch cfg.Version { + case CurrentVersion: + execPath = BinPath + case MinorityLastVersion: + if i <= cfg.ClusterSize/2 { + execPath = BinPath + } else { + execPath = BinPathLastRelease } - if cfg.CompactionBatchLimit != 0 { - args = append(args, "--experimental-compaction-batch-limit", fmt.Sprintf("%d", cfg.CompactionBatchLimit)) + case QuorumLastVersion: + if i <= cfg.ClusterSize/2 { + execPath = BinPathLastRelease + } else { + execPath = BinPath } + case LastVersion: + execPath = BinPathLastRelease + default: + panic(fmt.Sprintf("Unknown cluster version %v", cfg.Version)) + } - envVars := map[string]string{} - for key, value := range cfg.EnvVars { - envVars[key] = value - } - var gofailPort int - if cfg.GoFailEnabled { - gofailPort = (i+1)*10000 + 2381 - envVars["GOFAIL_HTTP"] = fmt.Sprintf("127.0.0.1:%d", gofailPort) - } + return &EtcdServerProcessConfig{ + lg: cfg.Logger, + ExecPath: execPath, + Args: args, + EnvVars: envVars, + TlsArgs: cfg.TlsArgs(), + DataDirPath: dataDirPath, + KeepDataDir: cfg.KeepDataDir, + Name: name, + Purl: peerAdvertiseUrl, + Acurl: curl, + Murl: murl, + InitialToken: cfg.InitialToken, + ClientHttpUrl: clientHttpUrl, + GoFailPort: gofailPort, + GoFailClientTimeout: cfg.GoFailClientTimeout, + Proxy: proxyCfg, + } +} - etcdCfgs[i] = &EtcdServerProcessConfig{ - lg: lg, - ExecPath: cfg.ExecPath, - Args: args, - EnvVars: envVars, - TlsArgs: cfg.TlsArgs(), - DataDirPath: dataDirPath, - KeepDataDir: cfg.KeepDataDir, - Name: name, - Purl: peerAdvertiseUrl, - Acurl: curl, - Murl: murl, - InitialToken: cfg.InitialToken, - ClientHttpUrl: clientHttpUrl, - GoFailPort: gofailPort, - GoFailClientTimeout: cfg.GoFailClientTimeout, - Proxy: proxyCfg, - } +func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []*EtcdServerProcessConfig { + cfg.Logger = zaptest.NewLogger(tb) + if cfg.BasePort == 0 { + cfg.BasePort = EtcdProcessBasePort + } + if cfg.SnapshotCount == 0 { + cfg.SnapshotCount = etcdserver.DefaultSnapshotCount + } + + etcdCfgs := make([]*EtcdServerProcessConfig, cfg.ClusterSize) + initialCluster := make([]string, cfg.ClusterSize) + for i := 0; i < cfg.ClusterSize; i++ { + etcdCfgs[i] = cfg.EtcdServerProcessConfig(tb, i) + initialCluster[i] = fmt.Sprintf("%s=%s", etcdCfgs[i].Name, etcdCfgs[i].Purl.String()) } - initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")} for i := range etcdCfgs { - etcdCfgs[i].InitialCluster = strings.Join(initialCluster, ",") - etcdCfgs[i].Args = append(etcdCfgs[i].Args, initialClusterArgs...) + cfg.SetInitialCluster(etcdCfgs[i], initialCluster, "") } return etcdCfgs @@ -557,6 +580,10 @@ func (epc *EtcdProcessCluster) Stop() (err error) { return err } +func (epc *EtcdProcessCluster) Etcdctl() *Etcdctl { + return NewEtcdctl(epc.EndpointsV3(), epc.Cfg.ClientTLS, epc.Cfg.IsClientAutoTLS, epc.Cfg.EnableV2) +} + func (epc *EtcdProcessCluster) Close() error { epc.lg.Info("closing test cluster...") err := epc.Stop() @@ -581,6 +608,75 @@ func (epc *EtcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) { return ret } +// StartNewProc grows cluster size by one with two phases +// Phase 1 - Inform cluster of new configuration +// Phase 2 - Start new member +func (epc *EtcdProcessCluster) StartNewProc(cfg *EtcdProcessClusterConfig, tb testing.TB) (memberID uint64, err error) { + memberID, serverCfg, err := epc.AddMember(cfg, tb) + if err != nil { + return 0, err + } + + // Then start process + if err = epc.StartNewProcFromConfig(tb, serverCfg); err != nil { + return 0, err + } + + return memberID, nil +} + +// AddMember adds a new member to the cluster without starting it. +func (epc *EtcdProcessCluster) AddMember(cfg *EtcdProcessClusterConfig, tb testing.TB) (memberID uint64, serverCfg *EtcdServerProcessConfig, err error) { + if cfg != nil { + serverCfg = cfg.EtcdServerProcessConfig(tb, epc.nextSeq) + } else { + serverCfg = epc.Cfg.EtcdServerProcessConfig(tb, epc.nextSeq) + } + + epc.nextSeq++ + + initialCluster := []string{ + fmt.Sprintf("%s=%s", serverCfg.Name, serverCfg.Purl.String()), + } + for _, p := range epc.Procs { + initialCluster = append(initialCluster, fmt.Sprintf("%s=%s", p.Config().Name, p.Config().Purl.String())) + } + + epc.Cfg.SetInitialCluster(serverCfg, initialCluster, "existing") + + // First add new member to cluster + tb.Logf("add new member to cluster; member-name %s, member-peer-url %s", serverCfg.Name, serverCfg.Purl.String()) + memberCtl := NewEtcdctl(epc.Procs[0].EndpointsV3(), cfg.ClientTLS, cfg.IsClientAutoTLS, false) + resp, err := memberCtl.MemberAdd(serverCfg.Name, []string{serverCfg.Purl.String()}) + if err != nil { + return 0, nil, fmt.Errorf("failed to add new member: %w", err) + } + + return resp.Member.ID, serverCfg, nil +} + +func (cfg *EtcdProcessClusterConfig) SetInitialCluster(serverCfg *EtcdServerProcessConfig, initialCluster []string, initialClusterState string) { + serverCfg.InitialCluster = strings.Join(initialCluster, ",") + serverCfg.Args = append(serverCfg.Args, "--initial-cluster", serverCfg.InitialCluster) + if len(initialClusterState) > 0 { + serverCfg.Args = append(serverCfg.Args, "--initial-cluster-state", initialClusterState) + } +} + +// StartNewProcFromConfig starts a new member process from the given config. +func (epc *EtcdProcessCluster) StartNewProcFromConfig(tb testing.TB, serverCfg *EtcdServerProcessConfig) error { + tb.Log("start new member") + proc, err := NewEtcdProcess(serverCfg) + if err != nil { + epc.Close() + return fmt.Errorf("cannot configure: %v", err) + } + + epc.Procs = append(epc.Procs, proc) + + return proc.Start() +} + // WaitLeader returns index of the member in c.Members() that is leader // or fails the test (if not established in 30s). func (epc *EtcdProcessCluster) WaitLeader(t testing.TB) int { diff --git a/tests/framework/e2e/config.go b/tests/framework/e2e/config.go new file mode 100644 index 00000000000..7e996e747fd --- /dev/null +++ b/tests/framework/e2e/config.go @@ -0,0 +1,28 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +type ClusterVersion string + +const ( + CurrentVersion ClusterVersion = "" + MinorityLastVersion ClusterVersion = "minority-last-version" + QuorumLastVersion ClusterVersion = "quorum-last-version" + LastVersion ClusterVersion = "last-version" +) + +type ClusterContext struct { + Version ClusterVersion +} diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index 2f6650f19c7..c2084428656 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -24,6 +24,7 @@ import ( "net/url" "os" "strings" + "testing" "time" "go.etcd.io/etcd/client/pkg/v3/fileutil" @@ -35,6 +36,7 @@ import ( var ( EtcdServerReadyLines = []string{"ready to serve client requests"} BinPath string + BinPathLastRelease string CtlBinPath string UtlBinPath string ) @@ -232,6 +234,14 @@ func (ep *EtcdServerProcess) Logs() LogsExpect { return ep.proc } +func AssertProcessLogs(t *testing.T, ep EtcdProcess, expectLog string) { + t.Helper() + _, err := ep.Logs().Expect(expectLog) + if err != nil { + t.Fatal(err) + } +} + func (ep *EtcdServerProcess) PeerProxy() proxy.Server { return ep.proxy } diff --git a/tests/framework/e2e/etcdctl.go b/tests/framework/e2e/etcdctl.go index 010b72b6456..7e7361b17cd 100644 --- a/tests/framework/e2e/etcdctl.go +++ b/tests/framework/e2e/etcdctl.go @@ -39,6 +39,22 @@ func NewEtcdctl(endpoints []string, connType ClientConnType, isAutoTLS bool, v2 } } +func (ctl *Etcdctl) HashKV(rev int64) ([]*clientv3.HashKVResponse, error) { + var epHashKVs []*struct { + Endpoint string + HashKV *clientv3.HashKVResponse + } + err := ctl.spawnJsonCmd(&epHashKVs, "endpoint", "hashkv", "--rev", fmt.Sprint(rev)) + if err != nil { + return nil, err + } + resp := make([]*clientv3.HashKVResponse, len(epHashKVs)) + for i, e := range epHashKVs { + resp[i] = e.HashKV + } + return resp, err +} + func (ctl *Etcdctl) Get(key string) (*clientv3.GetResponse, error) { var resp clientv3.GetResponse err := ctl.spawnJsonCmd(&resp, "get", key) diff --git a/tests/framework/e2e/flags.go b/tests/framework/e2e/flags.go index 139cf29c5c2..a092371cb3b 100644 --- a/tests/framework/e2e/flags.go +++ b/tests/framework/e2e/flags.go @@ -55,6 +55,7 @@ func InitFlags() { flag.Parse() BinPath = BinDir + "/etcd" + BinPathLastRelease = BinDir + "/etcd-last-release" CtlBinPath = BinDir + "/etcdctl" UtlBinPath = BinDir + "/etcdutl" CertPath = CertDir + "/server.crt"