From 93d7fbac95f59942c62c8935fbf98773d9638dcc Mon Sep 17 00:00:00 2001 From: David Jumani Date: Tue, 18 Jun 2024 13:58:25 -0400 Subject: [PATCH] Retry on leader lease renewal failure (#9563) * retry on leader lease renewal failure * Die if unable to recover * use env var * add basic tests * udpate tests * add comments * Add commnets around ci changes * update tests * refactor * cleanup * udpate test * rename env var * add changelog * address comments v1 * address comments v2 * fix test * sue GinkgoHelper * Adding changelog file to new location * Deleting changelog file from old location * specify a duration * Adding changelog file to new location * Deleting changelog file from old location * remove default * Adding changelog file to new location * Deleting changelog file from old location * fix tests * move changelog * move conter --------- Co-authored-by: soloio-bulldozer[bot] <48420018+soloio-bulldozer[bot]@users.noreply.github.com> Co-authored-by: changelog-bot Co-authored-by: Nathan Fudenberg --- .../dont-crash-on-failed-lease-renewal.yaml | 4 + ci/kind/cluster.yaml | 4 + ci/kind/setup-kind.sh | 9 + pkg/bootstrap/leaderelector/identity.go | 5 + pkg/bootstrap/leaderelector/kube/factory.go | 139 ++++++++--- pkg/utils/kubeutils/kubectl/cli.go | 23 +- projects/gloo/pkg/setup/setup.go | 8 +- .../gloo/artifacts/block-gloo-apiserver.yaml | 12 + test/kube2e/gloo/artifacts/block-labels.yaml | 12 + test/kube2e/gloo/artifacts/helm.yaml | 2 +- test/kube2e/gloo/bootstrap_clients_test.go | 229 ++++++++++++++++++ test/kube2e/gloo/gloo_suite_test.go | 5 +- test/kube2e/gloo/setup_syncer_test.go | 2 +- test/kube2e/helper/install.go | 7 + test/kube2e/helper/kube.go | 60 +++++ 15 files changed, 476 insertions(+), 45 deletions(-) create mode 100644 changelog/v1.17.0-rc5/dont-crash-on-failed-lease-renewal.yaml create mode 100644 test/kube2e/gloo/artifacts/block-gloo-apiserver.yaml create mode 100644 test/kube2e/gloo/artifacts/block-labels.yaml create mode 100644 test/kube2e/helper/kube.go diff --git a/changelog/v1.17.0-rc5/dont-crash-on-failed-lease-renewal.yaml b/changelog/v1.17.0-rc5/dont-crash-on-failed-lease-renewal.yaml new file mode 100644 index 00000000000..adc4aece7f9 --- /dev/null +++ b/changelog/v1.17.0-rc5/dont-crash-on-failed-lease-renewal.yaml @@ -0,0 +1,4 @@ +changelog: +- type: NEW_FEATURE + issueLink: https://github.com/solo-io/gloo/issues/8107 + description: Adds the ability to recover if the Kubernetes API server is unreachable once the gloo pod comes up. The `MAX_RECOVERY_DURATION_WITHOUT_KUBE_API_SERVER` environment variable defines the maximum duration the gloo pod can run and attempt to reconnect to the kube apiserver if it is unreachable. Exceeding this duration will lead to the pod quitting. To enable this feature, set the `MAX_RECOVERY_DURATION_WITHOUT_KUBE_API_SERVER` environment variable to the desired duration in the gloo container. This can be done either by modifying the gloo deployment or by specifying the `gloo.deployment.customEnv[0].Name=MAX_RECOVERY_DURATION_WITHOUT_KUBE_API_SERVER` and `gloo.deployment.customEnv[0].Value=60s` helm values. diff --git a/ci/kind/cluster.yaml b/ci/kind/cluster.yaml index 1b48be705f0..abe8e649aba 100644 --- a/ci/kind/cluster.yaml +++ b/ci/kind/cluster.yaml @@ -1,5 +1,9 @@ kind: Cluster apiVersion: kind.x-k8s.io/v1alpha4 +# Do not use the default CNI as kindnet does not support custom network policies. +# Instead, cilium is installed as CNI as we need to test Kube API server unavailability in the kube2e tests +networking: + disableDefaultCNI: true kubeadmConfigPatches: - | apiVersion: kubeadm.k8s.io/v1beta3 diff --git a/ci/kind/setup-kind.sh b/ci/kind/setup-kind.sh index 4b126944ab6..fd15c5c13ca 100755 --- a/ci/kind/setup-kind.sh +++ b/ci/kind/setup-kind.sh @@ -21,6 +21,7 @@ ISTIO_VERSION="${ISTIO_VERSION:-1.22.0}" IMAGE_VARIANT="${IMAGE_VARIANT:-standard}" # If true, run extra steps to set up k8s gateway api conformance test environment CONFORMANCE="${CONFORMANCE:-false}" +CILIUM_VERSION="${CILIUM_VERSION:-1.15.5}" function create_kind_cluster_or_skip() { activeClusters=$(kind get clusters) @@ -36,6 +37,14 @@ function create_kind_cluster_or_skip() { --name "$CLUSTER_NAME" \ --image "kindest/node:$CLUSTER_NODE_VERSION" \ --config="$SCRIPT_DIR/cluster.yaml" + + # Install cilium as we need to define custom network policies to simulate kube api server unavailability + # in some of our kube2e tests + helm repo add cilium https://helm.cilium.io/ + helm install cilium cilium/cilium --version $CILIUM_VERSION \ + --namespace kube-system \ + --set image.pullPolicy=IfNotPresent \ + --set ipam.mode=kubernetes echo "Finished setting up cluster $CLUSTER_NAME" # so that you can just build the kind image alone if needed diff --git a/pkg/bootstrap/leaderelector/identity.go b/pkg/bootstrap/leaderelector/identity.go index 89ecc1b745b..49d70b69b38 100644 --- a/pkg/bootstrap/leaderelector/identity.go +++ b/pkg/bootstrap/leaderelector/identity.go @@ -37,3 +37,8 @@ func (i identityImpl) IsLeader() bool { func (i identityImpl) Elected() <-chan struct{} { return i.elected } + +// Reset updates the current identity to a follower. It can be promoted to a leader by closing the elected channel +func (i identityImpl) Reset(elected <-chan struct{}) { + i.elected = elected +} diff --git a/pkg/bootstrap/leaderelector/kube/factory.go b/pkg/bootstrap/leaderelector/kube/factory.go index d3f54281372..1baca62ef96 100644 --- a/pkg/bootstrap/leaderelector/kube/factory.go +++ b/pkg/bootstrap/leaderelector/kube/factory.go @@ -3,9 +3,11 @@ package kube import ( "context" "os" + "sync/atomic" "time" "github.com/solo-io/gloo/pkg/bootstrap/leaderelector" + "github.com/solo-io/gloo/pkg/utils/envutils" "github.com/solo-io/go-utils/contextutils" "k8s.io/client-go/rest" k8sleaderelection "k8s.io/client-go/tools/leaderelection" @@ -21,9 +23,12 @@ const ( defaultRetryPeriod = 2 * time.Second defaultRenewPeriod = 10 * time.Second - leaseDurationEnvName = "LEADER_ELECTION_LEASE_DURATION" - retryPeriodEnvName = "LEADER_ELECTION_RETRY_PERIOD" - renewPeriodEnvName = "LEADER_ELECTION_RENEW_PERIOD" + defaultRecoveryTimeout = 60 * time.Second + + leaseDurationEnvName = "LEADER_ELECTION_LEASE_DURATION" + retryPeriodEnvName = "LEADER_ELECTION_RETRY_PERIOD" + renewPeriodEnvName = "LEADER_ELECTION_RENEW_PERIOD" + MaxRecoveryDurationWithoutKubeAPIServer = "MAX_RECOVERY_DURATION_WITHOUT_KUBE_API_SERVER" ) // kubeElectionFactory is the implementation for coordinating leader election using @@ -39,6 +44,20 @@ func NewElectionFactory(config *rest.Config) *kubeElectionFactory { } func (f *kubeElectionFactory) StartElection(ctx context.Context, config *leaderelector.ElectionConfig) (leaderelector.Identity, error) { + var recoveryTimeoutIfKubeAPIServerIsUnreachable time.Duration + var recoverIfKubeAPIServerIsUnreachable bool + var err error + if envutils.IsEnvDefined(MaxRecoveryDurationWithoutKubeAPIServer) { + recoveryTimeoutIfKubeAPIServerIsUnreachable, err = time.ParseDuration(os.Getenv(MaxRecoveryDurationWithoutKubeAPIServer)) + if err != nil { + contextutils.LoggerFrom(ctx).Errorf("%s is not a valid duration. Defaulting to 60s", MaxRecoveryDurationWithoutKubeAPIServer) + recoveryTimeoutIfKubeAPIServerIsUnreachable = defaultRecoveryTimeout + } else { + contextutils.LoggerFrom(ctx).Infof("max recovery from kube apiserver unavailability set to %s", recoveryTimeoutIfKubeAPIServerIsUnreachable) + } + recoverIfKubeAPIServerIsUnreachable = true + } + elected := make(chan struct{}) identity := leaderelector.NewIdentity(elected) @@ -55,39 +74,99 @@ func (f *kubeElectionFactory) StartElection(ctx context.Context, config *leadere return identity, err } - l, err := k8sleaderelection.NewLeaderElector( - k8sleaderelection.LeaderElectionConfig{ - Lock: resourceLock, - LeaseDuration: getLeaseDuration(), - RenewDeadline: getRenewPeriod(), - RetryPeriod: getRetryPeriod(), - Callbacks: k8sleaderelection.LeaderCallbacks{ - OnStartedLeading: func(callbackCtx context.Context) { - contextutils.LoggerFrom(callbackCtx).Debug("Started Leading") - close(elected) - config.OnStartedLeading(callbackCtx) - }, - OnStoppedLeading: func() { - contextutils.LoggerFrom(ctx).Error("Stopped Leading") - config.OnStoppedLeading() - }, - OnNewLeader: func(identity string) { - contextutils.LoggerFrom(ctx).Debugf("New Leader Elected with Identity: %s", identity) - config.OnNewLeader(identity) + var justFailed = false + var dontDie func() + + // dieIfUnrecoverable causes gloo to exit after the recoveryTimeout (default 60s) if the context is not cancelled. + // This function is called when this container is a leader but unable to renew the leader lease (caused by an unreachable kube api server). + // The context is cancelled if it is able to participate in leader election again, irrespective if it becomes a leader or follower. + dieIfUnrecoverable := func(ctx context.Context) { + timer := time.NewTimer(recoveryTimeoutIfKubeAPIServerIsUnreachable) + select { + case <-timer.C: + contextutils.LoggerFrom(ctx).Fatalf("unable to recover from failed leader election, quitting app") + case <-ctx.Done(): + contextutils.LoggerFrom(ctx).Infof("recovered from lease renewal failure") + } + } + + newLeaderElector := func() (*k8sleaderelection.LeaderElector, error) { + recoveryCtx, cancel := context.WithCancel(ctx) + + return k8sleaderelection.NewLeaderElector( + k8sleaderelection.LeaderElectionConfig{ + Lock: resourceLock, + LeaseDuration: getLeaseDuration(), + RenewDeadline: getRenewPeriod(), + RetryPeriod: getRetryPeriod(), + Callbacks: k8sleaderelection.LeaderCallbacks{ + OnStartedLeading: func(callbackCtx context.Context) { + contextutils.LoggerFrom(callbackCtx).Debug("Started Leading") + close(elected) + config.OnStartedLeading(callbackCtx) + }, + OnStoppedLeading: func() { + contextutils.LoggerFrom(ctx).Error("Stopped Leading") + config.OnStoppedLeading() + if recoverIfKubeAPIServerIsUnreachable { + // Recreate the elected channel and reset the identity to a follower + // Ref: https://github.com/solo-io/gloo/issues/7346 + elected = make(chan struct{}) + identity.Reset(elected) + // Die if we are unable to recover from this within the recoveryTimeout + go dieIfUnrecoverable(recoveryCtx) + // Set recover to cancel the context to be used the next time `OnNewLeader` is called + dontDie = cancel + justFailed = true + } + }, + OnNewLeader: func(identity string) { + contextutils.LoggerFrom(ctx).Debugf("New Leader Elected with Identity: %s", identity) + config.OnNewLeader(identity) + // Recover since we were able to re-negotiate leader election + // Do this only when we just failed and not when someone becomes a leader + if recoverIfKubeAPIServerIsUnreachable && justFailed { + dontDie() + justFailed = false + } + }, }, + Name: config.Id, + ReleaseOnCancel: true, }, - Name: config.Id, - ReleaseOnCancel: true, - }, - ) + ) + } + + // The error returned is just validating the config passed. If it passes validation once, it will again + _, err = newLeaderElector() if err != nil { return identity, err } - // Start the leader elector process in a goroutine - contextutils.LoggerFrom(ctx).Debug("Starting Kube Leader Election") - go l.Run(ctx) - + // leaderElector.Run() is a blocking method but we need to return the identity of this container to sub-components so they can + // perform their respective tasks, hence it runs within a go routine. + // It runs within an infinite loop so that we can recover if this container is a leader but fails to renew the lease and renegotiate leader election if possible. + // This can be caused when there is a failure to connect to the kube api server + go func() { + var counter atomic.Uint32 + + for { + l, _ := newLeaderElector() + // Start the leader elector process + contextutils.LoggerFrom(ctx).Debug("Starting Kube Leader Election") + l.Run(ctx) + + if !recoverIfKubeAPIServerIsUnreachable { + contextutils.LoggerFrom(ctx).Fatalf("lost leadership, quitting app") + } + + contextutils.LoggerFrom(ctx).Errorf("Leader election cycle %v lost. Trying again", counter.Load()) + counter.Add(1) + // Sleep for the lease duration so another container has a chance to become the leader rather than try to renew + // in when the kube api server is unreachable by this container + time.Sleep(getLeaseDuration()) + } + }() return identity, nil } diff --git a/pkg/utils/kubeutils/kubectl/cli.go b/pkg/utils/kubeutils/kubectl/cli.go index b07576e2a73..3185c208b77 100644 --- a/pkg/utils/kubeutils/kubectl/cli.go +++ b/pkg/utils/kubeutils/kubectl/cli.go @@ -249,17 +249,17 @@ func (c *Cli) CurlFromPod(ctx context.Context, podOpts PodExecOptions, options . "5", }, curlArgs...) - stdout, stderr, err := c.ExecuteOn(ctx, c.kubeContext, nil, args...) + stdout, stderr, err := c.ExecuteOn(ctx, c.kubeContext, args...) return &CurlResponse{StdOut: stdout, StdErr: stderr}, err } -func (c *Cli) ExecuteOn(ctx context.Context, kubeContext string, stdin *bytes.Buffer, args ...string) (string, string, error) { +func (c *Cli) ExecuteOn(ctx context.Context, kubeContext string, args ...string) (string, string, error) { args = append([]string{"--context", kubeContext}, args...) - return c.Execute(ctx, stdin, args...) + return c.Execute(ctx, args...) } -func (c *Cli) Execute(ctx context.Context, stdin *bytes.Buffer, args ...string) (string, string, error) { +func (c *Cli) Execute(ctx context.Context, args ...string) (string, string, error) { stdout := new(strings.Builder) stderr := new(strings.Builder) @@ -271,3 +271,18 @@ func (c *Cli) Execute(ctx context.Context, stdin *bytes.Buffer, args ...string) return stdout.String(), stderr.String(), err } + +func (c *Cli) Scale(ctx context.Context, namespace string, resource string, replicas uint) error { + err := c.RunCommand(ctx, "scale", "-n", namespace, fmt.Sprintf("--replicas=%d", replicas), resource, "--timeout=300s") + if err != nil { + return err + } + time.Sleep(2 * time.Second) // Sleep a bit so the container starts + return c.RunCommand(ctx, "wait", "-n", namespace, "--for=condition=available", resource, "--timeout=300s") +} + +// GetContainerLogs retrieves the logs for the specified container +func (c *Cli) GetContainerLogs(ctx context.Context, namespace string, name string) (string, error) { + stdout, stderr, err := c.Execute(ctx, "-n", namespace, "logs", name) + return stdout + stderr, err +} diff --git a/projects/gloo/pkg/setup/setup.go b/projects/gloo/pkg/setup/setup.go index 62b146f7048..8f0d6d0b209 100644 --- a/projects/gloo/pkg/setup/setup.go +++ b/projects/gloo/pkg/setup/setup.go @@ -41,11 +41,9 @@ func startSetupLoop(ctx context.Context) error { contextutils.LoggerFrom(ctx).Infof("new leader elected with ID: %s", leaderId) }, OnStoppedLeading: func() { - // Kill app if we lose leadership, we need to be VERY sure we don't continue - // any leader election processes. - // https://github.com/solo-io/gloo/issues/7346 - // There is follow-up work to handle lost leadership more gracefully - contextutils.LoggerFrom(ctx).Fatalf("lost leadership, quitting app") + // Don't die if we fall from grace. Instead we can retry leader election + // Ref: https://github.com/solo-io/gloo/issues/7346 + contextutils.LoggerFrom(ctx).Errorf("lost leadership") }, }, }) diff --git a/test/kube2e/gloo/artifacts/block-gloo-apiserver.yaml b/test/kube2e/gloo/artifacts/block-gloo-apiserver.yaml new file mode 100644 index 00000000000..157588b2d92 --- /dev/null +++ b/test/kube2e/gloo/artifacts/block-gloo-apiserver.yaml @@ -0,0 +1,12 @@ +apiVersion: cilium.io/v2 +kind: CiliumNetworkPolicy +metadata: + name: deny-gloo-to-kube-apiserver + namespace: gloo-system +spec: + endpointSelector: + matchLabels: + gloo: gloo + egressDeny: + - toEntities: + - kube-apiserver diff --git a/test/kube2e/gloo/artifacts/block-labels.yaml b/test/kube2e/gloo/artifacts/block-labels.yaml new file mode 100644 index 00000000000..ed9b7eb6e6a --- /dev/null +++ b/test/kube2e/gloo/artifacts/block-labels.yaml @@ -0,0 +1,12 @@ +apiVersion: cilium.io/v2 +kind: CiliumNetworkPolicy +metadata: + name: deny-gloo-to-kube-apiserver + namespace: gloo-system +spec: + endpointSelector: + matchLabels: + block: this + egressDeny: + - toEntities: + - kube-apiserver \ No newline at end of file diff --git a/test/kube2e/gloo/artifacts/helm.yaml b/test/kube2e/gloo/artifacts/helm.yaml index bc23ae06f65..8a090431b7e 100644 --- a/test/kube2e/gloo/artifacts/helm.yaml +++ b/test/kube2e/gloo/artifacts/helm.yaml @@ -17,7 +17,7 @@ gateway: alwaysAcceptResources: false gloo: logLevel: info - disableLeaderElection: true + disableLeaderElection: false gatewayProxies: gatewayProxy: healthyPanicThreshold: 0 \ No newline at end of file diff --git a/test/kube2e/gloo/bootstrap_clients_test.go b/test/kube2e/gloo/bootstrap_clients_test.go index e7993314498..6debf0e91db 100644 --- a/test/kube2e/gloo/bootstrap_clients_test.go +++ b/test/kube2e/gloo/bootstrap_clients_test.go @@ -3,25 +3,31 @@ package gloo_test import ( "context" "fmt" + "strings" "time" + "github.com/solo-io/gloo/pkg/bootstrap/leaderelector/kube" + "github.com/solo-io/gloo/test/kube2e/helper" kubetestclients "github.com/solo-io/gloo/test/kubernetes/testutils/clients" "github.com/onsi/gomega/gstruct" "github.com/solo-io/solo-kit/pkg/api/v1/clients/factory" "github.com/solo-io/solo-kit/pkg/api/v1/clients/kubesecret" "github.com/solo-io/solo-kit/pkg/api/v1/clients/vault" + "github.com/solo-io/solo-kit/pkg/api/v1/resources" "github.com/solo-io/solo-kit/pkg/api/v1/resources/core" skhelpers "github.com/solo-io/solo-kit/test/helpers" corev1 "k8s.io/api/core/v1" "github.com/hashicorp/consul/api" + gatewaydefaults "github.com/solo-io/gloo/projects/gateway/pkg/defaults" v1 "github.com/solo-io/gloo/projects/gloo/pkg/api/v1" "github.com/solo-io/gloo/test/helpers" "github.com/solo-io/gloo/test/services" skclients "github.com/solo-io/solo-kit/pkg/api/v1/clients" corecache "github.com/solo-io/solo-kit/pkg/api/v1/clients/kube/cache" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientsv1 "k8s.io/client-go/kubernetes/typed/apps/v1" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -418,4 +424,227 @@ var _ = Describe("Bootstrap Clients", func() { }) }) }) + + Context("Retry leader election failure", func() { + + var deploymentClient clientsv1.DeploymentInterface + var verifyTranslation func() + + BeforeEach(func() { + deploymentClient = resourceClientset.KubeClients().AppsV1().Deployments(testHelper.InstallNamespace) + + // verifyTranslation creates a VS with a directActionRoute and verifies it has been accepted + // and translated by curling against the route specified route + verifyTranslation = func() { + name := "test-vs" + domain := "test-vs-domain" + path := "/test" + response := "OK" + + testVS := helpers.NewVirtualServiceBuilder(). + WithName(name). + WithNamespace(testHelper.InstallNamespace). + WithDomain(domain). + WithRoutePrefixMatcher("test", path). + WithRouteDirectResponseAction("test", &v1.DirectResponseAction{ + Status: 200, + Body: response, + }). + Build() + + resourceClientset.VirtualServiceClient().Write(testVS, skclients.WriteOpts{}) + // Since the kube api server can be down when the VS is written, + // specify a long enough interval for it to be accepted when the kube api server comes back up + helpers.EventuallyResourceAccepted(func() (resources.InputResource, error) { + return resourceClientset.VirtualServiceClient().Read(testHelper.InstallNamespace, testVS.Metadata.Name, skclients.ReadOpts{}) + }, "120s", "10s") + defer resourceClientset.VirtualServiceClient().Delete(testHelper.InstallNamespace, testVS.Metadata.Name, skclients.DeleteOpts{}) + + testHelper.CurlEventuallyShouldRespond(helper.CurlOpts{ + Protocol: "http", + Method: "GET", + Path: path, + Host: domain, + Service: gatewaydefaults.GatewayProxyName, + Port: gatewayPort, + ConnectionTimeout: 1, + WithoutStats: true, + }, response, 1, 60*time.Second, 1*time.Second) + } + }) + + AfterEach(func() { + testHelper.ModifyDeploymentEnv(ctx, deploymentClient, testHelper.InstallNamespace, "gloo", 0, corev1.EnvVar{ + Name: kube.MaxRecoveryDurationWithoutKubeAPIServer, + Value: "", + }) + }) + + It("does not recover by default", func() { + waitUntilStartsLeading() + simulateKubeAPIServerUnavailability() + + // By default it should crash + Eventually(func(g Gomega) { + logs := testHelper.GetContainerLogs(ctx, testHelper.InstallNamespace, "deploy/gloo") + g.Expect(logs).To(ContainSubstring("lost leadership, quitting app")) + }, "30s", "1s").Should(Succeed()) + + verifyTranslation() + }) + + It("recovers when MAX_RECOVERY_DURATION_WITHOUT_KUBE_API_SERVER is set", func() { + testHelper.ModifyDeploymentEnv(ctx, deploymentClient, testHelper.InstallNamespace, "gloo", 0, corev1.EnvVar{ + Name: kube.MaxRecoveryDurationWithoutKubeAPIServer, + // This invalid value will test whether it falls back to the default value of 60s + Value: "abcd", + }) + + // Since a new deployment has been rolled out by changing the MAX_RECOVERY_DURATION_WITHOUT_KUBE_API_SERVER env var, + // we can be sure that the logs fetched were generated only after this test has begun + waitUntilStartsLeading() + + // Simulate the kube api server is down and bring it up after 15 seconds. While it is down : + // - The leader should lose the lease + // - Create a VS + // When the kube api server is back up : + // - The VS should be accepted + // - It should be translated into an envoy config + restoreKubeAPIServer := simulateKubeAPIServerDown() + go func() { + time.Sleep(15 * time.Second) + restoreKubeAPIServer() + }() + + // This creates a VS and ensures that it is accepted and translated. Run this while the kube api server is down to verify that + // reports are written once the pod recovers and becomes a leader + verified := make(chan struct{}) + go func() { + verifyTranslation() + verified <- struct{}{} + }() + + Eventually(func(g Gomega) { + logs := testHelper.GetContainerLogs(ctx, testHelper.InstallNamespace, "deploy/gloo") + g.Expect(logs).To(ContainSubstring(fmt.Sprintf("%s is not a valid duration. Defaulting to 60s", kube.MaxRecoveryDurationWithoutKubeAPIServer))) + g.Expect(logs).To(ContainSubstring("Leader election cycle 0 lost. Trying again")) + g.Expect(logs).To(ContainSubstring("recovered from lease renewal failure")) + g.Expect(logs).NotTo(ContainSubstring("lost leadership, quitting app")) + }, "60s", "1s").Should(Succeed()) + + // Wait for the goroutine to finish + <-verified + }) + + // During this test : + // - Scale up the deployment to 2 pods + // - Block kube api access to the leader pod + // - Verify the leader pod loses leadership + // - Verify the second pod becomes the leader + // - Create a resource and verify it has been translated + It("concedes leadership to another pod", func() { + testHelper.ModifyDeploymentEnv(ctx, deploymentClient, testHelper.InstallNamespace, "gloo", 0, corev1.EnvVar{ + Name: kube.MaxRecoveryDurationWithoutKubeAPIServer, + Value: "45s", + }) + + // Since a new deployment has been rolled out by changing the MAX_RECOVERY_DURATION_WITHOUT_KUBE_API_SERVER env var, + // we can be sure that the logs fetched were generated only after this test had begun + waitUntilStartsLeading() + + // Get the leader pod. Since there is only one pod it is the leader + leader, _, err := testHelper.Execute(ctx, "get", "pods", "-n", testHelper.InstallNamespace, "-l", "gloo=gloo", "-o", "jsonpath='{.items[0].metadata.name}'") + Expect(err).ToNot(HaveOccurred()) + leader = strings.ReplaceAll(leader, "'", "") + + // Scale the deployment to 2 replicas so the other can take over when the leader is unable to communicate with the Kube API server + err = testHelper.Scale(ctx, testHelper.InstallNamespace, "deploy/gloo", 2) + Expect(err).ToNot(HaveOccurred()) + defer func() { + err = testHelper.Scale(ctx, testHelper.InstallNamespace, "deploy/gloo", 1) + Expect(err).ToNot(HaveOccurred()) + }() + + // Get the follower pod. + follower, _, err := testHelper.Execute(ctx, "get", "pods", "-n", testHelper.InstallNamespace, "-l", "gloo=gloo", "-o", "jsonpath='{.items[*].metadata.name}'") + Expect(err).ToNot(HaveOccurred()) + // Before: 'gloo-7bd4788f8c-6qvd8' (leader) 'gloo-7bd4788f8c-qdjn6' (follower) + // After: gloo-7bd4788f8c-qdjn6 (follower) + follower = strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(follower, "'", ""), " ", ""), leader, "") + + // Verify that the follower is indeed not leading + Eventually(func(g Gomega) { + logs := testHelper.GetContainerLogs(ctx, testHelper.InstallNamespace, "pod/"+follower) + g.Expect(logs).To(ContainSubstring("new leader elected with ID: " + leader)) + g.Expect(logs).ToNot(ContainSubstring("starting leadership")) + }, "60s", "1s").Should(Succeed()) + + // Label the leader so the network policy can block communication to the Kube API server + pod, err := resourceClientset.KubeClients().CoreV1().Pods(testHelper.InstallNamespace).Get(ctx, leader, metav1.GetOptions{}) + Expect(err).ToNot(HaveOccurred()) + pod.Labels["block"] = "this" + _, err = resourceClientset.KubeClients().CoreV1().Pods(testHelper.InstallNamespace).Update(ctx, pod, metav1.UpdateOptions{}) + Expect(err).ToNot(HaveOccurred()) + + // Block the leader's communication to the Kube API server + err = testHelper.ApplyFile(ctx, testHelper.RootDir+"/test/kube2e/gloo/artifacts/block-labels.yaml") + Expect(err).ToNot(HaveOccurred()) + + // Verify that the leader has stopped leading + Eventually(func(g Gomega) { + logs := testHelper.GetContainerLogs(ctx, testHelper.InstallNamespace, "pod/"+leader) + g.Expect(logs).To(ContainSubstring("max recovery from kube apiserver unavailability set to 45s")) + g.Expect(logs).To(ContainSubstring("lost leadership")) + }, "600s", "10s").Should(Succeed()) + + // Verify that the follower has become the new leader + Eventually(func(g Gomega) { + logs := testHelper.GetContainerLogs(ctx, testHelper.InstallNamespace, "pod/"+follower) + g.Expect(logs).To(ContainSubstring("starting leadership")) + }, "60s", "1s").Should(Succeed()) + + // Cleanup the network policy. + err = testHelper.DeleteFile(ctx, testHelper.RootDir+"/test/kube2e/gloo/artifacts/block-labels.yaml") + Expect(err).ToNot(HaveOccurred()) + + // With connectivity restored, the old leader can become a follower + // Verify that the old leader has become a follower + Eventually(func(g Gomega) { + logs := testHelper.GetContainerLogs(ctx, testHelper.InstallNamespace, "pod/"+leader) + g.Expect(logs).To(ContainSubstring("recovered from lease renewal failure")) + g.Expect(logs).To(ContainSubstring("new leader elected with ID: " + follower)) + }, "60s", "1s").Should(Succeed()) + + // Ensure that we can still operate. + verifyTranslation() + }) + }) }) + +// simulateKubeAPIServerDown blocks network connectivity between the gloo pod and the kube api server. +// It returns a function that restores network connectivity. +func simulateKubeAPIServerDown() func() { + err := testHelper.ApplyFile(ctx, testHelper.RootDir+"/test/kube2e/gloo/artifacts/block-gloo-apiserver.yaml") + Expect(err).ToNot(HaveOccurred()) + + return func() { + err := testHelper.DeleteFile(ctx, testHelper.RootDir+"/test/kube2e/gloo/artifacts/block-gloo-apiserver.yaml") + Expect(err).ToNot(HaveOccurred()) + } +} + +// simulateKubeAPIServerUnavailability temporarily blocks network connectivity between the gloo pod and the kube api server +func simulateKubeAPIServerUnavailability() { + restoreKubeAPIServer := simulateKubeAPIServerDown() + time.Sleep(15 * time.Second) + restoreKubeAPIServer() +} + +func waitUntilStartsLeading() { + // Initially sleep as the new deployment might be rolling out + time.Sleep(10 * time.Second) + Eventually(func(g Gomega) { + out := testHelper.GetContainerLogs(ctx, testHelper.InstallNamespace, "deploy/gloo") + g.Expect(out).To(ContainSubstring("starting leadership")) + }, "120s", "10s").Should(Succeed()) +} diff --git a/test/kube2e/gloo/gloo_suite_test.go b/test/kube2e/gloo/gloo_suite_test.go index 3fa04f0d509..91f5eaac24e 100644 --- a/test/kube2e/gloo/gloo_suite_test.go +++ b/test/kube2e/gloo/gloo_suite_test.go @@ -52,8 +52,6 @@ var ( envoyFactory envoy.Factory vaultFactory *services.VaultFactory - - kubeCli *kubectl.Cli ) var _ = BeforeSuite(func() { @@ -64,10 +62,9 @@ var _ = BeforeSuite(func() { ctx, cancel = context.WithCancel(context.Background()) testHelper, err = kube2e.GetTestHelper(ctx, namespace) Expect(err).NotTo(HaveOccurred()) + testHelper.SetKubeCli(kubectl.NewCli().WithReceiver(GinkgoWriter)) skhelpers.RegisterPreFailHandler(helpers.StandardGlooDumpOnFail(GinkgoWriter, metav1.ObjectMeta{Namespace: testHelper.InstallNamespace})) - kubeCli = kubectl.NewCli().WithReceiver(GinkgoWriter) - // Allow skipping of install step for running multiple times if !glootestutils.ShouldSkipInstall() { installGloo() diff --git a/test/kube2e/gloo/setup_syncer_test.go b/test/kube2e/gloo/setup_syncer_test.go index f6a1716570f..00a62ca26b4 100644 --- a/test/kube2e/gloo/setup_syncer_test.go +++ b/test/kube2e/gloo/setup_syncer_test.go @@ -114,7 +114,7 @@ var _ = Describe("Setup Syncer", func() { }) It("restarts validation grpc server when settings change", func() { - portForwarder, err := kubeCli.StartPortForward(ctx, + portForwarder, err := testHelper.StartPortForward(ctx, portforward.WithDeployment(kubeutils.GlooDeploymentName, testHelper.InstallNamespace), portforward.WithRemotePort(defaults.GlooValidationPort), ) diff --git a/test/kube2e/helper/install.go b/test/kube2e/helper/install.go index ec6f1634f91..7e41682e201 100644 --- a/test/kube2e/helper/install.go +++ b/test/kube2e/helper/install.go @@ -15,6 +15,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/rotisserie/eris" + "github.com/solo-io/gloo/pkg/utils/kubeutils/kubectl" "github.com/solo-io/go-utils/log" "github.com/solo-io/go-utils/testutils/exec" "github.com/solo-io/k8s-utils/testutils/kube" @@ -88,6 +89,8 @@ type TestConfig struct { type SoloTestHelper struct { *TestConfig TestUpstreamServer + // The kubernetes helper + *kubectl.Cli } // NewSoloTestHelper is meant to provide a standard way of deploying Gloo/GlooE to a k8s cluster during tests. @@ -151,6 +154,10 @@ func NewSoloTestHelper(configFunc TestConfigFunc) (*SoloTestHelper, error) { return testHelper, nil } +func (h *SoloTestHelper) SetKubeCli(cli *kubectl.Cli) { + h.Cli = cli +} + // Return the version of the Helm chart func (h *SoloTestHelper) ChartVersion() string { return h.version diff --git a/test/kube2e/helper/kube.go b/test/kube2e/helper/kube.go new file mode 100644 index 00000000000..5cd556c2998 --- /dev/null +++ b/test/kube2e/helper/kube.go @@ -0,0 +1,60 @@ +package helper + +import ( + "context" + "fmt" + "strings" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientsv1 "k8s.io/client-go/kubernetes/typed/apps/v1" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func (h *SoloTestHelper) ModifyDeploymentEnv(ctx context.Context, deploymentClient clientsv1.DeploymentInterface, namespace string, deploymentName string, containerIndex int, envVar corev1.EnvVar) { + GinkgoHelper() + + d, err := deploymentClient.Get(ctx, deploymentName, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + + // make sure we are referencing a valid container + Expect(len(d.Spec.Template.Spec.Containers)).To(BeNumerically(">", containerIndex)) + + // if an env var with the given name already exists, modify it + exists := false + for i, env := range d.Spec.Template.Spec.Containers[containerIndex].Env { + if env.Name == envVar.Name { + d.Spec.Template.Spec.Containers[containerIndex].Env[i].Value = envVar.Value + exists = true + break + } + } + // otherwise add a new env var + if !exists { + d.Spec.Template.Spec.Containers[containerIndex].Env = append(d.Spec.Template.Spec.Containers[containerIndex].Env, envVar) + } + _, err = deploymentClient.Update(ctx, d, metav1.UpdateOptions{}) + Expect(err).NotTo(HaveOccurred()) + + h.WaitForRollout(ctx, deploymentName, namespace, "60s", "1s") +} + +// WaitForRollout waits for the specified deployment to be rolled out successfully. +func (h *SoloTestHelper) WaitForRollout(ctx context.Context, deploymentName string, deploymentNamespace string, intervals ...interface{}) { + GinkgoHelper() + + Eventually(func() (bool, error) { + out, _, err := h.Cli.Execute(ctx, "rollout", "status", "-n", deploymentNamespace, fmt.Sprintf("deployment/%s", deploymentName)) + return strings.Contains(out, "successfully rolled out"), err + }, "30s", "1s").Should(BeTrue()) +} + +func (h *SoloTestHelper) GetContainerLogs(ctx context.Context, namespace string, name string) string { + GinkgoHelper() + + out, _, err := h.Cli.Execute(ctx, "-n", namespace, "logs", name) + Expect(err).ToNot(HaveOccurred()) + return out +}