Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support upgrades for connect refactor #509

Merged
merged 6 commits into from
May 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ IMPROVEMENTS:
* Setting the label `consul.hashicorp.com/transparent-proxy` to `true/false` on a namespace will define the default behavior for pods in that namespace, which do not also have the annotation set.
* The default tproxy behavior will be defined by the value of `-enable-transparent-proxy` flag to the `consul-k8s inject-connect` command. It can be overridden in a namespace by the the label on the namespace or for a pod using the annotation on the pod.

* Connect: support upgrades for services deployed before endpoints controller to
upgrade to a version of consul-k8s with endpoints controller. [[GH-509](https://github.com/hashicorp/consul-k8s/pull/509)]

BUG FIXES:
* Connect: Use `runAsNonRoot: false` for connect-init's container when tproxy is enabled. [[GH-493](https://github.com/hashicorp/consul-k8s/pull/493)]
* CRDs: Fix a bug where the `config` field in `ProxyDefaults` CR was not synced to Consul because
Expand Down
9 changes: 9 additions & 0 deletions connect-inject/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ const (
// a pod after an injection is done.
keyInjectStatus = "consul.hashicorp.com/connect-inject-status"

// keyManagedBy is the key of the label that is added to pods managed
// by the Endpoints controller. This is to support upgrading from consul-k8s
// without Endpoints controller to consul-k8s with Endpoints controller
// without disrupting services managed the old way.
keyManagedBy = "consul.hashicorp.com/connect-inject-managed-by"

// annotationInject is the key of the annotation that controls whether
// injection is explicitly enabled or disabled for a pod. This should
// be set to a truthy or falsy value, as parseable by strconv.ParseBool
Expand Down Expand Up @@ -104,6 +110,9 @@ const (

// injected is used as the annotation value for annotationInjected.
injected = "injected"

// endpointsController is the value for keyManagedBy.
managedByValue = "consul-k8s-endpoints-controller"
)

// Annotations used by Prometheus.
Expand Down
198 changes: 155 additions & 43 deletions connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
MetaKeyPodName = "pod-name"
MetaKeyKubeServiceName = "k8s-service-name"
MetaKeyKubeNS = "k8s-namespace"
MetaKeyManagedBy = "managed-by"
kubernetesSuccessReasonMsg = "Kubernetes health checks passing"
envoyPrometheusBindAddr = "envoy_prometheus_bind_addr"

Expand Down Expand Up @@ -150,39 +151,55 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, err
}

// Get information from the pod to create service instance registrations.
serviceRegistration, proxyServiceRegistration, err := r.createServiceRegistrations(pod, serviceEndpoints, healthStatus)
if err != nil {
r.Log.Error(err, "failed to create service registrations for endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
return ctrl.Result{}, err
var managedByEndpointsController bool
if raw, ok := pod.Labels[keyManagedBy]; ok && raw == managedByValue {
managedByEndpointsController = true
}

// Register the service instance with the local agent.
// Note: the order of how we register services is important,
// and the connect-proxy service should come after the "main" service
// because its alias health check depends on the main service existing.
r.Log.Info("registering service with Consul", "name", serviceRegistration.Name)
err = client.Agent().ServiceRegister(serviceRegistration)
if err != nil {
r.Log.Error(err, "failed to register service", "name", serviceRegistration.Name)
return ctrl.Result{}, err
}
// For pods managed by this controller, create and register the service instance.
if managedByEndpointsController {
// Get information from the pod to create service instance registrations.
serviceRegistration, proxyServiceRegistration, err := r.createServiceRegistrations(pod, serviceEndpoints, healthStatus)
if err != nil {
r.Log.Error(err, "failed to create service registrations for endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
return ctrl.Result{}, err
}

// Register the proxy service instance with the local agent.
r.Log.Info("registering proxy service with Consul", "name", proxyServiceRegistration.Name)
err = client.Agent().ServiceRegister(proxyServiceRegistration)
if err != nil {
r.Log.Error(err, "failed to register proxy service", "name", proxyServiceRegistration.Name)
return ctrl.Result{}, err
// Register the service instance with the local agent.
// Note: the order of how we register services is important,
// and the connect-proxy service should come after the "main" service
// because its alias health check depends on the main service existing.
r.Log.Info("registering service with Consul", "name", serviceRegistration.Name)
err = client.Agent().ServiceRegister(serviceRegistration)
if err != nil {
r.Log.Error(err, "failed to register service", "name", serviceRegistration.Name)
return ctrl.Result{}, err
}

// Register the proxy service instance with the local agent.
r.Log.Info("registering proxy service with Consul", "name", proxyServiceRegistration.Name)
err = client.Agent().ServiceRegister(proxyServiceRegistration)
if err != nil {
r.Log.Error(err, "failed to register proxy service", "name", proxyServiceRegistration.Name)
return ctrl.Result{}, err
}
}

// Update the TTL health check for the service.
// This is required because ServiceRegister() does not update the TTL if the service already exists.
// Update the service TTL health check for both legacy services and services managed by endpoints
// controller. The proxy health checks are registered separately by endpoints controller and
// lifecycle sidecar for legacy services. Here, we always update the health check for legacy and
// newer services idempotently since the service health check is not added as part of the service
// registration.
reason := getHealthCheckStatusReason(healthStatus, pod.Name, pod.Namespace)
r.Log.Info("updating health check status for service", "name", serviceRegistration.Name, "reason", reason, "status", healthStatus)
err = client.Agent().UpdateTTL(getConsulHealthCheckID(pod, serviceRegistration.ID), reason, healthStatus)
serviceName := getServiceName(pod, serviceEndpoints)
r.Log.Info("updating health check status for service", "name", serviceName, "reason", reason, "status", healthStatus)
serviceID := getServiceID(pod, serviceEndpoints)
proxyServiceName := getProxyServiceName(pod, serviceEndpoints)
proxyServiceID := getProxyServiceID(pod, serviceEndpoints)
healthCheckID := getConsulHealthCheckID(pod, serviceID)
err = r.upsertHealthCheck(pod, client, serviceID, proxyServiceID, proxyServiceName, healthCheckID, healthStatus)
if err != nil {
r.Log.Error(err, "failed to update health check status for service", "name", serviceRegistration.Name)
r.Log.Error(err, "failed to update health check status for service", "name", serviceName)
return ctrl.Result{}, err
}
}
Expand Down Expand Up @@ -215,6 +232,111 @@ func (r *EndpointsController) SetupWithManager(mgr ctrl.Manager) error {
).Complete(r)
}

// getServiceCheck will return the health check for this pod and service if it exists.
func getServiceCheck(client *api.Client, healthCheckID string) (*api.AgentCheck, error) {
filter := fmt.Sprintf("CheckID == `%s`", healthCheckID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we may encounter a bug similar to the one where we weren't using %q when defining a filter in connect-init ? I'm guessing the `'s around "%s" make it work?

Copy link
Contributor Author

@ndhanushkodi ndhanushkodi May 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup! And I didn't notice issues testing it end to end, and that codepath would be used for updating the health check which I did test :)

checks, err := client.Agent().ChecksWithFilter(filter)
if err != nil {
return nil, err
}
// This will be nil (does not exist) or an actual check.
return checks[healthCheckID], nil
}

// registerConsulHealthCheck registers a TTL health check for the service on this Agent local to the Pod. This will add
// the Pod's readiness status, which will mark the service instance healthy/unhealthy for Consul service mesh
// traffic.
func registerConsulHealthCheck(client *api.Client, consulHealthCheckID, serviceID, status string) error {
// Create a TTL health check in Consul associated with this service and pod.
// The TTL time is 100000h which should ensure that the check never fails due to timeout
// of the TTL check.
err := client.Agent().CheckRegister(&api.AgentCheckRegistration{
ID: consulHealthCheckID,
Name: "Kubernetes Health Check",
ServiceID: serviceID,
AgentServiceCheck: api.AgentServiceCheck{
TTL: "100000h",
Status: status,
SuccessBeforePassing: 1,
FailuresBeforeCritical: 1,
},
})
if err != nil {
// Full error looks like:
// Unexpected response code: 500 (ServiceID "consulnamespace/svc-id" does not exist)
if strings.Contains(err.Error(), fmt.Sprintf("%s\" does not exist", serviceID)) {
return fmt.Errorf("service %q not found in Consul: unable to register health check", serviceID)
}
return fmt.Errorf("registering health check for service %q: %w", serviceID, err)
}

return nil
}

// updateConsulHealthCheckStatus updates the consul health check status.
func (r *EndpointsController) updateConsulHealthCheckStatus(client *api.Client, consulHealthCheckID, status, reason string) error {
r.Log.Info("updating health check", "id", consulHealthCheckID)
err := client.Agent().UpdateTTL(consulHealthCheckID, reason, status)
if err != nil {
return fmt.Errorf("error updating health check: %w", err)
}
return nil
}

// upsertHealthCheck checks if the healthcheck exists for the service, and creates it if it doesn't exist, or updates it
// if it does.
func (r *EndpointsController) upsertHealthCheck(pod corev1.Pod, client *api.Client, serviceID, proxyServiceID, proxyServiceName, healthCheckID, status string) error {
ndhanushkodi marked this conversation as resolved.
Show resolved Hide resolved
reason := getHealthCheckStatusReason(status, pod.Name, pod.Namespace)
// Retrieve the health check that would exist if the service had one registered for this pod.
serviceCheck, err := getServiceCheck(client, healthCheckID)
if err != nil {
return fmt.Errorf("unable to get agent health checks: serviceID=%s, checkID=%s, %s", serviceID, healthCheckID, err)
}
if serviceCheck == nil {
// Create a new health check.
err = registerConsulHealthCheck(client, healthCheckID, serviceID, status)
if err != nil {
return err
}

// Also update it, the reason this is separate is there is no way to set the Output field of the health check
// at creation time, and this is what is displayed on the UI as opposed to the Notes field.
err = r.updateConsulHealthCheckStatus(client, healthCheckID, status, reason)
if err != nil {
return err
}
} else if serviceCheck.Status != status {
ndhanushkodi marked this conversation as resolved.
Show resolved Hide resolved
err = r.updateConsulHealthCheckStatus(client, healthCheckID, status, reason)
if err != nil {
return err
}
}
return nil
}

func getServiceName(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string {
serviceName := serviceEndpoints.Name
if serviceNameFromAnnotation, ok := pod.Annotations[annotationService]; ok && serviceNameFromAnnotation != "" {
serviceName = serviceNameFromAnnotation
}
return serviceName

}

func getServiceID(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string {
return fmt.Sprintf("%s-%s", pod.Name, getServiceName(pod, serviceEndpoints))
}

func getProxyServiceName(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string {
serviceName := getServiceName(pod, serviceEndpoints)
return fmt.Sprintf("%s-sidecar-proxy", serviceName)
}

func getProxyServiceID(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string {
proxyServiceName := getProxyServiceName(pod, serviceEndpoints)
return fmt.Sprintf("%s-%s", pod.Name, proxyServiceName)
}

// createServiceRegistrations creates the service and proxy service instance registrations with the information from the
// Pod.
func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) (*api.AgentServiceRegistration, *api.AgentServiceRegistration, error) {
Expand All @@ -235,17 +357,15 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
// Otherwise, the Consul service name should equal the Kubernetes Service name.
// The service name in Consul defaults to the Endpoints object name, and is overridden by the pod
// annotation consul.hashicorp.com/connect-service..
serviceName := serviceEndpoints.Name
if serviceNameFromAnnotation, ok := pod.Annotations[annotationService]; ok && serviceNameFromAnnotation != "" {
serviceName = serviceNameFromAnnotation
}
serviceName := getServiceName(pod, serviceEndpoints)

serviceID := fmt.Sprintf("%s-%s", pod.Name, serviceName)
serviceID := getServiceID(pod, serviceEndpoints)

meta := map[string]string{
MetaKeyPodName: pod.Name,
MetaKeyKubeServiceName: serviceEndpoints.Name,
MetaKeyKubeNS: serviceEndpoints.Namespace,
MetaKeyManagedBy: managedByValue,
}
for k, v := range pod.Annotations {
if strings.HasPrefix(k, annotationMeta) && strings.TrimPrefix(k, annotationMeta) != "" {
Expand All @@ -269,21 +389,13 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
Address: pod.Status.PodIP,
Meta: meta,
Namespace: r.consulNamespace(pod.Namespace),
Check: &api.AgentServiceCheck{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if my understanding is wrong: we're not creating the check here because we're going to create it after the service register so the check also gets created for non-endpoints-ctrl pods?

If so, why don't we create it here and then after serviceRegister if the service isn't managed by endpoints controller we create the check?

CheckID: getConsulHealthCheckID(pod, serviceID),
Name: "Kubernetes Health Check",
TTL: "100000h",
Status: healthStatus,
SuccessBeforePassing: 1,
FailuresBeforeCritical: 1,
},
}
if len(tags) > 0 {
service.Tags = tags
}

proxyServiceName := fmt.Sprintf("%s-sidecar-proxy", serviceName)
proxyServiceID := fmt.Sprintf("%s-%s", pod.Name, proxyServiceName)
proxyServiceName := getProxyServiceName(pod, serviceEndpoints)
proxyServiceID := getProxyServiceID(pod, serviceEndpoints)
proxyConfig := &api.AgentServiceConnectProxyConfig{
DestinationServiceName: serviceName,
DestinationServiceID: serviceID,
Expand Down Expand Up @@ -506,8 +618,8 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context,
// of services instances that have the provided k8sServiceName and k8sServiceNamespace in their metadata.
func serviceInstancesForK8SServiceNameAndNamespace(k8sServiceName, k8sServiceNamespace string, client *api.Client) (map[string]*api.AgentService, error) {
return client.Agent().ServicesWithFilter(
fmt.Sprintf(`Meta[%q] == %q and Meta[%q] == %q`,
MetaKeyKubeServiceName, k8sServiceName, MetaKeyKubeNS, k8sServiceNamespace))
fmt.Sprintf(`Meta[%q] == %q and Meta[%q] == %q and Meta[%q] == %q`,
MetaKeyKubeServiceName, k8sServiceName, MetaKeyKubeNS, k8sServiceNamespace, MetaKeyManagedBy, managedByValue))
}

// processUpstreams reads the list of upstreams from the Pod annotation and converts them into a list of api.Upstream
Expand Down
Loading