Skip to content

Commit

Permalink
Don't support syncing multiple addresses when k8s service has multipl…
Browse files Browse the repository at this point in the history
…e ports.

Previously, we set a tagged address for each Kubernetes service port,
using the "virtual-" prefix for the tagged address key. For example,
if a service port is named "tcp", then we synced it to Consul with
the tagged address key "virtual-tcp".

However, since Consul doesn't really support multiple ports on a service yet,
this is unnecessary. So instead, we will find the service's target port
that is equal to the service port we are registering with Consul
and use that as the Port for the tagged address. Otherwise,
if we can't find the target port, we will set the tagged address's port to 0
(it will still have the cluster IP set).
  • Loading branch information
ishustava committed May 4, 2021
1 parent caa9f60 commit 21e535c
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 35 deletions.
52 changes: 39 additions & 13 deletions connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -215,13 +216,14 @@ func (r *EndpointsController) SetupWithManager(mgr ctrl.Manager) error {
func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) (*api.AgentServiceRegistration, *api.AgentServiceRegistration, error) {
// If a port is specified, then we determine the value of that port
// and register that port for the host service.
var servicePort int
// The handler will always set the port annotation if one is not provided on the pod.
var consulServicePort int
if raw, ok := pod.Annotations[annotationPort]; ok && raw != "" {
if port, err := portValue(pod, raw); port > 0 {
if err != nil {
return nil, nil, err
}
servicePort = int(port)
consulServicePort = int(port)
}
}

Expand Down Expand Up @@ -259,7 +261,7 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
service := &api.AgentServiceRegistration{
ID: serviceID,
Name: serviceName,
Port: servicePort,
Port: consulServicePort,
Address: pod.Status.PodIP,
Meta: meta,
Namespace: r.consulNamespace(pod.Namespace),
Expand Down Expand Up @@ -302,9 +304,9 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
proxyConfig.Config[envoyPrometheusBindAddr] = prometheusScrapeListener
}

if servicePort > 0 {
if consulServicePort > 0 {
proxyConfig.LocalServiceAddress = "127.0.0.1"
proxyConfig.LocalServicePort = servicePort
proxyConfig.LocalServicePort = consulServicePort
}

upstreams, err := r.processUpstreams(pod)
Expand Down Expand Up @@ -356,18 +358,42 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
parsedIP := net.ParseIP(k8sService.Spec.ClusterIP)
if parsedIP != nil {
taggedAddresses := make(map[string]api.ServiceAddress)
for _, servicePort := range k8sService.Spec.Ports {
taggedAddressKey := clusterIPTaggedAddressName
if servicePort.Name != "" {
taggedAddressKey = fmt.Sprintf("%s-%s", clusterIPTaggedAddressName, servicePort.Name)
}

taggedAddresses[taggedAddressKey] = api.ServiceAddress{
Address: k8sService.Spec.ClusterIP,
Port: int(servicePort.Port),
// When a service has multiple ports, we need to choose the port that is registered with Consul
// and only set that port as the tagged address because Consul currently does not support multiple port
// on a single service.
var k8sServicePort int32
for _, sp := range k8sService.Spec.Ports {
// If target port is a name, then we need to find the port value from the pod.
if sp.TargetPort.Type == intstr.String {
targetPortValue, err := portValue(pod, sp.TargetPort.StrVal)
if err != nil {
return nil, nil, err
}

// If the targetPortValue is the consulServicePort, then this is the service port we'll use as the tagged address.
if targetPortValue == int32(consulServicePort) {
k8sServicePort = sp.Port
}
} else if sp.TargetPort.Type == intstr.Int && sp.TargetPort.IntVal != 0 {
// If the target port is a non-zero int, we can compare that port directly with the Consul service port.
if sp.TargetPort.IntVal == int32(consulServicePort) {
k8sServicePort = sp.Port
}
} else {
// If targetPort is not specified, then the service port is used as the target port,
// and we can compare the service port with the Consul service port.
if sp.Port == int32(consulServicePort) {
k8sServicePort = sp.Port
}
}
}

taggedAddresses[clusterIPTaggedAddressName] = api.ServiceAddress{
Address: k8sService.Spec.ClusterIP,
Port: int(k8sServicePort),
}

service.TaggedAddresses = taggedAddresses
proxyService.TaggedAddresses = taggedAddresses

Expand Down
114 changes: 92 additions & 22 deletions connect-inject/endpoints_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand Down Expand Up @@ -2590,7 +2591,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Port: 80,
Port: 8081,
},
},
},
Expand All @@ -2599,7 +2600,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual": {
Address: "10.0.0.1",
Port: 80,
Port: 8081,
},
},
expErr: "",
Expand Down Expand Up @@ -2637,7 +2638,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Port: 80,
Port: 8081,
},
},
},
Expand All @@ -2646,7 +2647,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual": {
Address: "10.0.0.1",
Port: 80,
Port: 8081,
},
},
expErr: "",
Expand Down Expand Up @@ -2705,7 +2706,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Port: 80,
Port: 8081,
},
},
},
Expand All @@ -2714,7 +2715,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual": {
Address: "10.0.0.1",
Port: 80,
Port: 8081,
},
},
expErr: "",
Expand All @@ -2729,7 +2730,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
proxyMode: api.ProxyModeDefault,
expErr: "services \"test-service\" not found",
},
"service with a single port without a name": {
"service with a single port without a target port": {
globalEnabled: true,
service: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -2740,7 +2741,33 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Port: 80,
Port: 8081,
},
},
},
},
proxyMode: api.ProxyModeTransparent,
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual": {
Address: "10.0.0.1",
Port: 8081,
},
},
expErr: "",
},
"service with a single port and a target port that is a port name": {
globalEnabled: true,
service: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: "default",
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Port: 80,
TargetPort: intstr.Parse("tcp"),
},
},
},
Expand All @@ -2754,7 +2781,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
},
expErr: "",
},
"service with a single port with a name": {
"service with a single port and a target port that is a int": {
globalEnabled: true,
service: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -2765,15 +2792,15 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Name: "tcp",
Port: 80,
Port: 80,
TargetPort: intstr.FromInt(8081),
},
},
},
},
proxyMode: api.ProxyModeTransparent,
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual-tcp": {
"virtual": {
Address: "10.0.0.1",
Port: 80,
},
Expand All @@ -2791,25 +2818,52 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Name: "tcp",
Port: 80,
Name: "tcp",
Port: 80,
TargetPort: intstr.FromString("tcp"),
},
{
Name: "http",
Port: 8080,
Name: "http",
Port: 81,
TargetPort: intstr.FromString("http"),
},
},
},
},
proxyMode: api.ProxyModeTransparent,
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual-tcp": {
"virtual": {
Address: "10.0.0.1",
Port: 80,
},
"virtual-http": {
},
expErr: "",
},
// When target port is not equal to the port we're registering with Consul,
// then we want to register the zero-value for the port. This could happen
// for client services that don't have a container port that they're listening on.
"target port is not found": {
globalEnabled: true,
service: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: "default",
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Port: 80,
TargetPort: intstr.Parse("http"),
},
},
},
},
proxyMode: api.ProxyModeTransparent,
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual": {
Address: "10.0.0.1",
Port: 8080,
Port: 0,
},
},
expErr: "",
Expand Down Expand Up @@ -2885,7 +2939,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
ClusterIP: "2001:db8::68",
Ports: []corev1.ServicePort{
{
Port: 80,
Port: 8081,
},
},
},
Expand All @@ -2894,7 +2948,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual": {
Address: "2001:db8::68",
Port: 80,
Port: 8081,
},
},
expErr: "",
Expand All @@ -2903,10 +2957,26 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *

for name, c := range cases {
t.Run(name, func(t *testing.T) {
pod := createPod("test-pod-1", "1.2.3.4", false)
pod := createPod("test-pod-1", "1.2.3.4", true)
if c.annotationEnabled != nil {
pod.Annotations[annotationTransparentProxy] = strconv.FormatBool(*c.annotationEnabled)
}
pod.Spec.Containers = []corev1.Container{
{
Name: "test",
Ports: []corev1.ContainerPort{
{
Name: "tcp",
ContainerPort: 8081,
},
{
Name: "http",
ContainerPort: 8080,
},
},
},
}
pod.Annotations[annotationPort] = "tcp"
endpoints := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Expand Down

0 comments on commit 21e535c

Please sign in to comment.