Skip to content

Commit

Permalink
fix(kuma-cp) switch leader election to leader-for-life (#3023)
Browse files Browse the repository at this point in the history
* fix(kuma-cp) Switch leader election to leader-for-life

Signed-off-by: Paul Parkanzky <paul.parkanzky@konghq.com>

* tests(kuma-cp) Make check

Signed-off-by: Paul Parkanzky <paul.parkanzky@konghq.com>

* fix(kuma-cp) Add pod name to install env for leader election

Signed-off-by: Paul Parkanzky <paul.parkanzky@konghq.com>

* fix(kuma-cp) Force acquire deprecated locks

Signed-off-by: Paul Parkanzky <paul.parkanzky@konghq.com>

* fix(kuma-cp) PR comments - minor style changes

Signed-off-by: Paul Parkanzky <paul.parkanzky@konghq.com>
  • Loading branch information
parkanzky authored Nov 10, 2021
1 parent 4b8351a commit f5b2b27
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1067,6 +1067,10 @@ spec:
value: "kuma-system"
- name: KUMA_STORE_TYPE
value: "kubernetes"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
args:
- run
- --log-level=info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,10 @@ spec:
value: "kuma-system"
- name: KUMA_STORE_TYPE
value: "kubernetes"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
args:
- run
- --log-level=info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,10 @@ spec:
value: "kuma-system"
- name: KUMA_STORE_TYPE
value: "kubernetes"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
args:
- run
- --log-level=info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,10 @@ spec:
value: "kuma-system"
- name: KUMA_STORE_TYPE
value: "kubernetes"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
args:
- run
- --log-level=info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,10 @@ spec:
value: "kuma"
- name: KUMA_STORE_TYPE
value: "kubernetes"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
args:
- run
- --log-level=info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,10 @@ spec:
value: "kuma-system"
- name: KUMA_STORE_TYPE
value: "kubernetes"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
args:
- run
- --log-level=info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,10 @@ spec:
value: "kuma-system"
- name: KUMA_STORE_TYPE
value: "kubernetes"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
args:
- run
- --log-level=info
Expand Down
4 changes: 4 additions & 0 deletions deployments/charts/kuma/templates/cp-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ spec:
name: {{ $element.Secret }}
key: {{ $element.Key }}
{{- end }}
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
args:
- run
- --log-level={{ .Values.controlPlane.logLevel }}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ require (
github.com/google/uuid v1.3.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/gruntwork-io/terratest v0.38.2
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/hoisie/mustache v0.0.0-20160804235033-6375acf62c69
github.com/iancoleman/orderedmap v0.2.0
github.com/kelseyhightower/envconfig v1.4.0
Expand All @@ -30,6 +29,7 @@ require (
github.com/lib/pq v1.10.3
github.com/miekg/dns v1.1.43
github.com/onsi/ginkgo v1.16.5
github.com/operator-framework/operator-lib v0.8.0
github.com/onsi/gomega v1.17.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/errors v0.9.1
Expand Down
42 changes: 38 additions & 4 deletions go.sum

Large diffs are not rendered by default.

149 changes: 140 additions & 9 deletions pkg/plugins/bootstrap/k8s/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ package k8s

import (
"context"
"encoding/json"
"os"
"strconv"
"time"

"github.com/operator-framework/operator-lib/leader"
"github.com/pkg/errors"
kube_core "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kube_runtime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
kube_ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -50,12 +54,11 @@ func (p *plugin) BeforeBootstrap(b *core_runtime.Builder, _ core_plugins.PluginC
Scheme: scheme,
NewCache: kuma_kube_cache.New,
// Admission WebHook Server
Host: b.Config().Runtime.Kubernetes.AdmissionServer.Address,
Port: int(b.Config().Runtime.Kubernetes.AdmissionServer.Port),
CertDir: b.Config().Runtime.Kubernetes.AdmissionServer.CertDir,
LeaderElection: true,
LeaderElectionID: "kuma-cp-leader",
LeaderElectionNamespace: b.Config().Store.Kubernetes.SystemNamespace,
Host: b.Config().Runtime.Kubernetes.AdmissionServer.Address,
Port: int(b.Config().Runtime.Kubernetes.AdmissionServer.Port),
CertDir: b.Config().Runtime.Kubernetes.AdmissionServer.CertDir,
LeaderElection: false,

// Disable metrics bind address as we serve metrics some other way.
MetricsBindAddress: "0",
},
Expand All @@ -64,12 +67,14 @@ func (p *plugin) BeforeBootstrap(b *core_runtime.Builder, _ core_plugins.PluginC
return err
}

secretClient, err := createSecretClient(b.AppCtx(), scheme, b.Config().Store.Kubernetes.SystemNamespace, config, mgr.GetRESTMapper())
systemNamespace := b.Config().Store.Kubernetes.SystemNamespace

secretClient, err := createSecretClient(b.AppCtx(), scheme, systemNamespace, config, mgr.GetRESTMapper())
if err != nil {
return err
}

b.WithComponentManager(&kubeComponentManager{mgr})
b.WithComponentManager(&kubeComponentManager{mgr, systemNamespace, nil})
b.WithExtensions(k8s_extensions.NewManagerContext(b.Extensions(), mgr))
b.WithExtensions(k8s_extensions.NewSecretClientContext(b.Extensions(), secretClient))
if expTime := b.Config().Runtime.Kubernetes.MarshalingCacheExpirationTime; expTime > 0 {
Expand Down Expand Up @@ -143,16 +148,140 @@ func (p *plugin) AfterBootstrap(b *core_runtime.Builder, _ core_plugins.PluginCo

type kubeComponentManager struct {
kube_ctrl.Manager
oldLeaderElectionNamespace string
leaderComponents []component.Component
}

var _ component.Manager = &kubeComponentManager{}

type leaderAnnotation struct {
HolderIdentity string `json:"holderIdentity"`
LeaseDurationSeconds int `json:"leaseDurationSeconds"`
AcquireTime string `json:"acquireTime"`
RenewTime string `json:"renewTime"`
LeaderTransitions int `json:"leaderTransistions"`
}

var blockerHolderId = "cp-leader-lock-transition"
var oldLeaderConfigMapName = "kuma-cp-leader"

func makeOldLockAnnotation() string {
nowStr := time.Now().Format(time.RFC3339)
annot := &leaderAnnotation{
HolderIdentity: blockerHolderId,
LeaseDurationSeconds: 99999999999999999,
AcquireTime: nowStr,
RenewTime: nowStr,
LeaderTransitions: 0,
}

annotJson, _ := json.Marshal(annot)
return string(annotJson)
}

// Previous versions of kuma-cp used a timeout lock for leader election. We now
// keep the election for the lifetime of the pod. This function forces any previous
// style leader to see itself as having lost its election, and locks out any
// further old leaders.
//
// Only call this after acquiring new-style leader election, so as to only contend
// with old leaders over old locks.
func (cm *kubeComponentManager) forceTakeOldLock(ctx context.Context) error {
log.Info("checking for deprecated leader locks")
client := cm.Manager.GetClient()
ns := cm.oldLeaderElectionNamespace

pod := &kube_core.Pod{}
if err := client.Get(ctx, kube_client.ObjectKey{
Namespace: ns,
Name: os.Getenv("POD_NAME"),
}, pod); err != nil {
log.Error(err, "unable to retrieve this pod")
return err
}

owner := &metav1.OwnerReference{
APIVersion: "v1",
Kind: "Pod",
Name: pod.ObjectMeta.Name,
UID: pod.ObjectMeta.UID,
}

var mustWait = false

for {
newLock := &kube_core.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: oldLeaderConfigMapName,
Namespace: ns,
OwnerReferences: []metav1.OwnerReference{*owner},
Annotations: map[string]string{
"control-plane.alpha.kubernetes.io/leader": makeOldLockAnnotation(),
},
},
}

// Numerous potential races between new and old CP leader in this loop. Just keep
// trying to grab the lock until it succeeds. Since the old leader will
// politely die when we acquire lock, and we are relentless, we will eventually
// prevail.

err := client.Create(ctx, newLock)
switch {
case err == nil:
// Acquired old lock.
if mustWait {
log.Info("waiting 30 seconds for old leader to terminate")
time.Sleep(30 * time.Second)
}
return nil
case apierrors.IsAlreadyExists(err):
log.Info("existing deprecated lock found; stealing")
mustWait = true

existing := &kube_core.ConfigMap{}
key := kube_client.ObjectKey{Namespace: ns, Name: oldLeaderConfigMapName}
err = client.Get(ctx, key, existing)
if err != nil {
log.Error(err, "error reading old lock; trying again")
break
}

err := client.Delete(ctx, existing)
if err != nil {
log.Error(err, "error deleting old lock; trying again")
}
default:
log.Error(err, "error creating ConfigMap; trying again")
}
time.Sleep(1 * time.Second)
}
}

func (cm *kubeComponentManager) Start(done <-chan struct{}) error {
ctx, cancel := context.WithCancel(context.Background())
go func() {
defer cancel()
<-done
}()

go func() {
if err := leader.Become(ctx, "cp-leader"); err != nil {
log.Error(err, "leader lock failure")
os.Exit(1)
}
// This CP will now be leader. But first, destroy deprecated leader lock,
// forcing any old leaders to restart as non-leaders.
if err := cm.forceTakeOldLock(ctx); err != nil {
log.Error(err, "error attempting to clean up deprecated lock")
os.Exit(1)
}
for _, c := range cm.leaderComponents {
if err := cm.Manager.Add(&componentRunnableAdaptor{Component: c}); err != nil {
log.Error(err, "add component error")
}
}
}()
return cm.Manager.Start(ctx)
}

Expand All @@ -163,7 +292,9 @@ var _ kube_manager.LeaderElectionRunnable = component.ComponentFunc(func(i <-cha

func (k *kubeComponentManager) Add(components ...component.Component) error {
for _, c := range components {
if err := k.Manager.Add(&componentRunnableAdaptor{Component: c}); err != nil {
if c.NeedLeaderElection() {
k.leaderComponents = append(k.leaderComponents, c)
} else if err := k.Manager.Add(&componentRunnableAdaptor{Component: c}); err != nil {
return err
}
}
Expand Down

0 comments on commit f5b2b27

Please sign in to comment.