From b6c03735c9fdc9476ce592d9b48bceab8d13a665 Mon Sep 17 00:00:00 2001 From: Iryna Shustava Date: Tue, 4 May 2021 13:00:10 -0700 Subject: [PATCH] Don't support syncing multiple addresses when k8s service has multiple ports (#511) Previously, we set a tagged address for each Kubernetes service port, using the "virtual-" prefix for the tagged address key. For example, if a service port is named "tcp", then we synced it to Consul with the tagged address key "virtual-tcp". However, since Consul doesn't really support multiple ports on a service yet, this is unnecessary. So instead, we will find the service's target port that is equal to the service port we are registering with Consul and use that as the Port for the tagged address. Otherwise, if we can't find the target port, we will set the tagged address's port to 0 (it will still have the cluster IP set). --- CHANGELOG.md | 2 + connect-inject/endpoints_controller.go | 59 +++++++--- connect-inject/endpoints_controller_test.go | 114 ++++++++++++++++---- 3 files changed, 140 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f00e0dd90a..a04bbf78e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ IMPROVEMENTS: using this CRD but via annotations. [[GH-502](https://github.com/hashicorp/consul-k8s/pull/502)], [[GH-485](https://github.com/hashicorp/consul-k8s/pull/485)] * CRDs: Update ProxyDefaults with Mode and TransparentProxy fields. Note: Mode and TransparentProxy should not be set using the CRD but via annotations. [[GH-505](https://github.com/hashicorp/consul-k8s/pull/505)], [[GH-485](https://github.com/hashicorp/consul-k8s/pull/485)] +* Connect: No longer set multiple tagged addresses in Consul when k8s service has multiple ports and Transparent Proxy is enabled. + [[GH-511](https://github.com/hashicorp/consul-k8s/pull/511)] * Connect: Allow exclusion of inbound ports, outbound ports and CIDRs, and additional user IDs when Transparent Proxy is enabled. [[GH-506](https://github.com/hashicorp/consul-k8s/pull/506)] diff --git a/connect-inject/endpoints_controller.go b/connect-inject/endpoints_controller.go index e14b3094b9..13c87e825d 100644 --- a/connect-inject/endpoints_controller.go +++ b/connect-inject/endpoints_controller.go @@ -17,6 +17,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -32,6 +33,10 @@ const ( MetaKeyKubeNS = "k8s-namespace" kubernetesSuccessReasonMsg = "Kubernetes health checks passing" envoyPrometheusBindAddr = "envoy_prometheus_bind_addr" + + // clusterIPTaggedAddressName is the key for the tagged address to store the service's cluster IP and service port + // in Consul. Note: This value should not be changed without a corresponding change in Consul. + // TODO: change this to a constant shared with Consul to avoid accidentally changing this. clusterIPTaggedAddressName = "virtual" ) @@ -215,13 +220,14 @@ func (r *EndpointsController) SetupWithManager(mgr ctrl.Manager) error { func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) (*api.AgentServiceRegistration, *api.AgentServiceRegistration, error) { // If a port is specified, then we determine the value of that port // and register that port for the host service. - var servicePort int + // The handler will always set the port annotation if one is not provided on the pod. + var consulServicePort int if raw, ok := pod.Annotations[annotationPort]; ok && raw != "" { if port, err := portValue(pod, raw); port > 0 { if err != nil { return nil, nil, err } - servicePort = int(port) + consulServicePort = int(port) } } @@ -259,7 +265,7 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service service := &api.AgentServiceRegistration{ ID: serviceID, Name: serviceName, - Port: servicePort, + Port: consulServicePort, Address: pod.Status.PodIP, Meta: meta, Namespace: r.consulNamespace(pod.Namespace), @@ -302,9 +308,9 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service proxyConfig.Config[envoyPrometheusBindAddr] = prometheusScrapeListener } - if servicePort > 0 { + if consulServicePort > 0 { proxyConfig.LocalServiceAddress = "127.0.0.1" - proxyConfig.LocalServicePort = servicePort + proxyConfig.LocalServicePort = consulServicePort } upstreams, err := r.processUpstreams(pod) @@ -356,18 +362,45 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service parsedIP := net.ParseIP(k8sService.Spec.ClusterIP) if parsedIP != nil { taggedAddresses := make(map[string]api.ServiceAddress) - for _, servicePort := range k8sService.Spec.Ports { - taggedAddressKey := clusterIPTaggedAddressName - if servicePort.Name != "" { - taggedAddressKey = fmt.Sprintf("%s-%s", clusterIPTaggedAddressName, servicePort.Name) - } - taggedAddresses[taggedAddressKey] = api.ServiceAddress{ - Address: k8sService.Spec.ClusterIP, - Port: int(servicePort.Port), + // When a service has multiple ports, we need to choose the port that is registered with Consul + // and only set that port as the tagged address because Consul currently does not support multiple port + // on a single service. + var k8sServicePort int32 + for _, sp := range k8sService.Spec.Ports { + // If target port is a name, then we need to find the port value from the pod. + if sp.TargetPort.Type == intstr.String { + targetPortValue, err := portValue(pod, sp.TargetPort.StrVal) + if err != nil { + return nil, nil, err + } + + // If the targetPortValue is the consulServicePort, then this is the service port we'll use as the tagged address. + if targetPortValue == int32(consulServicePort) { + k8sServicePort = sp.Port + break + } + } else if sp.TargetPort.Type == intstr.Int && sp.TargetPort.IntVal != 0 { + // If the target port is a non-zero int, we can compare that port directly with the Consul service port. + if sp.TargetPort.IntVal == int32(consulServicePort) { + k8sServicePort = sp.Port + break + } + } else { + // If targetPort is not specified, then the service port is used as the target port, + // and we can compare the service port with the Consul service port. + if sp.Port == int32(consulServicePort) { + k8sServicePort = sp.Port + break + } } } + taggedAddresses[clusterIPTaggedAddressName] = api.ServiceAddress{ + Address: k8sService.Spec.ClusterIP, + Port: int(k8sServicePort), + } + service.TaggedAddresses = taggedAddresses proxyService.TaggedAddresses = taggedAddresses diff --git a/connect-inject/endpoints_controller_test.go b/connect-inject/endpoints_controller_test.go index 104033c4f8..d739701cee 100644 --- a/connect-inject/endpoints_controller_test.go +++ b/connect-inject/endpoints_controller_test.go @@ -19,6 +19,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -2590,7 +2591,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t * ClusterIP: "10.0.0.1", Ports: []corev1.ServicePort{ { - Port: 80, + Port: 8081, }, }, }, @@ -2599,7 +2600,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t * expTaggedAddresses: map[string]api.ServiceAddress{ "virtual": { Address: "10.0.0.1", - Port: 80, + Port: 8081, }, }, expErr: "", @@ -2637,7 +2638,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t * ClusterIP: "10.0.0.1", Ports: []corev1.ServicePort{ { - Port: 80, + Port: 8081, }, }, }, @@ -2646,7 +2647,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t * expTaggedAddresses: map[string]api.ServiceAddress{ "virtual": { Address: "10.0.0.1", - Port: 80, + Port: 8081, }, }, expErr: "", @@ -2705,7 +2706,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t * ClusterIP: "10.0.0.1", Ports: []corev1.ServicePort{ { - Port: 80, + Port: 8081, }, }, }, @@ -2714,7 +2715,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t * expTaggedAddresses: map[string]api.ServiceAddress{ "virtual": { Address: "10.0.0.1", - Port: 80, + Port: 8081, }, }, expErr: "", @@ -2729,7 +2730,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t * proxyMode: api.ProxyModeDefault, expErr: "services \"test-service\" not found", }, - "service with a single port without a name": { + "service with a single port without a target port": { globalEnabled: true, service: &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -2740,7 +2741,33 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t * ClusterIP: "10.0.0.1", Ports: []corev1.ServicePort{ { - Port: 80, + Port: 8081, + }, + }, + }, + }, + proxyMode: api.ProxyModeTransparent, + expTaggedAddresses: map[string]api.ServiceAddress{ + "virtual": { + Address: "10.0.0.1", + Port: 8081, + }, + }, + expErr: "", + }, + "service with a single port and a target port that is a port name": { + globalEnabled: true, + service: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: serviceName, + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.0.0.1", + Ports: []corev1.ServicePort{ + { + Port: 80, + TargetPort: intstr.Parse("tcp"), }, }, }, @@ -2754,7 +2781,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t * }, expErr: "", }, - "service with a single port with a name": { + "service with a single port and a target port that is a int": { globalEnabled: true, service: &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -2765,15 +2792,15 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t * ClusterIP: "10.0.0.1", Ports: []corev1.ServicePort{ { - Name: "tcp", - Port: 80, + Port: 80, + TargetPort: intstr.FromInt(8081), }, }, }, }, proxyMode: api.ProxyModeTransparent, expTaggedAddresses: map[string]api.ServiceAddress{ - "virtual-tcp": { + "virtual": { Address: "10.0.0.1", Port: 80, }, @@ -2791,25 +2818,52 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t * ClusterIP: "10.0.0.1", Ports: []corev1.ServicePort{ { - Name: "tcp", - Port: 80, + Name: "tcp", + Port: 80, + TargetPort: intstr.FromString("tcp"), }, { - Name: "http", - Port: 8080, + Name: "http", + Port: 81, + TargetPort: intstr.FromString("http"), }, }, }, }, proxyMode: api.ProxyModeTransparent, expTaggedAddresses: map[string]api.ServiceAddress{ - "virtual-tcp": { + "virtual": { Address: "10.0.0.1", Port: 80, }, - "virtual-http": { + }, + expErr: "", + }, + // When target port is not equal to the port we're registering with Consul, + // then we want to register the zero-value for the port. This could happen + // for client services that don't have a container port that they're listening on. + "target port is not found": { + globalEnabled: true, + service: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: serviceName, + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.0.0.1", + Ports: []corev1.ServicePort{ + { + Port: 80, + TargetPort: intstr.Parse("http"), + }, + }, + }, + }, + proxyMode: api.ProxyModeTransparent, + expTaggedAddresses: map[string]api.ServiceAddress{ + "virtual": { Address: "10.0.0.1", - Port: 8080, + Port: 0, }, }, expErr: "", @@ -2885,7 +2939,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t * ClusterIP: "2001:db8::68", Ports: []corev1.ServicePort{ { - Port: 80, + Port: 8081, }, }, }, @@ -2894,7 +2948,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t * expTaggedAddresses: map[string]api.ServiceAddress{ "virtual": { Address: "2001:db8::68", - Port: 80, + Port: 8081, }, }, expErr: "", @@ -2903,10 +2957,26 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t * for name, c := range cases { t.Run(name, func(t *testing.T) { - pod := createPod("test-pod-1", "1.2.3.4", false) + pod := createPod("test-pod-1", "1.2.3.4", true) if c.annotationEnabled != nil { pod.Annotations[annotationTransparentProxy] = strconv.FormatBool(*c.annotationEnabled) } + pod.Spec.Containers = []corev1.Container{ + { + Name: "test", + Ports: []corev1.ContainerPort{ + { + Name: "tcp", + ContainerPort: 8081, + }, + { + Name: "http", + ContainerPort: 8080, + }, + }, + }, + } + pod.Annotations[annotationPort] = "tcp" endpoints := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: serviceName,