-
Notifications
You must be signed in to change notification settings - Fork 321
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support upgrades for connect refactor #509
Changes from all commits
fc5cd1f
e3e6e59
ecfde64
5d95b23
93526bd
5aa7d0f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ const ( | |
MetaKeyPodName = "pod-name" | ||
MetaKeyKubeServiceName = "k8s-service-name" | ||
MetaKeyKubeNS = "k8s-namespace" | ||
MetaKeyManagedBy = "managed-by" | ||
kubernetesSuccessReasonMsg = "Kubernetes health checks passing" | ||
envoyPrometheusBindAddr = "envoy_prometheus_bind_addr" | ||
|
||
|
@@ -150,39 +151,55 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) ( | |
return ctrl.Result{}, err | ||
} | ||
|
||
// Get information from the pod to create service instance registrations. | ||
serviceRegistration, proxyServiceRegistration, err := r.createServiceRegistrations(pod, serviceEndpoints, healthStatus) | ||
if err != nil { | ||
r.Log.Error(err, "failed to create service registrations for endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) | ||
return ctrl.Result{}, err | ||
var managedByEndpointsController bool | ||
if raw, ok := pod.Labels[keyManagedBy]; ok && raw == managedByValue { | ||
managedByEndpointsController = true | ||
} | ||
|
||
// Register the service instance with the local agent. | ||
// Note: the order of how we register services is important, | ||
// and the connect-proxy service should come after the "main" service | ||
// because its alias health check depends on the main service existing. | ||
r.Log.Info("registering service with Consul", "name", serviceRegistration.Name) | ||
err = client.Agent().ServiceRegister(serviceRegistration) | ||
if err != nil { | ||
r.Log.Error(err, "failed to register service", "name", serviceRegistration.Name) | ||
return ctrl.Result{}, err | ||
} | ||
// For pods managed by this controller, create and register the service instance. | ||
if managedByEndpointsController { | ||
// Get information from the pod to create service instance registrations. | ||
serviceRegistration, proxyServiceRegistration, err := r.createServiceRegistrations(pod, serviceEndpoints, healthStatus) | ||
if err != nil { | ||
r.Log.Error(err, "failed to create service registrations for endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) | ||
return ctrl.Result{}, err | ||
} | ||
|
||
// Register the proxy service instance with the local agent. | ||
r.Log.Info("registering proxy service with Consul", "name", proxyServiceRegistration.Name) | ||
err = client.Agent().ServiceRegister(proxyServiceRegistration) | ||
if err != nil { | ||
r.Log.Error(err, "failed to register proxy service", "name", proxyServiceRegistration.Name) | ||
return ctrl.Result{}, err | ||
// Register the service instance with the local agent. | ||
// Note: the order of how we register services is important, | ||
// and the connect-proxy service should come after the "main" service | ||
// because its alias health check depends on the main service existing. | ||
r.Log.Info("registering service with Consul", "name", serviceRegistration.Name) | ||
err = client.Agent().ServiceRegister(serviceRegistration) | ||
if err != nil { | ||
r.Log.Error(err, "failed to register service", "name", serviceRegistration.Name) | ||
return ctrl.Result{}, err | ||
} | ||
|
||
// Register the proxy service instance with the local agent. | ||
r.Log.Info("registering proxy service with Consul", "name", proxyServiceRegistration.Name) | ||
err = client.Agent().ServiceRegister(proxyServiceRegistration) | ||
if err != nil { | ||
r.Log.Error(err, "failed to register proxy service", "name", proxyServiceRegistration.Name) | ||
return ctrl.Result{}, err | ||
} | ||
} | ||
|
||
// Update the TTL health check for the service. | ||
// This is required because ServiceRegister() does not update the TTL if the service already exists. | ||
// Update the service TTL health check for both legacy services and services managed by endpoints | ||
// controller. The proxy health checks are registered separately by endpoints controller and | ||
// lifecycle sidecar for legacy services. Here, we always update the health check for legacy and | ||
// newer services idempotently since the service health check is not added as part of the service | ||
// registration. | ||
reason := getHealthCheckStatusReason(healthStatus, pod.Name, pod.Namespace) | ||
r.Log.Info("updating health check status for service", "name", serviceRegistration.Name, "reason", reason, "status", healthStatus) | ||
err = client.Agent().UpdateTTL(getConsulHealthCheckID(pod, serviceRegistration.ID), reason, healthStatus) | ||
serviceName := getServiceName(pod, serviceEndpoints) | ||
r.Log.Info("updating health check status for service", "name", serviceName, "reason", reason, "status", healthStatus) | ||
serviceID := getServiceID(pod, serviceEndpoints) | ||
proxyServiceName := getProxyServiceName(pod, serviceEndpoints) | ||
proxyServiceID := getProxyServiceID(pod, serviceEndpoints) | ||
healthCheckID := getConsulHealthCheckID(pod, serviceID) | ||
err = r.upsertHealthCheck(pod, client, serviceID, proxyServiceID, proxyServiceName, healthCheckID, healthStatus) | ||
if err != nil { | ||
r.Log.Error(err, "failed to update health check status for service", "name", serviceRegistration.Name) | ||
r.Log.Error(err, "failed to update health check status for service", "name", serviceName) | ||
return ctrl.Result{}, err | ||
} | ||
} | ||
|
@@ -215,6 +232,111 @@ func (r *EndpointsController) SetupWithManager(mgr ctrl.Manager) error { | |
).Complete(r) | ||
} | ||
|
||
// getServiceCheck will return the health check for this pod and service if it exists. | ||
func getServiceCheck(client *api.Client, healthCheckID string) (*api.AgentCheck, error) { | ||
filter := fmt.Sprintf("CheckID == `%s`", healthCheckID) | ||
checks, err := client.Agent().ChecksWithFilter(filter) | ||
if err != nil { | ||
return nil, err | ||
} | ||
// This will be nil (does not exist) or an actual check. | ||
return checks[healthCheckID], nil | ||
} | ||
|
||
// registerConsulHealthCheck registers a TTL health check for the service on this Agent local to the Pod. This will add | ||
// the Pod's readiness status, which will mark the service instance healthy/unhealthy for Consul service mesh | ||
// traffic. | ||
func registerConsulHealthCheck(client *api.Client, consulHealthCheckID, serviceID, status string) error { | ||
// Create a TTL health check in Consul associated with this service and pod. | ||
// The TTL time is 100000h which should ensure that the check never fails due to timeout | ||
// of the TTL check. | ||
err := client.Agent().CheckRegister(&api.AgentCheckRegistration{ | ||
ID: consulHealthCheckID, | ||
Name: "Kubernetes Health Check", | ||
ServiceID: serviceID, | ||
AgentServiceCheck: api.AgentServiceCheck{ | ||
TTL: "100000h", | ||
Status: status, | ||
SuccessBeforePassing: 1, | ||
FailuresBeforeCritical: 1, | ||
}, | ||
}) | ||
if err != nil { | ||
// Full error looks like: | ||
// Unexpected response code: 500 (ServiceID "consulnamespace/svc-id" does not exist) | ||
if strings.Contains(err.Error(), fmt.Sprintf("%s\" does not exist", serviceID)) { | ||
return fmt.Errorf("service %q not found in Consul: unable to register health check", serviceID) | ||
} | ||
return fmt.Errorf("registering health check for service %q: %w", serviceID, err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// updateConsulHealthCheckStatus updates the consul health check status. | ||
func (r *EndpointsController) updateConsulHealthCheckStatus(client *api.Client, consulHealthCheckID, status, reason string) error { | ||
r.Log.Info("updating health check", "id", consulHealthCheckID) | ||
err := client.Agent().UpdateTTL(consulHealthCheckID, reason, status) | ||
if err != nil { | ||
return fmt.Errorf("error updating health check: %w", err) | ||
} | ||
return nil | ||
} | ||
|
||
// upsertHealthCheck checks if the healthcheck exists for the service, and creates it if it doesn't exist, or updates it | ||
// if it does. | ||
func (r *EndpointsController) upsertHealthCheck(pod corev1.Pod, client *api.Client, serviceID, proxyServiceID, proxyServiceName, healthCheckID, status string) error { | ||
ndhanushkodi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
reason := getHealthCheckStatusReason(status, pod.Name, pod.Namespace) | ||
// Retrieve the health check that would exist if the service had one registered for this pod. | ||
serviceCheck, err := getServiceCheck(client, healthCheckID) | ||
if err != nil { | ||
return fmt.Errorf("unable to get agent health checks: serviceID=%s, checkID=%s, %s", serviceID, healthCheckID, err) | ||
} | ||
if serviceCheck == nil { | ||
// Create a new health check. | ||
err = registerConsulHealthCheck(client, healthCheckID, serviceID, status) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Also update it, the reason this is separate is there is no way to set the Output field of the health check | ||
// at creation time, and this is what is displayed on the UI as opposed to the Notes field. | ||
err = r.updateConsulHealthCheckStatus(client, healthCheckID, status, reason) | ||
if err != nil { | ||
return err | ||
} | ||
} else if serviceCheck.Status != status { | ||
ndhanushkodi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
err = r.updateConsulHealthCheckStatus(client, healthCheckID, status, reason) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func getServiceName(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string { | ||
serviceName := serviceEndpoints.Name | ||
if serviceNameFromAnnotation, ok := pod.Annotations[annotationService]; ok && serviceNameFromAnnotation != "" { | ||
serviceName = serviceNameFromAnnotation | ||
} | ||
return serviceName | ||
|
||
} | ||
|
||
func getServiceID(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string { | ||
return fmt.Sprintf("%s-%s", pod.Name, getServiceName(pod, serviceEndpoints)) | ||
} | ||
|
||
func getProxyServiceName(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string { | ||
serviceName := getServiceName(pod, serviceEndpoints) | ||
return fmt.Sprintf("%s-sidecar-proxy", serviceName) | ||
} | ||
|
||
func getProxyServiceID(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string { | ||
proxyServiceName := getProxyServiceName(pod, serviceEndpoints) | ||
return fmt.Sprintf("%s-%s", pod.Name, proxyServiceName) | ||
} | ||
|
||
// createServiceRegistrations creates the service and proxy service instance registrations with the information from the | ||
// Pod. | ||
func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) (*api.AgentServiceRegistration, *api.AgentServiceRegistration, error) { | ||
|
@@ -235,17 +357,15 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service | |
// Otherwise, the Consul service name should equal the Kubernetes Service name. | ||
// The service name in Consul defaults to the Endpoints object name, and is overridden by the pod | ||
// annotation consul.hashicorp.com/connect-service.. | ||
serviceName := serviceEndpoints.Name | ||
if serviceNameFromAnnotation, ok := pod.Annotations[annotationService]; ok && serviceNameFromAnnotation != "" { | ||
serviceName = serviceNameFromAnnotation | ||
} | ||
serviceName := getServiceName(pod, serviceEndpoints) | ||
|
||
serviceID := fmt.Sprintf("%s-%s", pod.Name, serviceName) | ||
serviceID := getServiceID(pod, serviceEndpoints) | ||
|
||
meta := map[string]string{ | ||
MetaKeyPodName: pod.Name, | ||
MetaKeyKubeServiceName: serviceEndpoints.Name, | ||
MetaKeyKubeNS: serviceEndpoints.Namespace, | ||
MetaKeyManagedBy: managedByValue, | ||
} | ||
for k, v := range pod.Annotations { | ||
if strings.HasPrefix(k, annotationMeta) && strings.TrimPrefix(k, annotationMeta) != "" { | ||
|
@@ -269,21 +389,13 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service | |
Address: pod.Status.PodIP, | ||
Meta: meta, | ||
Namespace: r.consulNamespace(pod.Namespace), | ||
Check: &api.AgentServiceCheck{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct me if my understanding is wrong: we're not creating the check here because we're going to create it after the service register so the check also gets created for non-endpoints-ctrl pods? If so, why don't we create it here and then after serviceRegister if the service isn't managed by endpoints controller we create the check? |
||
CheckID: getConsulHealthCheckID(pod, serviceID), | ||
Name: "Kubernetes Health Check", | ||
TTL: "100000h", | ||
Status: healthStatus, | ||
SuccessBeforePassing: 1, | ||
FailuresBeforeCritical: 1, | ||
}, | ||
} | ||
if len(tags) > 0 { | ||
service.Tags = tags | ||
} | ||
|
||
proxyServiceName := fmt.Sprintf("%s-sidecar-proxy", serviceName) | ||
proxyServiceID := fmt.Sprintf("%s-%s", pod.Name, proxyServiceName) | ||
proxyServiceName := getProxyServiceName(pod, serviceEndpoints) | ||
proxyServiceID := getProxyServiceID(pod, serviceEndpoints) | ||
proxyConfig := &api.AgentServiceConnectProxyConfig{ | ||
DestinationServiceName: serviceName, | ||
DestinationServiceID: serviceID, | ||
|
@@ -506,8 +618,8 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context, | |
// of services instances that have the provided k8sServiceName and k8sServiceNamespace in their metadata. | ||
func serviceInstancesForK8SServiceNameAndNamespace(k8sServiceName, k8sServiceNamespace string, client *api.Client) (map[string]*api.AgentService, error) { | ||
return client.Agent().ServicesWithFilter( | ||
fmt.Sprintf(`Meta[%q] == %q and Meta[%q] == %q`, | ||
MetaKeyKubeServiceName, k8sServiceName, MetaKeyKubeNS, k8sServiceNamespace)) | ||
fmt.Sprintf(`Meta[%q] == %q and Meta[%q] == %q and Meta[%q] == %q`, | ||
MetaKeyKubeServiceName, k8sServiceName, MetaKeyKubeNS, k8sServiceNamespace, MetaKeyManagedBy, managedByValue)) | ||
} | ||
|
||
// processUpstreams reads the list of upstreams from the Pod annotation and converts them into a list of api.Upstream | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we may encounter a bug similar to the one where we weren't using
%q
when defining a filter inconnect-init
? I'm guessing the `'s around "%s" make it work?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup! And I didn't notice issues testing it end to end, and that codepath would be used for updating the health check which I did test :)