Skip to content

Commit

Permalink
review feedback, add tests, cleanup TODOs
Browse files Browse the repository at this point in the history
  • Loading branch information
zalimeni committed Sep 6, 2023
1 parent 9713daa commit b6daa27
Show file tree
Hide file tree
Showing 2 changed files with 607 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"github.com/hashicorp/go-multierror"
)

//TODO(zalimeni) When MVP of this controller is done, put tickets for remaining TODOs in Jira.

const (
metaKeyManagedBy = "managed-by"
kindReplicaSet = "ReplicaSet"
Expand Down Expand Up @@ -114,44 +112,52 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}
r.Log.Info("retrieved Service", "name", req.Name, "ns", req.Namespace)

// Determine Consul service selector values from pod names.
workloadSelector, err := r.getWorkloadSelectorFromEndpoints(ctx, &ClientPodFetcher{client: r.Client}, &endpoints)
if err != nil {
errs = multierror.Append(errs, err)
}

//TODO: Maybe check service-enable label here on service/deployments/other pod owners
if err = r.registerService(ctx, resourceClient, service, workloadSelector); err != nil {
errs = multierror.Append(errs, err)
}

return ctrl.Result{}, errs
}

// getWorkloadSelectorFromEndpoints calculates a Consul service WorkloadSelector from Endpoints based on pod names and
// owners.
func (r *Controller) getWorkloadSelectorFromEndpoints(ctx context.Context, pf PodFetcher, endpoints *corev1.Endpoints) (*pbcatalog.WorkloadSelector, error) {
podPrefixes := make(map[string]any)
podExactNames := make(map[string]any)
var errs error
for address := range allAddresses(endpoints.Subsets) {
if address.TargetRef != nil && address.TargetRef.Kind == "Pod" {
podName := types.NamespacedName{Name: address.TargetRef.Name, Namespace: req.Namespace}
podName := types.NamespacedName{Name: address.TargetRef.Name, Namespace: endpoints.Namespace}

// Accumulate owner prefixes and exact pod names for Consul workload selector.
// If this pod is already covered by a known owner prefix, skip it.
// If not, fetch the owner. If the owner has a unique prefix, add it to known prefixes.
// If not, add the pod name to exact name matches.
maybePodOwnerPrefix := getOwnerPrefixFromPodName(podName.Name)
if _, ok := podPrefixes[maybePodOwnerPrefix]; !ok {
var pod corev1.Pod
if err = r.Client.Get(ctx, podName, &pod); err != nil {
r.Log.Error(err, "failed to get pod", "name", podName.Name, "ns", req.Namespace)
pod, err := pf.GetPod(ctx, podName)
if err != nil {
r.Log.Error(err, "failed to get pod", "name", podName.Name, "ns", endpoints.Namespace)
errs = multierror.Append(errs, err)
continue
}
// Add to workload selector values.
// Pods can appear more than once in Endpoints subsets, so we use a set for exact names as well.
if prefix := getOwnerPrefixFromPod(&pod); prefix != "" {
if prefix := getOwnerPrefixFromPod(pod); prefix != "" {
podPrefixes[prefix] = true
} else {
podExactNames[podName.Name] = true
}
}
}
}

//TODO(zalimeni) Do we need to check service-enable label here on service/deployments/replicasets?
// Should we expect owner is annotated, and if so, is that consistent for non-replicaset workloads?
// If so, update to register and deregister conditionally.
if err = r.registerService(ctx, resourceClient, service, podPrefixes, podExactNames); err != nil {
errs = multierror.Append(errs, err)
}

return ctrl.Result{}, errs
return getWorkloadSelector(podPrefixes, podExactNames), errs
}

// allAddresses combines all Endpoints subset addresses to a single set. Service registration by this controller
Expand Down Expand Up @@ -193,10 +199,10 @@ func getOwnerPrefixFromPod(pod *corev1.Pod) string {
}

// registerService creates a Consul service registration from the provided Kuberetes service and endpoint information.
func (r *Controller) registerService(ctx context.Context, resourceClient pbresource.ResourceServiceClient, service corev1.Service, podPrefixes, podExactNames map[string]any) error {
func (r *Controller) registerService(ctx context.Context, resourceClient pbresource.ResourceServiceClient, service corev1.Service, selector *pbcatalog.WorkloadSelector) error {
serviceResource := r.getServiceResource(
&pbcatalog.Service{
Workloads: getWorkloadSelector(podPrefixes, podExactNames),
Workloads: selector,
Ports: getServicePorts(service),
VirtualIps: r.getServiceVIPs(service),
},
Expand All @@ -207,8 +213,7 @@ func (r *Controller) registerService(ctx context.Context, resourceClient pbresou
)

r.Log.Info("registering service with Consul", getLogFieldsForResource(serviceResource.Id)...)
//TODO(zalimeni) Read from Consul or some other state to determine whether a write is needed.
// For now, we blindly write state on each reconcile to simplify implementation.
//TODO: Maybe attempt to debounce redundant writes. For now, we blindly rewrite state on each reconcile.
_, err := resourceClient.Write(ctx, &pbresource.WriteRequest{Resource: serviceResource})
if err != nil {
r.Log.Error(err, "failed to register service", getLogFieldsForResource(serviceResource.Id)...)
Expand Down Expand Up @@ -249,14 +254,19 @@ func getServicePorts(service corev1.Service) []*pbcatalog.ServicePort {
for _, p := range service.Spec.Ports {
ports = append(ports, &pbcatalog.ServicePort{
VirtualPort: uint32(p.Port),
TargetPort: p.TargetPort.String(),
Protocol: common.GetPortProtocol(p.AppProtocol),
//TODO: We may require that this aligns w/ Endpoints ports, rather than following the K8s model
// of a bimodal string-or-int. If so, that should be documented as a constraint. Regardless,
// this value should match the string form of the K8s ServicePort.TargetPort.
TargetPort: p.TargetPort.String(),
Protocol: common.GetPortProtocol(p.AppProtocol),
})
}

//TODO: Error check reserved "mesh" target port

// Append Consul service mesh port in addition to discovered ports.
//TODO(zalimeni) Omit if zero mesh ports present in service endpoints? Or could there be some
// use of mesh-inject at the service level in the webhook as well?
//TODO: Maybe omit if zero mesh ports present in service endpoints, or if some
// use of mesh-inject/other label should cause this to be excluded.
ports = append(ports, &pbcatalog.ServicePort{
TargetPort: "mesh",
Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
Expand All @@ -283,16 +293,7 @@ func getServiceMeta(service corev1.Service) map[string]string {
constants.MetaKeyKubeNS: service.Namespace,
metaKeyManagedBy: constants.ManagedByEndpointsValue,
}
//TODO(zalimeni) This is adapted from v1 pod-based meta injection - verify correct and needed
for k, v := range service.Annotations {
if strings.HasPrefix(k, constants.AnnotationMeta) && strings.TrimPrefix(k, constants.AnnotationMeta) != "" {
if v == "$SERVICE_NAME" {
meta[strings.TrimPrefix(k, constants.AnnotationMeta)] = service.Name
} else {
meta[strings.TrimPrefix(k, constants.AnnotationMeta)] = v
}
}
}
//TODO: Support arbitrary meta injection via annotation? (see v1)
return meta
}

Expand Down Expand Up @@ -344,7 +345,26 @@ func (r *Controller) getConsulPartition() string {
func getLogFieldsForResource(id *pbresource.ID) []any {
return []any{
"name", id.Name,
"namespace", id.Tenancy.Namespace,
"ns", id.Tenancy.Namespace,
"partition", id.Tenancy.Partition,
}
}

// PodFetcher fetches pods by NamespacedName. This interface primarily exists for testing.
type PodFetcher interface {
GetPod(context.Context, types.NamespacedName) (*corev1.Pod, error)
}

// ClientPodFetcher wraps a Kubernetes client to implement PodFetcher. This is the only implementation outside of tests.
type ClientPodFetcher struct {
client client.Client
}

func (c *ClientPodFetcher) GetPod(ctx context.Context, name types.NamespacedName) (*corev1.Pod, error) {
var pod corev1.Pod
err := c.client.Get(ctx, name, &pod)
if err != nil {
return nil, err
}
return &pod, nil
}
Loading

0 comments on commit b6daa27

Please sign in to comment.