Skip to content

Commit

Permalink
Use clusterIP for local services
Browse files Browse the repository at this point in the history
Fixes submariner-io#355

Signed-off-by: Vishal Thapar <5137689+vthapar@users.noreply.github.com>
  • Loading branch information
vthapar committed Oct 29, 2020
1 parent adb02bc commit d192abe
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 22 deletions.
48 changes: 35 additions & 13 deletions pkg/gateway/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Controller struct {
queue workqueue.Interface
stopCh chan struct{}
clusterStatusMap atomic.Value
localClusterID atomic.Value
gatewayAvailable bool
}

Expand All @@ -41,6 +42,7 @@ func NewController() *Controller {
gatewayAvailable: true,
}
controller.clusterStatusMap.Store(make(map[string]bool))
controller.localClusterID.Store("")

return controller
}
Expand Down Expand Up @@ -117,10 +119,18 @@ func (c *Controller) processNextGateway(key, name, ns string) (bool, error) {
}

func (c *Controller) gatewayCreatedOrUpdated(obj *unstructured.Unstructured) {
connections, ok := getGatewayStatus(obj)
connections, localClusterID, ok := getGatewayStatus(obj)
if !ok {
return
}

// Updating
c.updateLocalClusterID(localClusterID)

c.updateClusterStatusMap(connections)
}

func (c *Controller) updateClusterStatusMap(connections []interface{}) {
var newMap map[string]bool

currentMap := c.getClusterStatusMap()
Expand All @@ -133,28 +143,28 @@ func (c *Controller) gatewayCreatedOrUpdated(obj *unstructured.Unstructured) {
klog.Errorf("status field not found in %#v", connectionMap)
}

clusterId, found, err := unstructured.NestedString(connectionMap, "endpoint", "cluster_id")
clusterID, found, err := unstructured.NestedString(connectionMap, "endpoint", "cluster_id")
if !found || err != nil {
klog.Errorf("cluster_id field not found in %#v", connectionMap)
continue
}

if status == "connected" {
_, found := currentMap[clusterId]
_, found := currentMap[clusterID]
if !found {
if newMap == nil {
newMap = copyMap(currentMap)
}

newMap[clusterId] = true
newMap[clusterID] = true
}
} else {
_, found = currentMap[clusterId]
_, found = currentMap[clusterID]
if found {
if newMap == nil {
newMap = copyMap(currentMap)
}
delete(newMap, clusterId)
delete(newMap, clusterID)
}
}
}
Expand All @@ -165,22 +175,30 @@ func (c *Controller) gatewayCreatedOrUpdated(obj *unstructured.Unstructured) {
}
}

