Skip to content

Commit

Permalink
Use clusterIP for local services (#356)
Browse files Browse the repository at this point in the history
* Use clusterIP for local services

Fixes #355

Signed-off-by: Vishal Thapar <5137689+vthapar@users.noreply.github.com>
  • Loading branch information
vthapar committed Dec 1, 2020
1 parent 4a91f9c commit 7aea9c7
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 42 deletions.
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,10 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5
github.com/mholt/certmagic v0.8.3/go.mod h1:91uJzK5K8IWtYQqTi5R2tsxV1pCde+wdGfaRaOZi6aQ=
github.com/miekg/dns v1.1.15/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.27/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM=
github.com/miekg/dns v1.1.31 h1:sJFOl9BgwbYAWOGEwr61FU28pqsBNdpRBnhGXtO06Oo=
github.com/miekg/dns v1.1.31/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM=
github.com/miekg/dns v1.1.34 h1:SgTzfkN+oLoIHF1bgUP+C71mzuDl3AhLApHzCCIAMWM=
github.com/miekg/dns v1.1.34/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM=
github.com/miekg/dns v1.1.35 h1:oTfOaDH+mZkdcgdIjH6yBajRGtIwcwcaR+rt23ZSrJs=
github.com/miekg/dns v1.1.35/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
Expand Down
91 changes: 91 additions & 0 deletions pkg/service/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package service

import (
"fmt"

"github.com/submariner-io/admiral/pkg/log"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
)

type Controller struct {
// Indirection hook for unit tests to supply fake client sets
NewClientset func(kubeConfig *rest.Config) (kubernetes.Interface, error)
svcInformer cache.Controller
svcStore cache.Store
stopCh chan struct{}
}

func NewController() *Controller {
return &Controller{
NewClientset: func(c *rest.Config) (kubernetes.Interface, error) {
return kubernetes.NewForConfig(c)
},
stopCh: make(chan struct{}),
}
}

func (c *Controller) Start(kubeConfig *rest.Config) error {
klog.Infof("Starting Services Controller")

clientSet, err := c.NewClientset(kubeConfig)
if err != nil {
return fmt.Errorf("error creating client set: %v", err)
}

c.svcStore, c.svcInformer = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return clientSet.CoreV1().Services(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientSet.CoreV1().Services(metav1.NamespaceAll).Watch(options)
},
},
&v1.Service{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
UpdateFunc: func(old interface{}, new interface{}) {},
DeleteFunc: func(obj interface{}) {},
},
)

go c.svcInformer.Run(c.stopCh)

return nil
}

func (c *Controller) Stop() {
close(c.stopCh)

klog.Infof("Services Controller stopped")
}

func (c *Controller) GetIP(name, namespace string) (string, bool) {
key := namespace + "/" + name
obj, exists, err := c.svcStore.GetByKey(key)

if err != nil {
klog.V(log.DEBUG).Infof("Error trying to get service for key %q", key)
return "", false
}

if !exists {
return "", false
}

svc := obj.(*v1.Service)

if svc.Spec.Type == v1.ServiceTypeClusterIP && svc.Spec.ClusterIP != "" {
return svc.Spec.ClusterIP, true
}

return "", false
}
16 changes: 8 additions & 8 deletions pkg/serviceimport/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (m *Map) selectIP(queue []clusterInfo, counter *uint64, name, namespace str
}

func (m *Map) GetIP(namespace, name, cluster, localCluster string, checkCluster func(string) bool,
checkEndpoint func(string, string, string) bool) (string, bool) {
checkEndpoint func(string, string, string) bool) (ip string, found, isLocal bool) {
clusterIPs, queue, counter, isHeadless := func() (map[string]string, []clusterInfo, *uint64, bool) {
m.RLock()
defer m.RUnlock()
Expand All @@ -68,13 +68,13 @@ func (m *Map) GetIP(namespace, name, cluster, localCluster string, checkCluster
}()

if clusterIPs == nil || isHeadless {
return "", false
return "", false, false
}

// If a clusterId is specified, we supply it even if the service is not there
if cluster != "" {
ip, found := clusterIPs[cluster]
return ip, found
ip, found = clusterIPs[cluster]
return ip, found, cluster == localCluster
}

// If we are aware of the local cluster
Expand All @@ -83,17 +83,17 @@ func (m *Map) GetIP(namespace, name, cluster, localCluster string, checkCluster
ip, found := clusterIPs[localCluster]

if found && ip != "" && checkEndpoint(name, namespace, localCluster) {
return ip, found
return ip, found, true
}
}

// Fall back to Round-Robin if service is not presented in the local cluster
ip := m.selectIP(queue, counter, name, namespace, checkCluster, checkEndpoint)
ip = m.selectIP(queue, counter, name, namespace, checkCluster, checkEndpoint)
if ip != "" {
return ip, true
return ip, true, false
}

return "", true
return "", true, false
}

func NewMap() *Map {
Expand Down
4 changes: 2 additions & 2 deletions pkg/serviceimport/map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ var _ = Describe("ServiceImport Map", func() {
}

expectIPsNotFound := func(ns, service, cluster, localCluster string) {
_, found := serviceImportMap.GetIP(ns, service, cluster, localCluster, checkCluster, checkEndpoint)
_, found, _ := serviceImportMap.GetIP(ns, service, cluster, localCluster, checkCluster, checkEndpoint)
Expect(found).To(BeFalse())
}

getIPExpectFound := func(ns, name, cluster, localCluster string) string {
ip, found := serviceImportMap.GetIP(ns, name, cluster, localCluster, checkCluster, checkEndpoint)
ip, found, _ := serviceImportMap.GetIP(ns, name, cluster, localCluster, checkCluster, checkEndpoint)
Expect(found).To(BeTrue())
return ip
}
Expand Down
26 changes: 20 additions & 6 deletions plugin/lighthouse/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ func (lh *Lighthouse) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns
return lh.nextOrFailure(state.Name(), ctx, w, r, dns.RcodeNameError, "Only services supported")
}

var ips []string
var found bool
var (
ips []string
found bool
ip string
)

localClusterID := lh.clusterStatus.LocalClusterID()

ip, found := lh.serviceImports.GetIP(pReq.namespace, pReq.service, pReq.cluster, localClusterID, lh.clusterStatus.IsConnected,
lh.endpointsStatus.IsHealthy)
ip, found = lh.getClusterIpForSvc(pReq)

if !found {
ips, found = lh.endpointSlices.GetIPs(pReq.hostname, pReq.cluster, pReq.namespace, pReq.service, lh.clusterStatus.IsConnected)
Expand Down Expand Up @@ -111,6 +111,20 @@ func (lh *Lighthouse) emptyResponse(state request.Request) (int, error) {
return dns.RcodeSuccess, nil
}

func (lh *Lighthouse) getClusterIpForSvc(pReq recordRequest) (ip string, found bool) {
localClusterID := lh.clusterStatus.LocalClusterID()

ip, found, isLocal := lh.serviceImports.GetIP(pReq.namespace, pReq.service, pReq.cluster, localClusterID, lh.clusterStatus.IsConnected,
lh.endpointsStatus.IsHealthy)

getLocal := isLocal || (pReq.cluster != "" && pReq.cluster == localClusterID)
if found && getLocal {
ip, found = lh.localServices.GetIP(pReq.service, pReq.namespace)
}

return ip, found
}

// Name implements the Handler interface.
func (lh *Lighthouse) Name() string {
return "lighthouse"
Expand Down
Loading

0 comments on commit 7aea9c7

Please sign in to comment.