Skip to content

Commit

Permalink
Implement cleanup controller (#433)
Browse files Browse the repository at this point in the history
* Implement cleanup controller

* Runs by default
* Watches for pod delete events and ensures there are no stale Consul
service instances
* Runs a Reconcile loop on a timer that checks all Consul services
* Adds new connect-inject flags: `-enable-cleanup-controller` (default
true),
`-cleanup-controller-reconcile-period`.
* Adds new server-acl-init flag: `-enable-cleanup-controller` (default
true) and modifies the ACL templates.
  • Loading branch information
lkysow committed Feb 12, 2021
1 parent 1e75484 commit b5784de
Show file tree
Hide file tree
Showing 12 changed files with 772 additions and 34 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ BREAKING CHANGES

FEATURES:
* CRDs: support annotation `consul.hashicorp.com/migrate-entry` on custom resources
that will allow an existing config entry to be migrated onto a Kubernetes custom resource. [[GH-419](https://github.com/hashicorp/consul-k8s/pull/419)]
that will allow an existing config entry to be migrated onto a Kubernetes custom resource. [[GH-419](https://github.com/hashicorp/consul-k8s/pull/419)]
* Connect: add new cleanup controller that runs in the connect-inject deployment. This
controller cleans up Consul service instances that remain registered despite their
pods being deleted. This could happen if the pod's `preStop` hook failed to execute
for some reason. [[GH-433](https://github.com/hashicorp/consul-k8s/pull/433)]

IMPROVEMENTS:
* CRDs: give a more descriptive error when a config entry already exists in Consul. [[GH-420](https://github.com/hashicorp/consul-k8s/pull/420)]
Expand Down
238 changes: 238 additions & 0 deletions connect-inject/cleanup_resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package connectinject

import (
"fmt"
"sync"
"time"

"github.com/hashicorp/consul-k8s/consul"
capi "github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
"golang.org/x/net/context"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

// CleanupResource implements Resource and is used to clean up Consul service
// instances that weren't deregistered when their pods were deleted.
// Usually the preStop hook in the pods handles this but during a force delete
// or OOM the preStop hook doesn't run.
type CleanupResource struct {
Log hclog.Logger
KubernetesClient kubernetes.Interface
// ConsulClient points at the agent on the same node as this pod.
ConsulClient *capi.Client
ReconcilePeriod time.Duration
Ctx context.Context
// ConsulScheme is the scheme to use when making API calls to Consul,
// i.e. "http" or "https".
ConsulScheme string
// ConsulPort is the port to make HTTP API calls to Consul agents on.
ConsulPort string

lock sync.Mutex
}

// Run starts the long-running Reconcile loop that runs on a timer.
func (c *CleanupResource) Run(stopCh <-chan struct{}) {
reconcileTimer := time.NewTimer(c.ReconcilePeriod)
defer reconcileTimer.Stop()

for {
c.reconcile()
reconcileTimer.Reset(c.ReconcilePeriod)

select {
case <-stopCh:
c.Log.Info("received stop signal, shutting down")
return

case <-reconcileTimer.C:
// Fall through and continue the loop.
}
}
}

// reconcile checks all registered Consul connect service instances and ensures
// the pod they represent is still running. If the pod is no longer running,
// it deregisters the service instance from its agent.
func (c *CleanupResource) reconcile() {
c.Log.Debug("starting reconcile")

// currentPods is a map of currently existing pods. Keys are in the form
// "namespace/pod-name". Value doesn't matter since we're using this
// solely for quick "exists" checks.
// Use currentPodsKey() to construct the key when accessing this map.
currentPods := make(map[string]bool)

// Gather needed data on nodes, services and pods.
kubeNodes, err := c.KubernetesClient.CoreV1().Nodes().List(c.Ctx, metav1.ListOptions{})
if err != nil {
c.Log.Error("unable to get nodes", "error", err)
return
}
serviceNames, _, err := c.ConsulClient.Catalog().Services(nil)
if err != nil {
c.Log.Error("unable to get Consul services", "error", err)
return
}
podList, err := c.KubernetesClient.CoreV1().Pods(corev1.NamespaceAll).List(c.Ctx,
metav1.ListOptions{LabelSelector: labelInject})
if err != nil {
c.Log.Error("unable to get pods", "error", err)
return
}

// Build up our map of currently running pods.
for _, pod := range podList.Items {
currentPods[currentPodsKey(pod.Name, pod.Namespace)] = true
}

// For each registered service, find the associated pod.
for serviceName := range serviceNames {
serviceInstances, _, err := c.ConsulClient.Catalog().Service(serviceName, "", nil)
if err != nil {
c.Log.Error("unable to get Consul service", "name", serviceName, "error", err)
return
}
for _, instance := range serviceInstances {
podName, hasPodMeta := instance.ServiceMeta[MetaKeyPodName]
k8sNamespace, hasNSMeta := instance.ServiceMeta[MetaKeyKubeNS]
if hasPodMeta && hasNSMeta {

// Check if the instance matches a running pod. If not, deregister it.
if _, podExists := currentPods[currentPodsKey(podName, k8sNamespace)]; !podExists {
if !nodeInCluster(kubeNodes, instance.Node) {
c.Log.Debug("skipping deregistration because instance is from node not in this cluster",
"pod", podName, "id", instance.ServiceID, "node", instance.Node)
continue
}

c.Log.Info("found service instance from terminated pod still registered", "pod", podName, "id", instance.ServiceID)
err := c.deregisterInstance(instance, instance.Address)
if err != nil {
c.Log.Error("unable to deregister service instance", "id", instance.ServiceID, "error", err)
continue
}
c.Log.Info("service instance deregistered", "id", instance.ServiceID)
}
}
}
}

c.Log.Debug("finished reconcile")
return
}

// Delete is called when a pod is deleted. It checks that the Consul service
// instance for that pod is no longer registered with Consul.
// If the instance is still registered, it deregisters it.
func (c *CleanupResource) Delete(key string, obj interface{}) error {
pod, ok := obj.(*corev1.Pod)
if !ok {
return fmt.Errorf("expected pod, got: %#v", obj)
}
if pod == nil {
return fmt.Errorf("object for key %s was nil", key)
}
serviceName, ok := pod.ObjectMeta.Annotations[annotationService]
if !ok {
return fmt.Errorf("pod did not have %s annotation", annotationService)
}
kubeNS := pod.Namespace
podName := pod.Name

// Look for both the service and its sidecar proxy.
consulServiceNames := []string{serviceName, fmt.Sprintf("%s-sidecar-proxy", serviceName)}

for _, consulServiceName := range consulServiceNames {
instances, _, err := c.ConsulClient.Catalog().Service(consulServiceName, "", &capi.QueryOptions{
Filter: fmt.Sprintf(`ServiceMeta[%q] == %q and ServiceMeta[%q] == %q`, MetaKeyPodName, podName, MetaKeyKubeNS, kubeNS),
})
if err != nil {
c.Log.Error("unable to get Consul Services", "error", err)
return err
}
if len(instances) == 0 {
c.Log.Debug("terminated pod had no still-registered instances", "service", consulServiceName, "pod", podName)
continue
}

// NOTE: We only expect a single instance because there's only one
// per pod but we may as well range over all of them just to be safe.
for _, instance := range instances {
// NOTE: We don't need to check if this instance belongs to a kube
// node in this cluster (like we do in Reconcile) because we only
// get the delete event if a pod in this cluster is deleted so
// we know this is one of our instances.

c.Log.Info("found service instance from terminated pod still registered", "pod", podName, "id", instance.ServiceID)
err := c.deregisterInstance(instance, pod.Status.HostIP)
if err != nil {
c.Log.Error("unable to deregister service instance", "id", instance.ServiceID, "error", err)
return err
}
c.Log.Info("service instance deregistered", "id", instance.ServiceID)
}
}
return nil
}

// Upsert is a no-op because we're only interested in pods that are deleted.
func (c *CleanupResource) Upsert(_ string, _ interface{}) error {
return nil
}

func (c *CleanupResource) Informer() cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return c.KubernetesClient.CoreV1().Pods(metav1.NamespaceAll).List(c.Ctx,
metav1.ListOptions{LabelSelector: labelInject})
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return c.KubernetesClient.CoreV1().Pods(metav1.NamespaceAll).Watch(c.Ctx,
metav1.ListOptions{LabelSelector: labelInject})
},
},
&corev1.Pod{},
// Resync is 0 because we perform our own reconcile loop on our own timer.
0,
cache.Indexers{},
)
}

// deregisterInstance deregisters instance from Consul by calling the agent at
// hostIP's deregister service API.
func (c *CleanupResource) deregisterInstance(instance *capi.CatalogService, hostIP string) error {
fullAddr := fmt.Sprintf("%s://%s:%s", c.ConsulScheme, hostIP, c.ConsulPort)
localConfig := capi.DefaultConfig()
localConfig.Address = fullAddr
client, err := consul.NewClient(localConfig)
if err != nil {
return fmt.Errorf("constructing client for address %q: %s", hostIP, err)
}

return client.Agent().ServiceDeregister(instance.ServiceID)
}

// currentPodsKey should be used to construct the key to access the
// currentPods map.
func currentPodsKey(podName, k8sNamespace string) string {
return fmt.Sprintf("%s/%s", k8sNamespace, podName)
}

// nodeInCluster returns whether nodeName is the name of a node in nodes, i.e.
// it's the name of a node in this cluster.
func nodeInCluster(nodes *corev1.NodeList, nodeName string) bool {
for _, n := range nodes.Items {
if n.Name == nodeName {
return true
}
}
return false
}
Loading

0 comments on commit b5784de

Please sign in to comment.