Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.4] Backport e2e tests for livez/readyz. #17144

Merged
merged 2 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ jobs:
PASSES='build grpcproxy' CPU='4' RACE='true' ./test
;;
linux-amd64-e2e)
GOARCH=amd64 make test-e2e-release
make install-gofail
GOARCH=amd64 FAILPOINTS='true' make test-e2e-release
;;
linux-386-unit)
GOARCH=386 make test-unit
Expand Down
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -532,3 +532,9 @@ pull-docker-functional:
$(info GO_VERSION: $(GO_VERSION))
$(info ETCD_VERSION: $(ETCD_VERSION))
docker pull gcr.io/etcd-development/etcd-functional:go$(GO_VERSION)

# Failpoints
GOFAIL_VERSION = $(shell cd tools/mod && go list -m -f {{.Version}} go.etcd.io/gofail)
.PHONY: install-gofail
install-gofail:
go install go.etcd.io/gofail@${GOFAIL_VERSION}
17 changes: 16 additions & 1 deletion build
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,26 @@ fi
# Set GO_LDFLAGS="-s" for building without symbols for debugging.
GO_LDFLAGS="${GO_LDFLAGS:-} -X ${REPO_PATH}/version.GitSHA=${GIT_SHA}"

GOFAIL_VERSION=$(cd tools/mod && go list -m -f "{{.Version}}" go.etcd.io/gofail)
# enable/disable failpoints
toggle_failpoints() {
mode="$1"
if command -v gofail >/dev/null 2>&1; then
gofail "$mode" etcdserver/ mvcc/backend/ wal/
gofail "$mode" etcdserver/ mvcc/ mvcc/backend/ wal/
# shellcheck disable=SC2086
if [[ "$mode" == "enable" ]]; then
go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ./etcdserver && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ../etcdctl && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ../tests && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ../
else
go mod tidy
cd ./etcdserver && go mod tidy
cd ../etcdctl && go mod tidy
cd ../tests && go mod tidy
cd ../
fi
elif [[ "$mode" != "disable" ]]; then
echo "FAILPOINTS set but gofail not found"
exit 1
Expand Down
1 change: 1 addition & 0 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2179,6 +2179,7 @@ func (s *EtcdServer) apply(
e := es[i]
switch e.Type {
case raftpb.EntryNormal:
// gofail: var beforeApplyOneEntryNormal struct{}
s.applyEntryNormal(&e)
s.setAppliedIndex(e.Index)
s.setTerm(e.Term)
Expand Down
1 change: 1 addition & 0 deletions mvcc/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ func newBatchTxBuffered(backend *backend) *batchTxBuffered {
func (t *batchTxBuffered) Unlock() {
if t.pending != 0 {
t.backend.readTx.Lock() // blocks txReadBuffer for writing.
// gofail: var beforeWritebackBuf struct{}
t.buf.writeback(&t.backend.readTx.buf)
t.backend.readTx.Unlock()
if t.pending >= t.backend.batchLimit {
Expand Down
4 changes: 4 additions & 0 deletions pkg/expect/expect.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,7 @@ func (ep *ExpectProcess) Send(command string) error {
_, err := io.WriteString(ep.fpty, command)
return err
}

func (ep *ExpectProcess) IsRunning() bool {
return ep.cmd != nil
}
28 changes: 11 additions & 17 deletions tests/e2e/cluster_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ import (
)

type proxyEtcdProcess struct {
etcdProc etcdProcess
proxyV2 *proxyV2Proc
proxyV3 *proxyV3Proc
*etcdServerProcess
proxyV2 *proxyV2Proc
proxyV3 *proxyV3Proc
}

func newEtcdProcess(cfg *etcdServerProcessConfig) (etcdProcess, error) {
Expand All @@ -45,15 +45,13 @@ func newProxyEtcdProcess(cfg *etcdServerProcessConfig) (*proxyEtcdProcess, error
return nil, err
}
pep := &proxyEtcdProcess{
etcdProc: ep,
proxyV2: newProxyV2Proc(cfg),
proxyV3: newProxyV3Proc(cfg),
etcdServerProcess: ep,
proxyV2: newProxyV2Proc(cfg),
proxyV3: newProxyV3Proc(cfg),
}
return pep, nil
}

func (p *proxyEtcdProcess) Config() *etcdServerProcessConfig { return p.etcdProc.Config() }

func (p *proxyEtcdProcess) EndpointsV2() []string { return p.EndpointsHTTP() }
func (p *proxyEtcdProcess) EndpointsV3() []string { return p.EndpointsGRPC() }
func (p *proxyEtcdProcess) EndpointsHTTP() []string { return p.proxyV2.endpoints() }
Expand All @@ -63,7 +61,7 @@ func (p *proxyEtcdProcess) EndpointsMetrics() []string {
}

func (p *proxyEtcdProcess) Start() error {
if err := p.etcdProc.Start(); err != nil {
if err := p.etcdServerProcess.Start(); err != nil {
return err
}
if err := p.proxyV2.Start(); err != nil {
Expand All @@ -73,7 +71,7 @@ func (p *proxyEtcdProcess) Start() error {
}

func (p *proxyEtcdProcess) Restart() error {
if err := p.etcdProc.Restart(); err != nil {
if err := p.etcdServerProcess.Restart(); err != nil {
return err
}
if err := p.proxyV2.Restart(); err != nil {
Expand All @@ -87,7 +85,7 @@ func (p *proxyEtcdProcess) Stop() error {
if v3err := p.proxyV3.Stop(); err == nil {
err = v3err
}
if eerr := p.etcdProc.Stop(); eerr != nil && err == nil {
if eerr := p.etcdServerProcess.Stop(); eerr != nil && err == nil {
// fails on go-grpc issue #1384
if !strings.Contains(eerr.Error(), "exit status 2") {
err = eerr
Expand All @@ -101,7 +99,7 @@ func (p *proxyEtcdProcess) Close() error {
if v3err := p.proxyV3.Close(); err == nil {
err = v3err
}
if eerr := p.etcdProc.Close(); eerr != nil && err == nil {
if eerr := p.etcdServerProcess.Close(); eerr != nil && err == nil {
// fails on go-grpc issue #1384
if !strings.Contains(eerr.Error(), "exit status 2") {
err = eerr
Expand All @@ -113,11 +111,7 @@ func (p *proxyEtcdProcess) Close() error {
func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal {
p.proxyV3.WithStopSignal(sig)
p.proxyV3.WithStopSignal(sig)
return p.etcdProc.WithStopSignal(sig)
}

func (p *proxyEtcdProcess) Logs() logsExpect {
return p.etcdProc.Logs()
return p.etcdServerProcess.WithStopSignal(sig)
}

type proxyProc struct {
Expand Down
61 changes: 50 additions & 11 deletions tests/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import (
"net/url"
"os"
"strings"
"testing"
"time"

"go.etcd.io/etcd/etcdserver"
"go.etcd.io/etcd/pkg/proxy"
"go.uber.org/zap"
)

const etcdProcessBasePort = 20000
Expand Down Expand Up @@ -97,10 +100,12 @@ type etcdProcessCluster struct {
}

type etcdProcessClusterConfig struct {
execPath string
dataDirPath string
keepDataDir bool
envVars map[string]string
execPath string
dataDirPath string
keepDataDir bool
goFailEnabled bool
peerProxy bool
envVars map[string]string

clusterSize int

Expand Down Expand Up @@ -141,13 +146,13 @@ type etcdProcessClusterConfig struct {

// newEtcdProcessCluster launches a new cluster from etcd processes, returning
// a new etcdProcessCluster once all nodes are ready to accept client requests.
func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
func newEtcdProcessCluster(t testing.TB, cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
epc, err := initEtcdProcessCluster(cfg)
if err != nil {
return nil, err
}

return startEtcdProcessCluster(epc, cfg)
return startEtcdProcessCluster(t, epc, cfg)
}

// `initEtcdProcessCluster` initializes a new cluster based on the given config.
Expand All @@ -174,7 +179,7 @@ func initEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster,
}

// `startEtcdProcessCluster` launches a new cluster from etcd processes.
func startEtcdProcessCluster(epc *etcdProcessCluster, cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
func startEtcdProcessCluster(t testing.TB, epc *etcdProcessCluster, cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
if err := epc.Start(); err != nil {
return nil, err
}
Expand All @@ -185,6 +190,12 @@ func startEtcdProcessCluster(epc *etcdProcessCluster, cfg *etcdProcessClusterCon
proc.WithStopSignal(cfg.stopSignal)
}
}
for _, proc := range epc.procs {
if cfg.goFailEnabled && !proc.Failpoints().Enabled() {
epc.Close()
t.Skip("please run test with 'FAILPOINTS=true'")
}
}
return epc, nil
}

Expand Down Expand Up @@ -223,6 +234,8 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
var curls []string
var curl string
port := cfg.basePort + 5*i
peerPort := port + 1
peer2Port := port + 3
clientPort := port
clientHttpPort := port + 4

Expand All @@ -235,6 +248,20 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
}

purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
peerAdvertiseUrl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
var proxyCfg *proxy.ServerConfig
if cfg.peerProxy {
if !cfg.isPeerTLS {
panic("Can't use peer proxy without peer TLS as it can result in malformed packets")
}
peerAdvertiseUrl.Host = fmt.Sprintf("localhost:%d", peer2Port)
proxyCfg = &proxy.ServerConfig{
Logger: zap.NewNop(),
To: purl,
From: peerAdvertiseUrl,
}
}

name := fmt.Sprintf("testname%d", i)
dataDirPath := cfg.dataDirPath
if cfg.dataDirPath == "" {
Expand All @@ -244,14 +271,14 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
panic(fmt.Sprintf("could not get tempdir for datadir: %s", derr))
}
}
initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
initialCluster[i] = fmt.Sprintf("%s=%s", name, peerAdvertiseUrl.String())

args := []string{
"--name", name,
"--listen-client-urls", strings.Join(curls, ","),
"--advertise-client-urls", strings.Join(curls, ","),
"--listen-peer-urls", purl.String(),
"--initial-advertise-peer-urls", purl.String(),
"--initial-advertise-peer-urls", peerAdvertiseUrl.String(),
"--initial-cluster-token", cfg.initialToken,
"--data-dir", dataDirPath,
"--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount),
Expand Down Expand Up @@ -309,19 +336,31 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
args = append(args, "--debug")
}

envVars := map[string]string{}
for key, value := range cfg.envVars {
envVars[key] = value
}
var gofailPort int
if cfg.goFailEnabled {
gofailPort = (i+1)*10000 + 2381
envVars["GOFAIL_HTTP"] = fmt.Sprintf("127.0.0.1:%d", gofailPort)
}

etcdCfgs[i] = &etcdServerProcessConfig{
execPath: cfg.execPath,
args: args,
envVars: cfg.envVars,
envVars: envVars,
tlsArgs: cfg.tlsArgs(),
dataDirPath: dataDirPath,
keepDataDir: cfg.keepDataDir,
name: name,
purl: purl,
purl: peerAdvertiseUrl,
acurl: curl,
murl: murl,
initialToken: cfg.initialToken,
clientHttpUrl: clientHttpUrl,
goFailPort: gofailPort,
proxy: proxyCfg,
}
}

Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/cmux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestConnectionMultiplexing(t *testing.T) {
clientHttpSeparate: tc.separateHttpPort,
stopSignal: syscall.SIGTERM, // check graceful stop
}
clus, err := newEtcdProcessCluster(&cfg)
clus, err := newEtcdProcessCluster(t, &cfg)
require.NoError(t, err)

defer func() {
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/ctl_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func setupEtcdctlTest(t *testing.T, cfg *etcdProcessClusterConfig, quorum bool)
if !quorum {
cfg = configStandalone(*cfg)
}
epc, err := newEtcdProcessCluster(cfg)
epc, err := newEtcdProcessCluster(t, cfg)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/ctl_v3_auth_no_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestCtlV3AuthCertCNWithWithConcurrentOperation(t *testing.T) {
initialToken: "new",
}

epc, err := newEtcdProcessCluster(&cx.cfg)
epc, err := newEtcdProcessCluster(t, &cx.cfg)
if err != nil {
t.Fatalf("Failed to start etcd cluster: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/ctl_v3_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestAuthority(t *testing.T) {
// Enable debug mode to get logs with http2 headers (including authority)
cfg.envVars = map[string]string{"GODEBUG": "http2debug=2"}

epc, err := newEtcdProcessCluster(&cfg)
epc, err := newEtcdProcessCluster(t, &cfg)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/ctl_v3_make_mirror_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func testMirrorCommand(cx ctlCtx, flags []string, sourcekvs []kv, destkvs []kvEx
dialTimeout: 7 * time.Second,
}

mirrorepc, err := newEtcdProcessCluster(&mirrorctx.cfg)
mirrorepc, err := newEtcdProcessCluster(cx.t, &mirrorctx.cfg)
if err != nil {
cx.t.Fatalf("could not start etcd process cluster (%v)", err)
}
Expand Down
4 changes: 2 additions & 2 deletions tests/e2e/ctl_v3_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func TestIssue6361(t *testing.T) {
os.Setenv("ETCDCTL_API", "3")
defer os.Unsetenv("ETCDCTL_API")

epc, err := newEtcdProcessCluster(&etcdProcessClusterConfig{
epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{
clusterSize: 1,
initialToken: "new",
keepDataDir: true,
Expand Down Expand Up @@ -276,7 +276,7 @@ func TestIssue6361(t *testing.T) {
func TestRestoreCompactionRevBump(t *testing.T) {
defer testutil.AfterTest(t)

epc, err := newEtcdProcessCluster(&etcdProcessClusterConfig{
epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{
clusterSize: 1,
initialToken: "new",
keepDataDir: true,
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/ctl_v3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
ret.cfg.initialCorruptCheck = ret.initialCorruptCheck
}

epc, err := newEtcdProcessCluster(&ret.cfg)
epc, err := newEtcdProcessCluster(t, &ret.cfg)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/etcd_corrupt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestInPlaceRecovery(t *testing.T) {
corruptCheckTime: time.Second,
basePort: basePort,
}
epcOld, err := newEtcdProcessCluster(&cfgOld)
epcOld, err := newEtcdProcessCluster(t, &cfgOld)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
Expand Down
Loading
Loading