From 573e9d7b85ca51b10f6bae77ea29a49ea78f57c8 Mon Sep 17 00:00:00 2001 From: Luke Kysow <1034429+lkysow@users.noreply.github.com> Date: Wed, 3 Feb 2021 18:06:29 -0800 Subject: [PATCH] Implement cleanup controller * Runs by default * Watches for pod delete events and ensures there are no stale Consul service instances * Runs a Reconcile loop on a timer that checks all Consul services * Adds new connect-inject flags: `-enable-cleanup-controller`, `-cleanup-controller-reconcile-period`. --- connect-inject/cleanup_resource.go | 231 ++++++++++++++++ connect-inject/cleanup_resource_test.go | 322 ++++++++++++++++++++++ connect-inject/container_init.go | 16 +- connect-inject/container_init_test.go | 24 ++ subcommand/inject-connect/command.go | 33 ++- subcommand/inject-connect/command_test.go | 3 +- 6 files changed, 623 insertions(+), 6 deletions(-) create mode 100644 connect-inject/cleanup_resource.go create mode 100644 connect-inject/cleanup_resource_test.go diff --git a/connect-inject/cleanup_resource.go b/connect-inject/cleanup_resource.go new file mode 100644 index 0000000000..31ae101260 --- /dev/null +++ b/connect-inject/cleanup_resource.go @@ -0,0 +1,231 @@ +package connectinject + +import ( + "fmt" + "sync" + "time" + + capi "github.com/hashicorp/consul/api" + "github.com/hashicorp/go-hclog" + "golang.org/x/net/context" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +// CleanupResource implements Resource and is used to clean up Consul service +// instances that weren't deregistered when their pods were deleted. +// Usually the preStop hook in the pods handles this but during a force delete +// or OOM the preStop hook doesn't run. +type CleanupResource struct { + Log hclog.Logger + KubernetesClient kubernetes.Interface + // ConsulClient points at the agent on the same node as this pod. + ConsulClient *capi.Client + ReconcilePeriod time.Duration + Ctx context.Context + // ConsulScheme is the scheme to use when making API calls to Consul, + // i.e. "http" or "https". + ConsulScheme string + // ConsulPort is the port to make HTTP API calls to Consul agents on. + ConsulPort string + + lock sync.Mutex +} + +// Run starts the long-running Reconcile loop that runs on a timer. +func (c *CleanupResource) Run(stopCh <-chan struct{}) { + reconcileTimer := time.NewTimer(c.ReconcilePeriod) + defer reconcileTimer.Stop() + + for { + c.reconcile() + reconcileTimer.Reset(c.ReconcilePeriod) + + select { + case <-stopCh: + c.Log.Info("received stop signal, shutting down") + return + + case <-reconcileTimer.C: + // Fall through and continue the loop. + } + } +} + +// reconcile checks all registered Consul connect service instances and ensures +// the pod they represent is still running. If the pod is no longer running, +// it deregisters the service instance from its agent. +func (c *CleanupResource) reconcile() { + c.Log.Debug("starting reconcile") + + // currentPods is a map of currently existing pods. Keys are in the form + // "namespace/pod-name". Value doesn't matter since we're using this + // solely for quick "exists" checks. + // Use currentPodsKey() to construct the key when accessing this map. + currentPods := make(map[string]bool) + + // Gather needed data on nodes, services and pods. + kubeNodes, err := c.KubernetesClient.CoreV1().Nodes().List(c.Ctx, metav1.ListOptions{}) + if err != nil { + c.Log.Error("unable to get nodes", "error", err) + return + } + serviceNames, _, err := c.ConsulClient.Catalog().Services(nil) + if err != nil { + c.Log.Error("unable to get Consul services", "error", err) + return + } + podList, err := c.KubernetesClient.CoreV1().Pods(corev1.NamespaceAll).List(c.Ctx, + metav1.ListOptions{LabelSelector: labelInject}) + if err != nil { + c.Log.Error("unable to get pods", "error", err) + return + } + + // Build up our map of currently running pods. + for _, pod := range podList.Items { + currentPods[currentPodsKey(pod.Name, pod.Namespace)] = true + } + + // For each registered service, find the associated pod. + for serviceName := range serviceNames { + serviceInstances, _, err := c.ConsulClient.Catalog().Service(serviceName, "", nil) + if err != nil { + c.Log.Error("unable to get Consul service", "name", serviceName, "error", err) + return + } + for _, instance := range serviceInstances { + podName, hasPodMeta := instance.ServiceMeta[MetaKeyPodName] + k8sNamespace, hasNSMeta := instance.ServiceMeta[MetaKeyKubeNS] + if hasPodMeta && hasNSMeta { + + // Check if the instance matches a running pod. If not, deregister it. + if _, podExists := currentPods[currentPodsKey(podName, k8sNamespace)]; !podExists { + if !nodeInCluster(kubeNodes, instance.Node) { + c.Log.Debug("skipping deregistration because instance is from node not in this cluster", + "pod", podName, "id", instance.ServiceID, "node", instance.Node) + continue + } + + c.Log.Info("found service instance from terminated pod still registered", "pod", podName, "id", instance.ServiceID) + err := c.deregisterInstance(instance, instance.Address) + if err != nil { + c.Log.Error("unable to deregister service instance", "id", instance.ServiceID, "error", err) + continue + } + c.Log.Info("service instance deregistered", "id", instance.ServiceID) + } + } + } + } + + c.Log.Debug("finished reconcile") + return +} + +// Delete is called when a pod is deleted. It checks that the Consul service +// instance for that pod is no longer registered with Consul. +// If the instance is still registered, it deregisters it. +func (c *CleanupResource) Delete(key string, obj interface{}) error { + pod, ok := obj.(*corev1.Pod) + if !ok { + return fmt.Errorf("expected pod, got: #%v", obj) + } + if pod == nil { + return fmt.Errorf("object for key %s was nil", key) + } + serviceName, ok := pod.ObjectMeta.Annotations[annotationService] + if !ok { + return fmt.Errorf("pod did not have %s annotation", annotationService) + } + kubeNS := pod.Namespace + podName := pod.Name + + // Look for both the service and its sidecar proxy. + consulServiceNames := []string{serviceName, fmt.Sprintf("%s-sidecar-proxy", serviceName)} + + for _, consulServiceName := range consulServiceNames { + instances, _, err := c.ConsulClient.Catalog().Service(consulServiceName, "", &capi.QueryOptions{ + Filter: fmt.Sprintf(`ServiceMeta[%q] == %q and ServiceMeta[%q] == %q`, MetaKeyPodName, podName, MetaKeyKubeNS, kubeNS), + }) + if err != nil { + c.Log.Error("unable to get Consul Services", "error", err) + return err + } + if len(instances) == 0 { + c.Log.Debug("terminated pod had no still-registered instances", "service", consulServiceName, "pod", podName) + continue + } + + // NOTE: We only expect a single instance because there's only one + // per pod but we may as well range over all of them just to be safe. + for _, instance := range instances { + // NOTE: We don't need to check if this instance belongs to a kube + // node in this cluster (like we do in Reconcile) because we only + // get the delete event if a pod in this cluster is deleted so + // we know this is one of our instances. + + c.Log.Info("found service instance from terminated pod still registered", "pod", podName, "id", instance.ServiceID) + err := c.deregisterInstance(instance, pod.Status.HostIP) + if err != nil { + c.Log.Error("Unable to deregister service instance", "id", instance.ServiceID, "error", err) + return err + } + c.Log.Info("service instance deregistered", "id", instance.ServiceID) + } + } + return nil +} + +// Upsert is a no-op because we're only interested in pods that are deleted. +func (c *CleanupResource) Upsert(_ string, _ interface{}) error { + return nil +} + +func (c *CleanupResource) Informer() cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return c.KubernetesClient.CoreV1().Pods(metav1.NamespaceAll).List(c.Ctx, + metav1.ListOptions{LabelSelector: labelInject}) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return c.KubernetesClient.CoreV1().Pods(metav1.NamespaceAll).Watch(c.Ctx, + metav1.ListOptions{LabelSelector: labelInject}) + }, + }, + &corev1.Pod{}, + // Resync is 0 because we perform our own reconcile loop on our own timer. + 0, + cache.Indexers{}, + ) +} + +func (c *CleanupResource) deregisterInstance(instance *capi.CatalogService, hostIP string) error { + fullAddr := fmt.Sprintf("%s://%s:%s", c.ConsulScheme, hostIP, c.ConsulPort) + localConfig := capi.DefaultConfig() + localConfig.Address = fullAddr + client, err := capi.NewClient(localConfig) + if err != nil { + return fmt.Errorf("constructing client for address %q: %s", hostIP, err) + } + + return client.Agent().ServiceDeregister(instance.ServiceID) +} + +func currentPodsKey(podName, k8sNamespace string) string { + return fmt.Sprintf("%s/%s", k8sNamespace, podName) +} + +func nodeInCluster(nodes *corev1.NodeList, nodeName string) bool { + for _, n := range nodes.Items { + if n.Name == nodeName { + return true + } + } + return false +} diff --git a/connect-inject/cleanup_resource_test.go b/connect-inject/cleanup_resource_test.go new file mode 100644 index 0000000000..d3626c098f --- /dev/null +++ b/connect-inject/cleanup_resource_test.go @@ -0,0 +1,322 @@ +package connectinject + +import ( + "net/url" + "testing" + + capi "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/sdk/testutil" + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" +) + +func TestReconcile(t *testing.T) { + t.Parallel() + + cases := map[string]struct { + ConsulServices []capi.AgentServiceRegistration + KubePods []runtime.Object + ExpConsulServiceIDs []string + // OutOfClusterNode controls whether the services are registered on a + // node that does not exist in this Kube cluster. + OutOfClusterNode bool + }{ + "no instances running": { + ConsulServices: nil, + KubePods: nil, + ExpConsulServiceIDs: nil, + }, + "instance does not have pod-name meta key": { + ConsulServices: []capi.AgentServiceRegistration{consulNoPodNameMetaSvc}, + ExpConsulServiceIDs: []string{"no-pod-name-meta"}, + }, + "instance does not have k8s-namespace meta key": { + ConsulServices: []capi.AgentServiceRegistration{consulNoK8sNSMetaSvc}, + ExpConsulServiceIDs: []string{"no-k8s-ns-meta"}, + }, + "out of cluster node": { + ConsulServices: []capi.AgentServiceRegistration{consulFooSvc, consulFooSvcSidecar}, + ExpConsulServiceIDs: []string{"foo-abc123-foo", "foo-abc123-foo-sidecar-proxy"}, + OutOfClusterNode: true, + }, + "app and sidecar still running": { + ConsulServices: []capi.AgentServiceRegistration{consulFooSvc, consulFooSvcSidecar}, + KubePods: []runtime.Object{fooPod}, + ExpConsulServiceIDs: []string{"foo-abc123-foo", "foo-abc123-foo-sidecar-proxy"}, + }, + "app and sidecar terminated": { + ConsulServices: []capi.AgentServiceRegistration{consulFooSvc, consulFooSvcSidecar}, + KubePods: nil, + ExpConsulServiceIDs: nil, + }, + "only app is registered, no sidecar": { + ConsulServices: []capi.AgentServiceRegistration{consulFooSvc}, + KubePods: nil, + ExpConsulServiceIDs: nil, + }, + "only sidecar is registered, no app": { + ConsulServices: []capi.AgentServiceRegistration{consulFooSvcSidecar}, + KubePods: nil, + ExpConsulServiceIDs: nil, + }, + "multiple instances of the same service": { + ConsulServices: []capi.AgentServiceRegistration{ + consulFooSvc, + consulFooSvcSidecar, + consulFooSvcPod2, + consulFooSvcSidecarPod2, + }, + KubePods: []runtime.Object{fooPod}, + ExpConsulServiceIDs: []string{"foo-abc123-foo", "foo-abc123-foo-sidecar-proxy"}, + }, + } + + for name, c := range cases { + t.Run(name, func(t *testing.T) { + require := require.New(t) + + // Start Consul server. + server, err := testutil.NewTestServerConfigT(t, nil) + defer server.Stop() + require.NoError(err) + server.WaitForLeader(t) + consulClient, err := capi.NewClient(&capi.Config{Address: server.HTTPAddr}) + require.NoError(err) + + // Register Consul services. + for _, svc := range c.ConsulServices { + require.NoError(consulClient.Agent().ServiceRegister(&svc)) + } + + // Create the cleanup resource. + log := hclog.Default().Named("cleanupResource") + log.SetLevel(hclog.Debug) + consulURL, err := url.Parse("http://" + server.HTTPAddr) + require.NoError(err) + kubeResources := c.KubePods + if !c.OutOfClusterNode { + node := nodeName(t, consulClient) + // NOTE: we need to add the node because the reconciler checks if + // the node the service is registered with actually exists in this + // cluster. + kubeResources = append(kubeResources, &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node, + }, + }) + + } + cleanupResource := CleanupResource{ + Log: log, + KubernetesClient: fake.NewSimpleClientset(kubeResources...), + ConsulClient: consulClient, + ConsulScheme: consulURL.Scheme, + ConsulPort: consulURL.Port(), + } + + // Run Reconcile. + cleanupResource.reconcile() + + // Test that the remaining services are what we expect. + services, err := consulClient.Agent().Services() + require.NoError(err) + var actualServiceIDs []string + for id := range services { + actualServiceIDs = append(actualServiceIDs, id) + } + require.ElementsMatch(actualServiceIDs, c.ExpConsulServiceIDs) + }) + } +} + +func TestDelete(t *testing.T) { + t.Parallel() + + cases := map[string]struct { + Pod *corev1.Pod + ConsulServices []capi.AgentServiceRegistration + ExpConsulServiceIDs []string + ExpErr string + }{ + "pod is nil": { + Pod: nil, + ExpErr: "object for key default/foo was nil", + }, + "pod does not have service-name annotation": { + Pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo-abc123", + Namespace: "default", + }, + Status: corev1.PodStatus{ + HostIP: "127.0.0.1", + }, + }, + ExpErr: "pod did not have consul.hashicorp.com/connect-service annotation", + }, + "no instances still registered": { + Pod: fooPod, + ConsulServices: nil, + ExpConsulServiceIDs: nil, + }, + "app and sidecar terminated": { + Pod: fooPod, + ConsulServices: []capi.AgentServiceRegistration{consulFooSvc, consulFooSvcSidecar}, + ExpConsulServiceIDs: nil, + }, + "only app is registered, no sidecar": { + Pod: fooPod, + ConsulServices: []capi.AgentServiceRegistration{consulFooSvc}, + ExpConsulServiceIDs: nil, + }, + "only sidecar is registered, no app": { + Pod: fooPod, + ConsulServices: []capi.AgentServiceRegistration{consulFooSvcSidecar}, + ExpConsulServiceIDs: nil, + }, + "multiple instances of the same service": { + Pod: fooPod, + ConsulServices: []capi.AgentServiceRegistration{ + consulFooSvc, + consulFooSvcSidecar, + consulFooSvcPod2, + consulFooSvcSidecarPod2, + }, + ExpConsulServiceIDs: []string{"foo-def456-foo", "foo-def456-foo-sidecar-proxy"}, + }, + } + + for name, c := range cases { + t.Run(name, func(t *testing.T) { + require := require.New(t) + + // Start Consul server. + server, err := testutil.NewTestServerConfigT(t, nil) + defer server.Stop() + require.NoError(err) + server.WaitForLeader(t) + consulClient, err := capi.NewClient(&capi.Config{Address: server.HTTPAddr}) + require.NoError(err) + + // Register Consul services. + for _, svc := range c.ConsulServices { + require.NoError(consulClient.Agent().ServiceRegister(&svc)) + } + + // Create the cleanup resource. + log := hclog.Default().Named("cleanupResource") + log.SetLevel(hclog.Debug) + consulURL, err := url.Parse("http://" + server.HTTPAddr) + require.NoError(err) + cleanupResource := CleanupResource{ + Log: log, + KubernetesClient: fake.NewSimpleClientset(), + ConsulClient: consulClient, + ConsulScheme: consulURL.Scheme, + ConsulPort: consulURL.Port(), + } + + // Run Delete. + err = cleanupResource.Delete("default/foo", c.Pod) + if c.ExpErr != "" { + require.EqualError(err, c.ExpErr) + } else { + require.NoError(err) + + // Test that the remaining services are what we expect. + services, err := consulClient.Agent().Services() + require.NoError(err) + var actualServiceIDs []string + for id := range services { + actualServiceIDs = append(actualServiceIDs, id) + } + require.ElementsMatch(actualServiceIDs, c.ExpConsulServiceIDs) + } + }) + } +} + +// nodeName returns the Consul node name for the agent that client +// points at. +func nodeName(t *testing.T, client *capi.Client) string { + self, err := client.Agent().Self() + require.NoError(t, err) + require.Contains(t, self, "Config") + require.Contains(t, self["Config"], "NodeName") + return self["Config"]["NodeName"].(string) +} + +var ( + consulFooSvc = capi.AgentServiceRegistration{ + ID: "foo-abc123-foo", + Name: "foo", + Address: "127.0.0.1", + Meta: map[string]string{ + MetaKeyPodName: "foo-abc123", + MetaKeyKubeNS: "default", + }, + } + consulFooSvcSidecar = capi.AgentServiceRegistration{ + ID: "foo-abc123-foo-sidecar-proxy", + Name: "foo-sidecar-proxy", + Address: "127.0.0.1", + Meta: map[string]string{ + MetaKeyPodName: "foo-abc123", + MetaKeyKubeNS: "default", + }, + } + consulFooSvcPod2 = capi.AgentServiceRegistration{ + ID: "foo-def456-foo", + Name: "foo", + Address: "127.0.0.1", + Meta: map[string]string{ + MetaKeyPodName: "foo-def456", + MetaKeyKubeNS: "default", + }, + } + consulFooSvcSidecarPod2 = capi.AgentServiceRegistration{ + ID: "foo-def456-foo-sidecar-proxy", + Name: "foo-sidecar-proxy", + Address: "127.0.0.1", + Meta: map[string]string{ + MetaKeyPodName: "foo-def456", + MetaKeyKubeNS: "default", + }, + } + consulNoPodNameMetaSvc = capi.AgentServiceRegistration{ + ID: "no-pod-name-meta", + Name: "no-pod-name-meta", + Address: "127.0.0.1", + Meta: map[string]string{ + MetaKeyKubeNS: "default", + }, + } + consulNoK8sNSMetaSvc = capi.AgentServiceRegistration{ + ID: "no-k8s-ns-meta", + Name: "no-k8s-ns-meta", + Address: "127.0.0.1", + Meta: map[string]string{ + MetaKeyPodName: "no-k8s-ns-meta", + }, + } + fooPod = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo-abc123", + Namespace: "default", + Labels: map[string]string{ + labelInject: injected, + }, + Annotations: map[string]string{ + annotationStatus: injected, + annotationService: "foo", + }, + }, + Status: corev1.PodStatus{ + HostIP: "127.0.0.1", + }, + } +) diff --git a/connect-inject/container_init.go b/connect-inject/container_init.go index 4d2c6c0c9c..628a51c95c 100644 --- a/connect-inject/container_init.go +++ b/connect-inject/container_init.go @@ -11,7 +11,11 @@ import ( corev1 "k8s.io/api/core/v1" ) -const InjectInitContainerName = "consul-connect-inject-init" +const ( + InjectInitContainerName = "consul-connect-inject-init" + MetaKeyPodName = "pod-name" + MetaKeyKubeNS = "k8s-namespace" +) type initContainerCommandData struct { ServiceName string @@ -32,6 +36,8 @@ type initContainerCommandData struct { Upstreams []initContainerCommandUpstreamData Tags string Meta map[string]string + MetaKeyPodName string + MetaKeyKubeNS string // The PEM-encoded CA certificate to use when // communicating with Consul clients @@ -56,6 +62,8 @@ func (h *Handler) containerInit(pod *corev1.Pod, k8sNamespace string) (corev1.Co ConsulNamespace: h.consulNamespace(k8sNamespace), NamespaceMirroringEnabled: h.EnableK8SNSMirroring, ConsulCACert: h.ConsulCACert, + MetaKeyPodName: MetaKeyPodName, + MetaKeyKubeNS: MetaKeyKubeNS, } if data.ServiceName == "" { // Assertion, since we call defaultAnnotations above and do @@ -281,7 +289,8 @@ services { {{$key}} = "{{$value}}" {{- end }} {{- end }} - pod-name = "${POD_NAME}" + {{ .MetaKeyPodName }} = "${POD_NAME}" + {{ .MetaKeyKubeNS }} = "${POD_NAMESPACE}" } } @@ -303,7 +312,8 @@ services { {{$key}} = "{{$value}}" {{- end }} {{- end }} - pod-name = "${POD_NAME}" + {{ .MetaKeyPodName }} = "${POD_NAME}" + {{ .MetaKeyKubeNS }} = "${POD_NAMESPACE}" } proxy { diff --git a/connect-inject/container_init_test.go b/connect-inject/container_init_test.go index 0d779ef478..47b0b378cf 100644 --- a/connect-inject/container_init_test.go +++ b/connect-inject/container_init_test.go @@ -65,6 +65,7 @@ services { port = 0 meta = { pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } } @@ -76,6 +77,7 @@ services { port = 20000 meta = { pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } proxy { @@ -124,6 +126,7 @@ cp /bin/consul /consul/connect-inject/consul`, port = 1234 meta = { pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } } @@ -135,6 +138,7 @@ services { port = 20000 meta = { pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } proxy { @@ -303,6 +307,7 @@ services { tags = ["abc"] meta = { pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } } @@ -315,6 +320,7 @@ services { tags = ["abc"] meta = { pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } proxy { @@ -355,6 +361,7 @@ services { tags = ["abc","123"] meta = { pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } } @@ -367,6 +374,7 @@ services { tags = ["abc","123"] meta = { pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } proxy { @@ -407,6 +415,7 @@ services { tags = ["abc","123"] meta = { pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } } @@ -419,6 +428,7 @@ services { tags = ["abc","123"] meta = { pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } proxy { @@ -460,6 +470,7 @@ services { tags = ["abc","123","abc","123","def","456"] meta = { pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } } @@ -472,6 +483,7 @@ services { tags = ["abc","123","abc","123","def","456"] meta = { pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } proxy { @@ -523,6 +535,7 @@ services { name = "abc" version = "2" pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } } @@ -536,6 +549,7 @@ services { name = "abc" version = "2" pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } proxy { @@ -568,6 +582,7 @@ services { }, ` meta = { pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } `, "", @@ -581,6 +596,7 @@ services { }, ` meta = { pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } `, "", @@ -691,6 +707,7 @@ services { namespace = "default" meta = { pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } } @@ -703,6 +720,7 @@ services { namespace = "default" meta = { pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } proxy { @@ -765,6 +783,7 @@ services { namespace = "non-default" meta = { pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } } @@ -777,6 +796,7 @@ services { namespace = "non-default" meta = { pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } proxy { @@ -840,6 +860,7 @@ services { namespace = "non-default" meta = { pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } } @@ -852,6 +873,7 @@ services { namespace = "non-default" meta = { pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } proxy { @@ -924,6 +946,7 @@ services { namespace = "k8snamespace" meta = { pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } } @@ -936,6 +959,7 @@ services { namespace = "k8snamespace" meta = { pod-name = "${POD_NAME}" + k8s-namespace = "${POD_NAMESPACE}" } proxy { diff --git a/subcommand/inject-connect/command.go b/subcommand/inject-connect/command.go index 6b34be89dd..bce770c49e 100644 --- a/subcommand/inject-connect/command.go +++ b/subcommand/inject-connect/command.go @@ -64,6 +64,10 @@ type Command struct { flagEnableHealthChecks bool // Start the health check controller. flagHealthChecksReconcilePeriod time.Duration // Period for health check reconcile. + // Flags for cleanup controller. + flagEnableCleanupController bool // Start the cleanup controller. + flagCleanupControllerReconcilePeriod time.Duration // Period for cleanup controller reconcile. + // Proxy resource settings. flagDefaultSidecarProxyCPULimit string flagDefaultSidecarProxyCPURequest string @@ -128,7 +132,10 @@ func (c *Command) init() { "K8s namespaces to explicitly deny. Takes precedence over allow. May be specified multiple times.") c.flagSet.BoolVar(&c.flagEnableHealthChecks, "enable-health-checks-controller", false, "Enables health checks controller.") + c.flagSet.BoolVar(&c.flagEnableCleanupController, "enable-cleanup-controller", true, + "Enables cleanup controller that cleans up stale Consul service instances.") c.flagSet.DurationVar(&c.flagHealthChecksReconcilePeriod, "health-checks-reconcile-period", 1*time.Minute, "Reconcile period for health checks controller.") + c.flagSet.DurationVar(&c.flagCleanupControllerReconcilePeriod, "cleanup-controller-reconcile-period", 5*time.Minute, "Reconcile period for cleanup controller.") c.flagSet.BoolVar(&c.flagEnableNamespaces, "enable-namespaces", false, "[Enterprise Only] Enables namespaces, in either a single Consul namespace or mirrored.") c.flagSet.StringVar(&c.flagConsulDestinationNamespace, "consul-destination-namespace", "default", @@ -384,8 +391,32 @@ func (c *Command) Run(args []string) int { } }() - // Start the health checks controller. + // Start the cleanup controller that cleans up Consul service instances + // still registered after the pod has been deleted (usually due to a force delete). ctrlExitCh := make(chan error) + + if c.flagEnableCleanupController { + cleanupResource := connectinject.CleanupResource{ + Log: logger.Named("cleanupResource"), + KubernetesClient: c.clientset, + Ctx: ctx, + ReconcilePeriod: c.flagHealthChecksReconcilePeriod, + ConsulClient: c.consulClient, + ConsulScheme: consulURL.Scheme, + ConsulPort: consulURL.Port(), + } + cleanupCtrl := &controller.Controller{ + Log: logger.Named("cleanupController"), + Resource: &cleanupResource, + } + go func() { + cleanupCtrl.Run(ctx.Done()) + if ctx.Err() == nil { + ctrlExitCh <- fmt.Errorf("cleanup controller exited unexpectedly") + } + }() + } + if c.flagEnableHealthChecks { healthResource := connectinject.HealthCheckResource{ Log: logger.Named("healthCheckResource"), diff --git a/subcommand/inject-connect/command_test.go b/subcommand/inject-connect/command_test.go index d466e0bb79..040e959574 100644 --- a/subcommand/inject-connect/command_test.go +++ b/subcommand/inject-connect/command_test.go @@ -227,13 +227,12 @@ func TestRun_CommandFailsWithInvalidListener(t *testing.T) { UI: ui, clientset: k8sClient, } - os.Setenv(api.HTTPAddrEnvName, "http://0.0.0.0:9999") code := cmd.Run([]string{ "-consul-k8s-image", "hashicorp/consul-k8s", "-consul-image", "foo", "-envoy-image", "envoy:1.16.0", "-enable-health-checks-controller=true", + "-http-addr=http://0.0.0.0:9999", "-listen", "999999", }) - os.Unsetenv(api.HTTPAddrEnvName) require.Equal(t, 1, code) require.Contains(t, ui.ErrorWriter.String(), "Error listening: listen tcp: address 999999: missing port in address") }