Skip to content

Commit

Permalink
[1.17] Retry on leader lease renewal failure (#9639)
Browse files Browse the repository at this point in the history
* retry on leader lease renewal failure

* Die if unable to recover

* use env var

* add basic tests

* udpate tests

* add comments

* Add commnets around ci changes

* update tests

* refactor

* cleanup

* udpate test

* rename env var

* add changelog

* address comments v1

* address comments v2

* fix test

* sue GinkgoHelper

* Adding changelog file to new location

* Deleting changelog file from old location

* specify a duration

* Adding changelog file to new location

* Deleting changelog file from old location

* remove default

* Adding changelog file to new location

* Deleting changelog file from old location

* fix tests

* move changelog

* move conter

---------

Co-authored-by: soloio-bulldozer[bot] <48420018+soloio-bulldozer[bot]@users.noreply.github.com>
Co-authored-by: Nathan Fudenberg <nathan.fudenberg@solo.io>
  • Loading branch information
3 people authored Jun 19, 2024
1 parent 4be2838 commit 52ea4ab
Show file tree
Hide file tree
Showing 15 changed files with 476 additions and 45 deletions.
4 changes: 4 additions & 0 deletions changelog/v1.17.0-rc5/dont-crash-on-failed-lease-renewal.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
changelog:
- type: NEW_FEATURE
issueLink: https://github.com/solo-io/gloo/issues/8107
description: Adds the ability to recover if the Kubernetes API server is unreachable once the gloo pod comes up. The `MAX_RECOVERY_DURATION_WITHOUT_KUBE_API_SERVER` environment variable defines the maximum duration the gloo pod can run and attempt to reconnect to the kube apiserver if it is unreachable. Exceeding this duration will lead to the pod quitting. To enable this feature, set the `MAX_RECOVERY_DURATION_WITHOUT_KUBE_API_SERVER` environment variable to the desired duration in the gloo container. This can be done either by modifying the gloo deployment or by specifying the `gloo.deployment.customEnv[0].Name=MAX_RECOVERY_DURATION_WITHOUT_KUBE_API_SERVER` and `gloo.deployment.customEnv[0].Value=60s` helm values.
4 changes: 4 additions & 0 deletions ci/kind/cluster.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
# Do not use the default CNI as kindnet does not support custom network policies.
# Instead, cilium is installed as CNI as we need to test Kube API server unavailability in the kube2e tests
networking:
disableDefaultCNI: true
kubeadmConfigPatches:
- |
apiVersion: kubeadm.k8s.io/v1beta3
Expand Down
9 changes: 9 additions & 0 deletions ci/kind/setup-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ ISTIO_VERSION="${ISTIO_VERSION:-1.22.0}"
IMAGE_VARIANT="${IMAGE_VARIANT:-standard}"
# If true, run extra steps to set up k8s gateway api conformance test environment
CONFORMANCE="${CONFORMANCE:-false}"
CILIUM_VERSION="${CILIUM_VERSION:-1.15.5}"

function create_kind_cluster_or_skip() {
activeClusters=$(kind get clusters)
Expand All @@ -36,6 +37,14 @@ function create_kind_cluster_or_skip() {
--name "$CLUSTER_NAME" \
--image "kindest/node:$CLUSTER_NODE_VERSION" \
--config="$SCRIPT_DIR/cluster.yaml"

# Install cilium as we need to define custom network policies to simulate kube api server unavailability
# in some of our kube2e tests
helm repo add cilium https://helm.cilium.io/
helm install cilium cilium/cilium --version $CILIUM_VERSION \
--namespace kube-system \
--set image.pullPolicy=IfNotPresent \
--set ipam.mode=kubernetes
echo "Finished setting up cluster $CLUSTER_NAME"

# so that you can just build the kind image alone if needed
Expand Down
5 changes: 5 additions & 0 deletions pkg/bootstrap/leaderelector/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,8 @@ func (i identityImpl) IsLeader() bool {
func (i identityImpl) Elected() <-chan struct{} {
return i.elected
}

// Reset updates the current identity to a follower. It can be promoted to a leader by closing the elected channel
func (i identityImpl) Reset(elected <-chan struct{}) {
i.elected = elected
}
139 changes: 109 additions & 30 deletions pkg/bootstrap/leaderelector/kube/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package kube
import (
"context"
"os"
"sync/atomic"
"time"

"github.com/solo-io/gloo/pkg/bootstrap/leaderelector"
"github.com/solo-io/gloo/pkg/utils/envutils"
"github.com/solo-io/go-utils/contextutils"
"k8s.io/client-go/rest"
k8sleaderelection "k8s.io/client-go/tools/leaderelection"
Expand All @@ -21,9 +23,12 @@ const (
defaultRetryPeriod = 2 * time.Second
defaultRenewPeriod = 10 * time.Second

leaseDurationEnvName = "LEADER_ELECTION_LEASE_DURATION"
retryPeriodEnvName = "LEADER_ELECTION_RETRY_PERIOD"
renewPeriodEnvName = "LEADER_ELECTION_RENEW_PERIOD"
defaultRecoveryTimeout = 60 * time.Second

leaseDurationEnvName = "LEADER_ELECTION_LEASE_DURATION"
retryPeriodEnvName = "LEADER_ELECTION_RETRY_PERIOD"
renewPeriodEnvName = "LEADER_ELECTION_RENEW_PERIOD"
MaxRecoveryDurationWithoutKubeAPIServer = "MAX_RECOVERY_DURATION_WITHOUT_KUBE_API_SERVER"
)

// kubeElectionFactory is the implementation for coordinating leader election using
Expand All @@ -39,6 +44,20 @@ func NewElectionFactory(config *rest.Config) *kubeElectionFactory {
}

func (f *kubeElectionFactory) StartElection(ctx context.Context, config *leaderelector.ElectionConfig) (leaderelector.Identity, error) {
var recoveryTimeoutIfKubeAPIServerIsUnreachable time.Duration
var recoverIfKubeAPIServerIsUnreachable bool
var err error
if envutils.IsEnvDefined(MaxRecoveryDurationWithoutKubeAPIServer) {
recoveryTimeoutIfKubeAPIServerIsUnreachable, err = time.ParseDuration(os.Getenv(MaxRecoveryDurationWithoutKubeAPIServer))
if err != nil {
contextutils.LoggerFrom(ctx).Errorf("%s is not a valid duration. Defaulting to 60s", MaxRecoveryDurationWithoutKubeAPIServer)
recoveryTimeoutIfKubeAPIServerIsUnreachable = defaultRecoveryTimeout
} else {
contextutils.LoggerFrom(ctx).Infof("max recovery from kube apiserver unavailability set to %s", recoveryTimeoutIfKubeAPIServerIsUnreachable)
}
recoverIfKubeAPIServerIsUnreachable = true
}

elected := make(chan struct{})
identity := leaderelector.NewIdentity(elected)

Expand All @@ -55,39 +74,99 @@ func (f *kubeElectionFactory) StartElection(ctx context.Context, config *leadere
return identity, err
}

l, err := k8sleaderelection.NewLeaderElector(
k8sleaderelection.LeaderElectionConfig{
Lock: resourceLock,
LeaseDuration: getLeaseDuration(),
RenewDeadline: getRenewPeriod(),
RetryPeriod: getRetryPeriod(),
Callbacks: k8sleaderelection.LeaderCallbacks{
OnStartedLeading: func(callbackCtx context.Context) {
contextutils.LoggerFrom(callbackCtx).Debug("Started Leading")
close(elected)
config.OnStartedLeading(callbackCtx)
},
OnStoppedLeading: func() {
contextutils.LoggerFrom(ctx).Error("Stopped Leading")
config.OnStoppedLeading()
},
OnNewLeader: func(identity string) {
contextutils.LoggerFrom(ctx).Debugf("New Leader Elected with Identity: %s", identity)
config.OnNewLeader(identity)
var justFailed = false
var dontDie func()

// dieIfUnrecoverable causes gloo to exit after the recoveryTimeout (default 60s) if the context is not cancelled.
// This function is called when this container is a leader but unable to renew the leader lease (caused by an unreachable kube api server).
// The context is cancelled if it is able to participate in leader election again, irrespective if it becomes a leader or follower.
dieIfUnrecoverable := func(ctx context.Context) {
timer := time.NewTimer(recoveryTimeoutIfKubeAPIServerIsUnreachable)
select {
case <-timer.C:
contextutils.LoggerFrom(ctx).Fatalf("unable to recover from failed leader election, quitting app")
case <-ctx.Done():
contextutils.LoggerFrom(ctx).Infof("recovered from lease renewal failure")
}
}

newLeaderElector := func() (*k8sleaderelection.LeaderElector, error) {
recoveryCtx, cancel := context.WithCancel(ctx)

return k8sleaderelection.NewLeaderElector(
k8sleaderelection.LeaderElectionConfig{
Lock: resourceLock,
LeaseDuration: getLeaseDuration(),
RenewDeadline: getRenewPeriod(),
RetryPeriod: getRetryPeriod(),
Callbacks: k8sleaderelection.LeaderCallbacks{
OnStartedLeading: func(callbackCtx context.Context) {
contextutils.LoggerFrom(callbackCtx).Debug("Started Leading")
close(elected)
config.OnStartedLeading(callbackCtx)
},
OnStoppedLeading: func() {
contextutils.LoggerFrom(ctx).Error("Stopped Leading")
config.OnStoppedLeading()
if recoverIfKubeAPIServerIsUnreachable {
// Recreate the elected channel and reset the identity to a follower
// Ref: https://github.com/solo-io/gloo/issues/7346
elected = make(chan struct{})
identity.Reset(elected)
// Die if we are unable to recover from this within the recoveryTimeout
go dieIfUnrecoverable(recoveryCtx)
// Set recover to cancel the context to be used the next time `OnNewLeader` is called
dontDie = cancel
justFailed = true
}
},
OnNewLeader: func(identity string) {
contextutils.LoggerFrom(ctx).Debugf("New Leader Elected with Identity: %s", identity)
config.OnNewLeader(identity)
// Recover since we were able to re-negotiate leader election
// Do this only when we just failed and not when someone becomes a leader
if recoverIfKubeAPIServerIsUnreachable && justFailed {
dontDie()
justFailed = false
}
},
},
Name: config.Id,
ReleaseOnCancel: true,
},
Name: config.Id,
ReleaseOnCancel: true,
},
)
)
}

// The error returned is just validating the config passed. If it passes validation once, it will again
_, err = newLeaderElector()
if err != nil {
return identity, err
}

// Start the leader elector process in a goroutine
contextutils.LoggerFrom(ctx).Debug("Starting Kube Leader Election")
go l.Run(ctx)

// leaderElector.Run() is a blocking method but we need to return the identity of this container to sub-components so they can
// perform their respective tasks, hence it runs within a go routine.
// It runs within an infinite loop so that we can recover if this container is a leader but fails to renew the lease and renegotiate leader election if possible.
// This can be caused when there is a failure to connect to the kube api server
go func() {
var counter atomic.Uint32

for {
l, _ := newLeaderElector()
// Start the leader elector process
contextutils.LoggerFrom(ctx).Debug("Starting Kube Leader Election")
l.Run(ctx)

if !recoverIfKubeAPIServerIsUnreachable {
contextutils.LoggerFrom(ctx).Fatalf("lost leadership, quitting app")
}

contextutils.LoggerFrom(ctx).Errorf("Leader election cycle %v lost. Trying again", counter.Load())
counter.Add(1)
// Sleep for the lease duration so another container has a chance to become the leader rather than try to renew
// in when the kube api server is unreachable by this container
time.Sleep(getLeaseDuration())
}
}()
return identity, nil
}

Expand Down
23 changes: 19 additions & 4 deletions pkg/utils/kubeutils/kubectl/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,17 +249,17 @@ func (c *Cli) CurlFromPod(ctx context.Context, podOpts PodExecOptions, options .
"5",
}, curlArgs...)

stdout, stderr, err := c.ExecuteOn(ctx, c.kubeContext, nil, args...)
stdout, stderr, err := c.ExecuteOn(ctx, c.kubeContext, args...)

return &CurlResponse{StdOut: stdout, StdErr: stderr}, err
}

func (c *Cli) ExecuteOn(ctx context.Context, kubeContext string, stdin *bytes.Buffer, args ...string) (string, string, error) {
func (c *Cli) ExecuteOn(ctx context.Context, kubeContext string, args ...string) (string, string, error) {
args = append([]string{"--context", kubeContext}, args...)
return c.Execute(ctx, stdin, args...)
return c.Execute(ctx, args...)
}

func (c *Cli) Execute(ctx context.Context, stdin *bytes.Buffer, args ...string) (string, string, error) {
func (c *Cli) Execute(ctx context.Context, args ...string) (string, string, error) {
stdout := new(strings.Builder)
stderr := new(strings.Builder)

Expand All @@ -271,3 +271,18 @@ func (c *Cli) Execute(ctx context.Context, stdin *bytes.Buffer, args ...string)

return stdout.String(), stderr.String(), err
}

func (c *Cli) Scale(ctx context.Context, namespace string, resource string, replicas uint) error {
err := c.RunCommand(ctx, "scale", "-n", namespace, fmt.Sprintf("--replicas=%d", replicas), resource, "--timeout=300s")
if err != nil {
return err
}
time.Sleep(2 * time.Second) // Sleep a bit so the container starts
return c.RunCommand(ctx, "wait", "-n", namespace, "--for=condition=available", resource, "--timeout=300s")
}

// GetContainerLogs retrieves the logs for the specified container
func (c *Cli) GetContainerLogs(ctx context.Context, namespace string, name string) (string, error) {
stdout, stderr, err := c.Execute(ctx, "-n", namespace, "logs", name)
return stdout + stderr, err
}
8 changes: 3 additions & 5 deletions projects/gloo/pkg/setup/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ func startSetupLoop(ctx context.Context) error {
contextutils.LoggerFrom(ctx).Infof("new leader elected with ID: %s", leaderId)
},
OnStoppedLeading: func() {
// Kill app if we lose leadership, we need to be VERY sure we don't continue
// any leader election processes.
// https://github.com/solo-io/gloo/issues/7346
// There is follow-up work to handle lost leadership more gracefully
contextutils.LoggerFrom(ctx).Fatalf("lost leadership, quitting app")
// Don't die if we fall from grace. Instead we can retry leader election
// Ref: https://github.com/solo-io/gloo/issues/7346
contextutils.LoggerFrom(ctx).Errorf("lost leadership")
},
},
})
Expand Down
12 changes: 12 additions & 0 deletions test/kube2e/gloo/artifacts/block-gloo-apiserver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: cilium.io/v2
kind: CiliumNetworkPolicy
metadata:
name: deny-gloo-to-kube-apiserver
namespace: gloo-system
spec:
endpointSelector:
matchLabels:
gloo: gloo
egressDeny:
- toEntities:
- kube-apiserver
12 changes: 12 additions & 0 deletions test/kube2e/gloo/artifacts/block-labels.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: cilium.io/v2
kind: CiliumNetworkPolicy
metadata:
name: deny-gloo-to-kube-apiserver
namespace: gloo-system
spec:
endpointSelector:
matchLabels:
block: this
egressDeny:
- toEntities:
- kube-apiserver
2 changes: 1 addition & 1 deletion test/kube2e/gloo/artifacts/helm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ gateway:
alwaysAcceptResources: false
gloo:
logLevel: info
disableLeaderElection: true
disableLeaderElection: false
gatewayProxies:
gatewayProxy:
healthyPanicThreshold: 0
Loading

0 comments on commit 52ea4ab

Please sign in to comment.