From 202959d228b4c870c1fcbc1376635fa8464db240 Mon Sep 17 00:00:00 2001 From: Michael Zalimeni Date: Fri, 25 Aug 2023 10:58:25 -0400 Subject: [PATCH] feat: add v2 endpoints controller Implement the basic requirements of a new Endpoints controller that registers Services via Consul's V2 API. Further tests and TODOs will be addressed in follow-up changes. --- control-plane/connect-inject/common/common.go | 44 + .../connect-inject/common/common_test.go | 85 ++ .../constants/annotations_and_labels.go | 8 +- .../endpointsv2/endpoints_controller.go | 369 ++++++ .../endpoints_controller_ent_test.go | 28 + .../endpointsv2/endpoints_controller_test.go | 1081 +++++++++++++++++ .../inject-connect/v2controllers.go | 20 +- 7 files changed, 1632 insertions(+), 3 deletions(-) create mode 100644 control-plane/connect-inject/controllers/endpointsv2/endpoints_controller.go create mode 100644 control-plane/connect-inject/controllers/endpointsv2/endpoints_controller_ent_test.go create mode 100644 control-plane/connect-inject/controllers/endpointsv2/endpoints_controller_test.go diff --git a/control-plane/connect-inject/common/common.go b/control-plane/connect-inject/common/common.go index acee282739..b7946d78e0 100644 --- a/control-plane/connect-inject/common/common.go +++ b/control-plane/connect-inject/common/common.go @@ -9,10 +9,13 @@ import ( "strings" mapset "github.com/deckarep/golang-set" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" ) // DetermineAndValidatePort behaves as follows: @@ -116,3 +119,44 @@ func ShouldIgnore(namespace string, denySet, allowSet mapset.Set) bool { func ConsulNodeNameFromK8sNode(nodeName string) string { return fmt.Sprintf("%s-virtual", nodeName) } + +// ******************** +// V2 Exclusive Common Code +// ******************** + +// ToProtoAny is a convenience function for converting proto.Message values to anypb.Any without error handling. +// This should _only_ be used in cases where a nil or valid proto.Message value is _guaranteed_, else it will panic. +// If the type of m is *anypb.Any, that value will be returned unmodified. +func ToProtoAny(m proto.Message) *anypb.Any { + switch v := m.(type) { + case nil: + return nil + case *anypb.Any: + return v + } + a, err := anypb.New(m) + if err != nil { + panic(fmt.Errorf("unexpected error: failed to convert proto message to anypb.Any: %w", err)) + } + return a +} + +// GetPortProtocol matches the Kubernetes EndpointPort.AppProtocol or ServicePort.AppProtocol (*string) to a supported +// Consul catalog port protocol. If nil or unrecognized, the default of `PROTOCOL_UNSPECIFIED` is returned. +func GetPortProtocol(appProtocol *string) pbcatalog.Protocol { + if appProtocol == nil { + return pbcatalog.Protocol_PROTOCOL_UNSPECIFIED + } + switch *appProtocol { + case "tcp": + return pbcatalog.Protocol_PROTOCOL_TCP + case "http": + return pbcatalog.Protocol_PROTOCOL_HTTP + case "http2": + return pbcatalog.Protocol_PROTOCOL_HTTP2 + case "grpc": + return pbcatalog.Protocol_PROTOCOL_GRPC + } + // If unrecognized or empty string, return default + return pbcatalog.Protocol_PROTOCOL_UNSPECIFIED +} diff --git a/control-plane/connect-inject/common/common_test.go b/control-plane/connect-inject/common/common_test.go index 6f623b28db..6cbbab5b88 100644 --- a/control-plane/connect-inject/common/common_test.go +++ b/control-plane/connect-inject/common/common_test.go @@ -7,12 +7,17 @@ import ( "testing" mapset "github.com/deckarep/golang-set" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/testing/protocmp" + "google.golang.org/protobuf/types/known/anypb" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" "github.com/hashicorp/consul-k8s/control-plane/namespaces" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" ) func TestCommonDetermineAndValidatePort(t *testing.T) { @@ -314,3 +319,83 @@ func TestShouldIgnore(t *testing.T) { }) } } + +func TestToProtoAny(t *testing.T) { + t.Parallel() + + t.Run("nil gets nil", func(t *testing.T) { + require.Nil(t, ToProtoAny(nil)) + }) + + t.Run("anypb.Any gets same value", func(t *testing.T) { + testMsg := &pbresource.Resource{Id: &pbresource.ID{Name: "foo"}} + testAny, err := anypb.New(testMsg) + require.NoError(t, err) + + require.Equal(t, testAny, ToProtoAny(testAny)) + }) + + t.Run("valid proto is successfully serialized", func(t *testing.T) { + testMsg := &pbresource.Resource{Id: &pbresource.ID{Name: "foo"}} + testAny, err := anypb.New(testMsg) + require.NoError(t, err) + + if diff := cmp.Diff(testAny, ToProtoAny(testMsg), protocmp.Transform()); diff != "" { + t.Errorf("unexpected difference:\n%v", diff) + } + }) +} + +func TestGetPortProtocol(t *testing.T) { + t.Parallel() + toStringPtr := func(s string) *string { + return &s + } + cases := []struct { + name string + input *string + expected pbcatalog.Protocol + }{ + { + name: "nil gets UNSPECIFIED", + input: nil, + expected: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + { + name: "tcp gets TCP", + input: toStringPtr("tcp"), + expected: pbcatalog.Protocol_PROTOCOL_TCP, + }, + { + name: "http gets HTTP", + input: toStringPtr("http"), + expected: pbcatalog.Protocol_PROTOCOL_HTTP, + }, + { + name: "http2 gets HTTP2", + input: toStringPtr("http2"), + expected: pbcatalog.Protocol_PROTOCOL_HTTP2, + }, + { + name: "grpc gets GRPC", + input: toStringPtr("grpc"), + expected: pbcatalog.Protocol_PROTOCOL_GRPC, + }, + { + name: "case sensitive", + input: toStringPtr("gRPC"), + expected: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + { + name: "unknown gets UNSPECIFIED", + input: toStringPtr("foo"), + expected: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + } + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + actual := GetPortProtocol(tt.input) + require.Equal(t, tt.expected, actual) + }) + } +} diff --git a/control-plane/connect-inject/constants/annotations_and_labels.go b/control-plane/connect-inject/constants/annotations_and_labels.go index 76b83eaf62..5c14cb15f2 100644 --- a/control-plane/connect-inject/constants/annotations_and_labels.go +++ b/control-plane/connect-inject/constants/annotations_and_labels.go @@ -203,6 +203,7 @@ const ( Enabled = "enabled" // ManagedByValue is the value for keyManagedBy. + //TODO(zalimeni) rename this to ManagedByLegacyEndpointsValue. ManagedByValue = "consul-k8s-endpoints-controller" ) @@ -220,8 +221,13 @@ const ( // a pod after an injection is done. KeyMeshInjectStatus = "consul.hashicorp.com/mesh-inject-status" + // ManagedByEndpointsValue is used in Consul metadata to identify the manager + // of resources. The 'v2' suffix is used to differentiate from the legacy + // endpoints controller of the same name. + ManagedByEndpointsValue = "consul-k8s-endpoints-controller-v2" + // ManagedByPodValue is used in Consul metadata to identify the manager - // of this resource. + // of resources. ManagedByPodValue = "consul-k8s-pod-controller" ) diff --git a/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller.go b/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller.go new file mode 100644 index 0000000000..ad32510861 --- /dev/null +++ b/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller.go @@ -0,0 +1,369 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 +package endpointsv2 + +import ( + "context" + "net" + "strings" + + mapset "github.com/deckarep/golang-set" + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/common" + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" + "github.com/hashicorp/consul-k8s/control-plane/consul" + "github.com/hashicorp/consul-k8s/control-plane/namespaces" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/go-multierror" +) + +const ( + metaKeyManagedBy = "managed-by" + kindReplicaSet = "ReplicaSet" +) + +type Controller struct { + client.Client + // ConsulServerConnMgr is the watcher for the Consul server addresses used to create Consul API v2 clients. + ConsulServerConnMgr consul.ServerConnectionManager + // Only endpoints in the AllowK8sNamespacesSet are reconciled. + AllowK8sNamespacesSet mapset.Set + // Endpoints in the DenyK8sNamespacesSet are ignored. + DenyK8sNamespacesSet mapset.Set + // EnableConsulPartitions indicates that a user is running Consul Enterprise. + EnableConsulPartitions bool + // ConsulPartition is the Consul Partition to which this controller belongs. + ConsulPartition string + // EnableConsulNamespaces indicates that a user is running Consul Enterprise. + EnableConsulNamespaces bool + // ConsulDestinationNamespace is the name of the Consul namespace to create + // all config entries in. If EnableNSMirroring is true this is ignored. + ConsulDestinationNamespace string + // EnableNSMirroring causes Consul namespaces to be created to match the + // k8s namespace of any config entry custom resource. Config entries will + // be created in the matching Consul namespace. + EnableNSMirroring bool + // NSMirroringPrefix is an optional prefix that can be added to the Consul + // namespaces created while mirroring. For example, if it is set to "k8s-", + // then the k8s `default` namespace will be mirrored in Consul's + // `k8s-default` namespace. + NSMirroringPrefix string + + Log logr.Logger + + Scheme *runtime.Scheme + context.Context +} + +func (r *Controller) Logger(name types.NamespacedName) logr.Logger { + return r.Log.WithValues("request", name) +} + +func (r *Controller) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&corev1.Endpoints{}). + Complete(r) +} + +// Reconcile reads the state of an Endpoints object for a Kubernetes Service and reconciles Consul services which +// correspond to the Kubernetes Service. These events are driven by changes to the Pods backing the Kube service. +func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + var errs error + var endpoints corev1.Endpoints + var service corev1.Service + + // Ignore the request if the namespace of the endpoint is not allowed. + if common.ShouldIgnore(req.Namespace, r.DenyK8sNamespacesSet, r.AllowK8sNamespacesSet) { + return ctrl.Result{}, nil + } + + // Create Consul resource service client for this reconcile. + resourceClient, err := consul.NewResourceServiceClient(r.ConsulServerConnMgr) + if err != nil { + r.Log.Error(err, "failed to create Consul resource client", "name", req.Name, "ns", req.Namespace) + return ctrl.Result{}, err + } + + // If the Endpoints object has been deleted (and we get an IsNotFound error), + // we need to deregister that service from Consul. + err = r.Client.Get(ctx, req.NamespacedName, &endpoints) + if k8serrors.IsNotFound(err) { + err = r.deregisterService(ctx, resourceClient, req.Name, r.getConsulNamespace(req.Namespace), r.getConsulPartition()) + return ctrl.Result{}, err + } else if err != nil { + r.Log.Error(err, "failed to get Endpoints", "name", req.Name, "ns", req.Namespace) + return ctrl.Result{}, err + } + r.Log.Info("retrieved Endpoints", "name", req.Name, "ns", req.Namespace) + + // We expect this to succeed if the Endpoints fetch for the Service succeeded. + err = r.Client.Get(r.Context, types.NamespacedName{Name: endpoints.Name, Namespace: endpoints.Namespace}, &service) + if err != nil { + r.Log.Error(err, "failed to get Service", "name", req.Name, "ns", req.Namespace) + return ctrl.Result{}, err + } + r.Log.Info("retrieved Service", "name", req.Name, "ns", req.Namespace) + + workloadSelector, err := r.getWorkloadSelectorFromEndpoints(ctx, &ClientPodFetcher{client: r.Client}, &endpoints) + if err != nil { + errs = multierror.Append(errs, err) + } + + //TODO: Maybe check service-enable label here on service/deployments/other pod owners + if err = r.registerService(ctx, resourceClient, service, workloadSelector); err != nil { + errs = multierror.Append(errs, err) + } + + return ctrl.Result{}, errs +} + +// getWorkloadSelectorFromEndpoints calculates a Consul service WorkloadSelector from Endpoints based on pod names and +// owners. +func (r *Controller) getWorkloadSelectorFromEndpoints(ctx context.Context, pf PodFetcher, endpoints *corev1.Endpoints) (*pbcatalog.WorkloadSelector, error) { + podPrefixes := make(map[string]any) + podExactNames := make(map[string]any) + var errs error + for address := range allAddresses(endpoints.Subsets) { + if address.TargetRef != nil && address.TargetRef.Kind == "Pod" { + podName := types.NamespacedName{Name: address.TargetRef.Name, Namespace: endpoints.Namespace} + + // Accumulate owner prefixes and exact pod names for Consul workload selector. + // If this pod is already covered by a known owner prefix, skip it. + // If not, fetch the owner. If the owner has a unique prefix, add it to known prefixes. + // If not, add the pod name to exact name matches. + maybePodOwnerPrefix := getOwnerPrefixFromPodName(podName.Name) + if _, ok := podPrefixes[maybePodOwnerPrefix]; !ok { + pod, err := pf.GetPod(ctx, podName) + if err != nil { + r.Log.Error(err, "failed to get pod", "name", podName.Name, "ns", endpoints.Namespace) + errs = multierror.Append(errs, err) + continue + } + // Add to workload selector values. + // Pods can appear more than once in Endpoints subsets, so we use a set for exact names as well. + if prefix := getOwnerPrefixFromPod(pod); prefix != "" { + podPrefixes[prefix] = true + } else { + podExactNames[podName.Name] = true + } + } + } + } + return getWorkloadSelector(podPrefixes, podExactNames), errs +} + +// allAddresses combines all Endpoints subset addresses to a single set. Service registration by this controller +// operates independent of health, and an address can appear in multiple subsets if it has a mix of ready and not-ready +// ports, so we combine them here to simplify iteration. +func allAddresses(subsets []corev1.EndpointSubset) map[corev1.EndpointAddress]any { + m := make(map[corev1.EndpointAddress]any) + for _, sub := range subsets { + for _, readyAddress := range sub.Addresses { + m[readyAddress] = true + } + for _, notReadyAddress := range sub.NotReadyAddresses { + m[notReadyAddress] = true + } + } + return m +} + +// getOwnerPrefixFromPodName extracts the owner name prefix from a pod name. +func getOwnerPrefixFromPodName(podName string) string { + podNameParts := strings.Split(podName, "-") + return strings.Join(podNameParts[:len(podNameParts)-1], "-") +} + +// getOwnerPrefixFromPod returns the common name prefix of the pod, if the pod is a member of a set with a unique name +// prefix. Currently, this only applies to ReplicaSets. +// +// We have to fetch the owner and check its type because pod names cannot be disambiguated from pod owner names due to +// the `-` delimiter and unique ID parts also being valid name components. +// +// If the pod owner does not have a unique name, the empty string is returned. +func getOwnerPrefixFromPod(pod *corev1.Pod) string { + for _, ref := range pod.OwnerReferences { + if ref.Kind == "ReplicaSet" { + return ref.Name + } + } + return "" +} + +// registerService creates a Consul service registration from the provided Kuberetes service and endpoint information. +func (r *Controller) registerService(ctx context.Context, resourceClient pbresource.ResourceServiceClient, service corev1.Service, selector *pbcatalog.WorkloadSelector) error { + serviceResource := r.getServiceResource( + &pbcatalog.Service{ + Workloads: selector, + Ports: getServicePorts(service), + VirtualIps: r.getServiceVIPs(service), + }, + service.Name, // Consul and Kubernetes service name will always match + r.getConsulNamespace(service.Namespace), + r.getConsulPartition(), + getServiceMeta(service), + ) + + r.Log.Info("registering service with Consul", getLogFieldsForResource(serviceResource.Id)...) + //TODO: Maybe attempt to debounce redundant writes. For now, we blindly rewrite state on each reconcile. + _, err := resourceClient.Write(ctx, &pbresource.WriteRequest{Resource: serviceResource}) + if err != nil { + r.Log.Error(err, "failed to register service", getLogFieldsForResource(serviceResource.Id)...) + return err + } + + return nil +} + +// getServiceResource converts the given Consul service and metadata as a Consul resource API record. +func (r *Controller) getServiceResource(svc *pbcatalog.Service, name, namespace, partition string, meta map[string]string) *pbresource.Resource { + return &pbresource.Resource{ + Id: getServiceID(name, namespace, partition), + Data: common.ToProtoAny(svc), + Metadata: meta, + } +} + +func getServiceID(name, namespace, partition string) *pbresource.ID { + return &pbresource.ID{ + Name: name, + Type: &pbresource.Type{ + Group: "catalog", + GroupVersion: "v1alpha1", + Kind: "Service", + }, + Tenancy: &pbresource.Tenancy{ + Partition: partition, + Namespace: namespace, + }, + } +} + +// getServicePorts converts Kubernetes Service ports data into Consul service ports. +func getServicePorts(service corev1.Service) []*pbcatalog.ServicePort { + ports := make([]*pbcatalog.ServicePort, 0, len(service.Spec.Ports)+1) + + for _, p := range service.Spec.Ports { + ports = append(ports, &pbcatalog.ServicePort{ + VirtualPort: uint32(p.Port), + //TODO: If the value is a number, infer the correct name value based on + // the most prevalent endpoint subset for the port (best-effot, inspect a pod). + TargetPort: p.TargetPort.String(), + Protocol: common.GetPortProtocol(p.AppProtocol), + }) + } + + //TODO: Error check reserved "mesh" target port + + // Append Consul service mesh port in addition to discovered ports. + //TODO: Maybe omit if zero mesh ports present in service endpoints, or if some + // use of mesh-inject/other label should cause this to be excluded. + ports = append(ports, &pbcatalog.ServicePort{ + TargetPort: "mesh", + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }) + + return ports +} + +// getServiceVIPs returns the VIPs to associate with the registered Consul service. This will contain the Kubernetes +// Service ClusterIP if it exists. +// +// Note that we always provide this data regardless of whether TProxy is enabled, deferring to individual proxy configs +// to decide whether it's used. +func (r *Controller) getServiceVIPs(service corev1.Service) []string { + if parsedIP := net.ParseIP(service.Spec.ClusterIP); parsedIP == nil { + r.Log.Info("skipping service registration virtual IP assignment due to invalid or unset ClusterIP", "name", service.Name, "ns", service.Namespace, "ip", service.Spec.ClusterIP) + return nil + } + return []string{service.Spec.ClusterIP} +} + +func getServiceMeta(service corev1.Service) map[string]string { + meta := map[string]string{ + constants.MetaKeyKubeNS: service.Namespace, + metaKeyManagedBy: constants.ManagedByEndpointsValue, + } + //TODO: Support arbitrary meta injection via annotation? (see v1) + return meta +} + +func getWorkloadSelector(podPrefixes, podExactNames map[string]any) *pbcatalog.WorkloadSelector { + workloads := &pbcatalog.WorkloadSelector{} + for v := range podPrefixes { + workloads.Prefixes = append(workloads.Prefixes, v) + } + for v := range podExactNames { + workloads.Names = append(workloads.Names, v) + } + return workloads +} + +// deregisterService deletes the service resource corresponding to the given name and namespace from Consul. +// This operation is idempotent and can be executed for non-existent services. +func (r *Controller) deregisterService(ctx context.Context, resourceClient pbresource.ResourceServiceClient, name, namespace, partition string) error { + _, err := resourceClient.Delete(ctx, &pbresource.DeleteRequest{ + Id: getServiceID(name, namespace, partition), + }) + return err +} + +// getConsulNamespace returns the Consul destination namespace for a provided Kubernetes namespace +// depending on Consul Namespaces being enabled and the value of namespace mirroring. +func (r *Controller) getConsulNamespace(kubeNamespace string) string { + ns := namespaces.ConsulNamespace( + kubeNamespace, + r.EnableConsulNamespaces, + r.ConsulDestinationNamespace, + r.EnableNSMirroring, + r.NSMirroringPrefix, + ) + + // TODO: remove this if and when the default namespace of resources is no longer required to be set explicitly. + if ns == "" { + ns = constants.DefaultConsulNS + } + return ns +} + +func (r *Controller) getConsulPartition() string { + if !r.EnableConsulPartitions || r.ConsulPartition == "" { + return constants.DefaultConsulPartition + } + return r.ConsulPartition +} + +func getLogFieldsForResource(id *pbresource.ID) []any { + return []any{ + "name", id.Name, + "ns", id.Tenancy.Namespace, + "partition", id.Tenancy.Partition, + } +} + +// PodFetcher fetches pods by NamespacedName. This interface primarily exists for testing. +type PodFetcher interface { + GetPod(context.Context, types.NamespacedName) (*corev1.Pod, error) +} + +// ClientPodFetcher wraps a Kubernetes client to implement PodFetcher. This is the only implementation outside of tests. +type ClientPodFetcher struct { + client client.Client +} + +func (c *ClientPodFetcher) GetPod(ctx context.Context, name types.NamespacedName) (*corev1.Pod, error) { + var pod corev1.Pod + err := c.client.Get(ctx, name, &pod) + if err != nil { + return nil, err + } + return &pod, nil +} diff --git a/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller_ent_test.go b/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller_ent_test.go new file mode 100644 index 0000000000..6d4e5460c4 --- /dev/null +++ b/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller_ent_test.go @@ -0,0 +1,28 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +//go:build enterprise + +package endpointsv2 + +import ( + "testing" +) + +// TODO(zalimeni) +// Tests new Service registration in a non-default NS and Partition with namespaces set to mirroring +func TestReconcile_CreateService_WithNamespaces(t *testing.T) { + +} + +// TODO(zalimeni) +// Tests updating Service registration in a non-default NS and Partition with namespaces set to mirroring +func TestReconcile_UpdateService_WithNamespaces(t *testing.T) { + +} + +// TODO(zalimeni) +// Tests removing Service registration in a non-default NS and Partition with namespaces set to mirroring +func TestReconcile_DeleteService_WithNamespaces(t *testing.T) { + +} diff --git a/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller_test.go b/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller_test.go new file mode 100644 index 0000000000..706a9d60a0 --- /dev/null +++ b/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller_test.go @@ -0,0 +1,1081 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package endpointsv2 + +import ( + "context" + "fmt" + "github.com/google/go-cmp/cmp/cmpopts" + "testing" + + mapset "github.com/deckarep/golang-set" + logrtest "github.com/go-logr/logr/testr" + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/testing/protocmp" + "google.golang.org/protobuf/types/known/anypb" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/common" + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" + "github.com/hashicorp/consul-k8s/control-plane/consul" + "github.com/hashicorp/consul-k8s/control-plane/helper/test" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/sdk/testutil" + "github.com/hashicorp/go-uuid" +) + +var ( + appProtocolHttp = "http" + appProtocolHttp2 = "http2" + appProtocolGrpc = "grpc" +) + +type reconcileCase struct { + name string + svcName string + k8sObjects func() []runtime.Object + existingResource *pbresource.Resource + expectedResource *pbresource.Resource + targetConsulNs string + targetConsulPartition string + expErr string +} + +// TODO: Allow/deny namespaces for reconcile tests +// TODO: ConsulDestinationNamespace and EnableNSMirroring +/- prefix + +func TestReconcile_CreateService(t *testing.T) { + t.Parallel() + cases := []reconcileCase{ + { + // In this test, we expect the same service registration as the "basic" + // case, but without any workload selector values due to missing endpoints. + name: "Empty endpoints", + svcName: "service-created", + k8sObjects: func() []runtime.Object { + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-created", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{}, + }, + }, + } + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-created", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "172.18.0.1", + Ports: []corev1.ServicePort{ + { + Name: "public", + Port: 8080, + TargetPort: intstr.FromString("my-http-port"), + AppProtocol: &appProtocolHttp, + }, + { + Name: "api", + Port: 9090, + TargetPort: intstr.FromString("my-grpc-port"), + AppProtocol: &appProtocolGrpc, + }, + { + Name: "other", + Port: 10001, + TargetPort: intstr.FromString("10001"), + // no protocol specified + }, + }, + }, + } + return []runtime.Object{endpoints, service} + }, + expectedResource: &pbresource.Resource{ + Id: &pbresource.ID{ + Name: "service-created", + Type: &pbresource.Type{ + Group: "catalog", + GroupVersion: "v1alpha1", + Kind: "Service", + }, + Tenancy: &pbresource.Tenancy{ + Namespace: constants.DefaultConsulNS, + Partition: constants.DefaultConsulPartition, + }, + }, + Data: common.ToProtoAny(&pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{ + { + VirtualPort: 8080, + TargetPort: "my-http-port", + Protocol: pbcatalog.Protocol_PROTOCOL_HTTP, + }, + { + VirtualPort: 9090, + TargetPort: "my-grpc-port", + Protocol: pbcatalog.Protocol_PROTOCOL_GRPC, + }, + { + VirtualPort: 10001, + TargetPort: "10001", + Protocol: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + { + TargetPort: "mesh", + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Workloads: &pbcatalog.WorkloadSelector{}, + VirtualIps: []string{"172.18.0.1"}, + }), + Metadata: map[string]string{ + constants.MetaKeyKubeNS: constants.DefaultConsulNS, + metaKeyManagedBy: constants.ManagedByEndpointsValue, + }, + }, + }, + { + name: "Basic endpoints", + svcName: "service-created", + k8sObjects: func() []runtime.Object { + pod1 := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-abcde") + pod2 := createServicePod("DaemonSet", "service-created-ds", "12345") + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-created", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: addressesForPods(pod1, pod2), + Ports: []corev1.EndpointPort{ + { + Name: "my-http-port", + AppProtocol: &appProtocolHttp, + Port: 2345, + }, + { + Name: "my-grpc-port", + AppProtocol: &appProtocolGrpc, + Port: 6789, + }, + { + Name: "10001", + Port: 10001, + }, + }, + }, + }, + } + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-created", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "172.18.0.1", + Ports: []corev1.ServicePort{ + { + Name: "public", + Port: 8080, + TargetPort: intstr.FromString("my-http-port"), + AppProtocol: &appProtocolHttp, + }, + { + Name: "api", + Port: 9090, + TargetPort: intstr.FromString("my-grpc-port"), + AppProtocol: &appProtocolGrpc, + }, + { + Name: "other", + Port: 10001, + TargetPort: intstr.FromString("10001"), + // no protocol specified + }, + }, + }, + } + return []runtime.Object{pod1, pod2, endpoints, service} + }, + expectedResource: &pbresource.Resource{ + Id: &pbresource.ID{ + Name: "service-created", + Type: &pbresource.Type{ + Group: "catalog", + GroupVersion: "v1alpha1", + Kind: "Service", + }, + Tenancy: &pbresource.Tenancy{ + Namespace: constants.DefaultConsulNS, + Partition: constants.DefaultConsulPartition, + }, + }, + Data: common.ToProtoAny(&pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{ + { + VirtualPort: 8080, + TargetPort: "my-http-port", + Protocol: pbcatalog.Protocol_PROTOCOL_HTTP, + }, + { + VirtualPort: 9090, + TargetPort: "my-grpc-port", + Protocol: pbcatalog.Protocol_PROTOCOL_GRPC, + }, + { + VirtualPort: 10001, + TargetPort: "10001", + Protocol: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + { + TargetPort: "mesh", + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Workloads: &pbcatalog.WorkloadSelector{ + Prefixes: []string{"service-created-rs-abcde"}, + Names: []string{"service-created-ds-12345"}, + }, + VirtualIps: []string{"172.18.0.1"}, + }), + Metadata: map[string]string{ + constants.MetaKeyKubeNS: constants.DefaultConsulNS, + metaKeyManagedBy: constants.ManagedByEndpointsValue, + }, + }, + }, + { + name: "Unhealthy endpoints should be registered", + svcName: "service-created", + k8sObjects: func() []runtime.Object { + pod1 := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-abcde") + pod2 := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-fghij") + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-created", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + // Split addresses between ready and not-ready + Addresses: addressesForPods(pod1), + NotReadyAddresses: addressesForPods(pod2), + Ports: []corev1.EndpointPort{ + { + Name: "my-http-port", + AppProtocol: &appProtocolHttp, + Port: 2345, + }, + }, + }, + }, + } + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-created", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "172.18.0.1", + Ports: []corev1.ServicePort{ + { + Name: "public", + Port: 8080, + TargetPort: intstr.FromString("my-http-port"), + AppProtocol: &appProtocolHttp, + }, + }, + }, + } + return []runtime.Object{pod1, pod2, endpoints, service} + }, + expectedResource: &pbresource.Resource{ + Id: &pbresource.ID{ + Name: "service-created", + Type: &pbresource.Type{ + Group: "catalog", + GroupVersion: "v1alpha1", + Kind: "Service", + }, + Tenancy: &pbresource.Tenancy{ + Namespace: constants.DefaultConsulNS, + Partition: constants.DefaultConsulPartition, + }, + }, + Data: common.ToProtoAny(&pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{ + { + VirtualPort: 8080, + TargetPort: "my-http-port", + Protocol: pbcatalog.Protocol_PROTOCOL_HTTP, + }, + { + TargetPort: "mesh", + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Workloads: &pbcatalog.WorkloadSelector{ + // Both replicasets (ready and not ready) should be present + Prefixes: []string{ + "service-created-rs-abcde", + "service-created-rs-fghij", + }, + }, + VirtualIps: []string{"172.18.0.1"}, + }), + Metadata: map[string]string{ + constants.MetaKeyKubeNS: constants.DefaultConsulNS, + metaKeyManagedBy: constants.ManagedByEndpointsValue, + }, + }, + }, + { + name: "Pods with only some service ports should be registered", + svcName: "service-created", + k8sObjects: func() []runtime.Object { + pod1 := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-abcde") + pod2 := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-fghij") + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-created", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + // Two separate endpoint subsets w/ each of 2 ports served by a different replicaset + { + Addresses: addressesForPods(pod1), + Ports: []corev1.EndpointPort{ + { + Name: "my-http-port", + AppProtocol: &appProtocolHttp, + Port: 2345, + }, + }, + }, + { + Addresses: addressesForPods(pod2), + Ports: []corev1.EndpointPort{ + { + Name: "my-grpc-port", + AppProtocol: &appProtocolGrpc, + Port: 6789, + }, + }, + }, + }, + } + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-created", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "172.18.0.1", + Ports: []corev1.ServicePort{ + { + Name: "public", + Port: 8080, + TargetPort: intstr.FromString("my-http-port"), + AppProtocol: &appProtocolHttp, + }, + { + Name: "api", + Port: 9090, + TargetPort: intstr.FromString("my-grpc-port"), + AppProtocol: &appProtocolGrpc, + }, + }, + }, + } + return []runtime.Object{pod1, pod2, endpoints, service} + }, + expectedResource: &pbresource.Resource{ + Id: &pbresource.ID{ + Name: "service-created", + Type: &pbresource.Type{ + Group: "catalog", + GroupVersion: "v1alpha1", + Kind: "Service", + }, + Tenancy: &pbresource.Tenancy{ + Namespace: constants.DefaultConsulNS, + Partition: constants.DefaultConsulPartition, + }, + }, + Data: common.ToProtoAny(&pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{ + { + VirtualPort: 8080, + TargetPort: "my-http-port", + Protocol: pbcatalog.Protocol_PROTOCOL_HTTP, + }, + { + VirtualPort: 9090, + TargetPort: "my-grpc-port", + Protocol: pbcatalog.Protocol_PROTOCOL_GRPC, + }, + { + TargetPort: "mesh", + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Workloads: &pbcatalog.WorkloadSelector{ + // Both replicasets should be present even though neither serves both ports + Prefixes: []string{ + "service-created-rs-abcde", + "service-created-rs-fghij", + }, + }, + VirtualIps: []string{"172.18.0.1"}, + }), + Metadata: map[string]string{ + constants.MetaKeyKubeNS: constants.DefaultConsulNS, + metaKeyManagedBy: constants.ManagedByEndpointsValue, + }, + }, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + runReconcileCase(t, tc) + }) + } +} + +func TestReconcile_UpdateService(t *testing.T) { + t.Parallel() + cases := []reconcileCase{ + { + name: "Pods changed", + svcName: "service-updated", + k8sObjects: func() []runtime.Object { + pod1 := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-abcde") + pod2 := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-klmno") + pod3 := createServicePod("DaemonSet", "service-created-ds", "12345") + pod4 := createServicePod("DaemonSet", "service-created-ds", "34567") + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: addressesForPods(pod1, pod2, pod3, pod4), + Ports: []corev1.EndpointPort{ + { + Name: "my-http-port", + AppProtocol: &appProtocolHttp, + Port: 2345, + }, + }, + }, + }, + } + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "172.18.0.1", + Ports: []corev1.ServicePort{ + { + Name: "public", + Port: 8080, + TargetPort: intstr.FromString("my-http-port"), + AppProtocol: &appProtocolHttp, + }, + }, + }, + } + return []runtime.Object{pod1, pod2, pod3, pod4, endpoints, service} + }, + existingResource: &pbresource.Resource{ + Id: &pbresource.ID{ + Name: "service-created", + Type: &pbresource.Type{ + Group: "catalog", + GroupVersion: "v1alpha1", + Kind: "Service", + }, + Tenancy: &pbresource.Tenancy{ + Namespace: constants.DefaultConsulNS, + Partition: constants.DefaultConsulPartition, + }, + }, + Data: common.ToProtoAny(&pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{ + { + VirtualPort: 8080, + TargetPort: "my-http-port", + Protocol: pbcatalog.Protocol_PROTOCOL_HTTP, + }, + { + TargetPort: "mesh", + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Workloads: &pbcatalog.WorkloadSelector{ + Prefixes: []string{ + "service-created-rs-abcde", // Retained + "service-created-rs-fghij", // Removed + }, + Names: []string{ + "service-created-ds-12345", // Retained + "service-created-ds-23456", // Removed + }, + }, + VirtualIps: []string{"172.18.0.1"}, + }), + Metadata: map[string]string{ + constants.MetaKeyKubeNS: constants.DefaultConsulNS, + metaKeyManagedBy: constants.ManagedByEndpointsValue, + }, + }, + expectedResource: &pbresource.Resource{ + Id: &pbresource.ID{ + Name: "service-created", + Type: &pbresource.Type{ + Group: "catalog", + GroupVersion: "v1alpha1", + Kind: "Service", + }, + Tenancy: &pbresource.Tenancy{ + Namespace: constants.DefaultConsulNS, + Partition: constants.DefaultConsulPartition, + }, + }, + Data: common.ToProtoAny(&pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{ + { + VirtualPort: 8080, + TargetPort: "my-http-port", + Protocol: pbcatalog.Protocol_PROTOCOL_HTTP, + }, + { + TargetPort: "mesh", + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Workloads: &pbcatalog.WorkloadSelector{ + + Prefixes: []string{ + "service-created-rs-abcde", // Retained + "service-created-rs-klmno", // New + }, + Names: []string{ + "service-created-ds-12345", // Retained + "service-created-ds-34567", // New + }, + }, + VirtualIps: []string{"172.18.0.1"}, + }), + Metadata: map[string]string{ + constants.MetaKeyKubeNS: constants.DefaultConsulNS, + metaKeyManagedBy: constants.ManagedByEndpointsValue, + }, + }, + }, + { + name: "Service ports changed", + svcName: "service-updated", + k8sObjects: func() []runtime.Object { + pod1 := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-abcde") + pod2 := createServicePod("DaemonSet", "service-created-ds", "12345") + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: addressesForPods(pod1, pod2), + Ports: []corev1.EndpointPort{ + { + Name: "my-http-port", + AppProtocol: &appProtocolHttp, + Port: 2345, + }, + { + Name: "my-grpc-port", + AppProtocol: &appProtocolHttp, + Port: 6789, + }, + }, + }, + }, + } + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "172.18.0.1", + Ports: []corev1.ServicePort{ + { + Name: "public", + Port: 8080, + TargetPort: intstr.FromString("new-http-port"), + AppProtocol: &appProtocolHttp2, + }, + { + Name: "api", + Port: 9091, + TargetPort: intstr.FromString("my-grpc-port"), + AppProtocol: &appProtocolGrpc, + }, + }, + }, + } + return []runtime.Object{pod1, pod2, endpoints, service} + }, + existingResource: &pbresource.Resource{ + Id: &pbresource.ID{ + Name: "service-updated", + Type: &pbresource.Type{ + Group: "catalog", + GroupVersion: "v1alpha1", + Kind: "Service", + }, + Tenancy: &pbresource.Tenancy{ + Namespace: constants.DefaultConsulNS, + Partition: constants.DefaultConsulPartition, + }, + }, + Data: common.ToProtoAny(&pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{ + { + VirtualPort: 8080, + TargetPort: "my-http-port", + Protocol: pbcatalog.Protocol_PROTOCOL_HTTP, + }, + { + VirtualPort: 9090, + TargetPort: "my-grpc-port", + Protocol: pbcatalog.Protocol_PROTOCOL_GRPC, + }, + { + VirtualPort: 10001, + TargetPort: "10001", + Protocol: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + { + TargetPort: "mesh", + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Workloads: &pbcatalog.WorkloadSelector{ + Prefixes: []string{"service-created-rs-abcde"}, + Names: []string{"service-created-ds-12345"}, + }, + VirtualIps: []string{"172.18.0.1"}, + }), + Metadata: map[string]string{ + constants.MetaKeyKubeNS: constants.DefaultConsulNS, + metaKeyManagedBy: constants.ManagedByEndpointsValue, + }, + }, + expectedResource: &pbresource.Resource{ + Id: &pbresource.ID{ + Name: "service-updated", + Type: &pbresource.Type{ + Group: "catalog", + GroupVersion: "v1alpha1", + Kind: "Service", + }, + Tenancy: &pbresource.Tenancy{ + Namespace: constants.DefaultConsulNS, + Partition: constants.DefaultConsulPartition, + }, + }, + Data: common.ToProtoAny(&pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{ + { + VirtualPort: 8080, + TargetPort: "new-http-port", // Updated + Protocol: pbcatalog.Protocol_PROTOCOL_HTTP2, // Updated + }, + { + VirtualPort: 9091, // Updated + TargetPort: "my-grpc-port", + Protocol: pbcatalog.Protocol_PROTOCOL_GRPC, + }, + // Port 10001 removed + { + TargetPort: "mesh", + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Workloads: &pbcatalog.WorkloadSelector{ + Prefixes: []string{"service-created-rs-abcde"}, + Names: []string{"service-created-ds-12345"}, + }, + VirtualIps: []string{"172.18.0.1"}, + }), + Metadata: map[string]string{ + constants.MetaKeyKubeNS: constants.DefaultConsulNS, + metaKeyManagedBy: constants.ManagedByEndpointsValue, + }, + }, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + runReconcileCase(t, tc) + }) + } +} + +func TestReconcile_DeleteService(t *testing.T) { + t.Parallel() + cases := []reconcileCase{ + { + name: "Basic Endpoints not found (service deleted)", + svcName: "service-deleted", + existingResource: &pbresource.Resource{ + Id: &pbresource.ID{ + Name: "service-created", + Type: &pbresource.Type{ + Group: "catalog", + GroupVersion: "v1alpha1", + Kind: "Service", + }, + Tenancy: &pbresource.Tenancy{ + Namespace: constants.DefaultConsulNS, + Partition: constants.DefaultConsulPartition, + }, + }, + Data: common.ToProtoAny(&pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{ + { + VirtualPort: 8080, + TargetPort: "my-http-port", + Protocol: pbcatalog.Protocol_PROTOCOL_HTTP, + }, + { + TargetPort: "mesh", + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Workloads: &pbcatalog.WorkloadSelector{ + Prefixes: []string{"service-created-rs-abcde"}, + Names: []string{"service-created-ds-12345"}, + }, + VirtualIps: []string{"172.18.0.1"}, + }), + Metadata: map[string]string{ + constants.MetaKeyKubeNS: constants.DefaultConsulNS, + metaKeyManagedBy: constants.ManagedByEndpointsValue, + }, + }, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + runReconcileCase(t, tc) + }) + } +} + +func TestGetWorkloadSelectorFromEndpoints(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testCase struct { + name string + endpoints *corev1.Endpoints + responses map[types.NamespacedName]*corev1.Pod + expected *pbcatalog.WorkloadSelector + mockFn func(*testing.T, *MockPodFetcher) + } + + rsPods := []*corev1.Pod{ + createServicePod(kindReplicaSet, "svc-rs-abcde", "12345"), + createServicePod(kindReplicaSet, "svc-rs-abcde", "23456"), + createServicePod(kindReplicaSet, "svc-rs-abcde", "34567"), + createServicePod(kindReplicaSet, "svc-rs-fghij", "12345"), + createServicePod(kindReplicaSet, "svc-rs-fghij", "23456"), + createServicePod(kindReplicaSet, "svc-rs-fghij", "34567"), + } + otherPods := []*corev1.Pod{ + createServicePod("DaemonSet", "svc-ds", "12345"), + createServicePod("DaemonSet", "svc-ds", "23456"), + createServicePod("DaemonSet", "svc-ds", "34567"), + createServicePod("StatefulSet", "svc-ss", "12345"), + createServicePod("StatefulSet", "svc-ss", "23456"), + createServicePod("StatefulSet", "svc-ss", "34567"), + } + podsByName := make(map[types.NamespacedName]*corev1.Pod) + for _, p := range rsPods { + podsByName[types.NamespacedName{Name: p.Name, Namespace: p.Namespace}] = p + } + for _, p := range otherPods { + podsByName[types.NamespacedName{Name: p.Name, Namespace: p.Namespace}] = p + } + + cases := []testCase{ + { + name: "Pod is fetched once per ReplicaSet", + endpoints: &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: addressesForPods(rsPods...), + Ports: []corev1.EndpointPort{ + { + Name: "my-http-port", + AppProtocol: &appProtocolHttp, + Port: 2345, + }, + }, + }, + }, + }, + responses: podsByName, + expected: getWorkloadSelector( + // Selector should consist of prefixes only. + map[string]any{ + "svc-rs-abcde": true, + "svc-rs-fghij": true, + }, + map[string]any{}), + mockFn: func(t *testing.T, pf *MockPodFetcher) { + // Assert called once per set. + require.Equal(t, 2, len(pf.calls)) + }, + }, + { + name: "Pod is fetched once per other pod owner type", + endpoints: &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: addressesForPods(otherPods...), + Ports: []corev1.EndpointPort{ + { + Name: "my-http-port", + AppProtocol: &appProtocolHttp, + Port: 2345, + }, + }, + }, + }, + }, + responses: podsByName, + expected: getWorkloadSelector( + // Selector should consist of exact name matches only. + map[string]any{}, + map[string]any{ + "svc-ds-12345": true, + "svc-ds-23456": true, + "svc-ds-34567": true, + "svc-ss-12345": true, + "svc-ss-23456": true, + "svc-ss-34567": true, + }), + mockFn: func(t *testing.T, pf *MockPodFetcher) { + // Assert called once per pod. + require.Equal(t, len(otherPods), len(pf.calls)) + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + // Create mock pod fetcher. + pf := MockPodFetcher{responses: tc.responses} + + // Create the Endpoints controller. + ep := &Controller{ + Log: logrtest.New(t), + } + + resp, err := ep.getWorkloadSelectorFromEndpoints(ctx, &pf, tc.endpoints) + require.NoError(t, err) + + // We don't care about order, so configure cmp.Diff to ignore slice order. + sorter := func(a, b string) bool { return a < b } + if diff := cmp.Diff(tc.expected, resp, protocmp.Transform(), cmpopts.SortSlices(sorter)); diff != "" { + t.Errorf("unexpected difference:\n%v", diff) + } + tc.mockFn(t, &pf) + }) + } +} + +type MockPodFetcher struct { + calls []types.NamespacedName + responses map[types.NamespacedName]*corev1.Pod +} + +func (m *MockPodFetcher) GetPod(_ context.Context, name types.NamespacedName) (*corev1.Pod, error) { + m.calls = append(m.calls, name) + if v, ok := m.responses[name]; !ok { + panic(fmt.Errorf("test is missing response for passed pod name: %v", name)) + } else { + return v, nil + } +} + +func runReconcileCase(t *testing.T, tc reconcileCase) { + t.Helper() + + // Create fake k8s client + var k8sObjects []runtime.Object + if tc.k8sObjects != nil { + k8sObjects = tc.k8sObjects() + } + fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build() + + // Create test Consul server. + testClient := test.TestServerWithMockConnMgrWatcher(t, func(c *testutil.TestServerConfig) { + c.Experiments = []string{"resource-apis"} + }) + + // Create the Endpoints controller. + ep := &Controller{ + Client: fakeClient, + Log: logrtest.New(t), + ConsulServerConnMgr: testClient.Watcher, + AllowK8sNamespacesSet: mapset.NewSetWith("*"), + DenyK8sNamespacesSet: mapset.NewSetWith(), + } + resourceClient, err := consul.NewResourceServiceClient(ep.ConsulServerConnMgr) + require.NoError(t, err) + + // Default ns and partition if not specified in test. + if tc.targetConsulNs == "" { + tc.targetConsulNs = constants.DefaultConsulNS + } + if tc.targetConsulPartition == "" { + tc.targetConsulPartition = constants.DefaultConsulPartition + } + + // If existing resource specified, create it and ensure it exists. + if tc.existingResource != nil { + writeReq := &pbresource.WriteRequest{Resource: tc.existingResource} + _, err = resourceClient.Write(context.Background(), writeReq) + require.NoError(t, err) + test.ResourceHasPersisted(t, resourceClient, tc.existingResource.Id) + } + + // Run actual reconcile and verify results. + resp, err := ep.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{ + Name: tc.svcName, + Namespace: tc.targetConsulNs, + }, + }) + if tc.expErr != "" { + require.ErrorContains(t, err, tc.expErr) + } else { + require.NoError(t, err) + } + require.False(t, resp.Requeue) + + expectedServiceMatches(t, resourceClient, tc.svcName, tc.targetConsulNs, tc.targetConsulPartition, tc.expectedResource) + +} + +func expectedServiceMatches(t *testing.T, client pbresource.ResourceServiceClient, name, namespace, partition string, expectedResource *pbresource.Resource) { + req := &pbresource.ReadRequest{Id: getServiceID(name, namespace, partition)} + + res, err := client.Read(context.Background(), req) + + if expectedResource == nil { + require.Error(t, err) + s, ok := status.FromError(err) + require.True(t, ok) + require.Equal(t, codes.NotFound, s.Code()) + return + } + + require.NoError(t, err) + require.NotNil(t, res) + require.NotNil(t, res.GetResource().GetData()) + + expectedService := &pbcatalog.Service{} + err = anypb.UnmarshalTo(expectedResource.Data, expectedService, proto.UnmarshalOptions{}) + require.NoError(t, err) + + actualService := &pbcatalog.Service{} + err = res.GetResource().GetData().UnmarshalTo(actualService) + require.NoError(t, err) + + if diff := cmp.Diff(expectedService, actualService, protocmp.Transform()); diff != "" { + t.Errorf("unexpected difference:\n%v", diff) + } +} + +func createServicePodOwnedBy(ownerKind, ownerName string) *corev1.Pod { + return createServicePod(ownerKind, ownerName, randomKubernetesId()) +} + +func createServicePod(ownerKind, ownerName, podId string) *corev1.Pod { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", ownerName, podId), + Namespace: "default", + Labels: map[string]string{}, + Annotations: map[string]string{ + constants.AnnotationConsulK8sVersion: "1.3.0", + }, + OwnerReferences: []metav1.OwnerReference{ + { + Name: ownerName, + Kind: ownerKind, + }, + }, + }, + } + return pod +} + +func addressesForPods(pods ...*corev1.Pod) []corev1.EndpointAddress { + var addresses []corev1.EndpointAddress + for i, p := range pods { + addresses = append(addresses, corev1.EndpointAddress{ + IP: fmt.Sprintf("1.2.3.%d", i), + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: p.Name, + Namespace: p.Namespace, + }, + }) + } + return addresses +} + +func randomKubernetesId() string { + u, err := uuid.GenerateUUID() + if err != nil { + panic(err) + } + return u[:5] +} diff --git a/control-plane/subcommand/inject-connect/v2controllers.go b/control-plane/subcommand/inject-connect/v2controllers.go index a9f29cd6a5..df3830a799 100644 --- a/control-plane/subcommand/inject-connect/v2controllers.go +++ b/control-plane/subcommand/inject-connect/v2controllers.go @@ -5,11 +5,11 @@ package connectinject import ( "context" - "github.com/hashicorp/consul-server-connection-manager/discovery" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/manager" + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/controllers/endpointsv2" "github.com/hashicorp/consul-k8s/control-plane/connect-inject/controllers/pod" "github.com/hashicorp/consul-k8s/control-plane/subcommand/flags" ) @@ -59,7 +59,23 @@ func (c *Command) configureV2Controllers(ctx context.Context, mgr manager.Manage return err } - // TODO: V2 Endpoints Controller + if err := (&endpointsv2.Controller{ + Client: mgr.GetClient(), + ConsulServerConnMgr: watcher, + AllowK8sNamespacesSet: allowK8sNamespaces, + DenyK8sNamespacesSet: denyK8sNamespaces, + EnableConsulPartitions: c.flagEnablePartitions, + EnableConsulNamespaces: c.flagEnableNamespaces, + ConsulDestinationNamespace: c.flagConsulDestinationNamespace, + EnableNSMirroring: c.flagEnableK8SNSMirroring, + NSMirroringPrefix: c.flagK8SNSMirroringPrefix, + Log: ctrl.Log.WithName("controller").WithName("endpoints"), + Scheme: mgr.GetScheme(), + Context: ctx, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", endpointsv2.Controller{}) + return err + } // TODO: Nodes Controller