func getGatewayStatus(obj *unstructured.Unstructured) (connections []interface{}, gwStatus bool) {
func (c *Controller) updateLocalClusterID(clusterID string) {
if clusterID != "" && clusterID != c.localClusterID.Load() {
c.localClusterID.Store(clusterID)
}
}

func getGatewayStatus(obj *unstructured.Unstructured) (connections []interface{}, clusterID string, gwStatus bool) {
status, found, err := unstructured.NestedMap(obj.Object, "status")
if !found || err != nil {
klog.Errorf("status field not found in %#v, err was: %v", obj, err)
return nil, false
return nil, "", false
}

localClusterId, found, err := unstructured.NestedString(status, "localEndpoint", "cluster_id")
localClusterID, found, err := unstructured.NestedString(status, "localEndpoint", "cluster_id")

if !found || err != nil {
klog.Errorf("localEndpoint->cluster_id not found in %#v, err was: %v", status, err)

localClusterID = ""
} else {
connections = append(connections, map[string]interface{}{
"status": "connected",
"endpoint": map[string]interface{}{
"cluster_id": localClusterId,
"cluster_id": localClusterID,
},
})
}
Expand All @@ -189,20 +207,20 @@ func getGatewayStatus(obj *unstructured.Unstructured) (connections []interface{}

if !found || err != nil {
klog.Errorf("haStatus field not found in %#v, err was: %v", status, err)
return connections, true
return connections, localClusterID, true
}

if haStatus == "active" {
rconns, _, err := unstructured.NestedSlice(status, "connections")
if err != nil {
klog.Errorf("connections field not found in %#v, err was: %v", status, err)
return connections, false
return connections, localClusterID, false
}

connections = append(connections, rconns...)
}

return connections, true
return connections, localClusterID, true
}

func (c *Controller) getClusterStatusMap() map[string]bool {
Expand All @@ -213,6 +231,10 @@ func (c *Controller) IsConnected(clusterId string) bool {
return !c.gatewayAvailable || c.getClusterStatusMap()[clusterId]
}

func (c *Controller) LocalClusterID() string {
return c.localClusterID.Load().(string)
}

func (c *Controller) getCheckedClientset(kubeConfig *rest.Config) (dynamic.ResourceInterface, error) {
clientSet, err := c.NewClientset(kubeConfig)
if err != nil {
Expand Down
89 changes: 89 additions & 0 deletions pkg/service/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package service

import (
"fmt"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
)

type Controller struct {
// Indirection hook for unit tests to supply fake client sets
NewClientset func(kubeConfig *rest.Config) (kubernetes.Interface, error)
svcInformer cache.Controller
svcStore cache.Store
stopCh chan struct{}
}

func NewController() *Controller {
return &Controller{
NewClientset: func(c *rest.Config) (kubernetes.Interface, error) {
return kubernetes.NewForConfig(c)
},
stopCh: make(chan struct{}),
}
}

func (c *Controller) Start(kubeConfig *rest.Config) error {
klog.Infof("Starting Services Controller")

clientSet, err := c.NewClientset(kubeConfig)
if err != nil {
return fmt.Errorf("error creating client set: %v", err)
}

c.svcStore, c.svcInformer = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return clientSet.CoreV1().Services(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientSet.CoreV1().Services(metav1.NamespaceAll).Watch(options)
},
},
&v1.Service{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
UpdateFunc: func(old interface{}, new interface{}) {},
DeleteFunc: func(obj interface{}) {},
},
)

go c.svcInformer.Run(c.stopCh)

return nil
}

func (c *Controller) Stop() {
close(c.stopCh)

klog.Infof("Services Controller stopped")
}

func (c *Controller) GetIp(name, namespace string) (string, bool) {
key := namespace + "/" + name
obj, exists, err := c.svcStore.GetByKey(key)

if err != nil {
return "", false
}

if !exists {
return "", false
}

svc := obj.(*v1.Service)

if svc.Spec.Type == v1.ServiceTypeClusterIP && svc.Spec.ClusterIP != "" {
return svc.Spec.ClusterIP, true
}

return "", false
}
36 changes: 32 additions & 4 deletions plugin/lighthouse/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,17 @@ func (lh *Lighthouse) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns
return lh.nextOrFailure(state.Name(), ctx, w, r, dns.RcodeNameError, "Only services supported")
}

var ips []string
var found bool
ip, found := lh.serviceImports.GetIP(pReq.namespace, pReq.service, pReq.cluster, lh.clusterStatus.IsConnected,
lh.endpointsStatus.IsHealthy)
var (
ips []string
found bool
ip string
)

ip, found = lh.getLocalSvcIp(pReq)
if !found {
ip, found = lh.serviceImports.GetIP(pReq.namespace, pReq.service, pReq.cluster, lh.clusterStatus.IsConnected,
lh.endpointsStatus.IsHealthy)
}

if !found {
ips, found = lh.endpointSlices.GetIPs(pReq.hostname, pReq.cluster, pReq.namespace, pReq.service, lh.clusterStatus.IsConnected)
Expand Down Expand Up @@ -108,6 +115,27 @@ func (lh *Lighthouse) emptyResponse(state request.Request) (int, error) {
return dns.RcodeSuccess, nil
}

func (lh *Lighthouse) getLocalSvcIp(pReq recordRequest) (ip string, found bool) {
var localClusterIP string

localClusterID := lh.clusterStatus.LocalClusterID()

if pReq.cluster == "" || pReq.cluster == localClusterID {
localClusterIP, found = lh.localServices.GetIp(pReq.service, pReq.namespace)
if found {
ip, _ = lh.serviceImports.GetIP(pReq.namespace, pReq.service, localClusterID, lh.clusterStatus.IsConnected,
lh.endpointsStatus.IsHealthy)

if ip != "" {
// for local service, return localClusterIP as ip can be globalIP if globalnet is enabled
return localClusterIP, true
}
}
}

return "", false
}

// Name implements the Handler interface.
func (lh *Lighthouse) Name() string {
return "lighthouse"
Expand Down
19 changes: 19 additions & 0 deletions plugin/lighthouse/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type FailingResponseWriter struct {

type MockClusterStatus struct {
clusterStatusMap map[string]bool
localClusterID string
}

func NewMockClusterStatus() *MockClusterStatus {
Expand All @@ -66,6 +67,22 @@ func (m *MockEndpointStatus) IsHealthy(name, namespace, clusterId string) bool {
return m.endpointStatusMap[clusterId]
}

func (m *MockClusterStatus) LocalClusterID() string {
return m.localClusterID
}

type MockLocalServices struct {
localServiceIp string
}

func NewMockLocalServices() *MockLocalServices {
return &MockLocalServices{localServiceIp: ""}
}

func (m *MockLocalServices) GetIp(name, namespace string) (string, bool) {
return m.localServiceIp, m.localServiceIp != ""
}

func (w *FailingResponseWriter) WriteMsg(m *dns.Msg) error {
return errors.New(w.errorMsg)
}
Expand All @@ -81,13 +98,15 @@ func testWithoutFallback() {
mockCs.clusterStatusMap[clusterID] = true
mockEs := NewMockEndpointStatus()
mockEs.endpointStatusMap[clusterID] = true
mockLs := NewMockLocalServices()

lh = &Lighthouse{
Zones: []string{"clusterset.local."},
serviceImports: setupServiceImportMap(),
endpointSlices: setupEndpointSliceMap(),
clusterStatus: mockCs,
endpointsStatus: mockEs,
localServices: mockLs,
ttl: defaultTtl,
}

Expand Down
7 changes: 7 additions & 0 deletions plugin/lighthouse/lighthouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,17 @@ type Lighthouse struct {
endpointSlices *endpointslice.Map
clusterStatus ClusterStatus
endpointsStatus EndpointsStatus
localServices LocalServices
}

type ClusterStatus interface {
IsConnected(clusterId string) bool

LocalClusterID() string
}

type LocalServices interface {
GetIp(name, namespace string) (string, bool)
}

type EndpointsStatus interface {
Expand Down
11 changes: 10 additions & 1 deletion plugin/lighthouse/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/coredns/coredns/plugin"
"github.com/submariner-io/lighthouse/pkg/endpointslice"
"github.com/submariner-io/lighthouse/pkg/gateway"
"github.com/submariner-io/lighthouse/pkg/service"
"github.com/submariner-io/lighthouse/pkg/serviceimport"
"k8s.io/client-go/tools/clientcmd"
)
Expand Down Expand Up @@ -76,14 +77,22 @@ func lighthouseParse(c *caddy.Controller) (*Lighthouse, error) {
return nil, fmt.Errorf("error starting the Gateway controller: %v", err)
}

svcController := service.NewController()
err = svcController.Start(cfg)
if err != nil {
return nil, fmt.Errorf("error starting the Service controller: %v", err)
}

c.OnShutdown(func() error {
siController.Stop()
epController.Stop()
gwController.Stop()
svcController.Stop()
return nil
})

lh := &Lighthouse{ttl: defaultTtl, serviceImports: siMap, clusterStatus: gwController, endpointSlices: epMap,
endpointsStatus: epController}
endpointsStatus: epController, localServices: svcController}

// Changed `for` to `if` to satisfy golint:
// SA4004: the surrounding loop is unconditionally terminated (staticcheck)
Expand Down
2 changes: 2 additions & 0 deletions test/e2e/discovery/headless_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
)

var _ = Describe("[discovery] Test Headless Service Discovery Across Clusters", func() {

f := lhframework.NewFramework("discovery")

When("a pod tries to resolve a headless service in a remote cluster", func() {
Expand Down Expand Up @@ -42,6 +43,7 @@ var _ = Describe("[discovery] Test Headless Service Discovery Across Clusters",
}
})
})

})

func RunHeadlessDiscoveryTest(f *lhframework.Framework) {
Expand Down
Loading

0 comments on commit d192abe

Please sign in to comment.