Skip to content

Commit

Permalink
Refactor health resource to match cleanup resource (#437)
Browse files Browse the repository at this point in the history
- refactor Run loop
- use ConsulPort and ConsulScheme instead of ConsulURL
- reconcile() does not return an error since it is logged during that
function
  • Loading branch information
lkysow authored Feb 12, 2021
1 parent f5f0878 commit 1e75484
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 26 deletions.
32 changes: 14 additions & 18 deletions connect-inject/health_check_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package connectinject
import (
"errors"
"fmt"
"net/url"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -38,8 +37,11 @@ type HealthCheckResource struct {
Log hclog.Logger
KubernetesClientset kubernetes.Interface

// ConsulUrl holds the url information for client connections.
ConsulUrl *url.URL
// ConsulScheme is the scheme to use when making API calls to Consul,
// i.e. "http" or "https".
ConsulScheme string
// ConsulPort is the port to make HTTP API calls to Consul agents on.
ConsulPort string
// ReconcilePeriod is the period by which reconcile gets called.
// default to 1 minute.
ReconcilePeriod time.Duration
Expand All @@ -52,25 +54,20 @@ type HealthCheckResource struct {
// It initially reconciles at startup and is then invoked after every
// ReconcilePeriod expires.
func (h *HealthCheckResource) Run(stopCh <-chan struct{}) {
err := h.Reconcile()
if err != nil {
h.Log.Error("reconcile returned an error", "err", err)
}

reconcileTimer := time.NewTimer(h.ReconcilePeriod)
defer reconcileTimer.Stop()

for {
h.reconcile()
reconcileTimer.Reset(h.ReconcilePeriod)

select {
case <-stopCh:
h.Log.Info("received stop signal, shutting down")
return

case <-reconcileTimer.C:
if err := h.Reconcile(); err != nil {
h.Log.Error("reconcile returned an error", "err", err)
}
reconcileTimer.Reset(h.ReconcilePeriod)
// Fall through and continue the loop.
}
}
}
Expand Down Expand Up @@ -106,7 +103,7 @@ func (h *HealthCheckResource) Informer() cache.SharedIndexInformer {
// Two primary use cases are handled, new pods will get a new consul TTL health check
// registered against their respective agent and service, and updates to pods will have
// this TTL health check updated to reflect the pod's readiness status.
func (h *HealthCheckResource) Upsert(key string, raw interface{}) error {
func (h *HealthCheckResource) Upsert(_ string, raw interface{}) error {
pod, ok := raw.(*corev1.Pod)
if !ok {
return fmt.Errorf("failed to cast to a pod object")
Expand All @@ -119,10 +116,10 @@ func (h *HealthCheckResource) Upsert(key string, raw interface{}) error {
return nil
}

// Reconcile iterates through all Pods with the appropriate label and compares the
// reconcile iterates through all Pods with the appropriate label and compares the
// current health check status against that which is stored in Consul and updates
// the consul health check accordingly. If the health check doesn't yet exist it will create it.
func (h *HealthCheckResource) Reconcile() error {
func (h *HealthCheckResource) reconcile() {
h.lock.Lock()
defer h.lock.Unlock()
h.Log.Debug("starting reconcile")
Expand All @@ -131,7 +128,7 @@ func (h *HealthCheckResource) Reconcile() error {
metav1.ListOptions{LabelSelector: labelInject})
if err != nil {
h.Log.Error("unable to get pods", "err", err)
return err
return
}
// Reconcile the state of each pod in the podList.
for _, pod := range podList.Items {
Expand All @@ -141,7 +138,6 @@ func (h *HealthCheckResource) Reconcile() error {
}
}
h.Log.Debug("finished reconcile")
return nil
}

// reconcilePod will reconcile a pod. This is the common work for both Upsert and Reconcile.
Expand Down Expand Up @@ -273,7 +269,7 @@ func (h *HealthCheckResource) getReadyStatusAndReason(pod *corev1.Pod) (string,

// getConsulClient returns an *api.Client that points at the consul agent local to the pod.
func (h *HealthCheckResource) getConsulClient(pod *corev1.Pod) (*api.Client, error) {
newAddr := fmt.Sprintf("%s://%s:%s", h.ConsulUrl.Scheme, pod.Status.HostIP, h.ConsulUrl.Port())
newAddr := fmt.Sprintf("%s://%s:%s", h.ConsulScheme, pod.Status.HostIP, h.ConsulPort)
localConfig := api.DefaultConfig()
localConfig.Address = newAddr
if pod.Annotations[annotationConsulNamespace] != "" {
Expand Down
14 changes: 7 additions & 7 deletions connect-inject/health_check_resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,7 @@ func TestReconcile_IgnorePodsWithoutInjectLabel(t *testing.T) {
server, client, resource := testServerAgentResourceAndController(t, pod)
defer server.Stop()
// Start the reconciler, it should not create a health check.
err := resource.Reconcile()
require.NoError(err)
resource.reconcile()
actual := getConsulAgentChecks(t, client, testHealthCheckID)
require.Nil(actual)
}
Expand Down Expand Up @@ -626,7 +625,6 @@ func TestReconcilerShutdown(t *testing.T) {
healthResource := HealthCheckResource{
Log: hclog.Default().Named("healthCheckResource"),
KubernetesClientset: k8sclientset,
ConsulUrl: nil,
ReconcilePeriod: 5 * time.Second,
}

Expand Down Expand Up @@ -691,13 +689,14 @@ func TestReconcileRun(t *testing.T) {
require.NoError(err)
client, err := api.NewClient(clientConfig)
require.NoError(err)
consulUrl, err := url.Parse(serverAddress)
consulURL, err := url.Parse(serverAddress)
require.NoError(err)

healthResource := HealthCheckResource{
Log: hclog.Default().Named("healthCheckResource"),
KubernetesClientset: k8sclientset,
ConsulUrl: consulUrl,
ConsulScheme: consulURL.Scheme,
ConsulPort: consulURL.Port(),
ReconcilePeriod: 100 * time.Millisecond,
}
ctx, cancelFunc := context.WithCancel(context.Background())
Expand Down Expand Up @@ -767,13 +766,14 @@ func testServerAgentResourceAndControllerWithConsulNS(t *testing.T, pod *corev1.
require.NoError(err)

schema := "http://"
consulUrl, err := url.Parse(schema + s.HTTPAddr)
consulURL, err := url.Parse(schema + s.HTTPAddr)
require.NoError(err)

healthResource := HealthCheckResource{
Log: hclog.Default().Named("healthCheckResource"),
KubernetesClientset: fake.NewSimpleClientset(pod),
ConsulUrl: consulUrl,
ConsulScheme: consulURL.Scheme,
ConsulPort: consulURL.Port(),
ReconcilePeriod: 0,
}
return s, client, &healthResource
Expand Down
3 changes: 2 additions & 1 deletion subcommand/inject-connect/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,8 @@ func (c *Command) Run(args []string) int {
healthResource := connectinject.HealthCheckResource{
Log: logger.Named("healthCheckResource"),
KubernetesClientset: c.clientset,
ConsulUrl: consulURL,
ConsulScheme: consulURL.Scheme,
ConsulPort: consulURL.Port(),
Ctx: ctx,
ReconcilePeriod: c.flagHealthChecksReconcilePeriod,
}
Expand Down

0 comments on commit 1e75484

Please sign in to comment.