-
Notifications
You must be signed in to change notification settings - Fork 321
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Runs by default * Watches for pod delete events and ensures there are no stale Consul service instances * Runs a Reconcile loop on a timer that checks all Consul services * Adds new connect-inject flags: `-enable-cleanup-controller`, `-cleanup-controller-reconcile-period`.
- Loading branch information
Showing
6 changed files
with
603 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,214 @@ | ||
package connectinject | ||
|
||
import ( | ||
"fmt" | ||
"net/url" | ||
"sync" | ||
"time" | ||
|
||
capi "github.com/hashicorp/consul/api" | ||
"github.com/hashicorp/go-hclog" | ||
"golang.org/x/net/context" | ||
corev1 "k8s.io/api/core/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/apimachinery/pkg/watch" | ||
"k8s.io/client-go/kubernetes" | ||
"k8s.io/client-go/tools/cache" | ||
) | ||
|
||
type CleanupResource struct { | ||
Log hclog.Logger | ||
KubernetesClient kubernetes.Interface | ||
ConsulClient *capi.Client | ||
ReconcilePeriod time.Duration | ||
Ctx context.Context | ||
ConsulURL *url.URL | ||
|
||
lock sync.Mutex | ||
} | ||
|
||
func (c *CleanupResource) Run(stopCh <-chan struct{}) { | ||
reconcileTimer := time.NewTimer(c.ReconcilePeriod) | ||
defer reconcileTimer.Stop() | ||
|
||
for { | ||
c.Reconcile() | ||
reconcileTimer.Reset(c.ReconcilePeriod) | ||
|
||
select { | ||
case <-stopCh: | ||
c.Log.Info("received stop signal, shutting down") | ||
return | ||
|
||
case <-reconcileTimer.C: | ||
// Fall through and continue the loop. | ||
} | ||
} | ||
} | ||
|
||
func (c *CleanupResource) Reconcile() { | ||
c.Log.Debug("starting reconcile") | ||
|
||
// currentPods is a map of currently existing pods. Keys are in the form | ||
// "namespace/pod-name". Value doesn't matter since we're using this | ||
// solely for quick "exists" checks. | ||
currentPods := make(map[string]bool) | ||
|
||
// Gather needed data on nodes, services and pods. | ||
kubeNodes, err := c.KubernetesClient.CoreV1().Nodes().List(c.Ctx, metav1.ListOptions{}) | ||
if err != nil { | ||
c.Log.Error("unable to get nodes", "error", err) | ||
return | ||
} | ||
serviceNames, _, err := c.ConsulClient.Catalog().Services(nil) | ||
if err != nil { | ||
c.Log.Error("unable to get Consul services", "error", err) | ||
return | ||
} | ||
podList, err := c.KubernetesClient.CoreV1().Pods(corev1.NamespaceAll).List(c.Ctx, | ||
metav1.ListOptions{LabelSelector: labelInject}) | ||
if err != nil { | ||
c.Log.Error("unable to get pods", "error", err) | ||
return | ||
} | ||
|
||
// Build up our map of currently running pods. | ||
for _, pod := range podList.Items { | ||
currentPods[currentPodsKey(pod.Name, pod.Namespace)] = true | ||
} | ||
|
||
// For each registered service, find the associated pod. | ||
for serviceName := range serviceNames { | ||
serviceInstances, _, err := c.ConsulClient.Catalog().Service(serviceName, "", nil) | ||
if err != nil { | ||
c.Log.Error("unable to get Consul service", "name", serviceName, "error", err) | ||
return | ||
} | ||
for _, instance := range serviceInstances { | ||
podName, hasPodMeta := instance.ServiceMeta[MetaKeyPodName] | ||
k8sNamespace, hasNSMeta := instance.ServiceMeta[MetaKeyKubeNS] | ||
if hasPodMeta && hasNSMeta { | ||
|
||
// Check if the instance matches a running pod. | ||
if _, podExists := currentPods[currentPodsKey(podName, k8sNamespace)]; !podExists { | ||
if !nodeInCluster(kubeNodes, instance.Node) { | ||
c.Log.Debug("skipping deregistration because instance is from node not in this cluster", | ||
"pod", podName, "id", instance.ServiceID, "node", instance.Node) | ||
continue | ||
} | ||
|
||
c.Log.Info("found service instance from terminated pod still registered", "pod", podName, "id", instance.ServiceID) | ||
err := c.deregisterInstance(instance, instance.Address) | ||
if err != nil { | ||
c.Log.Error("unable to deregister service instance", "id", instance.ServiceID, "error", err) | ||
continue | ||
} | ||
c.Log.Info("service instance deregistered", "id", instance.ServiceID) | ||
} | ||
} | ||
} | ||
} | ||
|
||
c.Log.Debug("finished reconcile") | ||
return | ||
} | ||
|
||
func (c *CleanupResource) Delete(key string, obj interface{}) error { | ||
pod, ok := obj.(*corev1.Pod) | ||
if !ok { | ||
return fmt.Errorf("expected pod, got: #%v", obj) | ||
} | ||
if pod == nil { | ||
return fmt.Errorf("object for key %s was nil", key) | ||
} | ||
serviceName, ok := pod.ObjectMeta.Annotations[annotationService] | ||
if !ok { | ||
return fmt.Errorf("pod did not have %s annotation", annotationService) | ||
} | ||
kubeNS := pod.Namespace | ||
podName := pod.Name | ||
|
||
// Look for both the service and its sidecar proxy. | ||
consulServiceNames := []string{serviceName, fmt.Sprintf("%s-sidecar-proxy", serviceName)} | ||
|
||
for _, consulServiceName := range consulServiceNames { | ||
instances, _, err := c.ConsulClient.Catalog().Service(consulServiceName, "", &capi.QueryOptions{ | ||
Filter: fmt.Sprintf(`ServiceMeta[%q] == %q and ServiceMeta[%q] == %q`, MetaKeyPodName, podName, MetaKeyKubeNS, kubeNS), | ||
}) | ||
if err != nil { | ||
c.Log.Error("unable to get Consul Services", "error", err) | ||
return err | ||
} | ||
if len(instances) == 0 { | ||
c.Log.Debug("terminated pod had no still-registered instances", "service", consulServiceName) | ||
continue | ||
} | ||
|
||
// NOTE: We only expect a single instance because there's only one | ||
// per pod but we may as well range over all of them just to be safe. | ||
for _, instance := range instances { | ||
// NOTE: We don't need to check if this instance belongs to a kube | ||
// node in this cluster (like we do in Reconcile) because we only | ||
// get the delete event if a pod in this cluster is deleted so | ||
// we know this is one of our instances. | ||
|
||
c.Log.Info("found service instance from terminated pod still registered", "pod", podName, "id", instance.ServiceID) | ||
err := c.deregisterInstance(instance, pod.Status.HostIP) | ||
if err != nil { | ||
c.Log.Error("Unable to deregister service instance", "id", instance.ServiceID, "error", err) | ||
return err | ||
} | ||
c.Log.Info("service instance deregistered", "id", instance.ServiceID) | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (c *CleanupResource) Upsert(_ string, _ interface{}) error { | ||
return nil | ||
} | ||
|
||
func (c *CleanupResource) Informer() cache.SharedIndexInformer { | ||
return cache.NewSharedIndexInformer( | ||
&cache.ListWatch{ | ||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { | ||
return c.KubernetesClient.CoreV1().Pods(metav1.NamespaceAll).List(c.Ctx, | ||
metav1.ListOptions{LabelSelector: labelInject}) | ||
}, | ||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { | ||
return c.KubernetesClient.CoreV1().Pods(metav1.NamespaceAll).Watch(c.Ctx, | ||
metav1.ListOptions{LabelSelector: labelInject}) | ||
}, | ||
}, | ||
&corev1.Pod{}, | ||
// Resync is 0 because we perform our own reconcile loop on our own timer. | ||
0, | ||
cache.Indexers{}, | ||
) | ||
} | ||
|
||
func (c *CleanupResource) deregisterInstance(instance *capi.CatalogService, hostIP string) error { | ||
fullAddr := fmt.Sprintf("%s://%s:%s", c.ConsulURL.Scheme, hostIP, c.ConsulURL.Port()) | ||
localConfig := capi.DefaultConfig() | ||
localConfig.Address = fullAddr | ||
client, err := capi.NewClient(localConfig) | ||
if err != nil { | ||
return fmt.Errorf("constructing client for address %q: %s", hostIP, err) | ||
} | ||
|
||
return client.Agent().ServiceDeregister(instance.ServiceID) | ||
} | ||
|
||
func currentPodsKey(podName, k8sNamespace string) string { | ||
return fmt.Sprintf("%s/%s", k8sNamespace, podName) | ||
} | ||
|
||
func nodeInCluster(nodes *corev1.NodeList, nodeName string) bool { | ||
for _, n := range nodes.Items { | ||
if n.Name == nodeName { | ||
return true | ||
} | ||
} | ||
return false | ||
} |
Oops, something went wrong.