From 7f026b98141b1339d5183f4a15f4cc6af13d069c Mon Sep 17 00:00:00 2001 From: Jakub Dyszkiewicz Date: Sat, 27 Jun 2020 13:55:17 +0200 Subject: [PATCH] fix(kuma-cp) outbound generation and dns service map (#860) --- api/mesh/v1alpha1/dataplane_helpers.go | 10 ++-- pkg/dns/sync_test.go | 11 ++++- pkg/dns/vips_allocator.go | 11 ++++- pkg/xds/server/components.go | 4 +- pkg/xds/topology/outbound.go | 64 +++++++++++++------------- pkg/xds/topology/outbound_test.go | 4 +- pkg/xds/topology/vip_outbounds.go | 37 +++++++-------- 7 files changed, 80 insertions(+), 61 deletions(-) diff --git a/api/mesh/v1alpha1/dataplane_helpers.go b/api/mesh/v1alpha1/dataplane_helpers.go index 8393bc4f611c..0b9073d19b86 100644 --- a/api/mesh/v1alpha1/dataplane_helpers.go +++ b/api/mesh/v1alpha1/dataplane_helpers.go @@ -392,15 +392,15 @@ func (d *Dataplane) HasAvailableServices() bool { return len(d.Networking.Ingress.AvailableServices) != 0 } -func (d *Dataplane) IsRemoteIngress() bool { +func (d *Dataplane) IsRemoteIngress(localClusterName string) bool { if !d.IsIngress() { return false } - // todo: take into account value itself, not just presence of the 'cluster' tag - if _, ok := d.Networking.Inbound[0].Tags["cluster"]; ok { - return true + cluster, ok := d.Networking.Inbound[0].Tags["cluster"] + if !ok { + return false } - return false + return cluster != localClusterName } func (t MultiValueTagSet) String() string { diff --git a/pkg/dns/sync_test.go b/pkg/dns/sync_test.go index a92a4aea39bc..4ec2f86dae49 100644 --- a/pkg/dns/sync_test.go +++ b/pkg/dns/sync_test.go @@ -103,11 +103,20 @@ var _ = Describe("DNS sync", func() { Spec: mesh_proto.Dataplane{ Networking: &mesh_proto.Dataplane_Networking{ Address: "192.168.0.1", + Ingress: &mesh_proto.Dataplane_Networking_Ingress{ + AvailableServices: []*mesh_proto.Dataplane_Networking_Ingress_AvailableService{ + { + Tags: map[string]string{ + "service": "backend", + }, + }, + }, + }, Inbound: []*mesh_proto.Dataplane_Networking_Inbound{ { Port: 1234, Tags: map[string]string{ - "service": "backend", + "cluster": "cluster-2", }, }, }, diff --git a/pkg/dns/vips_allocator.go b/pkg/dns/vips_allocator.go index bbdd3569f4fa..46e0252adecc 100644 --- a/pkg/dns/vips_allocator.go +++ b/pkg/dns/vips_allocator.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" "go.uber.org/multierr" + mesh_proto "github.com/Kong/kuma/api/mesh/v1alpha1" "github.com/Kong/kuma/pkg/core" core_mesh "github.com/Kong/kuma/pkg/core/resources/apis/mesh" "github.com/Kong/kuma/pkg/core/resources/manager" @@ -83,8 +84,14 @@ func (d *vipsAllocator) synchronize() error { // TODO: Do we need to reflect somehow the fact this service belongs to a particular `mesh` for _, dp := range dataplanes.Items { - for _, inbound := range dp.Spec.Networking.Inbound { - serviceMap[inbound.GetService()] = true + if dp.Spec.IsIngress() { + for _, service := range dp.Spec.Networking.Ingress.AvailableServices { + serviceMap[service.Tags[mesh_proto.ServiceTag]] = true + } + } else { + for _, inbound := range dp.Spec.Networking.Inbound { + serviceMap[inbound.GetService()] = true + } } } } diff --git a/pkg/xds/server/components.go b/pkg/xds/server/components.go index 85efac8473c8..289adfd5df92 100644 --- a/pkg/xds/server/components.go +++ b/pkg/xds/server/components.go @@ -142,7 +142,7 @@ func DefaultDataplaneSyncTracker(rt core_runtime.Runtime, reconciler, ingressRec return err } destinations := ingress.BuildDestinationMap(dataplane) - endpoints := xds_topology.BuildEndpointMap(destinations, dataplanes.Items) + endpoints := xds_topology.BuildEndpointMap(destinations, dataplanes.Items, rt.Config().General.ClusterName) proxy := xds.Proxy{ Id: proxyID, Dataplane: dataplane, @@ -170,7 +170,7 @@ func DefaultDataplaneSyncTracker(rt core_runtime.Runtime, reconciler, ingressRec destinations := xds_topology.BuildDestinationMap(dataplane, routes) // resolve all endpoints that match given selectors - outbound, err := xds_topology.GetOutboundTargets(destinations, dataplanes) + outbound, err := xds_topology.GetOutboundTargets(destinations, dataplanes, rt.Config().General.ClusterName) if err != nil { return err } diff --git a/pkg/xds/topology/outbound.go b/pkg/xds/topology/outbound.go index 7f6217e75729..c22c6afe06ad 100644 --- a/pkg/xds/topology/outbound.go +++ b/pkg/xds/topology/outbound.go @@ -7,57 +7,59 @@ import ( ) // GetOutboundTargets resolves all endpoints reachable from a given dataplane. -func GetOutboundTargets(destinations core_xds.DestinationMap, dataplanes *mesh_core.DataplaneResourceList) (core_xds.EndpointMap, error) { +func GetOutboundTargets(destinations core_xds.DestinationMap, dataplanes *mesh_core.DataplaneResourceList, localClusterName string) (core_xds.EndpointMap, error) { if len(destinations) == 0 { return nil, nil } - return BuildEndpointMap(destinations, dataplanes.Items), nil + return BuildEndpointMap(destinations, dataplanes.Items, localClusterName), nil } // BuildEndpointMap creates a map of all endpoints that match given selectors. -func BuildEndpointMap(destinations core_xds.DestinationMap, dataplanes []*mesh_core.DataplaneResource) core_xds.EndpointMap { +func BuildEndpointMap(destinations core_xds.DestinationMap, dataplanes []*mesh_core.DataplaneResource, localClusterName string) core_xds.EndpointMap { if len(destinations) == 0 { return nil } outbound := core_xds.EndpointMap{} for _, dataplane := range dataplanes { - if dataplane.Spec.IsRemoteIngress() && dataplane.Spec.HasAvailableServices() { - for _, ingress := range dataplane.Spec.Networking.GetIngress().GetAvailableServices() { - service := ingress.Tags[mesh_proto.ServiceTag] + if dataplane.Spec.IsIngress() { + if dataplane.Spec.IsRemoteIngress(localClusterName) { + for _, ingress := range dataplane.Spec.Networking.GetIngress().GetAvailableServices() { + service := ingress.Tags[mesh_proto.ServiceTag] + selectors, ok := destinations[service] + if !ok { + continue + } + if !selectors.Matches(ingress.Tags) { + continue + } + outbound[service] = append(outbound[service], core_xds.Endpoint{ + Target: dataplane.Spec.Networking.Address, + Port: dataplane.Spec.Networking.Inbound[0].Port, + Tags: ingress.Tags, + Weight: ingress.Instances, + }) + } + } + } else { + for _, inbound := range dataplane.Spec.Networking.GetInbound() { + service := inbound.Tags[mesh_proto.ServiceTag] selectors, ok := destinations[service] if !ok { continue } - if !selectors.Matches(ingress.Tags) { + if !selectors.Matches(inbound.Tags) { continue } + iface := dataplane.Spec.Networking.ToInboundInterface(inbound) + // TODO(yskopets): do we need to dedup? + // TODO(yskopets): sort ? outbound[service] = append(outbound[service], core_xds.Endpoint{ - Target: dataplane.Spec.Networking.Address, - Port: dataplane.Spec.Networking.Inbound[0].Port, - Tags: ingress.Tags, - Weight: ingress.Instances, + Target: iface.DataplaneIP, + Port: iface.DataplanePort, + Tags: inbound.Tags, + Weight: 1, }) } - continue - } - for _, inbound := range dataplane.Spec.Networking.GetInbound() { - service := inbound.Tags[mesh_proto.ServiceTag] - selectors, ok := destinations[service] - if !ok { - continue - } - if !selectors.Matches(inbound.Tags) { - continue - } - iface := dataplane.Spec.Networking.ToInboundInterface(inbound) - // TODO(yskopets): do we need to dedup? - // TODO(yskopets): sort ? - outbound[service] = append(outbound[service], core_xds.Endpoint{ - Target: iface.DataplaneIP, - Port: iface.DataplanePort, - Tags: inbound.Tags, - Weight: 1, - }) } } return outbound diff --git a/pkg/xds/topology/outbound_test.go b/pkg/xds/topology/outbound_test.go index 202cf6aad17e..02e66356e06f 100644 --- a/pkg/xds/topology/outbound_test.go +++ b/pkg/xds/topology/outbound_test.go @@ -133,7 +133,7 @@ var _ = Describe("TrafficRoute", func() { } // when - targets, err := GetOutboundTargets(destinations, dataplanes) + targets, err := GetOutboundTargets(destinations, dataplanes, "cluster-1") // then Expect(err).ToNot(HaveOccurred()) @@ -168,7 +168,7 @@ var _ = Describe("TrafficRoute", func() { DescribeTable("should include only those dataplanes that match given selectors", func(given testCase) { // when - endpoints := BuildEndpointMap(given.destinations, given.dataplanes) + endpoints := BuildEndpointMap(given.destinations, given.dataplanes, "cluster-1") // then Expect(endpoints).To(Equal(given.expected)) }, diff --git a/pkg/xds/topology/vip_outbounds.go b/pkg/xds/topology/vip_outbounds.go index 48e906e4eab4..a0e13d5e85ec 100644 --- a/pkg/xds/topology/vip_outbounds.go +++ b/pkg/xds/topology/vip_outbounds.go @@ -2,10 +2,6 @@ package topology import ( "sort" - "strings" - - "github.com/pkg/errors" - "go.uber.org/multierr" mesh_proto "github.com/Kong/kuma/api/mesh/v1alpha1" mesh_core "github.com/Kong/kuma/pkg/core/resources/apis/mesh" @@ -23,22 +19,27 @@ func PatchDataplaneWithVIPOutbounds(dataplane *mesh_core.DataplaneResource, continue } - for _, inbound := range dp.Spec.Networking.Inbound { - inService := inbound.GetTags()[mesh_proto.ServiceTag] - - if _, found := serviceVIPMap[inService]; !found { - vip, err := resolver.ForwardLookup(inService) - if err != nil { - // TODO: remove this additional lookup once the service tag contains a `flat` service name - // try to get the first part of the FQDN service and look it up - split := strings.Split(inService, ".") - vip, err = resolver.ForwardLookup(split[0]) - if err != nil { - errs = multierr.Append(errs, errors.Wrapf(err, "unable to resolve %s", inService)) + if dp.Spec.IsIngress() { + for _, service := range dp.Spec.Networking.Ingress.AvailableServices { + inService := service.Tags[mesh_proto.ServiceTag] + if _, found := serviceVIPMap[inService]; !found { + vip, err := resolver.ForwardLookup(inService) + if err == nil { + serviceVIPMap[inService] = vip + services = append(services, inService) + } + } + } + } else { + for _, inbound := range dp.Spec.Networking.Inbound { + inService := inbound.GetTags()[mesh_proto.ServiceTag] + if _, found := serviceVIPMap[inService]; !found { + vip, err := resolver.ForwardLookup(inService) + if err == nil { + serviceVIPMap[inService] = vip + services = append(services, inService) } } - serviceVIPMap[inService] = vip - services = append(services, inService) } } }