From b6daa27562529bb922b30b54669a13eff7c5b932 Mon Sep 17 00:00:00 2001 From: Michael Zalimeni Date: Wed, 6 Sep 2023 00:30:03 -0400 Subject: [PATCH] review feedback, add tests, cleanup TODOs --- .../endpointsv2/endpoints_controller.go | 92 ++- .../endpointsv2/endpoints_controller_test.go | 615 ++++++++++++++++-- 2 files changed, 607 insertions(+), 100 deletions(-) diff --git a/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller.go b/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller.go index 25177c6225..508af61933 100644 --- a/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller.go +++ b/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller.go @@ -25,8 +25,6 @@ import ( "github.com/hashicorp/go-multierror" ) -//TODO(zalimeni) When MVP of this controller is done, put tickets for remaining TODOs in Jira. - const ( metaKeyManagedBy = "managed-by" kindReplicaSet = "ReplicaSet" @@ -114,12 +112,28 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } r.Log.Info("retrieved Service", "name", req.Name, "ns", req.Namespace) - // Determine Consul service selector values from pod names. + workloadSelector, err := r.getWorkloadSelectorFromEndpoints(ctx, &ClientPodFetcher{client: r.Client}, &endpoints) + if err != nil { + errs = multierror.Append(errs, err) + } + + //TODO: Maybe check service-enable label here on service/deployments/other pod owners + if err = r.registerService(ctx, resourceClient, service, workloadSelector); err != nil { + errs = multierror.Append(errs, err) + } + + return ctrl.Result{}, errs +} + +// getWorkloadSelectorFromEndpoints calculates a Consul service WorkloadSelector from Endpoints based on pod names and +// owners. +func (r *Controller) getWorkloadSelectorFromEndpoints(ctx context.Context, pf PodFetcher, endpoints *corev1.Endpoints) (*pbcatalog.WorkloadSelector, error) { podPrefixes := make(map[string]any) podExactNames := make(map[string]any) + var errs error for address := range allAddresses(endpoints.Subsets) { if address.TargetRef != nil && address.TargetRef.Kind == "Pod" { - podName := types.NamespacedName{Name: address.TargetRef.Name, Namespace: req.Namespace} + podName := types.NamespacedName{Name: address.TargetRef.Name, Namespace: endpoints.Namespace} // Accumulate owner prefixes and exact pod names for Consul workload selector. // If this pod is already covered by a known owner prefix, skip it. @@ -127,15 +141,15 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu // If not, add the pod name to exact name matches. maybePodOwnerPrefix := getOwnerPrefixFromPodName(podName.Name) if _, ok := podPrefixes[maybePodOwnerPrefix]; !ok { - var pod corev1.Pod - if err = r.Client.Get(ctx, podName, &pod); err != nil { - r.Log.Error(err, "failed to get pod", "name", podName.Name, "ns", req.Namespace) + pod, err := pf.GetPod(ctx, podName) + if err != nil { + r.Log.Error(err, "failed to get pod", "name", podName.Name, "ns", endpoints.Namespace) errs = multierror.Append(errs, err) continue } // Add to workload selector values. // Pods can appear more than once in Endpoints subsets, so we use a set for exact names as well. - if prefix := getOwnerPrefixFromPod(&pod); prefix != "" { + if prefix := getOwnerPrefixFromPod(pod); prefix != "" { podPrefixes[prefix] = true } else { podExactNames[podName.Name] = true @@ -143,15 +157,7 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } } } - - //TODO(zalimeni) Do we need to check service-enable label here on service/deployments/replicasets? - // Should we expect owner is annotated, and if so, is that consistent for non-replicaset workloads? - // If so, update to register and deregister conditionally. - if err = r.registerService(ctx, resourceClient, service, podPrefixes, podExactNames); err != nil { - errs = multierror.Append(errs, err) - } - - return ctrl.Result{}, errs + return getWorkloadSelector(podPrefixes, podExactNames), errs } // allAddresses combines all Endpoints subset addresses to a single set. Service registration by this controller @@ -193,10 +199,10 @@ func getOwnerPrefixFromPod(pod *corev1.Pod) string { } // registerService creates a Consul service registration from the provided Kuberetes service and endpoint information. -func (r *Controller) registerService(ctx context.Context, resourceClient pbresource.ResourceServiceClient, service corev1.Service, podPrefixes, podExactNames map[string]any) error { +func (r *Controller) registerService(ctx context.Context, resourceClient pbresource.ResourceServiceClient, service corev1.Service, selector *pbcatalog.WorkloadSelector) error { serviceResource := r.getServiceResource( &pbcatalog.Service{ - Workloads: getWorkloadSelector(podPrefixes, podExactNames), + Workloads: selector, Ports: getServicePorts(service), VirtualIps: r.getServiceVIPs(service), }, @@ -207,8 +213,7 @@ func (r *Controller) registerService(ctx context.Context, resourceClient pbresou ) r.Log.Info("registering service with Consul", getLogFieldsForResource(serviceResource.Id)...) - //TODO(zalimeni) Read from Consul or some other state to determine whether a write is needed. - // For now, we blindly write state on each reconcile to simplify implementation. + //TODO: Maybe attempt to debounce redundant writes. For now, we blindly rewrite state on each reconcile. _, err := resourceClient.Write(ctx, &pbresource.WriteRequest{Resource: serviceResource}) if err != nil { r.Log.Error(err, "failed to register service", getLogFieldsForResource(serviceResource.Id)...) @@ -249,14 +254,19 @@ func getServicePorts(service corev1.Service) []*pbcatalog.ServicePort { for _, p := range service.Spec.Ports { ports = append(ports, &pbcatalog.ServicePort{ VirtualPort: uint32(p.Port), - TargetPort: p.TargetPort.String(), - Protocol: common.GetPortProtocol(p.AppProtocol), + //TODO: We may require that this aligns w/ Endpoints ports, rather than following the K8s model + // of a bimodal string-or-int. If so, that should be documented as a constraint. Regardless, + // this value should match the string form of the K8s ServicePort.TargetPort. + TargetPort: p.TargetPort.String(), + Protocol: common.GetPortProtocol(p.AppProtocol), }) } + //TODO: Error check reserved "mesh" target port + // Append Consul service mesh port in addition to discovered ports. - //TODO(zalimeni) Omit if zero mesh ports present in service endpoints? Or could there be some - // use of mesh-inject at the service level in the webhook as well? + //TODO: Maybe omit if zero mesh ports present in service endpoints, or if some + // use of mesh-inject/other label should cause this to be excluded. ports = append(ports, &pbcatalog.ServicePort{ TargetPort: "mesh", Protocol: pbcatalog.Protocol_PROTOCOL_MESH, @@ -283,16 +293,7 @@ func getServiceMeta(service corev1.Service) map[string]string { constants.MetaKeyKubeNS: service.Namespace, metaKeyManagedBy: constants.ManagedByEndpointsValue, } - //TODO(zalimeni) This is adapted from v1 pod-based meta injection - verify correct and needed - for k, v := range service.Annotations { - if strings.HasPrefix(k, constants.AnnotationMeta) && strings.TrimPrefix(k, constants.AnnotationMeta) != "" { - if v == "$SERVICE_NAME" { - meta[strings.TrimPrefix(k, constants.AnnotationMeta)] = service.Name - } else { - meta[strings.TrimPrefix(k, constants.AnnotationMeta)] = v - } - } - } + //TODO: Support arbitrary meta injection via annotation? (see v1) return meta } @@ -344,7 +345,26 @@ func (r *Controller) getConsulPartition() string { func getLogFieldsForResource(id *pbresource.ID) []any { return []any{ "name", id.Name, - "namespace", id.Tenancy.Namespace, + "ns", id.Tenancy.Namespace, "partition", id.Tenancy.Partition, } } + +// PodFetcher fetches pods by NamespacedName. This interface primarily exists for testing. +type PodFetcher interface { + GetPod(context.Context, types.NamespacedName) (*corev1.Pod, error) +} + +// ClientPodFetcher wraps a Kubernetes client to implement PodFetcher. This is the only implementation outside of tests. +type ClientPodFetcher struct { + client client.Client +} + +func (c *ClientPodFetcher) GetPod(ctx context.Context, name types.NamespacedName) (*corev1.Pod, error) { + var pod corev1.Pod + err := c.client.Get(ctx, name, &pod) + if err != nil { + return nil, err + } + return &pod, nil +} diff --git a/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller_test.go b/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller_test.go index d0f141018c..15a1a2c986 100644 --- a/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller_test.go +++ b/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller_test.go @@ -6,6 +6,7 @@ package endpointsv2 import ( "context" "fmt" + "github.com/google/go-cmp/cmp/cmpopts" "testing" mapset "github.com/deckarep/golang-set" @@ -36,25 +37,28 @@ import ( ) var ( - appProtocolHttp = "http" - appProtocolGrpc = "grpc" + appProtocolHttp = "http" + appProtocolHttp2 = "http2" + appProtocolGrpc = "grpc" ) -// TODO(zalimeni): -// - "Only fetches one pod per replicaset" -// - "Allow/deny namespaces" -// - "With additional meta" +type reconcileCase struct { + name string + svcName string + k8sObjects func() []runtime.Object + existingResource *pbresource.Resource + expectedResource *pbresource.Resource + targetConsulNs string + targetConsulPartition string + expErr string +} + +// TODO: Allow/deny namespaces for reconcile tests +// TODO: ConsulDestinationNamespace and EnableNSMirroring +/- prefix + func TestReconcile_CreateService(t *testing.T) { t.Parallel() - cases := []struct { - name string - svcName string - targetConsulNs string - targetConsulPartition string - k8sObjects func() []runtime.Object - expectedResource *pbresource.Resource - expErr string - }{ + cases := []reconcileCase{ { // In this test, we expect the same service registration as the "basic" // case, but without any workload selector values due to missing endpoints. @@ -174,7 +178,7 @@ func TestReconcile_CreateService(t *testing.T) { }, { Name: "10001", - Port: 10234, + Port: 10001, }, }, }, @@ -261,68 +265,551 @@ func TestReconcile_CreateService(t *testing.T) { } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - // Add the default namespace. - ns := corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}} - // Create fake k8s client - k8sObjects := append(tc.k8sObjects(), &ns) + runReconcileCase(t, tc) + }) + } +} + +func TestReconcile_UpdateService(t *testing.T) { + t.Parallel() + cases := []reconcileCase{ + { + name: "Pods changed", + svcName: "service-updated", + k8sObjects: func() []runtime.Object { + pod1 := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-abcde") + pod2 := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-klmno") + pod3 := createServicePod("DaemonSet", "service-created-ds", "12345") + pod4 := createServicePod("DaemonSet", "service-created-ds", "34567") + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: addressesForPods(pod1, pod2, pod3, pod4), + Ports: []corev1.EndpointPort{ + { + Name: "my-http-port", + AppProtocol: &appProtocolHttp, + Port: 2345, + }, + }, + }, + }, + } + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "172.18.0.1", + Ports: []corev1.ServicePort{ + { + Name: "public", + Port: 8080, + TargetPort: intstr.FromString("my-http-port"), + AppProtocol: &appProtocolHttp, + }, + }, + }, + } + return []runtime.Object{pod1, pod2, pod3, pod4, endpoints, service} + }, + existingResource: &pbresource.Resource{ + Id: &pbresource.ID{ + Name: "service-created", + Type: &pbresource.Type{ + Group: "catalog", + GroupVersion: "v1alpha1", + Kind: "Service", + }, + Tenancy: &pbresource.Tenancy{ + Namespace: constants.DefaultConsulNS, + Partition: constants.DefaultConsulPartition, + }, + }, + Data: common.ToProtoAny(&pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{ + { + VirtualPort: 8080, + TargetPort: "my-http-port", + Protocol: pbcatalog.Protocol_PROTOCOL_HTTP, + }, + { + TargetPort: "mesh", + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Workloads: &pbcatalog.WorkloadSelector{ + Prefixes: []string{ + "service-created-rs-abcde", // Retained + "service-created-rs-fghij", // Removed + }, + Names: []string{ + "service-created-ds-12345", // Retained + "service-created-ds-23456", // Removed + }, + }, + VirtualIps: []string{"172.18.0.1"}, + }), + Metadata: map[string]string{ + constants.MetaKeyKubeNS: constants.DefaultConsulNS, + metaKeyManagedBy: constants.ManagedByEndpointsValue, + }, + }, + expectedResource: &pbresource.Resource{ + Id: &pbresource.ID{ + Name: "service-created", + Type: &pbresource.Type{ + Group: "catalog", + GroupVersion: "v1alpha1", + Kind: "Service", + }, + Tenancy: &pbresource.Tenancy{ + Namespace: constants.DefaultConsulNS, + Partition: constants.DefaultConsulPartition, + }, + }, + Data: common.ToProtoAny(&pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{ + { + VirtualPort: 8080, + TargetPort: "my-http-port", + Protocol: pbcatalog.Protocol_PROTOCOL_HTTP, + }, + { + TargetPort: "mesh", + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Workloads: &pbcatalog.WorkloadSelector{ + + Prefixes: []string{ + "service-created-rs-abcde", // Retained + "service-created-rs-klmno", // New + }, + Names: []string{ + "service-created-ds-12345", // Retained + "service-created-ds-34567", // New + }, + }, + VirtualIps: []string{"172.18.0.1"}, + }), + Metadata: map[string]string{ + constants.MetaKeyKubeNS: constants.DefaultConsulNS, + metaKeyManagedBy: constants.ManagedByEndpointsValue, + }, + }, + }, + { + name: "Service ports changed", + svcName: "service-updated", + k8sObjects: func() []runtime.Object { + pod1 := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-abcde") + pod2 := createServicePod("DaemonSet", "service-created-ds", "12345") + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: addressesForPods(pod1, pod2), + Ports: []corev1.EndpointPort{ + { + Name: "my-http-port", + AppProtocol: &appProtocolHttp, + Port: 2345, + }, + { + Name: "my-api-port", + AppProtocol: &appProtocolHttp, + Port: 6789, + }, + }, + }, + }, + } + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "172.18.0.1", + Ports: []corev1.ServicePort{ + { + Name: "public", + Port: 8080, + TargetPort: intstr.FromString("new-http-port"), + AppProtocol: &appProtocolHttp2, + }, + { + Name: "api", + Port: 9091, + TargetPort: intstr.FromString("my-grpc-port"), + AppProtocol: &appProtocolGrpc, + }, + }, + }, + } + return []runtime.Object{pod1, pod2, endpoints, service} + }, + existingResource: &pbresource.Resource{ + Id: &pbresource.ID{ + Name: "service-updated", + Type: &pbresource.Type{ + Group: "catalog", + GroupVersion: "v1alpha1", + Kind: "Service", + }, + Tenancy: &pbresource.Tenancy{ + Namespace: constants.DefaultConsulNS, + Partition: constants.DefaultConsulPartition, + }, + }, + Data: common.ToProtoAny(&pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{ + { + VirtualPort: 8080, + TargetPort: "my-http-port", + Protocol: pbcatalog.Protocol_PROTOCOL_HTTP, + }, + { + VirtualPort: 9090, + TargetPort: "my-grpc-port", + Protocol: pbcatalog.Protocol_PROTOCOL_GRPC, + }, + { + VirtualPort: 10001, + TargetPort: "10001", + Protocol: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + { + TargetPort: "mesh", + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Workloads: &pbcatalog.WorkloadSelector{ + Prefixes: []string{"service-created-rs-abcde"}, + Names: []string{"service-created-ds-12345"}, + }, + VirtualIps: []string{"172.18.0.1"}, + }), + Metadata: map[string]string{ + constants.MetaKeyKubeNS: constants.DefaultConsulNS, + metaKeyManagedBy: constants.ManagedByEndpointsValue, + }, + }, + expectedResource: &pbresource.Resource{ + Id: &pbresource.ID{ + Name: "service-updated", + Type: &pbresource.Type{ + Group: "catalog", + GroupVersion: "v1alpha1", + Kind: "Service", + }, + Tenancy: &pbresource.Tenancy{ + Namespace: constants.DefaultConsulNS, + Partition: constants.DefaultConsulPartition, + }, + }, + Data: common.ToProtoAny(&pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{ + { + VirtualPort: 8080, + TargetPort: "new-http-port", // Updated + Protocol: pbcatalog.Protocol_PROTOCOL_HTTP2, // Updated + }, + { + VirtualPort: 9091, // Updated + TargetPort: "my-grpc-port", + Protocol: pbcatalog.Protocol_PROTOCOL_GRPC, + }, + // Port 10001 removed + { + TargetPort: "mesh", + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Workloads: &pbcatalog.WorkloadSelector{ + Prefixes: []string{"service-created-rs-abcde"}, + Names: []string{"service-created-ds-12345"}, + }, + VirtualIps: []string{"172.18.0.1"}, + }), + Metadata: map[string]string{ + constants.MetaKeyKubeNS: constants.DefaultConsulNS, + metaKeyManagedBy: constants.ManagedByEndpointsValue, + }, + }, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + runReconcileCase(t, tc) + }) + } +} - fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build() +func TestReconcile_DeleteService(t *testing.T) { + t.Parallel() + cases := []reconcileCase{ + { + name: "Basic Endpoints not found (service deleted)", + svcName: "service-deleted", + existingResource: &pbresource.Resource{ + Id: &pbresource.ID{ + Name: "service-created", + Type: &pbresource.Type{ + Group: "catalog", + GroupVersion: "v1alpha1", + Kind: "Service", + }, + Tenancy: &pbresource.Tenancy{ + Namespace: constants.DefaultConsulNS, + Partition: constants.DefaultConsulPartition, + }, + }, + Data: common.ToProtoAny(&pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{ + { + VirtualPort: 8080, + TargetPort: "my-http-port", + Protocol: pbcatalog.Protocol_PROTOCOL_HTTP, + }, + { + TargetPort: "mesh", + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Workloads: &pbcatalog.WorkloadSelector{ + Prefixes: []string{"service-created-rs-abcde"}, + Names: []string{"service-created-ds-12345"}, + }, + VirtualIps: []string{"172.18.0.1"}, + }), + Metadata: map[string]string{ + constants.MetaKeyKubeNS: constants.DefaultConsulNS, + metaKeyManagedBy: constants.ManagedByEndpointsValue, + }, + }, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + runReconcileCase(t, tc) + }) + } +} - // Create test consulServer server. - testClient := test.TestServerWithMockConnMgrWatcher(t, func(c *testutil.TestServerConfig) { - c.Experiments = []string{"resource-apis"} - }) +func TestGetWorkloadSelectorFromEndpoints(t *testing.T) { + t.Parallel() - // Create the endpoints controller. - ep := &Controller{ - Client: fakeClient, - Log: logrtest.New(t), - ConsulServerConnMgr: testClient.Watcher, - AllowK8sNamespacesSet: mapset.NewSetWith("*"), - DenyK8sNamespacesSet: mapset.NewSetWith(), - } - resourceClient, err := consul.NewResourceServiceClient(ep.ConsulServerConnMgr) - require.NoError(t, err) + ctx := context.Background() - namespacedName := types.NamespacedName{ - Name: tc.svcName, - Namespace: "default", - } + type testCase struct { + name string + endpoints *corev1.Endpoints + responses map[types.NamespacedName]*corev1.Pod + expected *pbcatalog.WorkloadSelector + mockFn func(*testing.T, *MockPodFetcher) + } - resp, err := ep.Reconcile(context.Background(), ctrl.Request{ - NamespacedName: namespacedName, - }) - if tc.expErr != "" { - require.EqualError(t, err, tc.expErr) - } else { - require.NoError(t, err) - } - require.False(t, resp.Requeue) + rsPods := []*corev1.Pod{ + createServicePod(kindReplicaSet, "svc-rs-abcde", "12345"), + createServicePod(kindReplicaSet, "svc-rs-abcde", "23456"), + createServicePod(kindReplicaSet, "svc-rs-abcde", "34567"), + createServicePod(kindReplicaSet, "svc-rs-fghij", "12345"), + createServicePod(kindReplicaSet, "svc-rs-fghij", "23456"), + createServicePod(kindReplicaSet, "svc-rs-fghij", "34567"), + } + otherPods := []*corev1.Pod{ + createServicePod("DaemonSet", "svc-ds", "12345"), + createServicePod("DaemonSet", "svc-ds", "23456"), + createServicePod("DaemonSet", "svc-ds", "34567"), + createServicePod("StatefulSet", "svc-ss", "12345"), + createServicePod("StatefulSet", "svc-ss", "23456"), + createServicePod("StatefulSet", "svc-ss", "34567"), + } + podsByName := make(map[types.NamespacedName]*corev1.Pod) + for _, p := range rsPods { + podsByName[types.NamespacedName{Name: p.Name, Namespace: p.Namespace}] = p + } + for _, p := range otherPods { + podsByName[types.NamespacedName{Name: p.Name, Namespace: p.Namespace}] = p + } - // Default if not specified in test - if tc.targetConsulNs == "" { - tc.targetConsulNs = constants.DefaultConsulNS - } - if tc.targetConsulPartition == "" { - tc.targetConsulPartition = constants.DefaultConsulPartition + cases := []testCase{ + { + name: "Pod is fetched once per ReplicaSet", + endpoints: &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: addressesForPods(rsPods...), + Ports: []corev1.EndpointPort{ + { + Name: "my-http-port", + AppProtocol: &appProtocolHttp, + Port: 2345, + }, + }, + }, + }, + }, + responses: podsByName, + expected: getWorkloadSelector( + // Selector should consist of prefixes only. + map[string]any{ + "svc-rs-abcde": true, + "svc-rs-fghij": true, + }, + map[string]any{}), + mockFn: func(t *testing.T, pf *MockPodFetcher) { + // Assert called once per set. + require.Equal(t, 2, len(pf.calls)) + }, + }, + { + name: "Pod is fetched once per other pod owner type", + endpoints: &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: addressesForPods(otherPods...), + Ports: []corev1.EndpointPort{ + { + Name: "my-http-port", + AppProtocol: &appProtocolHttp, + Port: 2345, + }, + }, + }, + }, + }, + responses: podsByName, + expected: getWorkloadSelector( + // Selector should consist of exact name matches only. + map[string]any{}, + map[string]any{ + "svc-ds-12345": true, + "svc-ds-23456": true, + "svc-ds-34567": true, + "svc-ss-12345": true, + "svc-ss-23456": true, + "svc-ss-34567": true, + }), + mockFn: func(t *testing.T, pf *MockPodFetcher) { + // Assert called once per pod. + require.Equal(t, len(otherPods), len(pf.calls)) + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + // Create mock pod fetcher. + pf := MockPodFetcher{responses: tc.responses} + + // Create the Endpoints controller. + ep := &Controller{ + Log: logrtest.New(t), } - expectedServiceMatches(t, resourceClient, tc.svcName, tc.targetConsulNs, tc.targetConsulPartition, tc.expectedResource) + resp, err := ep.getWorkloadSelectorFromEndpoints(ctx, &pf, tc.endpoints) + require.NoError(t, err) + + // We don't care about order, so configure cmp.Diff to ignore slice order. + sorter := func(a, b string) bool { return a < b } + if diff := cmp.Diff(tc.expected, resp, protocmp.Transform(), cmpopts.SortSlices(sorter)); diff != "" { + t.Errorf("unexpected difference:\n%v", diff) + } + tc.mockFn(t, &pf) }) } } -// TODO(zalimeni) -// - "Allow/deny namespaces" -// - "With additional meta updated" -func TestReconcile_UpdateService(t *testing.T) { +type MockPodFetcher struct { + calls []types.NamespacedName + responses map[types.NamespacedName]*corev1.Pod +} +func (m *MockPodFetcher) GetPod(_ context.Context, name types.NamespacedName) (*corev1.Pod, error) { + m.calls = append(m.calls, name) + if v, ok := m.responses[name]; !ok { + panic(fmt.Errorf("test is missing response for passed pod name: %v", name)) + } else { + return v, nil + } } -// TODO(zalimeni) -// - "Allow/deny namespaces" -// - "Endpoints not found" -func TestReconcile_DeleteService(t *testing.T) { +func runReconcileCase(t *testing.T, tc reconcileCase) { + t.Helper() + + // Create fake k8s client + var k8sObjects []runtime.Object + if tc.k8sObjects != nil { + k8sObjects = tc.k8sObjects() + } + fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build() + + // Create test Consul server. + testClient := test.TestServerWithMockConnMgrWatcher(t, func(c *testutil.TestServerConfig) { + c.Experiments = []string{"resource-apis"} + }) + + // Create the Endpoints controller. + ep := &Controller{ + Client: fakeClient, + Log: logrtest.New(t), + ConsulServerConnMgr: testClient.Watcher, + AllowK8sNamespacesSet: mapset.NewSetWith("*"), + DenyK8sNamespacesSet: mapset.NewSetWith(), + } + resourceClient, err := consul.NewResourceServiceClient(ep.ConsulServerConnMgr) + require.NoError(t, err) + + // Default ns and partition if not specified in test. + if tc.targetConsulNs == "" { + tc.targetConsulNs = constants.DefaultConsulNS + } + if tc.targetConsulPartition == "" { + tc.targetConsulPartition = constants.DefaultConsulPartition + } + + // If existing resource specified, create it and ensure it exists. + if tc.existingResource != nil { + writeReq := &pbresource.WriteRequest{Resource: tc.existingResource} + _, err = resourceClient.Write(context.Background(), writeReq) + require.NoError(t, err) + test.ResourceHasPersisted(t, resourceClient, tc.existingResource.Id) + } + + // Run actual reconcile and verify results. + resp, err := ep.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{ + Name: tc.svcName, + Namespace: tc.targetConsulNs, + }, + }) + if tc.expErr != "" { + require.ErrorContains(t, err, tc.expErr) + } else { + require.NoError(t, err) + } + require.False(t, resp.Requeue) + + expectedServiceMatches(t, resourceClient, tc.svcName, tc.targetConsulNs, tc.targetConsulPartition, tc.expectedResource) } @@ -351,7 +838,7 @@ func expectedServiceMatches(t *testing.T, client pbresource.ResourceServiceClien err = res.GetResource().GetData().UnmarshalTo(actualService) require.NoError(t, err) - if diff := cmp.Diff(actualService, expectedService, protocmp.Transform()); diff != "" { + if diff := cmp.Diff(expectedService, actualService, protocmp.Transform()); diff != "" { t.Errorf("unexpected difference:\n%v", diff) } }