diff --git a/CHANGELOG.md b/CHANGELOG.md index 11988f4b27..a8a1f26d65 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,9 @@ BREAKING CHANGES: IMPROVEMENTS: * CLI * Pre-check in the `install` command to verify the correct license secret exists when using an enterprise Consul image. [[GH-875](https://github.com/hashicorp/consul-k8s/pull/875)] +* Control Plane * Add a label "managed-by" to every secret the control-plane creates. Only delete said secrets on an uninstall. [[GH-835](https://github.com/hashicorp/consul-k8s/pull/835)] + * Add support for labeling a Kubernetes service with `consul.hashicorp.com/service-ignore` to prevent services from being registered in Consul. [[GH-858](https://github.com/hashicorp/consul-k8s/pull/858)] ## 0.37.0 (November 18, 2021) diff --git a/control-plane/connect-inject/annotations.go b/control-plane/connect-inject/annotations.go index ccc9ab6341..c8bf650e08 100644 --- a/control-plane/connect-inject/annotations.go +++ b/control-plane/connect-inject/annotations.go @@ -122,6 +122,10 @@ const ( // webhook/handler. annotationOriginalPod = "consul.hashicorp.com/original-pod" + // labelServiceIgnore is a label that can be added to a service to prevent it from being + // registered with Consul. + labelServiceIgnore = "consul.hashicorp.com/service-ignore" + // injected is used as the annotation value for annotationInjected. injected = "injected" diff --git a/control-plane/connect-inject/endpoints_controller.go b/control-plane/connect-inject/endpoints_controller.go index 4e6a2b73ba..78b1748d4b 100644 --- a/control-plane/connect-inject/endpoints_controller.go +++ b/control-plane/connect-inject/endpoints_controller.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "regexp" + "strconv" "strings" mapset "github.com/deckarep/golang-set" @@ -117,10 +118,13 @@ type EndpointsController struct { context.Context } +// Reconcile reads the state of an Endpoints object for a Kubernetes Service and reconciles Consul services which +// correspond to the Kubernetes Service. These events are driven by changes to the Pods backing the Kube service. func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { var errs error var serviceEndpoints corev1.Endpoints + // Ignore the request if the namespace of the endpoint is not allowed. if shouldIgnore(req.Namespace, r.DenyK8sNamespacesSet, r.AllowK8sNamespacesSet) { return ctrl.Result{}, nil } @@ -137,15 +141,22 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) ( if k8serrors.IsNotFound(err) { // Deregister all instances in Consul for this service. The function deregisterServiceOnAllAgents handles // the case where the Consul service name is different from the Kubernetes service name. - if err = r.deregisterServiceOnAllAgents(ctx, req.Name, req.Namespace, nil, endpointPods); err != nil { - return ctrl.Result{}, err - } - return ctrl.Result{}, nil + err = r.deregisterServiceOnAllAgents(ctx, req.Name, req.Namespace, nil, endpointPods) + return ctrl.Result{}, err } else if err != nil { r.Log.Error(err, "failed to get Endpoints", "name", req.Name, "ns", req.Namespace) return ctrl.Result{}, err } + // If the endpoints object has the label "consul.hashicorp.com/service-ignore" set to true, deregister all instances in Consul for this service. + // It is possible that the endpoints object has never been registered, in which case deregistration is a no-op. + if isLabeledIgnore(serviceEndpoints.Labels) { + // We always deregister the service to handle the case where a user has registered the service, then added the label later. + r.Log.Info("Ignoring endpoint labeled with `consul.hashicorp.com/service-ignore: \"true\"`", "name", req.Name, "namespace", req.Namespace) + err = r.deregisterServiceOnAllAgents(ctx, req.Name, req.Namespace, nil, endpointPods) + return ctrl.Result{}, err + } + r.Log.Info("retrieved", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) // endpointAddressMap stores every IP that corresponds to a Pod in the Endpoints object. It is used to compare @@ -154,17 +165,7 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) ( // Register all addresses of this Endpoints object as service instances in Consul. for _, subset := range serviceEndpoints.Subsets { - // Combine all addresses to a map, with a value indicating whether an address is ready or not. - allAddresses := make(map[corev1.EndpointAddress]string) - for _, readyAddress := range subset.Addresses { - allAddresses[readyAddress] = api.HealthPassing - } - - for _, notReadyAddress := range subset.NotReadyAddresses { - allAddresses[notReadyAddress] = api.HealthCritical - } - - for address, healthStatus := range allAddresses { + for address, healthStatus := range mapAddresses(subset) { if address.TargetRef != nil && address.TargetRef.Kind == "Pod" { endpointPods.Add(address.TargetRef.Name) if err := r.registerServicesAndHealthCheck(ctx, serviceEndpoints, address, healthStatus, endpointAddressMap); err != nil { @@ -200,7 +201,7 @@ func (r *EndpointsController) SetupWithManager(mgr ctrl.Manager) error { ).Complete(r) } -// registerServicesAndHealthCheck creates Consul registrations for the service and proxy and register them with Consul. +// registerServicesAndHealthCheck creates Consul registrations for the service and proxy and registers them with Consul. // It also upserts a Kubernetes health check for the service based on whether the endpoint address is ready. func (r *EndpointsController) registerServicesAndHealthCheck(ctx context.Context, serviceEndpoints corev1.Endpoints, address corev1.EndpointAddress, healthStatus string, endpointAddressMap map[string]bool) error { // Get pod associated with this address. @@ -726,6 +727,7 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context, } } } + return nil } @@ -1004,10 +1006,31 @@ func (r *EndpointsController) consulNamespace(namespace string) string { // hasBeenInjected checks the value of the status annotation and returns true if the Pod has been injected. func hasBeenInjected(pod corev1.Pod) bool { - if anno, ok := pod.Annotations[keyInjectStatus]; ok { - if anno == injected { - return true - } + if anno, ok := pod.Annotations[keyInjectStatus]; ok && anno == injected { + return true } return false } + +// mapAddresses combines all addresses to a mapping of address to its health status. +func mapAddresses(addresses corev1.EndpointSubset) map[corev1.EndpointAddress]string { + m := make(map[corev1.EndpointAddress]string) + for _, readyAddress := range addresses.Addresses { + m[readyAddress] = api.HealthPassing + } + + for _, notReadyAddress := range addresses.NotReadyAddresses { + m[notReadyAddress] = api.HealthCritical + } + + return m +} + +// isLabeledIgnore checks the value of the label `consul.hashicorp.com/service-ignore` and returns true if the +// label exists and is "truthy". Otherwise, it returns false. +func isLabeledIgnore(labels map[string]string) bool { + value, labelExists := labels[labelServiceIgnore] + shouldIgnore, err := strconv.ParseBool(value) + + return shouldIgnore && labelExists && err == nil +} diff --git a/control-plane/connect-inject/endpoints_controller_test.go b/control-plane/connect-inject/endpoints_controller_test.go index 5f1a0dadd8..de021ec198 100644 --- a/control-plane/connect-inject/endpoints_controller_test.go +++ b/control-plane/connect-inject/endpoints_controller_test.go @@ -2717,6 +2717,149 @@ func TestReconcileDeleteEndpoint(t *testing.T) { } } +// TestReconcileIgnoresServiceIgnoreLabel tests that the endpoints controller correctly ignores services +// with the service-ignore label and deregisters services previously registered if the service-ignore +// label is added. +func TestReconcileIgnoresServiceIgnoreLabel(t *testing.T) { + t.Parallel() + nodeName := "test-node" + serviceName := "service-ignored" + namespace := "default" + + cases := map[string]struct { + svcInitiallyRegistered bool + serviceLabels map[string]string + expectedNumSvcInstances int + }{ + "Registered endpoint with label is deregistered.": { + svcInitiallyRegistered: true, + serviceLabels: map[string]string{ + labelServiceIgnore: "true", + }, + expectedNumSvcInstances: 0, + }, + "Not registered endpoint with label is never registered": { + svcInitiallyRegistered: false, + serviceLabels: map[string]string{ + labelServiceIgnore: "true", + }, + expectedNumSvcInstances: 0, + }, + "Registered endpoint without label is unaffected": { + svcInitiallyRegistered: true, + serviceLabels: map[string]string{}, + expectedNumSvcInstances: 1, + }, + "Not registered endpoint without label is registered": { + svcInitiallyRegistered: false, + serviceLabels: map[string]string{}, + expectedNumSvcInstances: 1, + }, + } + + for name, tt := range cases { + t.Run(name, func(t *testing.T) { + // Set up the fake Kubernetes client with an endpoint, pod, consul client, and the default namespace. + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: serviceName, + Namespace: namespace, + Labels: tt.serviceLabels, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: namespace, + }, + }, + }, + }, + }, + } + pod1 := createPod("pod1", "1.2.3.4", true, true) + fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false, true) + fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"} + ns := corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}} + k8sObjects := []runtime.Object{endpoint, pod1, fakeClientPod, &ns} + fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build() + + // Create test Consul server. + consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { c.NodeName = nodeName }) + require.NoError(t, err) + defer consul.Stop() + consul.WaitForServiceIntentions(t) + cfg := &api.Config{Address: consul.HTTPAddr} + consulClient, err := api.NewClient(cfg) + require.NoError(t, err) + addr := strings.Split(consul.HTTPAddr, ":") + consulPort := addr[1] + + // Set up the initial Consul services. + if tt.svcInitiallyRegistered { + err = consulClient.Agent().ServiceRegister(&api.AgentServiceRegistration{ + ID: "pod1-" + serviceName, + Name: serviceName, + Port: 0, + Address: "1.2.3.4", + Meta: map[string]string{ + "k8s-namespace": namespace, + "k8s-service-name": serviceName, + "managed-by": "consul-k8s-endpoints-controller", + "pod-name": "pod1", + }, + }) + require.NoError(t, err) + err = consulClient.Agent().ServiceRegister(&api.AgentServiceRegistration{ + ID: "pod1-sidecar-proxy-" + serviceName, + Name: serviceName + "-sidecar-proxy", + Port: 0, + Meta: map[string]string{ + "k8s-namespace": namespace, + "k8s-service-name": serviceName, + "managed-by": "consul-k8s-endpoints-controller", + "pod-name": "pod1", + }, + }) + require.NoError(t, err) + } + + // Create the endpoints controller. + ep := &EndpointsController{ + Client: fakeClient, + Log: logrtest.TestLogger{T: t}, + ConsulClient: consulClient, + ConsulPort: consulPort, + ConsulScheme: "http", + AllowK8sNamespacesSet: mapset.NewSetWith("*"), + DenyK8sNamespacesSet: mapset.NewSetWith(), + ReleaseName: "consul", + ReleaseNamespace: namespace, + ConsulClientCfg: cfg, + } + + // Run the reconcile process to deregister the service if it was registered before. + namespacedName := types.NamespacedName{Namespace: namespace, Name: serviceName} + resp, err := ep.Reconcile(context.Background(), ctrl.Request{NamespacedName: namespacedName}) + require.NoError(t, err) + require.False(t, resp.Requeue) + + // Check that the correct number of services are registered with Consul. + serviceInstances, _, err := consulClient.Catalog().Service(serviceName, "", nil) + require.NoError(t, err) + require.Len(t, serviceInstances, tt.expectedNumSvcInstances) + proxyServiceInstances, _, err := consulClient.Catalog().Service(serviceName+"-sidecar-proxy", "", nil) + require.NoError(t, err) + require.Len(t, proxyServiceInstances, tt.expectedNumSvcInstances) + }) + } +} + func TestFilterAgentPods(t *testing.T) { t.Parallel() cases := map[string]struct { @@ -4761,6 +4904,74 @@ func TestGetTokenMetaFromDescription(t *testing.T) { } } +func TestMapAddresses(t *testing.T) { + t.Parallel() + cases := map[string]struct { + addresses corev1.EndpointSubset + expected map[corev1.EndpointAddress]string + }{ + "ready and not ready addresses": { + addresses: corev1.EndpointSubset{ + Addresses: []corev1.EndpointAddress{ + {Hostname: "host1"}, + {Hostname: "host2"}, + }, + NotReadyAddresses: []corev1.EndpointAddress{ + {Hostname: "host3"}, + {Hostname: "host4"}, + }, + }, + expected: map[corev1.EndpointAddress]string{ + {Hostname: "host1"}: api.HealthPassing, + {Hostname: "host2"}: api.HealthPassing, + {Hostname: "host3"}: api.HealthCritical, + {Hostname: "host4"}: api.HealthCritical, + }, + }, + "ready addresses only": { + addresses: corev1.EndpointSubset{ + Addresses: []corev1.EndpointAddress{ + {Hostname: "host1"}, + {Hostname: "host2"}, + {Hostname: "host3"}, + {Hostname: "host4"}, + }, + NotReadyAddresses: []corev1.EndpointAddress{}, + }, + expected: map[corev1.EndpointAddress]string{ + {Hostname: "host1"}: api.HealthPassing, + {Hostname: "host2"}: api.HealthPassing, + {Hostname: "host3"}: api.HealthPassing, + {Hostname: "host4"}: api.HealthPassing, + }, + }, + "not ready addresses only": { + addresses: corev1.EndpointSubset{ + Addresses: []corev1.EndpointAddress{}, + NotReadyAddresses: []corev1.EndpointAddress{ + {Hostname: "host1"}, + {Hostname: "host2"}, + {Hostname: "host3"}, + {Hostname: "host4"}, + }, + }, + expected: map[corev1.EndpointAddress]string{ + {Hostname: "host1"}: api.HealthCritical, + {Hostname: "host2"}: api.HealthCritical, + {Hostname: "host3"}: api.HealthCritical, + {Hostname: "host4"}: api.HealthCritical, + }, + }, + } + + for name, c := range cases { + t.Run(name, func(t *testing.T) { + actual := mapAddresses(c.addresses) + require.Equal(t, c.expected, actual) + }) + } +} + func createPod(name, ip string, inject bool, managedByEndpointsController bool) *corev1.Pod { pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ diff --git a/control-plane/subcommand/connect-init/command.go b/control-plane/subcommand/connect-init/command.go index e7487fb9e1..f035fcfc41 100644 --- a/control-plane/subcommand/connect-init/command.go +++ b/control-plane/subcommand/connect-init/command.go @@ -179,6 +179,12 @@ func (c *Command) Run(args []string) int { c.logger.Info("Check to ensure a Kubernetes service has been created for this application." + " If your pod is not starting also check the connect-inject deployment logs.") } + if len(serviceList) > 2 { + c.logger.Error("There are multiple Consul services registered for this pod when there must only be one." + + " Check if there are multiple Kubernetes services selecting this pod and add the label" + + " `consul.hashicorp.com/service-ignore: \"true\"` to all services except the one used by Consul for handling requests.") + } + return fmt.Errorf("did not find correct number of services: %d", len(serviceList)) } for _, svc := range serviceList {