Skip to content

Commit

Permalink
fix(kuma-cp) outbound generation and dns service map (#860)
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubdyszkiewicz authored Jun 27, 2020
1 parent 7ed1830 commit 7f026b9
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 61 deletions.
10 changes: 5 additions & 5 deletions api/mesh/v1alpha1/dataplane_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,15 +392,15 @@ func (d *Dataplane) HasAvailableServices() bool {
return len(d.Networking.Ingress.AvailableServices) != 0
}

func (d *Dataplane) IsRemoteIngress() bool {
func (d *Dataplane) IsRemoteIngress(localClusterName string) bool {
if !d.IsIngress() {
return false
}
// todo: take into account value itself, not just presence of the 'cluster' tag
if _, ok := d.Networking.Inbound[0].Tags["cluster"]; ok {
return true
cluster, ok := d.Networking.Inbound[0].Tags["cluster"]
if !ok {
return false
}
return false
return cluster != localClusterName
}

func (t MultiValueTagSet) String() string {
Expand Down
11 changes: 10 additions & 1 deletion pkg/dns/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,20 @@ var _ = Describe("DNS sync", func() {
Spec: mesh_proto.Dataplane{
Networking: &mesh_proto.Dataplane_Networking{
Address: "192.168.0.1",
Ingress: &mesh_proto.Dataplane_Networking_Ingress{
AvailableServices: []*mesh_proto.Dataplane_Networking_Ingress_AvailableService{
{
Tags: map[string]string{
"service": "backend",
},
},
},
},
Inbound: []*mesh_proto.Dataplane_Networking_Inbound{
{
Port: 1234,
Tags: map[string]string{
"service": "backend",
"cluster": "cluster-2",
},
},
},
Expand Down
11 changes: 9 additions & 2 deletions pkg/dns/vips_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"
"go.uber.org/multierr"

mesh_proto "github.com/Kong/kuma/api/mesh/v1alpha1"
"github.com/Kong/kuma/pkg/core"
core_mesh "github.com/Kong/kuma/pkg/core/resources/apis/mesh"
"github.com/Kong/kuma/pkg/core/resources/manager"
Expand Down Expand Up @@ -83,8 +84,14 @@ func (d *vipsAllocator) synchronize() error {

// TODO: Do we need to reflect somehow the fact this service belongs to a particular `mesh`
for _, dp := range dataplanes.Items {
for _, inbound := range dp.Spec.Networking.Inbound {
serviceMap[inbound.GetService()] = true
if dp.Spec.IsIngress() {
for _, service := range dp.Spec.Networking.Ingress.AvailableServices {
serviceMap[service.Tags[mesh_proto.ServiceTag]] = true
}
} else {
for _, inbound := range dp.Spec.Networking.Inbound {
serviceMap[inbound.GetService()] = true
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/xds/server/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func DefaultDataplaneSyncTracker(rt core_runtime.Runtime, reconciler, ingressRec
return err
}
destinations := ingress.BuildDestinationMap(dataplane)
endpoints := xds_topology.BuildEndpointMap(destinations, dataplanes.Items)
endpoints := xds_topology.BuildEndpointMap(destinations, dataplanes.Items, rt.Config().General.ClusterName)
proxy := xds.Proxy{
Id: proxyID,
Dataplane: dataplane,
Expand Down Expand Up @@ -170,7 +170,7 @@ func DefaultDataplaneSyncTracker(rt core_runtime.Runtime, reconciler, ingressRec
destinations := xds_topology.BuildDestinationMap(dataplane, routes)

// resolve all endpoints that match given selectors
outbound, err := xds_topology.GetOutboundTargets(destinations, dataplanes)
outbound, err := xds_topology.GetOutboundTargets(destinations, dataplanes, rt.Config().General.ClusterName)
if err != nil {
return err
}
Expand Down
64 changes: 33 additions & 31 deletions pkg/xds/topology/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,57 +7,59 @@ import (
)

// GetOutboundTargets resolves all endpoints reachable from a given dataplane.
func GetOutboundTargets(destinations core_xds.DestinationMap, dataplanes *mesh_core.DataplaneResourceList) (core_xds.EndpointMap, error) {
func GetOutboundTargets(destinations core_xds.DestinationMap, dataplanes *mesh_core.DataplaneResourceList, localClusterName string) (core_xds.EndpointMap, error) {
if len(destinations) == 0 {
return nil, nil
}
return BuildEndpointMap(destinations, dataplanes.Items), nil
return BuildEndpointMap(destinations, dataplanes.Items, localClusterName), nil
}

// BuildEndpointMap creates a map of all endpoints that match given selectors.
func BuildEndpointMap(destinations core_xds.DestinationMap, dataplanes []*mesh_core.DataplaneResource) core_xds.EndpointMap {
func BuildEndpointMap(destinations core_xds.DestinationMap, dataplanes []*mesh_core.DataplaneResource, localClusterName string) core_xds.EndpointMap {
if len(destinations) == 0 {
return nil
}
outbound := core_xds.EndpointMap{}
for _, dataplane := range dataplanes {
if dataplane.Spec.IsRemoteIngress() && dataplane.Spec.HasAvailableServices() {
for _, ingress := range dataplane.Spec.Networking.GetIngress().GetAvailableServices() {
service := ingress.Tags[mesh_proto.ServiceTag]
if dataplane.Spec.IsIngress() {
if dataplane.Spec.IsRemoteIngress(localClusterName) {
for _, ingress := range dataplane.Spec.Networking.GetIngress().GetAvailableServices() {
service := ingress.Tags[mesh_proto.ServiceTag]
selectors, ok := destinations[service]
if !ok {
continue
}
if !selectors.Matches(ingress.Tags) {
continue
}
outbound[service] = append(outbound[service], core_xds.Endpoint{
Target: dataplane.Spec.Networking.Address,
Port: dataplane.Spec.Networking.Inbound[0].Port,
Tags: ingress.Tags,
Weight: ingress.Instances,
})
}
}
} else {
for _, inbound := range dataplane.Spec.Networking.GetInbound() {
service := inbound.Tags[mesh_proto.ServiceTag]
selectors, ok := destinations[service]
if !ok {
continue
}
if !selectors.Matches(ingress.Tags) {
if !selectors.Matches(inbound.Tags) {
continue
}
iface := dataplane.Spec.Networking.ToInboundInterface(inbound)
// TODO(yskopets): do we need to dedup?
// TODO(yskopets): sort ?
outbound[service] = append(outbound[service], core_xds.Endpoint{
Target: dataplane.Spec.Networking.Address,
Port: dataplane.Spec.Networking.Inbound[0].Port,
Tags: ingress.Tags,
Weight: ingress.Instances,
Target: iface.DataplaneIP,
Port: iface.DataplanePort,
Tags: inbound.Tags,
Weight: 1,
})
}
continue
}
for _, inbound := range dataplane.Spec.Networking.GetInbound() {
service := inbound.Tags[mesh_proto.ServiceTag]
selectors, ok := destinations[service]
if !ok {
continue
}
if !selectors.Matches(inbound.Tags) {
continue
}
iface := dataplane.Spec.Networking.ToInboundInterface(inbound)
// TODO(yskopets): do we need to dedup?
// TODO(yskopets): sort ?
outbound[service] = append(outbound[service], core_xds.Endpoint{
Target: iface.DataplaneIP,
Port: iface.DataplanePort,
Tags: inbound.Tags,
Weight: 1,
})
}
}
return outbound
Expand Down
4 changes: 2 additions & 2 deletions pkg/xds/topology/outbound_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ var _ = Describe("TrafficRoute", func() {
}

// when
targets, err := GetOutboundTargets(destinations, dataplanes)
targets, err := GetOutboundTargets(destinations, dataplanes, "cluster-1")

// then
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -168,7 +168,7 @@ var _ = Describe("TrafficRoute", func() {
DescribeTable("should include only those dataplanes that match given selectors",
func(given testCase) {
// when
endpoints := BuildEndpointMap(given.destinations, given.dataplanes)
endpoints := BuildEndpointMap(given.destinations, given.dataplanes, "cluster-1")
// then
Expect(endpoints).To(Equal(given.expected))
},
Expand Down
37 changes: 19 additions & 18 deletions pkg/xds/topology/vip_outbounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ package topology

import (
"sort"
"strings"

"github.com/pkg/errors"
"go.uber.org/multierr"

mesh_proto "github.com/Kong/kuma/api/mesh/v1alpha1"
mesh_core "github.com/Kong/kuma/pkg/core/resources/apis/mesh"
Expand All @@ -23,22 +19,27 @@ func PatchDataplaneWithVIPOutbounds(dataplane *mesh_core.DataplaneResource,
continue
}

for _, inbound := range dp.Spec.Networking.Inbound {
inService := inbound.GetTags()[mesh_proto.ServiceTag]

if _, found := serviceVIPMap[inService]; !found {
vip, err := resolver.ForwardLookup(inService)
if err != nil {
// TODO: remove this additional lookup once the service tag contains a `flat` service name
// try to get the first part of the FQDN service and look it up
split := strings.Split(inService, ".")
vip, err = resolver.ForwardLookup(split[0])
if err != nil {
errs = multierr.Append(errs, errors.Wrapf(err, "unable to resolve %s", inService))
if dp.Spec.IsIngress() {
for _, service := range dp.Spec.Networking.Ingress.AvailableServices {
inService := service.Tags[mesh_proto.ServiceTag]
if _, found := serviceVIPMap[inService]; !found {
vip, err := resolver.ForwardLookup(inService)
if err == nil {
serviceVIPMap[inService] = vip
services = append(services, inService)
}
}
}
} else {
for _, inbound := range dp.Spec.Networking.Inbound {
inService := inbound.GetTags()[mesh_proto.ServiceTag]
if _, found := serviceVIPMap[inService]; !found {
vip, err := resolver.ForwardLookup(inService)
if err == nil {
serviceVIPMap[inService] = vip
services = append(services, inService)
}
}
serviceVIPMap[inService] = vip
services = append(services, inService)
}
}
}
Expand Down

0 comments on commit 7f026b9

Please sign in to comment.