From 8558ecafcee5d16f0466771d36eac1a7ca2c3773 Mon Sep 17 00:00:00 2001 From: Ashwin Venkatesh Date: Thu, 8 Apr 2021 10:47:54 -0400 Subject: [PATCH 1/2] Add Consul Enterprise Namespace support to endpoints controller - In order to support Consul namespaces, the controller now accepts values namespacesEnabled, consulDestinationNamespace, mirroringEnabled and mirroringPrefix which determine which Consul namespace a service and it's proxy should be registered in when created. This behavior is fairly straightforward when registering an endpoint but is a little tricker when de-registration is concerned. During de-registration, as we use a consul agent, we need to iterate through the list of all the namespaces in Consul that we register services against and create a Client that targets that namespace and agent to find services registered against the agent in a given namespace. - Additional changes here require the creating the ACL authmethod in the default namespace if namespace mirroring is configured and but in the destination namespace otherwise. This also requires us to explicitly specify the destination namespace of the service in order to query the agent for the service being available accurately. - Update the log format of the endpoints controller to be less verbose --- connect-inject/container_init.go | 7 +- connect-inject/container_init_test.go | 8 +- connect-inject/endpoints_controller.go | 105 +- .../endpoints_controller_ent_test.go | 1230 +++++++++++++++++ connect-inject/endpoints_controller_test.go | 92 +- go.mod | 1 - subcommand/common/common.go | 4 +- subcommand/common/common_test.go | 11 +- subcommand/connect-init/command.go | 20 +- subcommand/connect-init/command_ent_test.go | 307 ++++ subcommand/inject-connect/command.go | 52 +- 11 files changed, 1739 insertions(+), 98 deletions(-) create mode 100644 connect-inject/endpoints_controller_ent_test.go create mode 100644 subcommand/connect-init/command_ent_test.go diff --git a/connect-inject/container_init.go b/connect-inject/container_init.go index 91f4970493..6e225ca90d 100644 --- a/connect-inject/container_init.go +++ b/connect-inject/container_init.go @@ -178,12 +178,15 @@ consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \ {{- if .NamespaceMirroringEnabled }} {{- /* If namespace mirroring is enabled, the auth method is defined in the default namespace */}} - -namespace="default" + -auth-method-namespace="default" \ {{- else }} - -namespace="{{ .ConsulNamespace }}" + -auth-method-namespace="{{ .ConsulNamespace }}" \ {{- end }} {{- end }} {{- end }} + {{- if .ConsulNamespace }} + -consul-service-namespace="{{ .ConsulNamespace }}" \ + {{- end }} # Generate the envoy bootstrap code /consul/connect-inject/consul connect envoy \ diff --git a/connect-inject/container_init_test.go b/connect-inject/container_init_test.go index 844168c021..b869ed0d6d 100644 --- a/connect-inject/container_init_test.go +++ b/connect-inject/container_init_test.go @@ -168,6 +168,7 @@ func TestHandlerContainerInit_namespacesEnabled(t *testing.T) { export CONSUL_HTTP_ADDR="${HOST_IP}:8500" export CONSUL_GRPC_ADDR="${HOST_IP}:8502" consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \ + -consul-service-namespace="default" \ # Generate the envoy bootstrap code /consul/connect-inject/consul connect envoy \ @@ -192,6 +193,7 @@ consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \ export CONSUL_HTTP_ADDR="${HOST_IP}:8500" export CONSUL_GRPC_ADDR="${HOST_IP}:8502" consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \ + -consul-service-namespace="non-default" \ # Generate the envoy bootstrap code /consul/connect-inject/consul connect envoy \ @@ -218,7 +220,8 @@ export CONSUL_HTTP_ADDR="${HOST_IP}:8500" export CONSUL_GRPC_ADDR="${HOST_IP}:8502" consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \ -acl-auth-method="auth-method" \ - -namespace="non-default" + -auth-method-namespace="non-default" \ + -consul-service-namespace="non-default" \ # Generate the envoy bootstrap code /consul/connect-inject/consul connect envoy \ @@ -247,7 +250,8 @@ export CONSUL_HTTP_ADDR="${HOST_IP}:8500" export CONSUL_GRPC_ADDR="${HOST_IP}:8502" consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \ -acl-auth-method="auth-method" \ - -namespace="default" + -auth-method-namespace="default" \ + -consul-service-namespace="k8snamespace" \ # Generate the envoy bootstrap code /consul/connect-inject/consul connect envoy \ diff --git a/connect-inject/endpoints_controller.go b/connect-inject/endpoints_controller.go index 59b20b09e7..793ec25a80 100644 --- a/connect-inject/endpoints_controller.go +++ b/connect-inject/endpoints_controller.go @@ -8,6 +8,7 @@ import ( "github.com/deckarep/golang-set" "github.com/go-logr/logr" "github.com/hashicorp/consul-k8s/consul" + "github.com/hashicorp/consul-k8s/namespaces" "github.com/hashicorp/consul/api" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -48,6 +49,25 @@ type EndpointsController struct { AllowK8sNamespacesSet mapset.Set // Endpoints in the DenyK8sNamespacesSet are ignored. DenyK8sNamespacesSet mapset.Set + // EnableConsulNamespaces indicates that a user is running Consul Enterprise + // with version 1.7+ which supports namespaces. + EnableConsulNamespaces bool + // ConsulDestinationNamespace is the name of the Consul namespace to create + // all config entries in. If EnableNSMirroring is true this is ignored. + ConsulDestinationNamespace string + // EnableNSMirroring causes Consul namespaces to be created to match the + // k8s namespace of any config entry custom resource. Config entries will + // be created in the matching Consul namespace. + EnableNSMirroring bool + // NSMirroringPrefix is an optional prefix that can be added to the Consul + // namespaces created while mirroring. For example, if it is set to "k8s-", + // then the k8s `default` namespace will be mirrored in Consul's + // `k8s-default` namespace. + NSMirroringPrefix string + // CrossNSACLPolicy is the name of the ACL policy to attach to + // any created Consul namespaces to allow cross namespace service discovery. + // Only necessary if ACLs are enabled. + CrossNSACLPolicy string // ReleaseName is the Consul Helm installation release. ReleaseName string // ReleaseNamespace is the namespace where Consul is installed. @@ -77,11 +97,11 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) ( } return ctrl.Result{}, nil } else if err != nil { - r.Log.Error(err, "failed to get Endpoints from Kubernetes", "name", req.Name, "namespace", req.Namespace) + r.Log.Error(err, "failed to get Endpoints", "name", req.Name, "ns", req.Namespace) return ctrl.Result{}, err } - r.Log.Info("retrieved Kubernetes Endpoints", "endpoints", serviceEndpoints.Name, "endpoints-namespace", serviceEndpoints.Namespace) + r.Log.Info("retrieved", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) // endpointAddressMap stores every IP that corresponds to a Pod in the Endpoints object. It is used to compare // against service instances in Consul to deregister them if they are not in the map. @@ -100,13 +120,13 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) ( var pod corev1.Pod objectKey := types.NamespacedName{Name: address.TargetRef.Name, Namespace: address.TargetRef.Namespace} if err = r.Client.Get(ctx, objectKey, &pod); err != nil { - r.Log.Error(err, "failed to get pod from Kubernetes", "pod-name", address.TargetRef.Name) + r.Log.Error(err, "failed to get pod", "name", address.TargetRef.Name) return ctrl.Result{}, err } if hasBeenInjected(pod) { // Create client for Consul agent local to the pod. - client, err := r.remoteConsulClient(pod.Status.HostIP) + client, err := r.remoteConsulClient(pod.Status.HostIP, r.consulNamespace(pod.Namespace)) if err != nil { r.Log.Error(err, "failed to create a new Consul client", "address", pod.Status.HostIP) return ctrl.Result{}, err @@ -115,7 +135,7 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) ( // Get information from the pod to create service instance registrations. serviceRegistration, proxyServiceRegistration, err := r.createServiceRegistrations(pod, serviceEndpoints) if err != nil { - r.Log.Error(err, "failed to create service registrations", "endpoints", serviceEndpoints.Name) + r.Log.Error(err, "failed to create service registrations for endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) return ctrl.Result{}, err } @@ -123,30 +143,31 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) ( // Note: the order of how we register services is important, // and the connect-proxy service should come after the "main" service // because its alias health check depends on the main service existing. - r.Log.Info("registering service", "service", serviceRegistration.Name) + r.Log.Info("registering service with Consul", "name", serviceRegistration.Name) err = client.Agent().ServiceRegister(serviceRegistration) if err != nil { - r.Log.Error(err, "failed to register service with Consul", "consul-service-name", serviceRegistration.Name) + r.Log.Error(err, "failed to register service", "name", serviceRegistration.Name) return ctrl.Result{}, err } // Register the proxy service instance with the local agent. - r.Log.Info("registering proxy service", "service", proxyServiceRegistration.Name) + r.Log.Info("registering proxy service with Consul", "name", proxyServiceRegistration.Name) err = client.Agent().ServiceRegister(proxyServiceRegistration) if err != nil { - r.Log.Error(err, "failed to register proxy service with Consul", "consul-proxy-service-name", proxyServiceRegistration.Name) + r.Log.Error(err, "failed to register proxy service", "name", proxyServiceRegistration.Name) return ctrl.Result{}, err } // Update the TTL health check for the service. // This is required because ServiceRegister() does not update the TTL if the service already exists. - r.Log.Info("updating ttl health check", "service", serviceRegistration.Name) + r.Log.Info("updating TTL health check for service", "name", serviceRegistration.Name) status, reason, err := getReadyStatusAndReason(pod) if err != nil { return ctrl.Result{}, err } err = client.Agent().UpdateTTL(getConsulHealthCheckID(pod, serviceRegistration.ID), reason, status) if err != nil { + r.Log.Error(err, "failed to update TTL health check", "name", serviceRegistration.Name) return ctrl.Result{}, err } } @@ -158,7 +179,7 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) ( // from Consul. This uses endpointAddressMap which is populated with the addresses in the Endpoints object during // the registration codepath. if err = r.deregisterServiceOnAllAgents(ctx, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap); err != nil { - r.Log.Error(err, "failed to deregister service instances on all agents", "k8s-service-name", serviceEndpoints.Name, "k8s-namespace", serviceEndpoints.Namespace) + r.Log.Error(err, "failed to deregister endpoints on all agents", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) return ctrl.Result{}, err } @@ -240,7 +261,7 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service Port: servicePort, Address: pod.Status.PodIP, Meta: meta, - Namespace: "", // TODO: namespace support + Namespace: r.consulNamespace(pod.Namespace), Check: &api.AgentServiceCheck{ CheckID: getConsulHealthCheckID(pod, serviceID), Name: "Kubernetes Health Check", @@ -299,7 +320,7 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service Address: pod.Status.PodIP, TaggedAddresses: nil, // TODO: set cluster IP here (will be done later) Meta: meta, - Namespace: "", // TODO: same as service namespace + Namespace: r.consulNamespace(pod.Namespace), Proxy: proxyConfig, Checks: api.AgentServiceChecks{ { @@ -359,9 +380,8 @@ func getReadyStatusAndReason(pod corev1.Pod) (string, string, error) { // them only if they are not in endpointsAddressesMap. If the map is nil, it will deregister all instances. If the map // has addresses, it will only deregister instances not in the map. func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context, k8sSvcName, k8sSvcNamespace string, endpointsAddressesMap map[string]bool) error { - // Get all agents by getting pods with label component=client, app=consul and release= - list := corev1.PodList{} + agents := corev1.PodList{} listOptions := client.ListOptions{ Namespace: r.ReleaseNamespace, LabelSelector: labels.SelectorFromSet(map[string]string{ @@ -370,24 +390,23 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context, "release": r.ReleaseName, }), } - if err := r.Client.List(ctx, &list, &listOptions); err != nil { - r.Log.Error(err, "failed to get agent pods from Kubernetes") + if err := r.Client.List(ctx, &agents, &listOptions); err != nil { + r.Log.Error(err, "failed to get Consul client agent pods") return err } // On each agent, we need to get services matching "k8s-service-name" and "k8s-namespace" metadata. - for _, pod := range list.Items { - // Create client for this agent. - client, err := r.remoteConsulClient(pod.Status.PodIP) + for _, agent := range agents.Items { + client, err := r.remoteConsulClient(agent.Status.PodIP, r.consulNamespace(k8sSvcNamespace)) if err != nil { - r.Log.Error(err, "failed to create a new Consul client", "address", pod.Status.PodIP) + r.Log.Error(err, "failed to create a new Consul client", "address", agent.Status.PodIP) return err } // Get services matching metadata. svcs, err := serviceInstancesForK8SServiceNameAndNamespace(k8sSvcName, k8sSvcNamespace, client) if err != nil { - r.Log.Error(err, "failed to get service instances", MetaKeyKubeServiceName, k8sSvcName) + r.Log.Error(err, "failed to get service instances", "name", k8sSvcName) return err } @@ -399,13 +418,13 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context, if _, ok := endpointsAddressesMap[serviceRegistration.Address]; !ok { // If the service address is not in the Endpoints addresses, deregister it. if err = client.Agent().ServiceDeregister(svcID); err != nil { - r.Log.Error(err, "failed to deregister service instance", "consul-service-id", svcID) + r.Log.Error(err, "failed to deregister service instance", "id", svcID) return err } } } else { if err = client.Agent().ServiceDeregister(svcID); err != nil { - r.Log.Error(err, "failed to deregister service instance", "consul-service-id", svcID) + r.Log.Error(err, "failed to deregister service instance", "id", svcID) return err } } @@ -430,7 +449,7 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream, for _, raw := range strings.Split(raw, ",") { parts := strings.SplitN(raw, ":", 3) - var datacenter, serviceName, preparedQuery string + var datacenter, serviceName, preparedQuery, namespace string var port int32 if strings.TrimSpace(parts[0]) == "prepared_query" { port, _ = portValue(pod, strings.TrimSpace(parts[2])) @@ -438,9 +457,17 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream, } else { port, _ = portValue(pod, strings.TrimSpace(parts[1])) - // TODO: Parse the namespace if provided - - serviceName = strings.TrimSpace(parts[0]) + // If Consul Namespaces are enabled, attempt to parse the + // upstream for a namespace. + if r.EnableConsulNamespaces { + pieces := strings.SplitN(parts[0], ".", 2) + serviceName = strings.TrimSpace(pieces[0]) + if len(pieces) > 1 { + namespace = strings.TrimSpace(pieces[1]) + } + } else { + serviceName = strings.TrimSpace(parts[0]) + } // parse the optional datacenter if len(parts) > 2 { @@ -469,7 +496,7 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream, if port > 0 { upstream := api.Upstream{ DestinationType: api.UpstreamDestTypeService, - DestinationNamespace: "", // todo + DestinationNamespace: namespace, DestinationName: serviceName, Datacenter: datacenter, LocalBindPort: int(port), @@ -488,12 +515,12 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream, return upstreams, nil } -// remoteConsulClient returns an *api.Client that points at the consul agent local to the pod. -func (r *EndpointsController) remoteConsulClient(ip string) (*api.Client, error) { +// remoteConsulClient returns an *api.Client that points at the consul agent local to the pod for a provided namespace. +func (r *EndpointsController) remoteConsulClient(ip string, namespace string) (*api.Client, error) { newAddr := fmt.Sprintf("%s://%s:%s", r.ConsulScheme, ip, r.ConsulPort) localConfig := r.ConsulClientCfg localConfig.Address = newAddr - + localConfig.Namespace = namespace return consul.NewClient(localConfig) } @@ -553,20 +580,20 @@ func (r EndpointsController) filterAgentPods(object client.Object) bool { // for client agent pods where the Ready condition is true. func (r EndpointsController) requestsForRunningAgentPods(object client.Object) []ctrl.Request { var consulClientPod corev1.Pod - r.Log.Info("received update for consulClientPod", "podName", object.GetName()) + r.Log.Info("received update for Consul client pod", "name", object.GetName()) err := r.Client.Get(r.Context, types.NamespacedName{Name: object.GetName(), Namespace: object.GetNamespace()}, &consulClientPod) if k8serrors.IsNotFound(err) { // Ignore if consulClientPod is not found. return []ctrl.Request{} } if err != nil { - r.Log.Error(err, "failed to get consulClientPod", "consulClientPod", consulClientPod.Name) + r.Log.Error(err, "failed to get Consul client pod", "name", consulClientPod.Name) return []ctrl.Request{} } // We can ignore the agent pod if it's not running, since // we can't reconcile and register/deregister services against that agent. if consulClientPod.Status.Phase != corev1.PodRunning { - r.Log.Info("ignoring consulClientPod because it's not running", "consulClientPod", consulClientPod.Name) + r.Log.Info("ignoring Consul client pod because it's not running", "name", consulClientPod.Name) return []ctrl.Request{} } // We can ignore the agent pod if it's not yet ready, since @@ -574,7 +601,7 @@ func (r EndpointsController) requestsForRunningAgentPods(object client.Object) [ for _, cond := range consulClientPod.Status.Conditions { if cond.Type == corev1.PodReady && cond.Status != corev1.ConditionTrue { // Ignore if consulClientPod is not ready. - r.Log.Info("ignoring consulClientPod because it's not ready", "consulClientPod", consulClientPod.Name) + r.Log.Info("ignoring Consul client pod because it's not ready", "name", consulClientPod.Name) return []ctrl.Request{} } } @@ -605,6 +632,12 @@ func (r EndpointsController) requestsForRunningAgentPods(object client.Object) [ return requests } +// consulNamespace returns the Consul destination namespace for a provided Kubernetes namespace +// depending on Consul Namespaces being enabled and the value of namespace mirroring. +func (r *EndpointsController) consulNamespace(namespace string) string { + return namespaces.ConsulNamespace(namespace, r.EnableConsulNamespaces, r.ConsulDestinationNamespace, r.EnableNSMirroring, r.NSMirroringPrefix) +} + // hasBeenInjected checks the value of the status annotation and returns true if the Pod has been injected. func hasBeenInjected(pod corev1.Pod) bool { if anno, ok := pod.Annotations[keyInjectStatus]; ok { diff --git a/connect-inject/endpoints_controller_ent_test.go b/connect-inject/endpoints_controller_ent_test.go new file mode 100644 index 0000000000..a997d12ead --- /dev/null +++ b/connect-inject/endpoints_controller_ent_test.go @@ -0,0 +1,1230 @@ +// +build enterprise + +package connectinject + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/deckarep/golang-set" + logrtest "github.com/go-logr/logr/testing" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/hashicorp/consul-k8s/namespaces" + "github.com/hashicorp/consul-k8s/subcommand/common" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/sdk/testutil" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +// TestReconcileCreateEndpoint tests the logic to create service instances in Consul from the addresses in the Endpoints +// object. The cases test a basic endpoints object with two addresses. This test verifies that the services and their TTL +// health checks are created in the expected Consul namespace for various combinations of namespace flags. +// This test covers EndpointsController.createServiceRegistrations. +func TestReconcileCreateEndpointWithNamespaces(t *testing.T) { + t.Parallel() + nodeName := "test-node" + cases := map[string]struct { + Mirror bool + MirrorPrefix string + SourceKubeNS string + DestConsulNS string + ExpConsulNS string + }{ + "SourceKubeNS=default, DestConsulNS=default": { + SourceKubeNS: "default", + DestConsulNS: "default", + ExpConsulNS: "default", + }, + "SourceKubeNS=kube, DestConsulNS=default": { + SourceKubeNS: "kube", + DestConsulNS: "default", + ExpConsulNS: "default", + }, + "SourceKubeNS=default, DestConsulNS=other": { + SourceKubeNS: "default", + DestConsulNS: "other", + ExpConsulNS: "other", + }, + "SourceKubeNS=kube, DestConsulNS=other": { + SourceKubeNS: "kube", + DestConsulNS: "other", + ExpConsulNS: "other", + }, + "SourceKubeNS=default, Mirror=true": { + SourceKubeNS: "default", + Mirror: true, + ExpConsulNS: "default", + }, + "SourceKubeNS=kube, Mirror=true": { + SourceKubeNS: "kube", + Mirror: true, + ExpConsulNS: "kube", + }, + "SourceKubeNS=default, Mirror=true, Prefix=prefix": { + SourceKubeNS: "default", + Mirror: true, + MirrorPrefix: "prefix-", + ExpConsulNS: "prefix-default", + }, + } + for name, test := range cases { + setup := struct { + consulSvcName string + k8sObjects func() []runtime.Object + initialConsulSvcs []*api.AgentServiceRegistration + expectedNumSvcInstances int + expectedConsulSvcInstances []*api.CatalogService + expectedProxySvcInstances []*api.CatalogService + expectedAgentHealthChecks []*api.AgentCheck + }{ + consulSvcName: "service-created", + k8sObjects: func() []runtime.Object { + pod1 := createPodWithNamespace("pod1", test.SourceKubeNS, "1.2.3.4", true) + pod2 := createPodWithNamespace("pod2", test.SourceKubeNS, "2.2.3.4", true) + endpointWithTwoAddresses := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-created", + Namespace: test.SourceKubeNS, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: test.SourceKubeNS, + }, + }, + { + IP: "2.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod2", + Namespace: test.SourceKubeNS, + }, + }, + }, + }, + }, + } + return []runtime.Object{pod1, pod2, endpointWithTwoAddresses} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{}, + expectedNumSvcInstances: 2, + expectedConsulSvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-created", + ServiceName: "service-created", + ServiceAddress: "1.2.3.4", + ServiceMeta: map[string]string{MetaKeyPodName: "pod1", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: test.SourceKubeNS}, + ServiceTags: []string{}, + Namespace: test.ExpConsulNS, + }, + { + ServiceID: "pod2-service-created", + ServiceName: "service-created", + ServiceAddress: "2.2.3.4", + ServiceMeta: map[string]string{MetaKeyPodName: "pod2", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: test.SourceKubeNS}, + ServiceTags: []string{}, + Namespace: test.ExpConsulNS, + }, + }, + expectedProxySvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-created-sidecar-proxy", + ServiceName: "service-created-sidecar-proxy", + ServiceAddress: "1.2.3.4", + ServicePort: 20000, + ServiceProxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-created", + DestinationServiceID: "pod1-service-created", + }, + ServiceMeta: map[string]string{MetaKeyPodName: "pod1", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: test.SourceKubeNS}, + ServiceTags: []string{}, + Namespace: test.ExpConsulNS, + }, + { + ServiceID: "pod2-service-created-sidecar-proxy", + ServiceName: "service-created-sidecar-proxy", + ServiceAddress: "2.2.3.4", + ServicePort: 20000, + ServiceProxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-created", + DestinationServiceID: "pod2-service-created", + }, + ServiceMeta: map[string]string{MetaKeyPodName: "pod2", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: test.SourceKubeNS}, + ServiceTags: []string{}, + Namespace: test.ExpConsulNS, + }, + }, + expectedAgentHealthChecks: []*api.AgentCheck{ + { + CheckID: fmt.Sprintf("%s/pod1-service-created/kubernetes-health-check", test.SourceKubeNS), + ServiceName: "service-created", + ServiceID: "pod1-service-created", + Name: "Kubernetes Health Check", + Status: api.HealthPassing, + Output: kubernetesSuccessReasonMsg, + Type: ttl, + Namespace: test.ExpConsulNS, + }, + { + CheckID: fmt.Sprintf("%s/pod2-service-created/kubernetes-health-check", test.SourceKubeNS), + ServiceName: "service-created", + ServiceID: "pod2-service-created", + Name: "Kubernetes Health Check", + Status: api.HealthPassing, + Output: kubernetesSuccessReasonMsg, + Type: ttl, + Namespace: test.ExpConsulNS, + }, + }, + } + t.Run(name, func(t *testing.T) { + // The agent pod needs to have the address 127.0.0.1 so when the + // code gets the agent pods via the label component=client, and + // makes requests against the agent API, it will actually hit the + // test server we have on localhost. + fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false) + fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"} + + // Create fake k8s client. + k8sObjects := append(setup.k8sObjects(), fakeClientPod) + fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build() + + // Create test Consul server. + consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { + c.NodeName = nodeName + }) + require.NoError(t, err) + defer consul.Stop() + consul.WaitForLeader(t) + + cfg := &api.Config{ + Address: consul.HTTPAddr, + Namespace: test.ExpConsulNS, + } + consulClient, err := api.NewClient(cfg) + require.NoError(t, err) + addr := strings.Split(consul.HTTPAddr, ":") + consulPort := addr[1] + + _, err = namespaces.EnsureExists(consulClient, test.ExpConsulNS, "") + require.NoError(t, err) + + // Register service and proxy in Consul. + for _, svc := range setup.initialConsulSvcs { + err = consulClient.Agent().ServiceRegister(svc) + require.NoError(t, err) + } + + // Create the endpoints controller. + ep := &EndpointsController{ + Client: fakeClient, + Log: logrtest.TestLogger{T: t}, + ConsulClient: consulClient, + ConsulPort: consulPort, + ConsulScheme: "http", + AllowK8sNamespacesSet: mapset.NewSetWith("*"), + DenyK8sNamespacesSet: mapset.NewSetWith(), + ReleaseName: "consul", + ReleaseNamespace: "default", + ConsulClientCfg: cfg, + EnableConsulNamespaces: true, + ConsulDestinationNamespace: test.DestConsulNS, + EnableNSMirroring: test.Mirror, + NSMirroringPrefix: test.MirrorPrefix, + } + namespacedName := types.NamespacedName{ + Namespace: test.SourceKubeNS, + Name: "service-created", + } + + resp, err := ep.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: namespacedName, + }) + require.NoError(t, err) + require.False(t, resp.Requeue) + + // After reconciliation, Consul should have the service with the correct number of instances. + serviceInstances, _, err := consulClient.Catalog().Service(setup.consulSvcName, "", &api.QueryOptions{Namespace: test.ExpConsulNS}) + require.NoError(t, err) + require.Len(t, serviceInstances, setup.expectedNumSvcInstances) + for i, instance := range serviceInstances { + require.Equal(t, setup.expectedConsulSvcInstances[i].ServiceID, instance.ServiceID) + require.Equal(t, setup.expectedConsulSvcInstances[i].ServiceName, instance.ServiceName) + require.Equal(t, setup.expectedConsulSvcInstances[i].ServiceAddress, instance.ServiceAddress) + require.Equal(t, setup.expectedConsulSvcInstances[i].ServicePort, instance.ServicePort) + require.Equal(t, setup.expectedConsulSvcInstances[i].ServiceMeta, instance.ServiceMeta) + require.Equal(t, setup.expectedConsulSvcInstances[i].ServiceTags, instance.ServiceTags) + } + proxyServiceInstances, _, err := consulClient.Catalog().Service(fmt.Sprintf("%s-sidecar-proxy", setup.consulSvcName), "", &api.QueryOptions{ + Namespace: test.ExpConsulNS, + }) + require.NoError(t, err) + require.Len(t, proxyServiceInstances, setup.expectedNumSvcInstances) + for i, instance := range proxyServiceInstances { + require.Equal(t, setup.expectedProxySvcInstances[i].ServiceID, instance.ServiceID) + require.Equal(t, setup.expectedProxySvcInstances[i].ServiceName, instance.ServiceName) + require.Equal(t, setup.expectedProxySvcInstances[i].ServiceAddress, instance.ServiceAddress) + require.Equal(t, setup.expectedProxySvcInstances[i].ServicePort, instance.ServicePort) + require.Equal(t, setup.expectedProxySvcInstances[i].ServiceProxy, instance.ServiceProxy) + require.Equal(t, setup.expectedProxySvcInstances[i].ServiceMeta, instance.ServiceMeta) + require.Equal(t, setup.expectedProxySvcInstances[i].ServiceTags, instance.ServiceTags) + } + + _, checkInfos, err := consulClient.Agent().AgentHealthServiceByName(fmt.Sprintf("%s-sidecar-proxy", setup.consulSvcName)) + expectedChecks := []string{"Proxy Public Listener", "Destination Alias"} + require.NoError(t, err) + require.Len(t, checkInfos, setup.expectedNumSvcInstances) + for _, checkInfo := range checkInfos { + checks := checkInfo.Checks + require.Contains(t, expectedChecks, checks[0].Name) + require.Contains(t, expectedChecks, checks[1].Name) + } + + // Check that the Consul health check was created for the k8s pod. + if setup.expectedAgentHealthChecks != nil { + for i := range setup.expectedConsulSvcInstances { + filter := fmt.Sprintf("CheckID == `%s`", setup.expectedAgentHealthChecks[i].CheckID) + newChecks, _ := consulClient.Agent().Checks() + for key, value := range newChecks { + fmt.Printf("%s:%v\n", key, value) + } + check, err := consulClient.Agent().ChecksWithFilter(filter) + require.NoError(t, err) + require.EqualValues(t, 1, len(check)) + // Ignoring Namespace because the response from ENT includes it and OSS does not. + var ignoredFields = []string{"Node", "Definition", "Namespace"} + require.True(t, cmp.Equal(check[setup.expectedAgentHealthChecks[i].CheckID], setup.expectedAgentHealthChecks[i], cmpopts.IgnoreFields(api.AgentCheck{}, ignoredFields...))) + } + } + }) + } +} + +// Tests updating an Endpoints object when Consul namespaces are enabled. +// - Tests updates via the register codepath: +// - When an address in an Endpoint is updated, that the corresponding service instance in Consul is updated in the correct Consul namespace. +// - When an address is added to an Endpoint, an additional service instance in Consul is registered in the correct Consul namespace. +// - Tests updates via the deregister codepath: +// - When an address is removed from an Endpoint, the corresponding service instance in Consul is deregistered. +// - When an address is removed from an Endpoint *and there are no addresses left in the Endpoint*, the +// corresponding service instance in Consul is deregistered. +// For the register and deregister codepath, this also tests that they work when the Consul service name is different +// from the K8s service name. +// This test covers EndpointsController.deregisterServiceOnAllAgents when services should be selectively deregistered +// since the map will not be nil. This test also runs each test with ACLs+TLS enabled and disabled, since it covers all the cases where a Consul client is created. +func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { + t.Parallel() + nodeName := "test-node" + cases := map[string]struct { + Mirror bool + MirrorPrefix string + SourceKubeNS string + DestConsulNS string + ExpConsulNS string + }{ + "SourceKubeNS=default, DestConsulNS=default": { + SourceKubeNS: "default", + DestConsulNS: "default", + ExpConsulNS: "default", + }, + "SourceKubeNS=kube, DestConsulNS=default": { + SourceKubeNS: "kube", + DestConsulNS: "default", + ExpConsulNS: "default", + }, + "SourceKubeNS=default, DestConsulNS=other": { + SourceKubeNS: "default", + DestConsulNS: "other", + ExpConsulNS: "other", + }, + "SourceKubeNS=kube, DestConsulNS=other": { + SourceKubeNS: "kube", + DestConsulNS: "other", + ExpConsulNS: "other", + }, + "SourceKubeNS=default, Mirror=true": { + SourceKubeNS: "default", + Mirror: true, + ExpConsulNS: "default", + }, + "SourceKubeNS=kube, Mirror=true": { + SourceKubeNS: "kube", + Mirror: true, + ExpConsulNS: "kube", + }, + "SourceKubeNS=default, Mirror=true, Prefix=prefix": { + SourceKubeNS: "default", + Mirror: true, + MirrorPrefix: "prefix-", + ExpConsulNS: "prefix-default", + }, + } + for name, ts := range cases { + cases := []struct { + name string + consulSvcName string + k8sObjects func() []runtime.Object + initialConsulSvcs []*api.AgentServiceRegistration + expectedNumSvcInstances int + expectedConsulSvcInstances []*api.CatalogService + expectedProxySvcInstances []*api.CatalogService + }{ + { + name: "Endpoints has an updated address (pod IP change).", + consulSvcName: "service-updated", + k8sObjects: func() []runtime.Object { + pod1 := createPodWithNamespace("pod1", ts.SourceKubeNS, "4.4.4.4", true) + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: ts.SourceKubeNS, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "4.4.4.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: ts.SourceKubeNS, + }, + }, + }, + }, + }, + } + return []runtime.Object{pod1, endpoint} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-service-updated", + Name: "service-updated", + Port: 80, + Address: "1.2.3.4", + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-service-updated-sidecar-proxy", + Name: "service-updated-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-updated", + DestinationServiceID: "pod1-service-updated", + }, + Namespace: ts.ExpConsulNS, + }, + }, + expectedNumSvcInstances: 1, + expectedConsulSvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated", + ServiceAddress: "4.4.4.4", + Namespace: ts.ExpConsulNS, + }, + }, + expectedProxySvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated-sidecar-proxy", + ServiceAddress: "4.4.4.4", + Namespace: ts.ExpConsulNS, + }, + }, + }, + { + name: "Different Consul service name: Endpoints has an updated address (pod IP change).", + consulSvcName: "different-consul-svc-name", + k8sObjects: func() []runtime.Object { + pod1 := createPodWithNamespace("pod1", ts.SourceKubeNS, "4.4.4.4", true) + pod1.Annotations[annotationService] = "different-consul-svc-name" + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: ts.SourceKubeNS, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "4.4.4.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: ts.SourceKubeNS, + }, + }, + }, + }, + }, + } + return []runtime.Object{pod1, endpoint} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-different-consul-svc-name", + Name: "different-consul-svc-name", + Port: 80, + Address: "1.2.3.4", + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-different-consul-svc-name-sidecar-proxy", + Name: "different-consul-svc-name-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "different-consul-svc-name", + DestinationServiceID: "pod1-different-consul-svc-name", + }, + Namespace: ts.ExpConsulNS, + }, + }, + expectedNumSvcInstances: 1, + expectedConsulSvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-different-consul-svc-name", + ServiceAddress: "4.4.4.4", + Namespace: ts.ExpConsulNS, + }, + }, + expectedProxySvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-different-consul-svc-name-sidecar-proxy", + ServiceAddress: "4.4.4.4", + Namespace: ts.ExpConsulNS, + }, + }, + }, + { + name: "Endpoints has additional address not in Consul.", + consulSvcName: "service-updated", + k8sObjects: func() []runtime.Object { + pod1 := createPodWithNamespace("pod1", ts.SourceKubeNS, "1.2.3.4", true) + pod2 := createPodWithNamespace("pod2", ts.SourceKubeNS, "2.2.3.4", true) + endpointWithTwoAddresses := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: ts.SourceKubeNS, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: ts.SourceKubeNS, + }, + }, + { + IP: "2.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod2", + Namespace: ts.SourceKubeNS, + }, + }, + }, + }, + }, + } + return []runtime.Object{pod1, pod2, endpointWithTwoAddresses} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-service-updated", + Name: "service-updated", + Port: 80, + Address: "1.2.3.4", + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-service-updated-sidecar-proxy", + Name: "service-updated-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-updated", + DestinationServiceID: "pod1-service-updated", + }, + Namespace: ts.ExpConsulNS, + }, + }, + expectedNumSvcInstances: 2, + expectedConsulSvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated", + ServiceAddress: "1.2.3.4", + Namespace: ts.ExpConsulNS, + }, + { + ServiceID: "pod2-service-updated", + ServiceAddress: "2.2.3.4", + Namespace: ts.ExpConsulNS, + }, + }, + expectedProxySvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated-sidecar-proxy", + ServiceAddress: "1.2.3.4", + Namespace: ts.ExpConsulNS, + }, + { + ServiceID: "pod2-service-updated-sidecar-proxy", + ServiceAddress: "2.2.3.4", + Namespace: ts.ExpConsulNS, + }, + }, + }, + { + name: "Consul has instances that are not in the Endpoints addresses.", + consulSvcName: "service-updated", + k8sObjects: func() []runtime.Object { + pod1 := createPodWithNamespace("pod1", ts.SourceKubeNS, "1.2.3.4", true) + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: ts.SourceKubeNS, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: ts.SourceKubeNS, + }, + }, + }, + }, + }, + } + return []runtime.Object{pod1, endpoint} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-service-updated", + Name: "service-updated", + Port: 80, + Address: "1.2.3.4", + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-service-updated-sidecar-proxy", + Name: "service-updated-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-updated", + DestinationServiceID: "pod1-service-updated", + }, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + ID: "pod2-service-updated", + Name: "service-updated", + Port: 80, + Address: "2.2.3.4", + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod2-service-updated-sidecar-proxy", + Name: "service-updated-sidecar-proxy", + Port: 20000, + Address: "2.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-updated", + DestinationServiceID: "pod2-service-updated", + }, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + }, + expectedNumSvcInstances: 1, + expectedConsulSvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated", + ServiceAddress: "1.2.3.4", + Namespace: ts.ExpConsulNS, + }, + }, + expectedProxySvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated-sidecar-proxy", + ServiceAddress: "1.2.3.4", + Namespace: ts.ExpConsulNS, + }, + }, + }, + { + name: "Different Consul service name: Consul has instances that are not in the Endpoints addresses.", + consulSvcName: "different-consul-svc-name", + k8sObjects: func() []runtime.Object { + pod1 := createPodWithNamespace("pod1", ts.SourceKubeNS, "1.2.3.4", true) + pod1.Annotations[annotationService] = "different-consul-svc-name" + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: ts.SourceKubeNS, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: ts.SourceKubeNS, + }, + }, + }, + }, + }, + } + return []runtime.Object{pod1, endpoint} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-different-consul-svc-name", + Name: "different-consul-svc-name", + Port: 80, + Address: "1.2.3.4", + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-different-consul-svc-name-sidecar-proxy", + Name: "different-consul-svc-name-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "different-consul-svc-name", + DestinationServiceID: "pod1-different-consul-svc-name", + }, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + ID: "pod2-different-consul-svc-name", + Name: "different-consul-svc-name", + Port: 80, + Address: "2.2.3.4", + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod2-different-consul-svc-name-sidecar-proxy", + Name: "different-consul-svc-name-sidecar-proxy", + Port: 20000, + Address: "2.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "different-consul-svc-name", + DestinationServiceID: "pod2-different-consul-svc-name", + }, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + }, + expectedNumSvcInstances: 1, + expectedConsulSvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-different-consul-svc-name", + ServiceAddress: "1.2.3.4", + Namespace: ts.ExpConsulNS, + }, + }, + expectedProxySvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-different-consul-svc-name-sidecar-proxy", + ServiceAddress: "1.2.3.4", + Namespace: ts.ExpConsulNS, + }, + }, + }, + { + // When a k8s deployment is deleted but it's k8s service continues to exist, the endpoints has no addresses + // and the instances should be deleted from Consul. + name: "Consul has instances that are not in the endpoints, and the endpoints has no addresses.", + consulSvcName: "service-updated", + k8sObjects: func() []runtime.Object { + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: ts.SourceKubeNS, + }, + } + return []runtime.Object{endpoint} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-service-updated", + Name: "service-updated", + Port: 80, + Address: "1.2.3.4", + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-service-updated-sidecar-proxy", + Name: "service-updated-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-updated", + DestinationServiceID: "pod1-service-updated", + }, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + ID: "pod2-service-updated", + Name: "service-updated", + Port: 80, + Address: "2.2.3.4", + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod2-service-updated-sidecar-proxy", + Name: "service-updated-sidecar-proxy", + Port: 20000, + Address: "2.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-updated", + DestinationServiceID: "pod2-service-updated", + }, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + }, + expectedNumSvcInstances: 0, + expectedConsulSvcInstances: []*api.CatalogService{}, + expectedProxySvcInstances: []*api.CatalogService{}, + }, + { + // With a different Consul service name, when a k8s deployment is deleted but it's k8s service continues to + // exist, the endpoints has no addresses and the instances should be deleted from Consul. + name: "Different Consul service name: Consul has instances that are not in the endpoints, and the endpoints has no addresses.", + consulSvcName: "different-consul-svc-name", + k8sObjects: func() []runtime.Object { + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: ts.SourceKubeNS, + }, + } + return []runtime.Object{endpoint} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-different-consul-svc-name", + Name: "different-consul-svc-name", + Port: 80, + Address: "1.2.3.4", + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-different-consul-svc-name-sidecar-proxy", + Name: "different-consul-svc-name-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "different-consul-svc-name", + DestinationServiceID: "pod1-different-consul-svc-name", + }, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + ID: "pod2-different-consul-svc-name", + Name: "different-consul-svc-name", + Port: 80, + Address: "2.2.3.4", + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod2-different-consul-svc-name-sidecar-proxy", + Name: "different-consul-svc-name-sidecar-proxy", + Port: 20000, + Address: "2.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "different-consul-svc-name", + DestinationServiceID: "pod2-different-consul-svc-name", + }, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + }, + expectedNumSvcInstances: 0, + expectedConsulSvcInstances: []*api.CatalogService{}, + expectedProxySvcInstances: []*api.CatalogService{}, + }, + } + for _, secure := range []bool{true, false} { + for _, tt := range cases { + t.Run(fmt.Sprintf("%s: %s - secure: %v", name, tt.name, secure), func(t *testing.T) { + // The agent pod needs to have the address 127.0.0.1 so when the + // code gets the agent pods via the label component=client, and + // makes requests against the agent API, it will actually hit the + // test server we have on localhost. + fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false) + fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"} + + // Create fake k8s client. + k8sObjects := append(tt.k8sObjects(), fakeClientPod) + fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build() + + masterToken := "b78d37c7-0ca7-5f4d-99ee-6d9975ce4586" + caFile, certFile, keyFile := common.GenerateServerCerts(t) + // Create test consul server, with ACLs+TLS if necessary. + consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { + if secure { + c.ACL.Enabled = true + c.ACL.DefaultPolicy = "deny" + c.ACL.Tokens.Master = masterToken + c.CAFile = caFile + c.CertFile = certFile + c.KeyFile = keyFile + } + c.NodeName = nodeName + }) + require.NoError(t, err) + defer consul.Stop() + consul.WaitForSerfCheck(t) + + cfg := &api.Config{ + Scheme: "http", + Address: consul.HTTPAddr, + Namespace: ts.ExpConsulNS, + } + if secure { + cfg.Address = consul.HTTPSAddr + cfg.Scheme = "https" + cfg.TLSConfig = api.TLSConfig{ + CAFile: caFile, + } + cfg.Token = masterToken + } + consulClient, err := api.NewClient(cfg) + require.NoError(t, err) + addr := strings.Split(cfg.Address, ":") + consulPort := addr[1] + + _, err = namespaces.EnsureExists(consulClient, ts.ExpConsulNS, "") + require.NoError(t, err) + + // Register service and proxy in Consul. + for _, svc := range tt.initialConsulSvcs { + err = consulClient.Agent().ServiceRegister(svc) + require.NoError(t, err) + } + + // Create the endpoints controller. + ep := &EndpointsController{ + Client: fakeClient, + Log: logrtest.TestLogger{T: t}, + ConsulClient: consulClient, + ConsulPort: consulPort, + ConsulScheme: cfg.Scheme, + AllowK8sNamespacesSet: mapset.NewSetWith("*"), + DenyK8sNamespacesSet: mapset.NewSetWith(), + ReleaseName: "consul", + ReleaseNamespace: "default", + ConsulClientCfg: cfg, + EnableConsulNamespaces: true, + EnableNSMirroring: ts.Mirror, + NSMirroringPrefix: ts.MirrorPrefix, + ConsulDestinationNamespace: ts.DestConsulNS, + } + namespacedName := types.NamespacedName{ + Namespace: ts.SourceKubeNS, + Name: "service-updated", + } + + resp, err := ep.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: namespacedName, + }) + require.NoError(t, err) + require.False(t, resp.Requeue) + + // After reconciliation, Consul should have service-updated with the correct number of instances. + serviceInstances, _, err := consulClient.Catalog().Service(tt.consulSvcName, "", &api.QueryOptions{Namespace: ts.ExpConsulNS}) + require.NoError(t, err) + require.Len(t, serviceInstances, tt.expectedNumSvcInstances) + for i, instance := range serviceInstances { + require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceID, instance.ServiceID) + require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceAddress, instance.ServiceAddress) + } + proxyServiceInstances, _, err := consulClient.Catalog().Service(fmt.Sprintf("%s-sidecar-proxy", tt.consulSvcName), "", &api.QueryOptions{Namespace: ts.ExpConsulNS}) + require.NoError(t, err) + require.Len(t, proxyServiceInstances, tt.expectedNumSvcInstances) + for i, instance := range proxyServiceInstances { + require.Equal(t, tt.expectedProxySvcInstances[i].ServiceID, instance.ServiceID) + require.Equal(t, tt.expectedProxySvcInstances[i].ServiceAddress, instance.ServiceAddress) + } + }) + } + } + } +} + +// Tests deleting an Endpoints object, with and without matching Consul and K8s service names when Consul namespaces are enabled. +// This test covers EndpointsController.deregisterServiceOnAllAgents when the map is nil (not selectively deregistered). +func TestReconcileDeleteEndpointWithNamespaces(t *testing.T) { + t.Parallel() + nodeName := "test-node" + cases := map[string]struct { + Mirror bool + MirrorPrefix string + SourceKubeNS string + DestConsulNS string + ExpConsulNS string + }{ + "SourceKubeNS=default, DestConsulNS=default": { + SourceKubeNS: "default", + DestConsulNS: "default", + ExpConsulNS: "default", + }, + "SourceKubeNS=kube, DestConsulNS=default": { + SourceKubeNS: "kube", + DestConsulNS: "default", + ExpConsulNS: "default", + }, + "SourceKubeNS=default, DestConsulNS=other": { + SourceKubeNS: "default", + DestConsulNS: "other", + ExpConsulNS: "other", + }, + "SourceKubeNS=kube, DestConsulNS=other": { + SourceKubeNS: "kube", + DestConsulNS: "other", + ExpConsulNS: "other", + }, + "SourceKubeNS=default, Mirror=true": { + SourceKubeNS: "default", + Mirror: true, + ExpConsulNS: "default", + }, + "SourceKubeNS=kube, Mirror=true": { + SourceKubeNS: "kube", + Mirror: true, + ExpConsulNS: "kube", + }, + "SourceKubeNS=default, Mirror=true, Prefix=prefix": { + SourceKubeNS: "default", + Mirror: true, + MirrorPrefix: "prefix-", + ExpConsulNS: "prefix-default", + }, + } + for name, ts := range cases { + cases := []struct { + name string + consulSvcName string + initialConsulSvcs []*api.AgentServiceRegistration + }{ + { + name: "Consul service name matches K8s service name", + consulSvcName: "service-deleted", + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-service-deleted", + Name: "service-deleted", + Port: 80, + Address: "1.2.3.4", + Meta: map[string]string{"k8s-service-name": "service-deleted", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-service-deleted-sidecar-proxy", + Name: "service-deleted-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-deleted", + DestinationServiceID: "pod1-service-deleted", + }, + Meta: map[string]string{"k8s-service-name": "service-deleted", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + }, + }, + { + name: "Consul service name does not match K8s service name", + consulSvcName: "different-consul-svc-name", + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-different-consul-svc-name", + Name: "different-consul-svc-name", + Port: 80, + Address: "1.2.3.4", + Meta: map[string]string{"k8s-service-name": "service-deleted", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-different-consul-svc-name-sidecar-proxy", + Name: "different-consul-svc-name-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "different-consul-svc-name", + DestinationServiceID: "pod1-different-consul-svc-name", + }, + Meta: map[string]string{"k8s-service-name": "service-deleted", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + }, + }, + } + for _, tt := range cases { + t.Run(fmt.Sprintf("%s:%s", name, tt.name), func(t *testing.T) { + // The agent pod needs to have the address 127.0.0.1 so when the + // code gets the agent pods via the label component=client, and + // makes requests against the agent API, it will actually hit the + // test server we have on localhost. + fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false) + fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"} + + // Create fake k8s client. + fakeClient := fake.NewClientBuilder().WithRuntimeObjects(fakeClientPod).Build() + + // Create test Consul server. + consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { + c.NodeName = nodeName + }) + require.NoError(t, err) + defer consul.Stop() + + consul.WaitForLeader(t) + cfg := &api.Config{ + Address: consul.HTTPAddr, + Namespace: ts.ExpConsulNS, + } + consulClient, err := api.NewClient(cfg) + require.NoError(t, err) + addr := strings.Split(consul.HTTPAddr, ":") + consulPort := addr[1] + + _, err = namespaces.EnsureExists(consulClient, ts.ExpConsulNS, "") + require.NoError(t, err) + + // Register service and proxy in consul. + for _, svc := range tt.initialConsulSvcs { + err = consulClient.Agent().ServiceRegister(svc) + require.NoError(t, err) + } + + // Create the endpoints controller. + ep := &EndpointsController{ + Client: fakeClient, + Log: logrtest.TestLogger{T: t}, + ConsulClient: consulClient, + ConsulPort: consulPort, + ConsulScheme: "http", + AllowK8sNamespacesSet: mapset.NewSetWith("*"), + DenyK8sNamespacesSet: mapset.NewSetWith(), + ReleaseName: "consul", + ReleaseNamespace: "default", + ConsulClientCfg: cfg, + EnableConsulNamespaces: true, + EnableNSMirroring: ts.Mirror, + NSMirroringPrefix: ts.MirrorPrefix, + ConsulDestinationNamespace: ts.DestConsulNS, + } + + // Set up the Endpoint that will be reconciled, and reconcile. + namespacedName := types.NamespacedName{ + Namespace: ts.SourceKubeNS, + Name: "service-deleted", + } + resp, err := ep.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: namespacedName, + }) + require.NoError(t, err) + require.False(t, resp.Requeue) + + // After reconciliation, Consul should not have any instances of service-deleted. + serviceInstances, _, err := consulClient.Catalog().Service(tt.consulSvcName, "", &api.QueryOptions{Namespace: ts.ExpConsulNS}) + require.NoError(t, err) + require.Empty(t, serviceInstances) + proxyServiceInstances, _, err := consulClient.Catalog().Service(fmt.Sprintf("%s-sidecar-proxy", tt.consulSvcName), "", &api.QueryOptions{Namespace: ts.ExpConsulNS}) + require.NoError(t, err) + require.Empty(t, proxyServiceInstances) + + }) + } + } +} + +func createPodWithNamespace(name, namespace, ip string, inject bool) *corev1.Pod { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + Status: corev1.PodStatus{ + PodIP: ip, + HostIP: "127.0.0.1", + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + if inject { + pod.Labels[keyInjectStatus] = injected + pod.Annotations[keyInjectStatus] = injected + } + return pod + +} diff --git a/connect-inject/endpoints_controller_test.go b/connect-inject/endpoints_controller_test.go index f3bf5cf7c2..edea1b20ac 100644 --- a/connect-inject/endpoints_controller_test.go +++ b/connect-inject/endpoints_controller_test.go @@ -186,12 +186,13 @@ func TestProcessUpstreams(t *testing.T) { t.Parallel() nodeName := "test-node" cases := []struct { - name string - pod func() *corev1.Pod - expected []api.Upstream - expErr string - configEntry func() api.ConfigEntry - consulUnavailable bool + name string + pod func() *corev1.Pod + expected []api.Upstream + expErr string + configEntry func() api.ConfigEntry + consulUnavailable bool + consulNamespacesEnabled bool }{ { name: "upstream with datacenter without ProxyDefaults", @@ -200,7 +201,8 @@ func TestProcessUpstreams(t *testing.T) { pod1.Annotations[annotationUpstreams] = "upstream1:1234:dc1" return pod1 }, - expErr: "upstream \"upstream1:1234:dc1\" is invalid: there is no ProxyDefaults config to set mesh gateway mode", + expErr: "upstream \"upstream1:1234:dc1\" is invalid: there is no ProxyDefaults config to set mesh gateway mode", + consulNamespacesEnabled: false, }, { name: "upstream with datacenter with ProxyDefaults whose mesh gateway mode is not local or remote", @@ -216,6 +218,7 @@ func TestProcessUpstreams(t *testing.T) { pd.MeshGateway.Mode = "bad-mode" return pd }, + consulNamespacesEnabled: false, }, { name: "upstream with datacenter with ProxyDefaults and mesh gateway is in local mode", @@ -238,6 +241,7 @@ func TestProcessUpstreams(t *testing.T) { pd.MeshGateway.Mode = api.MeshGatewayModeLocal return pd }, + consulNamespacesEnabled: false, }, { name: "upstream with datacenter with ProxyDefaults and mesh gateway in remote mode", @@ -260,10 +264,10 @@ func TestProcessUpstreams(t *testing.T) { pd.MeshGateway.Mode = api.MeshGatewayModeRemote return pd }, + consulNamespacesEnabled: false, }, { - name: "when consul is unavailable, we don't return an error", - consulUnavailable: true, + name: "when consul is unavailable, we don't return an error", pod: func() *corev1.Pod { pod1 := createPod("pod1", "1.2.3.4", true) pod1.Annotations[annotationUpstreams] = "upstream1:1234:dc1" @@ -284,6 +288,8 @@ func TestProcessUpstreams(t *testing.T) { Datacenter: "dc1", }, }, + consulUnavailable: true, + consulNamespacesEnabled: false, }, { name: "single upstream", @@ -299,6 +305,24 @@ func TestProcessUpstreams(t *testing.T) { LocalBindPort: 1234, }, }, + consulNamespacesEnabled: false, + }, + { + name: "single upstream with namespace", + pod: func() *corev1.Pod { + pod1 := createPod("pod1", "1.2.3.4", true) + pod1.Annotations[annotationUpstreams] = "upstream.foo:1234" + return pod1 + }, + expected: []api.Upstream{ + { + DestinationType: api.UpstreamDestTypeService, + DestinationName: "upstream", + LocalBindPort: 1234, + DestinationNamespace: "foo", + }, + }, + consulNamespacesEnabled: true, }, { name: "multiple upstreams", @@ -319,6 +343,41 @@ func TestProcessUpstreams(t *testing.T) { LocalBindPort: 2234, }, }, + consulNamespacesEnabled: false, + }, + { + name: "multiple upstreams with consul namespaces and datacenters", + pod: func() *corev1.Pod { + pod1 := createPod("pod1", "1.2.3.4", true) + pod1.Annotations[annotationUpstreams] = "upstream1:1234, upstream2.bar:2234, upstream3.foo:3234:dc2" + return pod1 + }, + configEntry: func() api.ConfigEntry { + ce, _ := api.MakeConfigEntry(api.ProxyDefaults, "pd") + pd := ce.(*api.ProxyConfigEntry) + pd.MeshGateway.Mode = "remote" + return pd + }, + expected: []api.Upstream{ + { + DestinationType: api.UpstreamDestTypeService, + DestinationName: "upstream1", + LocalBindPort: 1234, + }, + { + DestinationType: api.UpstreamDestTypeService, + DestinationName: "upstream2", + DestinationNamespace: "bar", + LocalBindPort: 2234, + }, { + DestinationType: api.UpstreamDestTypeService, + DestinationName: "upstream3", + DestinationNamespace: "foo", + LocalBindPort: 3234, + Datacenter: "dc2", + }, + }, + consulNamespacesEnabled: true, }, { name: "prepared query upstream", @@ -334,6 +393,7 @@ func TestProcessUpstreams(t *testing.T) { LocalBindPort: 1234, }, }, + consulNamespacesEnabled: false, }, { name: "prepared query and non-query upstreams", @@ -359,6 +419,7 @@ func TestProcessUpstreams(t *testing.T) { LocalBindPort: 8202, }, }, + consulNamespacesEnabled: false, }, } for _, tt := range cases { @@ -387,12 +448,13 @@ func TestProcessUpstreams(t *testing.T) { } ep := &EndpointsController{ - Log: logrtest.TestLogger{T: t}, - ConsulClient: consulClient, - ConsulPort: consulPort, - ConsulScheme: "http", - AllowK8sNamespacesSet: mapset.NewSetWith("*"), - DenyK8sNamespacesSet: mapset.NewSetWith(), + Log: logrtest.TestLogger{T: t}, + ConsulClient: consulClient, + ConsulPort: consulPort, + ConsulScheme: "http", + AllowK8sNamespacesSet: mapset.NewSetWith("*"), + DenyK8sNamespacesSet: mapset.NewSetWith(), + EnableConsulNamespaces: tt.consulNamespacesEnabled, } upstreams, err := ep.processUpstreams(*tt.pod()) diff --git a/go.mod b/go.mod index 0dd38a6377..d55fbf83f3 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,6 @@ require ( github.com/mitchellh/mapstructure v1.4.1 // indirect github.com/stretchr/testify v1.6.1 go.uber.org/zap v1.15.0 - golang.org/x/net v0.0.0-20201110031124-69a78807bb2b golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 // indirect golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c // indirect golang.org/x/tools v0.0.0-20200616195046-dc31b401abb5 // indirect diff --git a/subcommand/common/common.go b/subcommand/common/common.go index 24f3f81257..70c3000a5b 100644 --- a/subcommand/common/common.go +++ b/subcommand/common/common.go @@ -52,7 +52,7 @@ func ValidateUnprivilegedPort(flagName, flagValue string) error { // ConsulLogin issues an ACL().Login to Consul and writes out the token to tokenSinkFile. // The logic of this is taken from the `consul login` command. -func ConsulLogin(client *api.Client, bearerTokenFile, authMethodName, tokenSinkFile string, meta map[string]string) error { +func ConsulLogin(client *api.Client, bearerTokenFile, authMethodName, tokenSinkFile, namespace string, meta map[string]string) error { if meta == nil { return fmt.Errorf("invalid meta") } @@ -70,7 +70,7 @@ func ConsulLogin(client *api.Client, bearerTokenFile, authMethodName, tokenSinkF BearerToken: bearerToken, Meta: meta, } - tok, _, err := client.ACL().Login(req, nil) + tok, _, err := client.ACL().Login(req, &api.WriteOptions{Namespace: namespace}) if err != nil { return fmt.Errorf("error logging in: %s", err) } diff --git a/subcommand/common/common_test.go b/subcommand/common/common_test.go index 05820b9723..f8cccd8db1 100644 --- a/subcommand/common/common_test.go +++ b/subcommand/common/common_test.go @@ -46,13 +46,7 @@ func TestConsulLogin(t *testing.T) { tokenFile := WriteTempFile(t, "") client := startMockServer(t, &counter) - err := ConsulLogin( - client, - bearerTokenFile, - testAuthMethod, - tokenFile, - testPodMeta, - ) + err := ConsulLogin(client, bearerTokenFile, testAuthMethod, tokenFile, "", testPodMeta) require.NoError(err) require.Equal(counter, 1) // Validate that the token file was written to disk. @@ -71,6 +65,7 @@ func TestConsulLogin_EmptyBearerTokenFile(t *testing.T) { bearerTokenFile, testAuthMethod, "", + "", testPodMeta, ) require.EqualError(err, fmt.Sprintf("no bearer token found in %s", bearerTokenFile)) @@ -85,6 +80,7 @@ func TestConsulLogin_BearerTokenFileDoesNotExist(t *testing.T) { randFileName, testAuthMethod, "", + "", testPodMeta, ) require.Error(err) @@ -103,6 +99,7 @@ func TestConsulLogin_TokenFileUnwritable(t *testing.T) { bearerTokenFile, testAuthMethod, randFileName, + "", testPodMeta, ) require.Error(err) diff --git a/subcommand/connect-init/command.go b/subcommand/connect-init/command.go index 276315ae06..f2da924f28 100644 --- a/subcommand/connect-init/command.go +++ b/subcommand/connect-init/command.go @@ -30,9 +30,11 @@ const ( type Command struct { UI cli.Ui - flagACLAuthMethod string // Auth Method to use for ACLs, if enabled. - flagPodName string // Pod name. - flagPodNamespace string // Pod namespace. + flagACLAuthMethod string // Auth Method to use for ACLs, if enabled. + flagPodName string // Pod name. + flagPodNamespace string // Pod namespace. + flagAuthMethodNamespace string // Consul namespace the auth-method is defined in. + flagConsulServiceNamespace string // Consul destination namespace for the service. bearerTokenFile string // Location of the bearer token. Default is /var/run/secrets/kubernetes.io/serviceaccount/token. tokenSinkFile string // Location to write the output token. Default is defaultTokenSinkFile. @@ -51,6 +53,8 @@ func (c *Command) init() { c.flagSet.StringVar(&c.flagACLAuthMethod, "acl-auth-method", "", "Name of the auth method to login to.") c.flagSet.StringVar(&c.flagPodName, "pod-name", "", "Name of the pod.") c.flagSet.StringVar(&c.flagPodNamespace, "pod-namespace", "", "Name of the pod namespace.") + c.flagSet.StringVar(&c.flagAuthMethodNamespace, "auth-method-namespace", "", "Consul namespace the auth-method is defined in") + c.flagSet.StringVar(&c.flagConsulServiceNamespace, "consul-service-namespace", "", "Consul destination namespace of the service.") if c.bearerTokenFile == "" { c.bearerTokenFile = defaultBearerTokenFile @@ -86,6 +90,7 @@ func (c *Command) Run(args []string) int { } cfg := api.DefaultConfig() + cfg.Namespace = c.flagConsulServiceNamespace c.http.MergeOntoConfig(cfg) consulClient, err := consul.NewClient(cfg) if err != nil { @@ -98,7 +103,7 @@ func (c *Command) Run(args []string) int { // loginMeta is the default metadata that we pass to the consul login API. loginMeta := map[string]string{"pod": fmt.Sprintf("%s/%s", c.flagPodNamespace, c.flagPodName)} err = backoff.Retry(func() error { - err := common.ConsulLogin(consulClient, c.bearerTokenFile, c.flagACLAuthMethod, c.tokenSinkFile, loginMeta) + err := common.ConsulLogin(consulClient, c.bearerTokenFile, c.flagACLAuthMethod, c.tokenSinkFile, c.flagAuthMethodNamespace, loginMeta) if err != nil { c.UI.Error(fmt.Sprintf("Consul login failed; retrying: %s", err)) } @@ -122,12 +127,7 @@ func (c *Command) Run(args []string) int { // which maps to this pod+namespace. var proxyID string err = backoff.Retry(func() error { - filter := fmt.Sprintf( - "Meta[%q] == %q and Meta[%q] == %q", - connectinject.MetaKeyPodName, - c.flagPodName, - connectinject.MetaKeyKubeNS, - c.flagPodNamespace) + filter := fmt.Sprintf("Meta[%q] == %q and Meta[%q] == %q", connectinject.MetaKeyPodName, c.flagPodName, connectinject.MetaKeyKubeNS, c.flagPodNamespace) serviceList, err := consulClient.Agent().ServicesWithFilter(filter) if err != nil { c.UI.Error(fmt.Sprintf("Unable to get Agent services: %s", err)) diff --git a/subcommand/connect-init/command_ent_test.go b/subcommand/connect-init/command_ent_test.go new file mode 100644 index 0000000000..1b0039c8b8 --- /dev/null +++ b/subcommand/connect-init/command_ent_test.go @@ -0,0 +1,307 @@ +// +build enterprise + +package connectinit + +import ( + "fmt" + "io/ioutil" + "math/rand" + "net/http" + "net/http/httptest" + "os" + "testing" + + "github.com/hashicorp/consul-k8s/namespaces" + "github.com/hashicorp/consul-k8s/subcommand/common" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/sdk/testutil" + "github.com/mitchellh/cli" + "github.com/stretchr/testify/require" +) + +func TestRun_ServicePollingWithACLsAndTLSWithNamespaces(t *testing.T) { + t.Parallel() + cases := []struct { + name string + tls bool + consulServiceNamespace string + authMethod string + authMethodNamespace string + }{ + { + name: "ACLs enabled, no tls, serviceNS=default, authMethodNS=default", + tls: false, + consulServiceNamespace: "default", + authMethodNamespace: "default", + authMethod: "consul-k8s-auth-method", + }, + { + name: "ACLs enabled, tls, serviceNS=default, authMethodNS=default", + tls: true, + consulServiceNamespace: "default", + authMethodNamespace: "default", + authMethod: "consul-k8s-auth-method", + }, + { + name: "ACLs enabled, no tls, serviceNS=default-ns, authMethodNS=default", + tls: false, + consulServiceNamespace: "default-ns", + authMethodNamespace: "default", + authMethod: "consul-k8s-auth-method", + }, + { + name: "ACLs enabled, tls, serviceNS=default-ns, authMethodNS=default", + tls: true, + consulServiceNamespace: "default-ns", + authMethodNamespace: "default", + authMethod: "consul-k8s-auth-method", + }, + { + name: "ACLs enabled, no tls, serviceNS=other, authMethodNS=other", + tls: false, + consulServiceNamespace: "other", + authMethodNamespace: "other", + authMethod: "consul-k8s-auth-method", + }, + { + name: "ACLs enabled, tls, serviceNS=other, authMethodNS=other", + tls: true, + consulServiceNamespace: "other", + authMethodNamespace: "other", + authMethod: "consul-k8s-auth-method", + }, + { + name: "ACLs disabled, no tls, serviceNS=default, authMethodNS=default", + tls: false, + consulServiceNamespace: "default", + authMethodNamespace: "default", + }, + { + name: "ACLs disabled, tls, serviceNS=default, authMethodNS=default", + tls: true, + consulServiceNamespace: "default", + authMethodNamespace: "default", + }, + { + name: "ACLs disabled, no tls, serviceNS=default-ns, authMethodNS=default", + tls: false, + consulServiceNamespace: "default-ns", + authMethodNamespace: "default", + }, + { + name: "ACLs disabled, tls, serviceNS=default-ns, authMethodNS=default", + tls: true, + consulServiceNamespace: "default-ns", + authMethodNamespace: "default", + }, + { + name: "ACLs disabled, no tls, serviceNS=other, authMethodNS=other", + tls: false, + consulServiceNamespace: "other", + authMethodNamespace: "other", + }, + { + name: "ACLs disabled, tls, serviceNS=other, authMethodNS=other", + tls: true, + consulServiceNamespace: "other", + authMethodNamespace: "other", + }, + } + for _, test := range cases { + t.Run(test.name, func(t *testing.T) { + bearerFile := common.WriteTempFile(t, serviceAccountJWTToken) + tokenFile := fmt.Sprintf("/tmp/%d1", rand.Int()) + proxyFile := fmt.Sprintf("/tmp/%d2", rand.Int()) + t.Cleanup(func() { + os.Remove(proxyFile) + os.Remove(tokenFile) + }) + + var caFile, certFile, keyFile string + // Start Consul server with ACLs enabled and default deny policy. + masterToken := "b78d37c7-0ca7-5f4d-99ee-6d9975ce4586" + server, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { + if test.authMethod != "" { + c.ACL.Enabled = true + c.ACL.DefaultPolicy = "deny" + c.ACL.Tokens.Master = masterToken + } + if test.tls { + caFile, certFile, keyFile = common.GenerateServerCerts(t) + c.CAFile = caFile + c.CertFile = certFile + c.KeyFile = keyFile + } + }) + require.NoError(t, err) + defer server.Stop() + server.WaitForLeader(t) + cfg := &api.Config{ + Scheme: "http", + Address: server.HTTPAddr, + Namespace: test.consulServiceNamespace, + } + if test.authMethod != "" { + cfg.Token = masterToken + } + if test.tls { + cfg.Address = server.HTTPSAddr + cfg.Scheme = "https" + cfg.TLSConfig = api.TLSConfig{ + CAFile: caFile, + } + } + + consulClient, err := api.NewClient(cfg) + require.NoError(t, err) + + _, err = namespaces.EnsureExists(consulClient, test.consulServiceNamespace, "") + require.NoError(t, err) + + // Start the mock k8s server. + k8sMockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("content-type", "application/json") + if r != nil && r.URL.Path == "/apis/authentication.k8s.io/v1/tokenreviews" && r.Method == "POST" { + w.Write([]byte(tokenReviewFoundResponseForNamespaces)) + } + if r != nil && r.URL.Path == "/api/v1/namespaces/default-ns/serviceaccounts/counting" && r.Method == "GET" { + w.Write([]byte(readServiceAccountFoundForNamespaces)) + } + })) + defer k8sMockServer.Close() + + if test.authMethod != "" { + // Set up Consul's auth method. + authMethod := &api.ACLAuthMethod{ + Name: testAuthMethod, + Type: "kubernetes", + Description: "Kubernetes Auth Method", + Config: map[string]interface{}{ + "Host": k8sMockServer.URL, + "CACert": serviceAccountCACert, + "ServiceAccountJWT": serviceAccountJWTToken, + }, + Namespace: test.authMethodNamespace, + } + // This will be the case when we are emulating "namespace mirroring" where the + // authMethodNamespace is not equal to the consulServiceNamespace. + if test.authMethodNamespace != test.consulServiceNamespace { + authMethod.Config["MapNamespaces"] = true + } + _, _, err = consulClient.ACL().AuthMethodCreate(authMethod, &api.WriteOptions{Namespace: test.authMethodNamespace}) + require.NoError(t, err) + + // Create the binding rule. + aclBindingRule := api.ACLBindingRule{ + Description: "Kubernetes binding rule", + AuthMethod: testAuthMethod, + BindType: api.BindingRuleBindTypeService, + BindName: "${serviceaccount.name}", + Selector: "serviceaccount.name!=default", + Namespace: test.authMethodNamespace, + } + _, _, err = consulClient.ACL().BindingRuleCreate(&aclBindingRule, &api.WriteOptions{Namespace: test.authMethodNamespace}) + require.NoError(t, err) + } + + // Register Consul services. + testConsulServices := []api.AgentServiceRegistration{consulCountingSvc, consulCountingSvcSidecar} + for _, svc := range testConsulServices { + require.NoError(t, consulClient.Agent().ServiceRegister(&svc)) + } + + ui := cli.NewMockUi() + cmd := Command{ + UI: ui, + bearerTokenFile: bearerFile, + tokenSinkFile: tokenFile, + proxyIDFile: proxyFile, + serviceRegistrationPollingAttempts: 5, + } + // We build the http-addr because normally it's defined by the init container setting + // CONSUL_HTTP_ADDR when it processes the command template. + flags := []string{"-pod-name", testPodName, + "-pod-namespace", testPodNamespace, + "-acl-auth-method", test.authMethod, + "-http-addr", fmt.Sprintf("%s://%s", cfg.Scheme, cfg.Address), + "-consul-service-namespace", test.consulServiceNamespace, + "-auth-method-namespace", test.authMethodNamespace, + } + // Add the CA File if necessary since we're not setting CONSUL_CACERT in test ENV. + if test.tls { + flags = append(flags, "-ca-file", caFile) + } + // Run the command. + code := cmd.Run(flags) + require.Equal(t, 0, code, ui.ErrorWriter.String()) + + if test.authMethod != "" { + // Validate the ACL token was written. + tokenData, err := ioutil.ReadFile(tokenFile) + require.NoError(t, err) + require.NotEmpty(t, tokenData) + + // Check that the token has the metadata with pod name and pod namespace. + consulClient, err = api.NewClient(&api.Config{Address: server.HTTPAddr, Token: string(tokenData), Namespace: test.consulServiceNamespace}) + require.NoError(t, err) + token, _, err := consulClient.ACL().TokenReadSelf(&api.QueryOptions{Namespace: test.authMethodNamespace}) + require.NoError(t, err) + require.Equal(t, "token created via login: {\"pod\":\"default-ns/counting-pod\"}", token.Description) + } + + // Validate contents of proxyFile. + data, err := ioutil.ReadFile(proxyFile) + require.NoError(t, err) + require.Contains(t, string(data), "counting-counting-sidecar-proxy") + }) + } +} + +// The namespace here is default-ns as the the k8s-auth method +// relies on the namespace in the response from Kubernetes to +// correctly create the token in the same namespace as the Kubernetes +// namespace which is required when mirroring namespace is enabled. +// Note that this namespace is incorrect for other test cases but +// Consul only cares about this namespace when mirroring is enabled. +const ( + readServiceAccountFoundForNamespaces = `{ + "kind": "ServiceAccount", + "apiVersion": "v1", + "metadata": { + "name": "counting", + "namespace": "default-ns", + "selfLink": "/api/v1/namespaces/default-ns/serviceaccounts/counting", + "uid": "9ff51ff4-557e-11e9-9687-48e6c8b8ecb5", + "resourceVersion": "2101", + "creationTimestamp": "2019-04-02T19:36:34Z" + }, + "secrets": [ + { + "name": "counting-token-m9cvn" + } + ] +}` + + tokenReviewFoundResponseForNamespaces = `{ + "kind": "TokenReview", + "apiVersion": "authentication.k8s.io/v1", + "metadata": { + "creationTimestamp": null + }, + "spec": { + "token": "eyJhbGciOiJSUzI1NiIsImtpZCI6IiJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJkZWZhdWx0Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZWNyZXQubmFtZSI6ImRlbW8tdG9rZW4tbTljdm4iLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC5uYW1lIjoiZGVtbyIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50LnVpZCI6IjlmZjUxZmY0LTU1N2UtMTFlOS05Njg3LTQ4ZTZjOGI4ZWNiNSIsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpkZWZhdWx0OmRlbW8ifQ.UJEphtrN261gy9WCl4ZKjm2PRDLDkc3Xg9VcDGfzyroOqFQ6sog5dVAb9voc5Nc0-H5b1yGwxDViEMucwKvZpA5pi7VEx_OskK-KTWXSmafM0Xg_AvzpU9Ed5TSRno-OhXaAraxdjXoC4myh1ay2DMeHUusJg_ibqcYJrWx-6MO1bH_ObORtAKhoST_8fzkqNAlZmsQ87FinQvYN5mzDXYukl-eeRdBgQUBkWvEb-Ju6cc0-QE4sUQ4IH_fs0fUyX_xc0om0SZGWLP909FTz4V8LxV8kr6L7irxROiS1jn3Fvyc9ur1PamVf3JOPPrOyfmKbaGRiWJM32b3buQw7cg" + }, + "status": { + "authenticated": true, + "user": { + "username": "system:serviceaccount:default-ns:counting", + "uid": "9ff51ff4-557e-11e9-9687-48e6c8b8ecb5", + "groups": [ + "system:serviceaccounts", + "system:serviceaccounts:default-ns", + "system:authenticated" + ] + } + } +}` +) diff --git a/subcommand/inject-connect/command.go b/subcommand/inject-connect/command.go index 4531c6cb4c..36e952c81b 100644 --- a/subcommand/inject-connect/command.go +++ b/subcommand/inject-connect/command.go @@ -49,14 +49,15 @@ type Command struct { flagEnvoyExtraArgs string // Extra envoy args when starting envoy flagLogLevel string - // Flags to support namespaces - flagEnableNamespaces bool // Use namespacing on all components - flagConsulDestinationNamespace string // Consul namespace to register everything if not mirroring - flagAllowK8sNamespacesList []string // K8s namespaces to explicitly inject - flagDenyK8sNamespacesList []string // K8s namespaces to deny injection (has precedence) - flagEnableK8SNSMirroring bool // Enables mirroring of k8s namespaces into Consul - flagK8SNSMirroringPrefix string // Prefix added to Consul namespaces created when mirroring - flagCrossNamespaceACLPolicy string // The name of the ACL policy to add to every created namespace if ACLs are enabled + flagAllowK8sNamespacesList []string // K8s namespaces to explicitly inject + flagDenyK8sNamespacesList []string // K8s namespaces to deny injection (has precedence) + + // Flags to support Consul namespaces + flagEnableNamespaces bool // Use namespacing on all components + flagConsulDestinationNamespace string // Consul namespace to register everything if not mirroring + flagEnableK8SNSMirroring bool // Enables mirroring of k8s namespaces into Consul + flagK8SNSMirroringPrefix string // Prefix added to Consul namespaces created when mirroring + flagCrossNamespaceACLPolicy string // The name of the ACL policy to add to every created namespace if ACLs are enabled // Flags for endpoints controller. flagReleaseName string @@ -383,19 +384,24 @@ func (c *Command) Run(args []string) int { } if err = (&connectinject.EndpointsController{ - Client: mgr.GetClient(), - ConsulClient: c.consulClient, - ConsulScheme: consulURL.Scheme, - ConsulPort: consulURL.Port(), - AllowK8sNamespacesSet: allowK8sNamespaces, - DenyK8sNamespacesSet: denyK8sNamespaces, - Log: ctrl.Log.WithName("controller").WithName("endpoints-controller"), - Scheme: mgr.GetScheme(), - ReleaseName: c.flagReleaseName, - ReleaseNamespace: c.flagReleaseNamespace, - MetricsConfig: metricsConfig, - Context: ctx, - ConsulClientCfg: cfg, + Client: mgr.GetClient(), + ConsulClient: c.consulClient, + ConsulScheme: consulURL.Scheme, + ConsulPort: consulURL.Port(), + AllowK8sNamespacesSet: allowK8sNamespaces, + DenyK8sNamespacesSet: denyK8sNamespaces, + MetricsConfig: metricsConfig, + ConsulClientCfg: cfg, + EnableConsulNamespaces: c.flagEnableNamespaces, + ConsulDestinationNamespace: c.flagConsulDestinationNamespace, + EnableNSMirroring: c.flagEnableK8SNSMirroring, + NSMirroringPrefix: c.flagK8SNSMirroringPrefix, + CrossNSACLPolicy: c.flagCrossNamespaceACLPolicy, + Log: ctrl.Log.WithName("controller").WithName("endpoints"), + Scheme: mgr.GetScheme(), + ReleaseName: c.flagReleaseName, + ReleaseNamespace: c.flagReleaseNamespace, + Context: ctx, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", connectinject.EndpointsController{}) return 1 @@ -417,16 +423,16 @@ func (c *Command) Run(args []string) int { DefaultProxyCPULimit: sidecarProxyCPULimit, DefaultProxyMemoryRequest: sidecarProxyMemoryRequest, DefaultProxyMemoryLimit: sidecarProxyMemoryLimit, + MetricsConfig: metricsConfig, InitContainerResources: initResources, ConsulSidecarResources: consulSidecarResources, - EnableNamespaces: c.flagEnableNamespaces, AllowK8sNamespacesSet: allowK8sNamespaces, DenyK8sNamespacesSet: denyK8sNamespaces, + EnableNamespaces: c.flagEnableNamespaces, ConsulDestinationNamespace: c.flagConsulDestinationNamespace, EnableK8SNSMirroring: c.flagEnableK8SNSMirroring, K8SNSMirroringPrefix: c.flagK8SNSMirroringPrefix, CrossNamespaceACLPolicy: c.flagCrossNamespaceACLPolicy, - MetricsConfig: metricsConfig, Log: logger.Named("handler"), }}) From 926a056da76985db38655e590f7b1e4b2190befa Mon Sep 17 00:00:00 2001 From: Iryna Shustava Date: Mon, 12 Apr 2021 17:13:32 -0700 Subject: [PATCH 2/2] Apply suggestions from code review --- subcommand/connect-init/command_ent_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/subcommand/connect-init/command_ent_test.go b/subcommand/connect-init/command_ent_test.go index 1b0039c8b8..394b473284 100644 --- a/subcommand/connect-init/command_ent_test.go +++ b/subcommand/connect-init/command_ent_test.go @@ -257,10 +257,10 @@ func TestRun_ServicePollingWithACLsAndTLSWithNamespaces(t *testing.T) { } } -// The namespace here is default-ns as the the k8s-auth method +// The namespace here is default-ns as the k8s-auth method // relies on the namespace in the response from Kubernetes to // correctly create the token in the same namespace as the Kubernetes -// namespace which is required when mirroring namespace is enabled. +// namespace which is required when namespace mirroring is enabled. // Note that this namespace is incorrect for other test cases but // Consul only cares about this namespace when mirroring is enabled. const (