From 7aea9c7d2f5575c0773cb6101d56a5b687c3ba03 Mon Sep 17 00:00:00 2001 From: Vishal Thapar <5137689+vthapar@users.noreply.github.com> Date: Wed, 2 Dec 2020 00:33:37 +0530 Subject: [PATCH] Use clusterIP for local services (#356) * Use clusterIP for local services Fixes #355 Signed-off-by: Vishal Thapar <5137689+vthapar@users.noreply.github.com> --- go.sum | 4 + pkg/service/controller.go | 91 ++++++++++++++++++ pkg/serviceimport/map.go | 16 +-- pkg/serviceimport/map_test.go | 4 +- plugin/lighthouse/handler.go | 26 +++-- plugin/lighthouse/handler_test.go | 123 +++++++++++++++++++++++- plugin/lighthouse/lighthouse.go | 5 + plugin/lighthouse/setup.go | 11 ++- test/e2e/discovery/headless_services.go | 1 + test/e2e/discovery/service_discovery.go | 43 +++++---- 10 files changed, 282 insertions(+), 42 deletions(-) create mode 100644 pkg/service/controller.go diff --git a/go.sum b/go.sum index bd4ddfd25..b25fb2b9f 100644 --- a/go.sum +++ b/go.sum @@ -409,6 +409,10 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5 github.com/mholt/certmagic v0.8.3/go.mod h1:91uJzK5K8IWtYQqTi5R2tsxV1pCde+wdGfaRaOZi6aQ= github.com/miekg/dns v1.1.15/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/miekg/dns v1.1.27/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM= +github.com/miekg/dns v1.1.31 h1:sJFOl9BgwbYAWOGEwr61FU28pqsBNdpRBnhGXtO06Oo= +github.com/miekg/dns v1.1.31/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM= +github.com/miekg/dns v1.1.34 h1:SgTzfkN+oLoIHF1bgUP+C71mzuDl3AhLApHzCCIAMWM= +github.com/miekg/dns v1.1.34/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM= github.com/miekg/dns v1.1.35 h1:oTfOaDH+mZkdcgdIjH6yBajRGtIwcwcaR+rt23ZSrJs= github.com/miekg/dns v1.1.35/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= diff --git a/pkg/service/controller.go b/pkg/service/controller.go new file mode 100644 index 000000000..cb722856b --- /dev/null +++ b/pkg/service/controller.go @@ -0,0 +1,91 @@ +package service + +import ( + "fmt" + + "github.com/submariner-io/admiral/pkg/log" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/klog" +) + +type Controller struct { + // Indirection hook for unit tests to supply fake client sets + NewClientset func(kubeConfig *rest.Config) (kubernetes.Interface, error) + svcInformer cache.Controller + svcStore cache.Store + stopCh chan struct{} +} + +func NewController() *Controller { + return &Controller{ + NewClientset: func(c *rest.Config) (kubernetes.Interface, error) { + return kubernetes.NewForConfig(c) + }, + stopCh: make(chan struct{}), + } +} + +func (c *Controller) Start(kubeConfig *rest.Config) error { + klog.Infof("Starting Services Controller") + + clientSet, err := c.NewClientset(kubeConfig) + if err != nil { + return fmt.Errorf("error creating client set: %v", err) + } + + c.svcStore, c.svcInformer = cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return clientSet.CoreV1().Services(metav1.NamespaceAll).List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return clientSet.CoreV1().Services(metav1.NamespaceAll).Watch(options) + }, + }, + &v1.Service{}, + 0, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) {}, + UpdateFunc: func(old interface{}, new interface{}) {}, + DeleteFunc: func(obj interface{}) {}, + }, + ) + + go c.svcInformer.Run(c.stopCh) + + return nil +} + +func (c *Controller) Stop() { + close(c.stopCh) + + klog.Infof("Services Controller stopped") +} + +func (c *Controller) GetIP(name, namespace string) (string, bool) { + key := namespace + "/" + name + obj, exists, err := c.svcStore.GetByKey(key) + + if err != nil { + klog.V(log.DEBUG).Infof("Error trying to get service for key %q", key) + return "", false + } + + if !exists { + return "", false + } + + svc := obj.(*v1.Service) + + if svc.Spec.Type == v1.ServiceTypeClusterIP && svc.Spec.ClusterIP != "" { + return svc.Spec.ClusterIP, true + } + + return "", false +} diff --git a/pkg/serviceimport/map.go b/pkg/serviceimport/map.go index 7f92e56f5..0cb270979 100644 --- a/pkg/serviceimport/map.go +++ b/pkg/serviceimport/map.go @@ -54,7 +54,7 @@ func (m *Map) selectIP(queue []clusterInfo, counter *uint64, name, namespace str } func (m *Map) GetIP(namespace, name, cluster, localCluster string, checkCluster func(string) bool, - checkEndpoint func(string, string, string) bool) (string, bool) { + checkEndpoint func(string, string, string) bool) (ip string, found, isLocal bool) { clusterIPs, queue, counter, isHeadless := func() (map[string]string, []clusterInfo, *uint64, bool) { m.RLock() defer m.RUnlock() @@ -68,13 +68,13 @@ func (m *Map) GetIP(namespace, name, cluster, localCluster string, checkCluster }() if clusterIPs == nil || isHeadless { - return "", false + return "", false, false } // If a clusterId is specified, we supply it even if the service is not there if cluster != "" { - ip, found := clusterIPs[cluster] - return ip, found + ip, found = clusterIPs[cluster] + return ip, found, cluster == localCluster } // If we are aware of the local cluster @@ -83,17 +83,17 @@ func (m *Map) GetIP(namespace, name, cluster, localCluster string, checkCluster ip, found := clusterIPs[localCluster] if found && ip != "" && checkEndpoint(name, namespace, localCluster) { - return ip, found + return ip, found, true } } // Fall back to Round-Robin if service is not presented in the local cluster - ip := m.selectIP(queue, counter, name, namespace, checkCluster, checkEndpoint) + ip = m.selectIP(queue, counter, name, namespace, checkCluster, checkEndpoint) if ip != "" { - return ip, true + return ip, true, false } - return "", true + return "", true, false } func NewMap() *Map { diff --git a/pkg/serviceimport/map_test.go b/pkg/serviceimport/map_test.go index 7bcb9869b..d2e40305e 100644 --- a/pkg/serviceimport/map_test.go +++ b/pkg/serviceimport/map_test.go @@ -40,12 +40,12 @@ var _ = Describe("ServiceImport Map", func() { } expectIPsNotFound := func(ns, service, cluster, localCluster string) { - _, found := serviceImportMap.GetIP(ns, service, cluster, localCluster, checkCluster, checkEndpoint) + _, found, _ := serviceImportMap.GetIP(ns, service, cluster, localCluster, checkCluster, checkEndpoint) Expect(found).To(BeFalse()) } getIPExpectFound := func(ns, name, cluster, localCluster string) string { - ip, found := serviceImportMap.GetIP(ns, name, cluster, localCluster, checkCluster, checkEndpoint) + ip, found, _ := serviceImportMap.GetIP(ns, name, cluster, localCluster, checkCluster, checkEndpoint) Expect(found).To(BeTrue()) return ip } diff --git a/plugin/lighthouse/handler.go b/plugin/lighthouse/handler.go index 97356b7b9..c2d49a52d 100644 --- a/plugin/lighthouse/handler.go +++ b/plugin/lighthouse/handler.go @@ -44,13 +44,13 @@ func (lh *Lighthouse) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns return lh.nextOrFailure(state.Name(), ctx, w, r, dns.RcodeNameError, "Only services supported") } - var ips []string - var found bool + var ( + ips []string + found bool + ip string + ) - localClusterID := lh.clusterStatus.LocalClusterID() - - ip, found := lh.serviceImports.GetIP(pReq.namespace, pReq.service, pReq.cluster, localClusterID, lh.clusterStatus.IsConnected, - lh.endpointsStatus.IsHealthy) + ip, found = lh.getClusterIpForSvc(pReq) if !found { ips, found = lh.endpointSlices.GetIPs(pReq.hostname, pReq.cluster, pReq.namespace, pReq.service, lh.clusterStatus.IsConnected) @@ -111,6 +111,20 @@ func (lh *Lighthouse) emptyResponse(state request.Request) (int, error) { return dns.RcodeSuccess, nil } +func (lh *Lighthouse) getClusterIpForSvc(pReq recordRequest) (ip string, found bool) { + localClusterID := lh.clusterStatus.LocalClusterID() + + ip, found, isLocal := lh.serviceImports.GetIP(pReq.namespace, pReq.service, pReq.cluster, localClusterID, lh.clusterStatus.IsConnected, + lh.endpointsStatus.IsHealthy) + + getLocal := isLocal || (pReq.cluster != "" && pReq.cluster == localClusterID) + if found && getLocal { + ip, found = lh.localServices.GetIP(pReq.service, pReq.namespace) + } + + return ip, found +} + // Name implements the Handler interface. func (lh *Lighthouse) Name() string { return "lighthouse" diff --git a/plugin/lighthouse/handler_test.go b/plugin/lighthouse/handler_test.go index dca3d6fb0..62b786f1e 100644 --- a/plugin/lighthouse/handler_test.go +++ b/plugin/lighthouse/handler_test.go @@ -35,6 +35,7 @@ var _ = Describe("Lighthouse DNS plugin Handler", func() { Context("Fallthrough configured", testWithFallback) Context("Cluster connectivity status", testClusterStatus) Context("Headless services", testHeadlessService) + Context("Local services", testLocalService) }) type FailingResponseWriter struct { @@ -55,10 +56,6 @@ func (m *MockClusterStatus) IsConnected(clusterId string) bool { return m.clusterStatusMap[clusterId] } -func (m *MockClusterStatus) LocalClusterID() string { - return m.localClusterID -} - type MockEndpointStatus struct { endpointStatusMap map[string]bool } @@ -71,6 +68,26 @@ func (m *MockEndpointStatus) IsHealthy(name, namespace, clusterId string) bool { return m.endpointStatusMap[clusterId] } +func (m *MockClusterStatus) LocalClusterID() string { + return m.localClusterID +} + +type MockLocalServices struct { + LocalServicesMap map[string]string +} + +func NewMockLocalServices() *MockLocalServices { + return &MockLocalServices{LocalServicesMap: make(map[string]string)} +} + +func (m *MockLocalServices) GetIP(name, namespace string) (string, bool) { + ip, found := m.LocalServicesMap[getKey(name, namespace)] + return ip, found +} + +func getKey(name, namespace string) string { + return namespace + "/" + name +} func (w *FailingResponseWriter) WriteMsg(m *dns.Msg) error { return errors.New(w.errorMsg) } @@ -86,13 +103,14 @@ func testWithoutFallback() { mockCs.clusterStatusMap[clusterID] = true mockEs := NewMockEndpointStatus() mockEs.endpointStatusMap[clusterID] = true - + mockLs := NewMockLocalServices() lh = &Lighthouse{ Zones: []string{"clusterset.local."}, serviceImports: setupServiceImportMap(), endpointSlices: setupEndpointSliceMap(), clusterStatus: mockCs, endpointsStatus: mockEs, + localServices: mockLs, ttl: defaultTtl, } @@ -214,8 +232,11 @@ func testWithFallback() { BeforeEach(func() { mockCs := NewMockClusterStatus() mockCs.clusterStatusMap[clusterID] = true + mockCs.localClusterID = clusterID mockEs := NewMockEndpointStatus() mockEs.endpointStatusMap[clusterID] = true + mockLs := NewMockLocalServices() + lh = &Lighthouse{ Zones: []string{"clusterset.local."}, Fall: fall.F{Zones: []string{"clusterset.local."}}, @@ -224,6 +245,7 @@ func testWithFallback() { endpointSlices: setupEndpointSliceMap(), clusterStatus: mockCs, endpointsStatus: mockEs, + localServices: mockLs, ttl: defaultTtl, } @@ -297,12 +319,14 @@ func testClusterStatus() { mockEs := NewMockEndpointStatus() mockEs.endpointStatusMap[clusterID] = true mockEs.endpointStatusMap[clusterID2] = true + mockLs := NewMockLocalServices() lh = &Lighthouse{ Zones: []string{"clusterset.local."}, serviceImports: setupServiceImportMap(), endpointSlices: setupEndpointSliceMap(), clusterStatus: mockCs, endpointsStatus: mockEs, + localServices: mockLs, ttl: defaultTtl, } lh.serviceImports.Put(newServiceImport(namespace1, service1, clusterID2, serviceIP2, mcsv1a1.ClusterSetIP)) @@ -399,15 +423,18 @@ func testHeadlessService() { BeforeEach(func() { mockCs = NewMockClusterStatus() mockCs.clusterStatusMap[clusterID] = true + mockCs.localClusterID = clusterID mockEs = NewMockEndpointStatus() mockEs.endpointStatusMap[clusterID] = true mockEs.endpointStatusMap[clusterID2] = true + mockLs := NewMockLocalServices() lh = &Lighthouse{ Zones: []string{"clusterset.local."}, serviceImports: serviceimport.NewMap(), endpointSlices: setupEndpointSliceMap(), clusterStatus: mockCs, endpointsStatus: mockEs, + localServices: mockLs, ttl: defaultTtl, } @@ -500,6 +527,92 @@ func testHeadlessService() { }) } +func testLocalService() { + var ( + rec *dnstest.Recorder + lh *Lighthouse + mockCs *MockClusterStatus + ) + + BeforeEach(func() { + mockCs = NewMockClusterStatus() + mockCs.clusterStatusMap[clusterID] = true + mockCs.clusterStatusMap[clusterID2] = true + mockEs := NewMockEndpointStatus() + mockEs.endpointStatusMap[clusterID] = true + mockEs.endpointStatusMap[clusterID2] = true + mockLs := NewMockLocalServices() + mockCs.localClusterID = clusterID + mockLs.LocalServicesMap[getKey(service1, namespace1)] = serviceIP + lh = &Lighthouse{ + Zones: []string{"clusterset.local."}, + serviceImports: setupServiceImportMap(), + endpointSlices: setupEndpointSliceMap(), + clusterStatus: mockCs, + endpointsStatus: mockEs, + localServices: mockLs, + ttl: defaultTtl, + } + lh.serviceImports.Put(newServiceImport(namespace1, service1, clusterID2, serviceIP2, mcsv1a1.ClusterSetIP)) + + rec = dnstest.NewRecorder(&test.ResponseWriter{}) + }) + + When("service is in local and remote clusters", func() { + It("should succeed and write local cluster's IP as A record response", func() { + executeTestCase(lh, rec, test.Case{ + Qname: service1 + "." + namespace1 + ".svc.clusterset.local.", + Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A(service1 + "." + namespace1 + ".svc.clusterset.local. 5 IN A " + serviceIP), + }, + }) + // Execute again to make sure not round robin + executeTestCase(lh, rec, test.Case{ + Qname: service1 + "." + namespace1 + ".svc.clusterset.local.", + Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A(service1 + "." + namespace1 + ".svc.clusterset.local. 5 IN A " + serviceIP), + }, + }) + }) + }) + + When("service is in local and remote clusters, and remote cluster is requested", func() { + It("should succeed and write remote cluster's IP as A record response", func() { + executeTestCase(lh, rec, test.Case{ + Qname: clusterID2 + "." + service1 + "." + namespace1 + ".svc.clusterset.local.", + Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A(clusterID2 + "." + service1 + "." + namespace1 + ".svc.clusterset.local. 5 IN A " + serviceIP2), + }, + }) + }) + }) + + When("service is in local and remote clusters, and local has no active endpoints", func() { + JustBeforeEach(func() { + mockEs := NewMockEndpointStatus() + mockEs.endpointStatusMap[clusterID] = false + mockEs.endpointStatusMap[clusterID2] = true + lh.endpointsStatus = mockEs + }) + It("should succeed and write remote cluster's IP as A record response", func() { + executeTestCase(lh, rec, test.Case{ + Qname: service1 + "." + namespace1 + ".svc.clusterset.local.", + Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A(service1 + "." + namespace1 + ".svc.clusterset.local. 5 IN A " + serviceIP2), + }, + }) + }) + }) +} + func executeTestCase(lh *Lighthouse, rec *dnstest.Recorder, tc test.Case) { code, err := lh.ServeDNS(context.TODO(), rec, tc.Msg()) diff --git a/plugin/lighthouse/lighthouse.go b/plugin/lighthouse/lighthouse.go index c0a662d88..be2f7e3e9 100644 --- a/plugin/lighthouse/lighthouse.go +++ b/plugin/lighthouse/lighthouse.go @@ -33,6 +33,7 @@ type Lighthouse struct { endpointSlices *endpointslice.Map clusterStatus ClusterStatus endpointsStatus EndpointsStatus + localServices LocalServices } type ClusterStatus interface { @@ -41,6 +42,10 @@ type ClusterStatus interface { LocalClusterID() string } +type LocalServices interface { + GetIP(name, namespace string) (string, bool) +} + type EndpointsStatus interface { IsHealthy(name, namespace, clusterId string) bool } diff --git a/plugin/lighthouse/setup.go b/plugin/lighthouse/setup.go index 9b85e6cfd..803be0042 100644 --- a/plugin/lighthouse/setup.go +++ b/plugin/lighthouse/setup.go @@ -10,6 +10,7 @@ import ( "github.com/coredns/coredns/plugin" "github.com/submariner-io/lighthouse/pkg/endpointslice" "github.com/submariner-io/lighthouse/pkg/gateway" + "github.com/submariner-io/lighthouse/pkg/service" "github.com/submariner-io/lighthouse/pkg/serviceimport" "k8s.io/client-go/tools/clientcmd" ) @@ -76,14 +77,22 @@ func lighthouseParse(c *caddy.Controller) (*Lighthouse, error) { return nil, fmt.Errorf("error starting the Gateway controller: %v", err) } + svcController := service.NewController() + err = svcController.Start(cfg) + if err != nil { + return nil, fmt.Errorf("error starting the Service controller: %v", err) + } + c.OnShutdown(func() error { siController.Stop() + epController.Stop() gwController.Stop() + svcController.Stop() return nil }) lh := &Lighthouse{ttl: defaultTtl, serviceImports: siMap, clusterStatus: gwController, endpointSlices: epMap, - endpointsStatus: epController} + endpointsStatus: epController, localServices: svcController} // Changed `for` to `if` to satisfy golint: // SA4004: the surrounding loop is unconditionally terminated (staticcheck) diff --git a/test/e2e/discovery/headless_services.go b/test/e2e/discovery/headless_services.go index bfb836a3c..9341fef90 100644 --- a/test/e2e/discovery/headless_services.go +++ b/test/e2e/discovery/headless_services.go @@ -11,6 +11,7 @@ import ( ) var _ = Describe("[discovery] Test Headless Service Discovery Across Clusters", func() { + f := lhframework.NewFramework("discovery") When("a pod tries to resolve a headless service in a remote cluster", func() { diff --git a/test/e2e/discovery/service_discovery.go b/test/e2e/discovery/service_discovery.go index 766730522..0ab6c0f54 100644 --- a/test/e2e/discovery/service_discovery.go +++ b/test/e2e/discovery/service_discovery.go @@ -82,14 +82,14 @@ func RunServiceDiscoveryTest(f *lhframework.Framework) { f.AwaitEndpointSlices(framework.ClusterA, nginxServiceClusterB.Name, nginxServiceClusterB.Namespace, 1, 1) } - verifyServiceIpWithDig(f.Framework, framework.ClusterA, nginxServiceClusterB, netshootPodList, checkedDomains, true) + verifyServiceIpWithDig(f.Framework, framework.ClusterA, framework.ClusterB, nginxServiceClusterB, netshootPodList, checkedDomains, true) f.DeleteServiceExport(framework.ClusterB, nginxServiceClusterB.Name, nginxServiceClusterB.Namespace) f.AwaitServiceImportDelete(framework.ClusterA, nginxServiceClusterB.Name, nginxServiceClusterB.Namespace) f.DeleteService(framework.ClusterB, nginxServiceClusterB.Name) - verifyServiceIpWithDig(f.Framework, framework.ClusterA, nginxServiceClusterB, netshootPodList, checkedDomains, false) + verifyServiceIpWithDig(f.Framework, framework.ClusterA, framework.ClusterB, nginxServiceClusterB, netshootPodList, checkedDomains, false) } func RunServiceDiscoveryLocalTest(f *lhframework.Framework) { @@ -120,7 +120,8 @@ func RunServiceDiscoveryLocalTest(f *lhframework.Framework) { netshootPodList := f.NewNetShootDeployment(framework.ClusterA) clusterADomain := getClusterDomain(f.Framework, framework.ClusterA, netshootPodList) - verifyServiceIpWithDig(f.Framework, framework.ClusterA, nginxServiceClusterA, netshootPodList, []string{clusterADomain}, true) + verifyServiceIpWithDig(f.Framework, framework.ClusterA, framework.ClusterA, nginxServiceClusterA, netshootPodList, + []string{clusterADomain}, true) f.DeleteService(framework.ClusterA, nginxServiceClusterA.Name) @@ -131,14 +132,14 @@ func RunServiceDiscoveryLocalTest(f *lhframework.Framework) { f.AwaitEndpointSlices(framework.ClusterA, nginxServiceClusterB.Name, nginxServiceClusterB.Namespace, 1, 1) } - verifyServiceIpWithDig(f.Framework, framework.ClusterA, nginxServiceClusterB, netshootPodList, checkedDomains, true) + verifyServiceIpWithDig(f.Framework, framework.ClusterA, framework.ClusterB, nginxServiceClusterB, netshootPodList, checkedDomains, true) f.DeleteServiceExport(framework.ClusterB, nginxServiceClusterB.Name, nginxServiceClusterB.Namespace) f.AwaitServiceImportDelete(framework.ClusterA, nginxServiceClusterB.Name, nginxServiceClusterB.Namespace) f.DeleteService(framework.ClusterB, nginxServiceClusterB.Name) - verifyServiceIpWithDig(f.Framework, framework.ClusterA, nginxServiceClusterB, netshootPodList, checkedDomains, false) + verifyServiceIpWithDig(f.Framework, framework.ClusterA, framework.ClusterB, nginxServiceClusterB, netshootPodList, checkedDomains, false) } func RunServiceExportTest(f *lhframework.Framework) { @@ -167,12 +168,12 @@ func RunServiceExportTest(f *lhframework.Framework) { f.AwaitEndpointSlices(framework.ClusterA, nginxServiceClusterB.Name, nginxServiceClusterB.Namespace, 1, 1) } - verifyServiceIpWithDig(f.Framework, framework.ClusterA, nginxServiceClusterB, netshootPodList, checkedDomains, true) + verifyServiceIpWithDig(f.Framework, framework.ClusterA, framework.ClusterB, nginxServiceClusterB, netshootPodList, checkedDomains, true) f.DeleteServiceExport(framework.ClusterB, nginxServiceClusterB.Name, nginxServiceClusterB.Namespace) f.AwaitServiceImportDelete(framework.ClusterA, nginxServiceClusterB.Name, nginxServiceClusterB.Namespace) - verifyServiceIpWithDig(f.Framework, framework.ClusterA, nginxServiceClusterB, netshootPodList, checkedDomains, false) + verifyServiceIpWithDig(f.Framework, framework.ClusterA, framework.ClusterB, nginxServiceClusterB, netshootPodList, checkedDomains, false) } func RunServicesPodAvailabilityTest(f *lhframework.Framework) { @@ -201,11 +202,11 @@ func RunServicesPodAvailabilityTest(f *lhframework.Framework) { f.AwaitEndpointSlices(framework.ClusterA, nginxServiceClusterB.Name, nginxServiceClusterB.Namespace, 1, 1) } - verifyServiceIpWithDig(f.Framework, framework.ClusterA, nginxServiceClusterB, netshootPodList, checkedDomains, true) + verifyServiceIpWithDig(f.Framework, framework.ClusterA, framework.ClusterB, nginxServiceClusterB, netshootPodList, checkedDomains, true) f.SetNginxReplicaSet(framework.ClusterB, 0) - verifyServiceIpWithDig(f.Framework, framework.ClusterA, nginxServiceClusterB, netshootPodList, checkedDomains, false) + verifyServiceIpWithDig(f.Framework, framework.ClusterA, framework.ClusterB, nginxServiceClusterB, netshootPodList, checkedDomains, false) f.SetNginxReplicaSet(framework.ClusterB, 2) - verifyServiceIpWithDig(f.Framework, framework.ClusterA, nginxServiceClusterB, netshootPodList, checkedDomains, true) + verifyServiceIpWithDig(f.Framework, framework.ClusterA, framework.ClusterB, nginxServiceClusterB, netshootPodList, checkedDomains, true) } func RunServicesPodAvailabilityMutliClusterTest(f *lhframework.Framework) { @@ -251,26 +252,28 @@ func RunServicesPodAvailabilityMutliClusterTest(f *lhframework.Framework) { f.AwaitEndpointSlices(framework.ClusterA, nginxServiceClusterA.Name, nginxServiceClusterA.Namespace, 2, 2) } - verifyServiceIpWithDig(f.Framework, framework.ClusterA, nginxServiceClusterA, netshootPodList, checkedDomains, true) - verifyServiceIpWithDig(f.Framework, framework.ClusterA, nginxServiceClusterB, netshootPodList, checkedDomains, false) + verifyServiceIpWithDig(f.Framework, framework.ClusterA, framework.ClusterA, nginxServiceClusterA, netshootPodList, checkedDomains, true) + verifyServiceIpWithDig(f.Framework, framework.ClusterA, framework.ClusterB, nginxServiceClusterB, netshootPodList, checkedDomains, false) f.SetNginxReplicaSet(framework.ClusterA, 0) + f.AwaitEndpointSlices(framework.ClusterA, nginxServiceClusterA.Name, nginxServiceClusterA.Namespace, 2, 1) - verifyServiceIpWithDig(f.Framework, framework.ClusterA, nginxServiceClusterA, netshootPodList, checkedDomains, false) - verifyServiceIpWithDig(f.Framework, framework.ClusterA, nginxServiceClusterB, netshootPodList, checkedDomains, true) + verifyServiceIpWithDig(f.Framework, framework.ClusterA, framework.ClusterA, nginxServiceClusterA, netshootPodList, checkedDomains, false) + verifyServiceIpWithDig(f.Framework, framework.ClusterA, framework.ClusterB, nginxServiceClusterB, netshootPodList, checkedDomains, true) f.SetNginxReplicaSet(framework.ClusterB, 0) f.AwaitEndpointSlices(framework.ClusterA, nginxServiceClusterA.Name, nginxServiceClusterA.Namespace, 2, 0) - verifyServiceIpWithDig(f.Framework, framework.ClusterA, nginxServiceClusterA, netshootPodList, checkedDomains, false) - verifyServiceIpWithDig(f.Framework, framework.ClusterA, nginxServiceClusterB, netshootPodList, checkedDomains, false) + verifyServiceIpWithDig(f.Framework, framework.ClusterA, framework.ClusterA, nginxServiceClusterA, netshootPodList, checkedDomains, false) + verifyServiceIpWithDig(f.Framework, framework.ClusterA, framework.ClusterB, nginxServiceClusterB, netshootPodList, checkedDomains, false) } -func verifyServiceIpWithDig(f *framework.Framework, cluster framework.ClusterIndex, service *corev1.Service, targetPod *corev1.PodList, - domains []string, shouldContain bool) { +func verifyServiceIpWithDig(f *framework.Framework, srcCluster, targetCluster framework.ClusterIndex, service *corev1.Service, + targetPod *corev1.PodList, domains []string, shouldContain bool) { var serviceIP string var ok bool - if serviceIP, ok = service.Annotations[submarinerIpamGlobalIp]; !ok { + serviceIP, ok = service.Annotations[submarinerIpamGlobalIp] + if !ok || srcCluster == targetCluster { serviceIP = service.Spec.ClusterIP } @@ -293,7 +296,7 @@ func verifyServiceIpWithDig(f *framework.Framework, cluster framework.ClusterInd ContainerName: targetPod.Items[0].Spec.Containers[0].Name, CaptureStdout: true, CaptureStderr: true, - }, cluster) + }, srcCluster) if err != nil { return nil, err }