Skip to content

Commit

Permalink
chore(*) support Service-less Pods (#1460) (#1549)
Browse files Browse the repository at this point in the history
* chore(*) support Service-less Pods

* fix(*) test e2e

* fix(*) outbound converter handles service less

* fix(*) update error message in inbound converter

Signed-off-by: Nikolay Nikolaev <nikolay.nikolaev@konghq.com>
(cherry picked from commit 5c18cb8)

Co-authored-by: Nikolay Nikolaev <nikolay.nikolaev@konghq.com>
  • Loading branch information
mergify[bot] and Nikolay Nikolaev authored Feb 10, 2021
1 parent b7ecfda commit 555672e
Show file tree
Hide file tree
Showing 13 changed files with 283 additions and 113 deletions.
7 changes: 7 additions & 0 deletions api/mesh/v1alpha1/dataplane_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ const (
RegularDpType DpType = "regular"
IngressDpType DpType = "ingress"
GatewayDpType DpType = "gateway"

// Used for Service-less dataplanes
TCPPortReserved = 49151 // IANA Reserved
)

type DpType string
Expand All @@ -56,6 +59,10 @@ func (i InboundInterface) String() string {
return fmt.Sprintf("%s:%d:%d", i.DataplaneIP, i.DataplanePort, i.WorkloadPort)
}

func (i *InboundInterface) IsServiceLess() bool {
return i.DataplanePort == TCPPortReserved
}

type OutboundInterface struct {
DataplaneIP string
DataplanePort uint32
Expand Down
41 changes: 35 additions & 6 deletions dev/examples/k8s/example-app/example-app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,32 @@
apiVersion: v1
kind: Service
metadata:
name: example-app
name: example-server
spec:
ports:
- port: 80
name: http
selector:
app: example-app
app: example-server
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: example-app
name: example-server
labels:
app: example-app
app: example-server
spec:
strategy:
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
selector:
matchLabels:
app: example-app
app: example-server
template:
metadata:
labels:
app: example-app
app: example-server
spec:
containers:
- name: nginx
Expand All @@ -39,3 +39,32 @@ spec:
requests:
cpu: 10m
memory: 32Mi
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: example-client
labels:
app: example-client
spec:
strategy:
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
selector:
matchLabels:
app: example-client
template:
metadata:
labels:
app: example-client
spec:
containers:
- name: alpine
image: "alpine:latest"
imagePullPolicy: IfNotPresent
command: ["sh", "-c", "tail -f /dev/null"]
resources:
requests:
cpu: 10m
memory: 32Mi
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1600,7 +1600,6 @@ google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM
google.golang.org/grpc v1.32.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/grpc v1.34.0 h1:raiipEjMOIC/TO2AvyTxP25XFdLxNIBwzDh3FM3XztI=
google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8=
google.golang.org/grpc/examples v0.0.0-20201130180447-c456688b1860/go.mod h1:Ly7ZA/ARzg8fnPU9TyZIxoz33sEUuWX7txiqs8lPTgE=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
Expand Down
168 changes: 124 additions & 44 deletions pkg/plugins/runtime/k8s/controllers/inbound_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"fmt"
"strings"

"github.com/pkg/errors"
kube_core "k8s.io/api/core/v1"
Expand All @@ -11,67 +12,116 @@ import (
util_k8s "github.com/kumahq/kuma/pkg/plugins/runtime/k8s/util"
)

func InboundInterfacesFor(zone string, pod *kube_core.Pod, services []*kube_core.Service) ([]*mesh_proto.Dataplane_Networking_Inbound, error) {
var ifaces []*mesh_proto.Dataplane_Networking_Inbound
for _, svc := range services {
for _, svcPort := range svc.Spec.Ports {
if svcPort.Protocol != "" && svcPort.Protocol != kube_core.ProtocolTCP {
// ignore non-TCP ports
continue
}
containerPort, container, err := util_k8s.FindPort(pod, &svcPort)
if err != nil {
converterLog.Error(err, "failed to find a container port in a given Pod that would match a given Service port", "namespace", pod.Namespace, "podName", pod.Name, "serviceName", svc.Name, "servicePortName", svcPort.Name)
// ignore those cases where a Pod doesn't have all the ports a Service has
continue
func inboundForService(zone string, pod *kube_core.Pod, service *kube_core.Service) (ifaces []*mesh_proto.Dataplane_Networking_Inbound) {
for _, svcPort := range service.Spec.Ports {
if svcPort.Protocol != "" && svcPort.Protocol != kube_core.ProtocolTCP {
// ignore non-TCP ports
continue
}
containerPort, container, err := util_k8s.FindPort(pod, &svcPort)
if err != nil {
converterLog.Error(err, "failed to find a container port in a given Pod that would match a given Service port", "namespace", pod.Namespace, "podName", pod.Name, "serviceName", service.Name, "servicePortName", svcPort.Name)
// ignore those cases where a Pod doesn't have all the ports a Service has
continue
}

tags := InboundTagsForService(zone, pod, service, &svcPort)
var health *mesh_proto.Dataplane_Networking_Inbound_Health

// if container is not equal nil then port is explicitly defined as containerPort so we're able
// to figure out which container implements which service. Since we know container we can check its status
// and map it to the Dataplane health
if container != nil {
if cs := util_k8s.FindContainerStatus(pod, container.Name); cs != nil {
health = &mesh_proto.Dataplane_Networking_Inbound_Health{
Ready: cs.Ready,
}
}
}

tags := InboundTagsFor(zone, pod, svc, &svcPort)
var health *mesh_proto.Dataplane_Networking_Inbound_Health

// if container is not equal nil then port is explicitly defined as containerPort so we're able
// to figure out which container implements which service. Since we know container we can check its status
// and map it to the Dataplane health
if container != nil {
if cs := util_k8s.FindContainerStatus(pod, container.Name); cs != nil {
health = &mesh_proto.Dataplane_Networking_Inbound_Health{
Ready: cs.Ready,
}
// also we're checking whether kuma-sidecar container is ready
if cs := util_k8s.FindContainerStatus(pod, util_k8s.KumaSidecarContainerName); cs != nil {
if health != nil {
health.Ready = health.Ready && cs.Ready
} else {
health = &mesh_proto.Dataplane_Networking_Inbound_Health{
Ready: cs.Ready,
}
}
}

// also we're checking whether kuma-sidecar container is ready
if cs := util_k8s.FindContainerStatus(pod, util_k8s.KumaSidecarContainerName); cs != nil {
if health != nil {
health.Ready = health.Ready && cs.Ready
} else {
health = &mesh_proto.Dataplane_Networking_Inbound_Health{
Ready: cs.Ready,
}
ifaces = append(ifaces, &mesh_proto.Dataplane_Networking_Inbound{
Port: uint32(containerPort),
Tags: tags,
Health: health,
})
}

return
}

func inboundForServiceless(zone string, pod *kube_core.Pod) (ifaces []*mesh_proto.Dataplane_Networking_Inbound) {
// The Pod does not have any services associated with it, just get the data from the Pod itself

// We still need that extra listener with a service because it is required in many places of the code (e.g. mTLS)
// TCPPortReserved, is a special port that will never be allocated from the TCP/IP stack. We use it as special
// designator that this is actually a service-less inbound.

// NOTE: It is cleaner to implement an equivalent of Gateway which is inbound-less dataplane. However such approch
// will create lots of code changes to account for this other type of dataplne (we already have GW and Ingress),
// including GUI and CLI changes

tags := InboundTagsForPod(zone, pod)
var health *mesh_proto.Dataplane_Networking_Inbound_Health

for _, container := range pod.Spec.Containers {
if container.Name != util_k8s.KumaSidecarContainerName {
if cs := util_k8s.FindContainerStatus(pod, container.Name); cs != nil {
health = &mesh_proto.Dataplane_Networking_Inbound_Health{
Ready: cs.Ready,
}
}
}
}

ifaces = append(ifaces, &mesh_proto.Dataplane_Networking_Inbound{
Port: uint32(containerPort),
Tags: tags,
Health: health,
})
// also we're checking whether kuma-sidecar container is ready
if cs := util_k8s.FindContainerStatus(pod, util_k8s.KumaSidecarContainerName); cs != nil {
if health != nil {
health.Ready = health.Ready && cs.Ready
} else {
health = &mesh_proto.Dataplane_Networking_Inbound_Health{
Ready: cs.Ready,
}
}
}

ifaces = append(ifaces, &mesh_proto.Dataplane_Networking_Inbound{
Port: mesh_proto.TCPPortReserved,
Tags: tags,
Health: health,
})

return
}

func InboundInterfacesFor(zone string, pod *kube_core.Pod, services []*kube_core.Service) ([]*mesh_proto.Dataplane_Networking_Inbound, error) {
ifaces := []*mesh_proto.Dataplane_Networking_Inbound{}
for _, svc := range services {
svcIfaces := inboundForService(zone, pod, svc)
ifaces = append(ifaces, svcIfaces...)
}

if len(ifaces) == 0 {
// Notice that here we return an error immediately
// instead of leaving Dataplane validation up to a ValidatingAdmissionWebHook.
// We do it this way in order to provide the most descriptive error message.
cause := "However, there are no Services that select this Pod."
if len(services) > 0 {
cause = "However, this Pod doesn't have any container ports that would satisfy matching Service(s)."
return nil, errors.Errorf("A service that selects pod %s was found, but it doesn't match any container ports.", pod.GetName())
}
return nil, errors.Errorf("Kuma requires every Pod in a Mesh to be a part of at least one Service. %s", cause)

ifaces = append(ifaces, inboundForServiceless(zone, pod)...)
}
return ifaces, nil
}

func InboundTagsFor(zone string, pod *kube_core.Pod, svc *kube_core.Service, svcPort *kube_core.ServicePort) map[string]string {
func InboundTagsForService(zone string, pod *kube_core.Pod, svc *kube_core.Service, svcPort *kube_core.ServicePort) map[string]string {
tags := util_k8s.CopyStringMap(pod.Labels)
for key, value := range tags {
if value == "" {
Expand Down Expand Up @@ -110,3 +160,33 @@ func ProtocolTagFor(svc *kube_core.Service, svcPort *kube_core.ServicePort) stri
// to a user that at least `<port>.service.kuma.io/protocol` has an effect
return protocolValue
}

func InboundTagsForPod(zone string, pod *kube_core.Pod) map[string]string {
tags := util_k8s.CopyStringMap(pod.Labels)
for key, value := range tags {
if value == "" {
delete(tags, key)
}
}
if tags == nil {
tags = make(map[string]string)
}
tags[mesh_proto.ServiceTag] = fmt.Sprintf("%s_%s_svc", nameFromPod(pod), pod.Namespace)
if zone != "" {
tags[mesh_proto.ZoneTag] = zone
}
tags[mesh_proto.ProtocolTag] = mesh_core.ProtocolTCP
tags[mesh_proto.InstanceTag] = pod.Name

return tags
}

func nameFromPod(pod *kube_core.Pod) string {
// the name is in format <name>-<replica set id>-<pod id>
split := strings.Split(pod.Name, "-")
if len(split) > 2 {
split = split[:len(split)-2]
}

return strings.Join(split, "-")
}
38 changes: 29 additions & 9 deletions pkg/plugins/runtime/k8s/controllers/outbound_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ func (p *PodConverter) OutboundInterfacesFor(
converterLog.Error(err, "could not get K8S Service for service tag")
continue // one invalid Dataplane definition should not break the entire mesh
}

// Do not generate outbounds for service-less
if isServiceLess(port) {
continue
}

if isHeadlessService(service) {
// Generate outbound listeners for every endpoint of services.
for _, endpoint := range endpoints[serviceTag] {
Expand Down Expand Up @@ -91,10 +97,17 @@ func isHeadlessService(svc *kube_core.Service) bool {
return svc.Spec.ClusterIP == "None"
}

func isServiceLess(port uint32) bool {
return port == mesh_proto.TCPPortReserved
}

func (p *PodConverter) k8sService(serviceTag string) (*kube_core.Service, uint32, error) {
name, ns, port, err := ParseService(serviceTag)
name, ns, port, err := parseService(serviceTag)
if err != nil {
return nil, 0, errors.Errorf("failed to parse `service` host %q as FQDN", serviceTag)
return nil, 0, errors.Wrapf(err, "failed to parse `service` host %q as FQDN", serviceTag)
}
if isServiceLess(port) {
return nil, port, nil
}

svc := &kube_core.Service{}
Expand All @@ -105,17 +118,24 @@ func (p *PodConverter) k8sService(serviceTag string) (*kube_core.Service, uint32
return svc, port, nil
}

func ParseService(host string) (name string, namespace string, port uint32, err error) {
func parseService(host string) (name string, namespace string, port uint32, err error) {
// split host into <name>_<namespace>_svc_<port>
segments := strings.Split(host, "_")
if len(segments) != 4 {
switch len(segments) {
case 4:
p, err := strconv.Atoi(segments[3])
if err != nil {
return "", "", 0, err
}
port = uint32(p)
case 3:
// service less service names have no port, so we just put the reserved
// one here to note that this service is actually
port = mesh_proto.TCPPortReserved
default:
return "", "", 0, errors.Errorf("service tag in unexpected format")
}
p, err := strconv.Atoi(segments[3])
if err != nil {
return "", "", 0, err
}
port = uint32(p)

name, namespace = segments[0], segments[1]
return
}
Loading

0 comments on commit 555672e

Please sign in to comment.