Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kuma-cp) outbound generation and dns service map #860

Merged
merged 1 commit into from
Jun 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions api/mesh/v1alpha1/dataplane_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,15 +392,15 @@ func (d *Dataplane) HasAvailableServices() bool {
return len(d.Networking.Ingress.AvailableServices) != 0
}

func (d *Dataplane) IsRemoteIngress() bool {
func (d *Dataplane) IsRemoteIngress(localClusterName string) bool {
if !d.IsIngress() {
return false
}
// todo: take into account value itself, not just presence of the 'cluster' tag
if _, ok := d.Networking.Inbound[0].Tags["cluster"]; ok {
return true
cluster, ok := d.Networking.Inbound[0].Tags["cluster"]
if !ok {
return false
}
return false
return cluster != localClusterName
}

func (t MultiValueTagSet) String() string {
Expand Down
11 changes: 10 additions & 1 deletion pkg/dns/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,20 @@ var _ = Describe("DNS sync", func() {
Spec: mesh_proto.Dataplane{
Networking: &mesh_proto.Dataplane_Networking{
Address: "192.168.0.1",
Ingress: &mesh_proto.Dataplane_Networking_Ingress{
AvailableServices: []*mesh_proto.Dataplane_Networking_Ingress_AvailableService{
{
Tags: map[string]string{
"service": "backend",
},
},
},
},
Inbound: []*mesh_proto.Dataplane_Networking_Inbound{
{
Port: 1234,
Tags: map[string]string{
"service": "backend",
"cluster": "cluster-2",
},
},
},
Expand Down
11 changes: 9 additions & 2 deletions pkg/dns/vips_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"
"go.uber.org/multierr"

mesh_proto "github.com/Kong/kuma/api/mesh/v1alpha1"
"github.com/Kong/kuma/pkg/core"
core_mesh "github.com/Kong/kuma/pkg/core/resources/apis/mesh"
"github.com/Kong/kuma/pkg/core/resources/manager"
Expand Down Expand Up @@ -83,8 +84,14 @@ func (d *vipsAllocator) synchronize() error {

// TODO: Do we need to reflect somehow the fact this service belongs to a particular `mesh`
for _, dp := range dataplanes.Items {
for _, inbound := range dp.Spec.Networking.Inbound {
serviceMap[inbound.GetService()] = true
if dp.Spec.IsIngress() {
for _, service := range dp.Spec.Networking.Ingress.AvailableServices {
serviceMap[service.Tags[mesh_proto.ServiceTag]] = true
}
} else {
for _, inbound := range dp.Spec.Networking.Inbound {
serviceMap[inbound.GetService()] = true
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/xds/server/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func DefaultDataplaneSyncTracker(rt core_runtime.Runtime, reconciler, ingressRec
return err
}
destinations := ingress.BuildDestinationMap(dataplane)
endpoints := xds_topology.BuildEndpointMap(destinations, dataplanes.Items)
endpoints := xds_topology.BuildEndpointMap(destinations, dataplanes.Items, rt.Config().General.ClusterName)
proxy := xds.Proxy{
Id: proxyID,
Dataplane: dataplane,
Expand Down Expand Up @@ -170,7 +170,7 @@ func DefaultDataplaneSyncTracker(rt core_runtime.Runtime, reconciler, ingressRec
destinations := xds_topology.BuildDestinationMap(dataplane, routes)

// resolve all endpoints that match given selectors
outbound, err := xds_topology.GetOutboundTargets(destinations, dataplanes)
outbound, err := xds_topology.GetOutboundTargets(destinations, dataplanes, rt.Config().General.ClusterName)
if err != nil {
return err
}
Expand Down
64 changes: 33 additions & 31 deletions pkg/xds/topology/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,57 +7,59 @@ import (
)

// GetOutboundTargets resolves all endpoints reachable from a given dataplane.
func GetOutboundTargets(destinations core_xds.DestinationMap, dataplanes *mesh_core.DataplaneResourceList) (core_xds.EndpointMap, error) {
func GetOutboundTargets(destinations core_xds.DestinationMap, dataplanes *mesh_core.DataplaneResourceList, localClusterName string) (core_xds.EndpointMap, error) {
if len(destinations) == 0 {
return nil, nil
}
return BuildEndpointMap(destinations, dataplanes.Items), nil
return BuildEndpointMap(destinations, dataplanes.Items, localClusterName), nil
}

// BuildEndpointMap creates a map of all endpoints that match given selectors.
func BuildEndpointMap(destinations core_xds.DestinationMap, dataplanes []*mesh_core.DataplaneResource) core_xds.EndpointMap {
func BuildEndpointMap(destinations core_xds.DestinationMap, dataplanes []*mesh_core.DataplaneResource, localClusterName string) core_xds.EndpointMap {
if len(destinations) == 0 {
return nil
}
outbound := core_xds.EndpointMap{}
for _, dataplane := range dataplanes {
if dataplane.Spec.IsRemoteIngress() && dataplane.Spec.HasAvailableServices() {
for _, ingress := range dataplane.Spec.Networking.GetIngress().GetAvailableServices() {
service := ingress.Tags[mesh_proto.ServiceTag]
if dataplane.Spec.IsIngress() {
if dataplane.Spec.IsRemoteIngress(localClusterName) {
for _, ingress := range dataplane.Spec.Networking.GetIngress().GetAvailableServices() {
service := ingress.Tags[mesh_proto.ServiceTag]
selectors, ok := destinations[service]
if !ok {
continue
}
if !selectors.Matches(ingress.Tags) {
continue
}
outbound[service] = append(outbound[service], core_xds.Endpoint{
Target: dataplane.Spec.Networking.Address,
Port: dataplane.Spec.Networking.Inbound[0].Port,
Tags: ingress.Tags,
Weight: ingress.Instances,
})
}
}
} else {
for _, inbound := range dataplane.Spec.Networking.GetInbound() {
service := inbound.Tags[mesh_proto.ServiceTag]
selectors, ok := destinations[service]
if !ok {
continue
}
if !selectors.Matches(ingress.Tags) {
if !selectors.Matches(inbound.Tags) {
continue
}
iface := dataplane.Spec.Networking.ToInboundInterface(inbound)
// TODO(yskopets): do we need to dedup?
// TODO(yskopets): sort ?
outbound[service] = append(outbound[service], core_xds.Endpoint{
Target: dataplane.Spec.Networking.Address,
Port: dataplane.Spec.Networking.Inbound[0].Port,
Tags: ingress.Tags,
Weight: ingress.Instances,
Target: iface.DataplaneIP,
Port: iface.DataplanePort,
Tags: inbound.Tags,
Weight: 1,
})
}
continue
}
for _, inbound := range dataplane.Spec.Networking.GetInbound() {
service := inbound.Tags[mesh_proto.ServiceTag]
selectors, ok := destinations[service]
if !ok {
continue
}
if !selectors.Matches(inbound.Tags) {
continue
}
iface := dataplane.Spec.Networking.ToInboundInterface(inbound)
// TODO(yskopets): do we need to dedup?
// TODO(yskopets): sort ?
outbound[service] = append(outbound[service], core_xds.Endpoint{
Target: iface.DataplaneIP,
Port: iface.DataplanePort,
Tags: inbound.Tags,
Weight: 1,
})
}
}
return outbound
Expand Down
4 changes: 2 additions & 2 deletions pkg/xds/topology/outbound_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ var _ = Describe("TrafficRoute", func() {
}

// when
targets, err := GetOutboundTargets(destinations, dataplanes)
targets, err := GetOutboundTargets(destinations, dataplanes, "cluster-1")

// then
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -168,7 +168,7 @@ var _ = Describe("TrafficRoute", func() {
DescribeTable("should include only those dataplanes that match given selectors",
func(given testCase) {
// when
endpoints := BuildEndpointMap(given.destinations, given.dataplanes)
endpoints := BuildEndpointMap(given.destinations, given.dataplanes, "cluster-1")
// then
Expect(endpoints).To(Equal(given.expected))
},
Expand Down
37 changes: 19 additions & 18 deletions pkg/xds/topology/vip_outbounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ package topology

import (
"sort"
"strings"

"github.com/pkg/errors"
"go.uber.org/multierr"

mesh_proto "github.com/Kong/kuma/api/mesh/v1alpha1"
mesh_core "github.com/Kong/kuma/pkg/core/resources/apis/mesh"
Expand All @@ -23,22 +19,27 @@ func PatchDataplaneWithVIPOutbounds(dataplane *mesh_core.DataplaneResource,
continue
}

for _, inbound := range dp.Spec.Networking.Inbound {
inService := inbound.GetTags()[mesh_proto.ServiceTag]

if _, found := serviceVIPMap[inService]; !found {
vip, err := resolver.ForwardLookup(inService)
if err != nil {
// TODO: remove this additional lookup once the service tag contains a `flat` service name
// try to get the first part of the FQDN service and look it up
split := strings.Split(inService, ".")
vip, err = resolver.ForwardLookup(split[0])
if err != nil {
errs = multierr.Append(errs, errors.Wrapf(err, "unable to resolve %s", inService))
if dp.Spec.IsIngress() {
for _, service := range dp.Spec.Networking.Ingress.AvailableServices {
inService := service.Tags[mesh_proto.ServiceTag]
if _, found := serviceVIPMap[inService]; !found {
vip, err := resolver.ForwardLookup(inService)
if err == nil {
serviceVIPMap[inService] = vip
services = append(services, inService)
}
}
}
} else {
for _, inbound := range dp.Spec.Networking.Inbound {
inService := inbound.GetTags()[mesh_proto.ServiceTag]
if _, found := serviceVIPMap[inService]; !found {
vip, err := resolver.ForwardLookup(inService)
if err == nil {
serviceVIPMap[inService] = vip
services = append(services, inService)
}
}
serviceVIPMap[inService] = vip
services = append(services, inService)
}
}
}
Expand Down