Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore services in endpoint controller using consul.hashicorp.com/service-ignore #858

Merged
merged 37 commits into from
Dec 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
42c9971
connect-ignore deregisters or ignores service endpoint
Nov 11, 2021
b8501ca
Add some comments as a clean up
Nov 11, 2021
df488f1
Refactor deregisterServiceOnAllAgents to return the same as Reconcile
Nov 11, 2021
38bf002
Add a warning in connect-init about running multiple services which p…
Nov 12, 2021
a6a1449
Add a line break before the multiservice warning
Nov 12, 2021
a422942
Move labelConnectIgnore to annotations
Nov 12, 2021
bbcd557
Inline the check on hasBeenInjected
Nov 12, 2021
c868850
Refactor address mapping to a function
Nov 12, 2021
9c9ea9d
Test mapAddresses
Nov 12, 2021
1918b48
connect-ignore -> service-ignore
Nov 12, 2021
551967c
Clarify Reconcile comment
Nov 12, 2021
766e870
Add CHANGELOG entry
Nov 15, 2021
0ffc49b
Stub out test
Nov 12, 2021
1591d56
Stub out isRegistered
Nov 12, 2021
9a99f92
Just a baby grammar fix
Nov 16, 2021
72a3318
Undo deregister refactor
Nov 17, 2021
9a7eb00
Pull the deregistration back into the normal path
Nov 17, 2021
016aea4
Extend test cases for mapAddresses
Nov 17, 2021
239ca75
Move the stubbed out test down
Nov 17, 2021
a1cf92e
Add comment to deregistration process
Nov 18, 2021
e1ceeb9
Fix connect ignore to service ignore
Nov 19, 2021
04f72a2
Test service-ignore
Nov 19, 2021
e066d11
Delete isRegistered
Nov 19, 2021
77a36ef
Update control-plane/connect-inject/endpoints_controller.go
Nov 23, 2021
40af11b
Update control-plane/connect-inject/endpoints_controller.go
Nov 23, 2021
5717f4c
Update control-plane/subcommand/connect-init/command.go
Nov 23, 2021
3118bb7
Update CHANGELOG.md
Nov 29, 2021
01917b1
Use strconv.ParseBool on label value
Nov 29, 2021
6e8fdcb
Separate error for multiple Consul services registered.
Nov 29, 2021
588ceed
Update control-plane/connect-inject/endpoints_controller_test.go
Nov 29, 2021
9f3ad73
Split out isLabeledIgnore
Nov 30, 2021
a321891
Rework test to not run Reconcile twice
Nov 30, 2021
9abb504
Clean up space on CHANGELOG
Nov 30, 2021
a9d88ef
Add logging when an endpoint is getting ignored
Nov 30, 2021
d558bd7
Fix comment on service ignore test
Dec 1, 2021
db957e9
Add proxy service to test
Dec 1, 2021
d978d20
Update control-plane/subcommand/connect-init/command.go
Dec 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ BREAKING CHANGES:
IMPROVEMENTS:
* CLI
* Pre-check in the `install` command to verify the correct license secret exists when using an enterprise Consul image. [[GH-875](https://github.com/hashicorp/consul-k8s/pull/875)]
* Control Plane
* Add a label "managed-by" to every secret the control-plane creates. Only delete said secrets on an uninstall. [[GH-835](https://github.com/hashicorp/consul-k8s/pull/835)]
* Add support for labeling a Kubernetes service with `consul.hashicorp.com/service-ignore` to prevent services from being registered in Consul. [[GH-858](https://github.com/hashicorp/consul-k8s/pull/858)]

## 0.37.0 (November 18, 2021)

Expand Down
4 changes: 4 additions & 0 deletions control-plane/connect-inject/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ const (
// webhook/handler.
annotationOriginalPod = "consul.hashicorp.com/original-pod"

// labelServiceIgnore is a label that can be added to a service to prevent it from being
// registered with Consul.
labelServiceIgnore = "consul.hashicorp.com/service-ignore"

// injected is used as the annotation value for annotationInjected.
injected = "injected"

Expand Down
63 changes: 43 additions & 20 deletions control-plane/connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net"
"regexp"
"strconv"
"strings"

mapset "github.com/deckarep/golang-set"
Expand Down Expand Up @@ -117,10 +118,13 @@ type EndpointsController struct {
context.Context
}

// Reconcile reads the state of an Endpoints object for a Kubernetes Service and reconciles Consul services which
// correspond to the Kubernetes Service. These events are driven by changes to the Pods backing the Kube service.
func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var errs error
var serviceEndpoints corev1.Endpoints

// Ignore the request if the namespace of the endpoint is not allowed.
if shouldIgnore(req.Namespace, r.DenyK8sNamespacesSet, r.AllowK8sNamespacesSet) {
return ctrl.Result{}, nil
}
Expand All @@ -137,15 +141,22 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (
if k8serrors.IsNotFound(err) {
// Deregister all instances in Consul for this service. The function deregisterServiceOnAllAgents handles
// the case where the Consul service name is different from the Kubernetes service name.
if err = r.deregisterServiceOnAllAgents(ctx, req.Name, req.Namespace, nil, endpointPods); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
err = r.deregisterServiceOnAllAgents(ctx, req.Name, req.Namespace, nil, endpointPods)
return ctrl.Result{}, err
} else if err != nil {
r.Log.Error(err, "failed to get Endpoints", "name", req.Name, "ns", req.Namespace)
return ctrl.Result{}, err
}

// If the endpoints object has the label "consul.hashicorp.com/service-ignore" set to true, deregister all instances in Consul for this service.
// It is possible that the endpoints object has never been registered, in which case deregistration is a no-op.
if isLabeledIgnore(serviceEndpoints.Labels) {
// We always deregister the service to handle the case where a user has registered the service, then added the label later.
r.Log.Info("Ignoring endpoint labeled with `consul.hashicorp.com/service-ignore: \"true\"`", "name", req.Name, "namespace", req.Namespace)
err = r.deregisterServiceOnAllAgents(ctx, req.Name, req.Namespace, nil, endpointPods)
return ctrl.Result{}, err
}

r.Log.Info("retrieved", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)

// endpointAddressMap stores every IP that corresponds to a Pod in the Endpoints object. It is used to compare
Expand All @@ -154,17 +165,7 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (

// Register all addresses of this Endpoints object as service instances in Consul.
for _, subset := range serviceEndpoints.Subsets {
// Combine all addresses to a map, with a value indicating whether an address is ready or not.
allAddresses := make(map[corev1.EndpointAddress]string)
for _, readyAddress := range subset.Addresses {
allAddresses[readyAddress] = api.HealthPassing
}

for _, notReadyAddress := range subset.NotReadyAddresses {
allAddresses[notReadyAddress] = api.HealthCritical
}

for address, healthStatus := range allAddresses {
for address, healthStatus := range mapAddresses(subset) {
if address.TargetRef != nil && address.TargetRef.Kind == "Pod" {
endpointPods.Add(address.TargetRef.Name)
if err := r.registerServicesAndHealthCheck(ctx, serviceEndpoints, address, healthStatus, endpointAddressMap); err != nil {
Expand Down Expand Up @@ -200,7 +201,7 @@ func (r *EndpointsController) SetupWithManager(mgr ctrl.Manager) error {
).Complete(r)
}

// registerServicesAndHealthCheck creates Consul registrations for the service and proxy and register them with Consul.
// registerServicesAndHealthCheck creates Consul registrations for the service and proxy and registers them with Consul.
t-eckert marked this conversation as resolved.
Show resolved Hide resolved
// It also upserts a Kubernetes health check for the service based on whether the endpoint address is ready.
func (r *EndpointsController) registerServicesAndHealthCheck(ctx context.Context, serviceEndpoints corev1.Endpoints, address corev1.EndpointAddress, healthStatus string, endpointAddressMap map[string]bool) error {
// Get pod associated with this address.
Expand Down Expand Up @@ -726,6 +727,7 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context,
}
}
}

return nil
}

Expand Down Expand Up @@ -1004,10 +1006,31 @@ func (r *EndpointsController) consulNamespace(namespace string) string {

// hasBeenInjected checks the value of the status annotation and returns true if the Pod has been injected.
func hasBeenInjected(pod corev1.Pod) bool {
if anno, ok := pod.Annotations[keyInjectStatus]; ok {
if anno == injected {
return true
}
if anno, ok := pod.Annotations[keyInjectStatus]; ok && anno == injected {
return true
}
return false
}

t-eckert marked this conversation as resolved.
Show resolved Hide resolved
// mapAddresses combines all addresses to a mapping of address to its health status.
func mapAddresses(addresses corev1.EndpointSubset) map[corev1.EndpointAddress]string {
m := make(map[corev1.EndpointAddress]string)
for _, readyAddress := range addresses.Addresses {
m[readyAddress] = api.HealthPassing
}

for _, notReadyAddress := range addresses.NotReadyAddresses {
m[notReadyAddress] = api.HealthCritical
}

return m
}

// isLabeledIgnore checks the value of the label `consul.hashicorp.com/service-ignore` and returns true if the
// label exists and is "truthy". Otherwise, it returns false.
func isLabeledIgnore(labels map[string]string) bool {
value, labelExists := labels[labelServiceIgnore]
shouldIgnore, err := strconv.ParseBool(value)

return shouldIgnore && labelExists && err == nil
}
211 changes: 211 additions & 0 deletions control-plane/connect-inject/endpoints_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2717,6 +2717,149 @@ func TestReconcileDeleteEndpoint(t *testing.T) {
}
}

// TestReconcileIgnoresServiceIgnoreLabel tests that the endpoints controller correctly ignores services
// with the service-ignore label and deregisters services previously registered if the service-ignore
// label is added.
func TestReconcileIgnoresServiceIgnoreLabel(t *testing.T) {
t-eckert marked this conversation as resolved.
Show resolved Hide resolved
t.Parallel()
nodeName := "test-node"
serviceName := "service-ignored"
namespace := "default"

cases := map[string]struct {
svcInitiallyRegistered bool
serviceLabels map[string]string
expectedNumSvcInstances int
}{
"Registered endpoint with label is deregistered.": {
svcInitiallyRegistered: true,
serviceLabels: map[string]string{
labelServiceIgnore: "true",
},
expectedNumSvcInstances: 0,
},
"Not registered endpoint with label is never registered": {
svcInitiallyRegistered: false,
serviceLabels: map[string]string{
labelServiceIgnore: "true",
},
expectedNumSvcInstances: 0,
},
"Registered endpoint without label is unaffected": {
svcInitiallyRegistered: true,
serviceLabels: map[string]string{},
expectedNumSvcInstances: 1,
},
"Not registered endpoint without label is registered": {
svcInitiallyRegistered: false,
serviceLabels: map[string]string{},
expectedNumSvcInstances: 1,
},
}

for name, tt := range cases {
t.Run(name, func(t *testing.T) {
// Set up the fake Kubernetes client with an endpoint, pod, consul client, and the default namespace.
endpoint := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: namespace,
Labels: tt.serviceLabels,
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
IP: "1.2.3.4",
NodeName: &nodeName,
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Name: "pod1",
Namespace: namespace,
},
},
},
},
},
}
pod1 := createPod("pod1", "1.2.3.4", true, true)
fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false, true)
fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"}
ns := corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
k8sObjects := []runtime.Object{endpoint, pod1, fakeClientPod, &ns}
fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build()

// Create test Consul server.
consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { c.NodeName = nodeName })
require.NoError(t, err)
defer consul.Stop()
consul.WaitForServiceIntentions(t)
cfg := &api.Config{Address: consul.HTTPAddr}
consulClient, err := api.NewClient(cfg)
require.NoError(t, err)
addr := strings.Split(consul.HTTPAddr, ":")
consulPort := addr[1]

// Set up the initial Consul services.
if tt.svcInitiallyRegistered {
err = consulClient.Agent().ServiceRegister(&api.AgentServiceRegistration{
ID: "pod1-" + serviceName,
Name: serviceName,
Port: 0,
Address: "1.2.3.4",
Meta: map[string]string{
"k8s-namespace": namespace,
"k8s-service-name": serviceName,
"managed-by": "consul-k8s-endpoints-controller",
"pod-name": "pod1",
},
})
t-eckert marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
err = consulClient.Agent().ServiceRegister(&api.AgentServiceRegistration{
ID: "pod1-sidecar-proxy-" + serviceName,
Name: serviceName + "-sidecar-proxy",
Port: 0,
Meta: map[string]string{
"k8s-namespace": namespace,
"k8s-service-name": serviceName,
"managed-by": "consul-k8s-endpoints-controller",
"pod-name": "pod1",
},
})
require.NoError(t, err)
}

// Create the endpoints controller.
ep := &EndpointsController{
Client: fakeClient,
Log: logrtest.TestLogger{T: t},
ConsulClient: consulClient,
ConsulPort: consulPort,
ConsulScheme: "http",
AllowK8sNamespacesSet: mapset.NewSetWith("*"),
DenyK8sNamespacesSet: mapset.NewSetWith(),
ReleaseName: "consul",
ReleaseNamespace: namespace,
ConsulClientCfg: cfg,
}

// Run the reconcile process to deregister the service if it was registered before.
namespacedName := types.NamespacedName{Namespace: namespace, Name: serviceName}
resp, err := ep.Reconcile(context.Background(), ctrl.Request{NamespacedName: namespacedName})
require.NoError(t, err)
require.False(t, resp.Requeue)

// Check that the correct number of services are registered with Consul.
serviceInstances, _, err := consulClient.Catalog().Service(serviceName, "", nil)
require.NoError(t, err)
require.Len(t, serviceInstances, tt.expectedNumSvcInstances)
proxyServiceInstances, _, err := consulClient.Catalog().Service(serviceName+"-sidecar-proxy", "", nil)
require.NoError(t, err)
require.Len(t, proxyServiceInstances, tt.expectedNumSvcInstances)
})
}
t-eckert marked this conversation as resolved.
Show resolved Hide resolved
}

func TestFilterAgentPods(t *testing.T) {
t.Parallel()
cases := map[string]struct {
Expand Down Expand Up @@ -4761,6 +4904,74 @@ func TestGetTokenMetaFromDescription(t *testing.T) {
}
}

func TestMapAddresses(t *testing.T) {
t.Parallel()
cases := map[string]struct {
addresses corev1.EndpointSubset
expected map[corev1.EndpointAddress]string
}{
"ready and not ready addresses": {
addresses: corev1.EndpointSubset{
Addresses: []corev1.EndpointAddress{
{Hostname: "host1"},
{Hostname: "host2"},
},
NotReadyAddresses: []corev1.EndpointAddress{
{Hostname: "host3"},
{Hostname: "host4"},
},
},
expected: map[corev1.EndpointAddress]string{
{Hostname: "host1"}: api.HealthPassing,
{Hostname: "host2"}: api.HealthPassing,
{Hostname: "host3"}: api.HealthCritical,
{Hostname: "host4"}: api.HealthCritical,
},
},
"ready addresses only": {
addresses: corev1.EndpointSubset{
Addresses: []corev1.EndpointAddress{
{Hostname: "host1"},
{Hostname: "host2"},
{Hostname: "host3"},
{Hostname: "host4"},
},
NotReadyAddresses: []corev1.EndpointAddress{},
},
expected: map[corev1.EndpointAddress]string{
{Hostname: "host1"}: api.HealthPassing,
{Hostname: "host2"}: api.HealthPassing,
{Hostname: "host3"}: api.HealthPassing,
{Hostname: "host4"}: api.HealthPassing,
},
},
"not ready addresses only": {
addresses: corev1.EndpointSubset{
Addresses: []corev1.EndpointAddress{},
NotReadyAddresses: []corev1.EndpointAddress{
{Hostname: "host1"},
{Hostname: "host2"},
{Hostname: "host3"},
{Hostname: "host4"},
},
},
expected: map[corev1.EndpointAddress]string{
{Hostname: "host1"}: api.HealthCritical,
{Hostname: "host2"}: api.HealthCritical,
{Hostname: "host3"}: api.HealthCritical,
{Hostname: "host4"}: api.HealthCritical,
},
},
}

for name, c := range cases {
t.Run(name, func(t *testing.T) {
actual := mapAddresses(c.addresses)
require.Equal(t, c.expected, actual)
})
}
t-eckert marked this conversation as resolved.
Show resolved Hide resolved
}

func createPod(name, ip string, inject bool, managedByEndpointsController bool) *corev1.Pod {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down
6 changes: 6 additions & 0 deletions control-plane/subcommand/connect-init/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ func (c *Command) Run(args []string) int {
c.logger.Info("Check to ensure a Kubernetes service has been created for this application." +
" If your pod is not starting also check the connect-inject deployment logs.")
}
if len(serviceList) > 2 {
c.logger.Error("There are multiple Consul services registered for this pod when there must only be one." +
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@t-eckert I am running multiple stateful sets in kubernets (kafka, zookeeper, elasticsearch, redis) and all of those have two services normal + headless but I dont get this error? shouldnt this fail the pod scheduling?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yes it should be failing scheduling if both the headless and non-headless services select the same pods. Maybe open up an issue about this.

" Check if there are multiple Kubernetes services selecting this pod and add the label" +
" `consul.hashicorp.com/service-ignore: \"true\"` to all services except the one used by Consul for handling requests.")
}

return fmt.Errorf("did not find correct number of services: %d", len(serviceList))
}
for _, svc := range serviceList {
Expand Down