Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(*) support Service-less Pods (bp #1460) #1549

Merged
merged 1 commit into from
Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/mesh/v1alpha1/dataplane_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ const (
RegularDpType DpType = "regular"
IngressDpType DpType = "ingress"
GatewayDpType DpType = "gateway"

// Used for Service-less dataplanes
TCPPortReserved = 49151 // IANA Reserved
)

type DpType string
Expand All @@ -56,6 +59,10 @@ func (i InboundInterface) String() string {
return fmt.Sprintf("%s:%d:%d", i.DataplaneIP, i.DataplanePort, i.WorkloadPort)
}

func (i *InboundInterface) IsServiceLess() bool {
return i.DataplanePort == TCPPortReserved
}

type OutboundInterface struct {
DataplaneIP string
DataplanePort uint32
Expand Down
41 changes: 35 additions & 6 deletions dev/examples/k8s/example-app/example-app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,32 @@
apiVersion: v1
kind: Service
metadata:
name: example-app
name: example-server
spec:
ports:
- port: 80
name: http
selector:
app: example-app
app: example-server
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: example-app
name: example-server
labels:
app: example-app
app: example-server
spec:
strategy:
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
selector:
matchLabels:
app: example-app
app: example-server
template:
metadata:
labels:
app: example-app
app: example-server
spec:
containers:
- name: nginx
Expand All @@ -39,3 +39,32 @@ spec:
requests:
cpu: 10m
memory: 32Mi
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: example-client
labels:
app: example-client
spec:
strategy:
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
selector:
matchLabels:
app: example-client
template:
metadata:
labels:
app: example-client
spec:
containers:
- name: alpine
image: "alpine:latest"
imagePullPolicy: IfNotPresent
command: ["sh", "-c", "tail -f /dev/null"]
resources:
requests:
cpu: 10m
memory: 32Mi
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1599,7 +1599,6 @@ google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM
google.golang.org/grpc v1.32.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/grpc v1.34.0 h1:raiipEjMOIC/TO2AvyTxP25XFdLxNIBwzDh3FM3XztI=
google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8=
google.golang.org/grpc/examples v0.0.0-20201130180447-c456688b1860/go.mod h1:Ly7ZA/ARzg8fnPU9TyZIxoz33sEUuWX7txiqs8lPTgE=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
Expand Down
168 changes: 124 additions & 44 deletions pkg/plugins/runtime/k8s/controllers/inbound_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"fmt"
"strings"

"github.com/pkg/errors"
kube_core "k8s.io/api/core/v1"
Expand All @@ -11,67 +12,116 @@ import (
util_k8s "github.com/kumahq/kuma/pkg/plugins/runtime/k8s/util"
)

func InboundInterfacesFor(zone string, pod *kube_core.Pod, services []*kube_core.Service) ([]*mesh_proto.Dataplane_Networking_Inbound, error) {
var ifaces []*mesh_proto.Dataplane_Networking_Inbound
for _, svc := range services {
for _, svcPort := range svc.Spec.Ports {
if svcPort.Protocol != "" && svcPort.Protocol != kube_core.ProtocolTCP {
// ignore non-TCP ports
continue
}
containerPort, container, err := util_k8s.FindPort(pod, &svcPort)
if err != nil {
converterLog.Error(err, "failed to find a container port in a given Pod that would match a given Service port", "namespace", pod.Namespace, "podName", pod.Name, "serviceName", svc.Name, "servicePortName", svcPort.Name)
// ignore those cases where a Pod doesn't have all the ports a Service has
continue
func inboundForService(zone string, pod *kube_core.Pod, service *kube_core.Service) (ifaces []*mesh_proto.Dataplane_Networking_Inbound) {
for _, svcPort := range service.Spec.Ports {
if svcPort.Protocol != "" && svcPort.Protocol != kube_core.ProtocolTCP {
// ignore non-TCP ports
continue
}
containerPort, container, err := util_k8s.FindPort(pod, &svcPort)
if err != nil {
converterLog.Error(err, "failed to find a container port in a given Pod that would match a given Service port", "namespace", pod.Namespace, "podName", pod.Name, "serviceName", service.Name, "servicePortName", svcPort.Name)
// ignore those cases where a Pod doesn't have all the ports a Service has
continue
}

tags := InboundTagsForService(zone, pod, service, &svcPort)
var health *mesh_proto.Dataplane_Networking_Inbound_Health

// if container is not equal nil then port is explicitly defined as containerPort so we're able
// to figure out which container implements which service. Since we know container we can check its status
// and map it to the Dataplane health
if container != nil {
if cs := util_k8s.FindContainerStatus(pod, container.Name); cs != nil {
health = &mesh_proto.Dataplane_Networking_Inbound_Health{
Ready: cs.Ready,
}
}
}

tags := InboundTagsFor(zone, pod, svc, &svcPort)
var health *mesh_proto.Dataplane_Networking_Inbound_Health

// if container is not equal nil then port is explicitly defined as containerPort so we're able
// to figure out which container implements which service. Since we know container we can check its status
// and map it to the Dataplane health
if container != nil {
if cs := util_k8s.FindContainerStatus(pod, container.Name); cs != nil {
health = &mesh_proto.Dataplane_Networking_Inbound_Health{
Ready: cs.Ready,
}
// also we're checking whether kuma-sidecar container is ready
if cs := util_k8s.FindContainerStatus(pod, util_k8s.KumaSidecarContainerName); cs != nil {
if health != nil {
health.Ready = health.Ready && cs.Ready
} else {
health = &mesh_proto.Dataplane_Networking_Inbound_Health{
Ready: cs.Ready,
}
}
}

// also we're checking whether kuma-sidecar container is ready
if cs := util_k8s.FindContainerStatus(pod, util_k8s.KumaSidecarContainerName); cs != nil {
if health != nil {
health.Ready = health.Ready && cs.Ready
} else {
health = &mesh_proto.Dataplane_Networking_Inbound_Health{
Ready: cs.Ready,
}
ifaces = append(ifaces, &mesh_proto.Dataplane_Networking_Inbound{
Port: uint32(containerPort),
Tags: tags,
Health: health,
})
}

return
}

func inboundForServiceless(zone string, pod *kube_core.Pod) (ifaces []*mesh_proto.Dataplane_Networking_Inbound) {
// The Pod does not have any services associated with it, just get the data from the Pod itself

// We still need that extra listener with a service because it is required in many places of the code (e.g. mTLS)
// TCPPortReserved, is a special port that will never be allocated from the TCP/IP stack. We use it as special
// designator that this is actually a service-less inbound.

// NOTE: It is cleaner to implement an equivalent of Gateway which is inbound-less dataplane. However such approch
// will create lots of code changes to account for this other type of dataplne (we already have GW and Ingress),
// including GUI and CLI changes

tags := InboundTagsForPod(zone, pod)
var health *mesh_proto.Dataplane_Networking_Inbound_Health

for _, container := range pod.Spec.Containers {
if container.Name != util_k8s.KumaSidecarContainerName {
if cs := util_k8s.FindContainerStatus(pod, container.Name); cs != nil {
health = &mesh_proto.Dataplane_Networking_Inbound_Health{
Ready: cs.Ready,
}
}
}
}

ifaces = append(ifaces, &mesh_proto.Dataplane_Networking_Inbound{
Port: uint32(containerPort),
Tags: tags,
Health: health,
})
// also we're checking whether kuma-sidecar container is ready
if cs := util_k8s.FindContainerStatus(pod, util_k8s.KumaSidecarContainerName); cs != nil {
if health != nil {
health.Ready = health.Ready && cs.Ready
} else {
health = &mesh_proto.Dataplane_Networking_Inbound_Health{
Ready: cs.Ready,
}
}
}

ifaces = append(ifaces, &mesh_proto.Dataplane_Networking_Inbound{
Port: mesh_proto.TCPPortReserved,
Tags: tags,
Health: health,
})

return
}

func InboundInterfacesFor(zone string, pod *kube_core.Pod, services []*kube_core.Service) ([]*mesh_proto.Dataplane_Networking_Inbound, error) {
ifaces := []*mesh_proto.Dataplane_Networking_Inbound{}
for _, svc := range services {
svcIfaces := inboundForService(zone, pod, svc)
ifaces = append(ifaces, svcIfaces...)
}

if len(ifaces) == 0 {
// Notice that here we return an error immediately
// instead of leaving Dataplane validation up to a ValidatingAdmissionWebHook.
// We do it this way in order to provide the most descriptive error message.
cause := "However, there are no Services that select this Pod."
if len(services) > 0 {
cause = "However, this Pod doesn't have any container ports that would satisfy matching Service(s)."
return nil, errors.Errorf("A service that selects pod %s was found, but it doesn't match any container ports.", pod.GetName())
}
return nil, errors.Errorf("Kuma requires every Pod in a Mesh to be a part of at least one Service. %s", cause)

ifaces = append(ifaces, inboundForServiceless(zone, pod)...)
}
return ifaces, nil
}

func InboundTagsFor(zone string, pod *kube_core.Pod, svc *kube_core.Service, svcPort *kube_core.ServicePort) map[string]string {
func InboundTagsForService(zone string, pod *kube_core.Pod, svc *kube_core.Service, svcPort *kube_core.ServicePort) map[string]string {
tags := util_k8s.CopyStringMap(pod.Labels)
for key, value := range tags {
if value == "" {
Expand Down Expand Up @@ -110,3 +160,33 @@ func ProtocolTagFor(svc *kube_core.Service, svcPort *kube_core.ServicePort) stri
// to a user that at least `<port>.service.kuma.io/protocol` has an effect
return protocolValue
}

func InboundTagsForPod(zone string, pod *kube_core.Pod) map[string]string {
tags := util_k8s.CopyStringMap(pod.Labels)
for key, value := range tags {
if value == "" {
delete(tags, key)
}
}
if tags == nil {
tags = make(map[string]string)
}
tags[mesh_proto.ServiceTag] = fmt.Sprintf("%s_%s_svc", nameFromPod(pod), pod.Namespace)
if zone != "" {
tags[mesh_proto.ZoneTag] = zone
}
tags[mesh_proto.ProtocolTag] = mesh_core.ProtocolTCP
tags[mesh_proto.InstanceTag] = pod.Name

return tags
}

func nameFromPod(pod *kube_core.Pod) string {
// the name is in format <name>-<replica set id>-<pod id>
split := strings.Split(pod.Name, "-")
if len(split) > 2 {
split = split[:len(split)-2]
}

return strings.Join(split, "-")
}
38 changes: 29 additions & 9 deletions pkg/plugins/runtime/k8s/controllers/outbound_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ func (p *PodConverter) OutboundInterfacesFor(
converterLog.Error(err, "could not get K8S Service for service tag")
continue // one invalid Dataplane definition should not break the entire mesh
}

// Do not generate outbounds for service-less
if isServiceLess(port) {
continue
}

if isHeadlessService(service) {
// Generate outbound listeners for every endpoint of services.
for _, endpoint := range endpoints[serviceTag] {
Expand Down Expand Up @@ -91,10 +97,17 @@ func isHeadlessService(svc *kube_core.Service) bool {
return svc.Spec.ClusterIP == "None"
}

func isServiceLess(port uint32) bool {
return port == mesh_proto.TCPPortReserved
}

func (p *PodConverter) k8sService(serviceTag string) (*kube_core.Service, uint32, error) {
name, ns, port, err := ParseService(serviceTag)
name, ns, port, err := parseService(serviceTag)
if err != nil {
return nil, 0, errors.Errorf("failed to parse `service` host %q as FQDN", serviceTag)
return nil, 0, errors.Wrapf(err, "failed to parse `service` host %q as FQDN", serviceTag)
}
if isServiceLess(port) {
return nil, port, nil
}

svc := &kube_core.Service{}
Expand All @@ -105,17 +118,24 @@ func (p *PodConverter) k8sService(serviceTag string) (*kube_core.Service, uint32
return svc, port, nil
}

func ParseService(host string) (name string, namespace string, port uint32, err error) {
func parseService(host string) (name string, namespace string, port uint32, err error) {
// split host into <name>_<namespace>_svc_<port>
segments := strings.Split(host, "_")
if len(segments) != 4 {
switch len(segments) {
case 4:
p, err := strconv.Atoi(segments[3])
if err != nil {
return "", "", 0, err
}
port = uint32(p)
case 3:
// service less service names have no port, so we just put the reserved
// one here to note that this service is actually
port = mesh_proto.TCPPortReserved
default:
return "", "", 0, errors.Errorf("service tag in unexpected format")
}
p, err := strconv.Atoi(segments[3])
if err != nil {
return "", "", 0, err
}
port = uint32(p)

name, namespace = segments[0], segments[1]
return
}
Loading