From d93b26580eea0b9ceb61f57ce3a42482dec90482 Mon Sep 17 00:00:00 2001 From: Cheng Xing Date: Wed, 4 Nov 2020 17:02:08 -0800 Subject: [PATCH] Add health checker to leader election library --- leaderelection/leader_election.go | 36 +++++++++++++++++++++++++++++++ metrics/metrics.go | 36 +++++++++++-------------------- metrics/metrics_test.go | 20 ++++++++--------- 3 files changed, 58 insertions(+), 34 deletions(-) diff --git a/leaderelection/leader_election.go b/leaderelection/leader_election.go index f3065c5af..6e891ed2d 100644 --- a/leaderelection/leader_election.go +++ b/leaderelection/leader_election.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io/ioutil" + "net/http" "os" "regexp" "strings" @@ -39,6 +40,9 @@ const ( defaultLeaseDuration = 15 * time.Second defaultRenewDeadline = 10 * time.Second defaultRetryPeriod = 5 * time.Second + healthCheckTimeout = 20 * time.Second + + HealthCheckerAddress = "/healthz/leader-election" ) // leaderElection is a convenience wrapper around client-go's leader election library. @@ -55,6 +59,9 @@ type leaderElection struct { // valid options are resourcelock.LeasesResourceLock, resourcelock.EndpointsResourceLock, // and resourcelock.ConfigMapsResourceLock resourceLock string + // healthCheck reports unhealthy if leader election fails to renew leadership + // within a timeout period. + healthCheck *leaderelection.HealthzAdaptor leaseDuration time.Duration renewDeadline time.Duration @@ -76,6 +83,7 @@ func NewLeaderElectionWithLeases(clientset kubernetes.Interface, lockName string runFunc: runFunc, lockName: lockName, resourceLock: resourcelock.LeasesResourceLock, + healthCheck: leaderelection.NewLeaderHealthzAdaptor(healthCheckTimeout), leaseDuration: defaultLeaseDuration, renewDeadline: defaultRenewDeadline, retryPeriod: defaultRetryPeriod, @@ -89,6 +97,7 @@ func NewLeaderElectionWithEndpoints(clientset kubernetes.Interface, lockName str runFunc: runFunc, lockName: lockName, resourceLock: resourcelock.EndpointsResourceLock, + healthCheck: leaderelection.NewLeaderHealthzAdaptor(healthCheckTimeout), leaseDuration: defaultLeaseDuration, renewDeadline: defaultRenewDeadline, retryPeriod: defaultRetryPeriod, @@ -102,6 +111,7 @@ func NewLeaderElectionWithConfigMaps(clientset kubernetes.Interface, lockName st runFunc: runFunc, lockName: lockName, resourceLock: resourcelock.ConfigMapsResourceLock, + healthCheck: leaderelection.NewLeaderHealthzAdaptor(healthCheckTimeout), leaseDuration: defaultLeaseDuration, renewDeadline: defaultRenewDeadline, retryPeriod: defaultRetryPeriod, @@ -134,6 +144,19 @@ func (l *leaderElection) WithContext(ctx context.Context) { l.ctx = ctx } +// Server represents any type that could serve HTTP requests for the leader +// election health check endpoint. +type Server interface { + Handle(pattern string, handler http.Handler) +} + +// RegisterHealthCheck creates a health check for this leader election object and +// registers its HTTP handler to the given server at the path specified by the +// constant "healthCheckerAddress". +func (l *leaderElection) RegisterHealthCheck(s Server) { + s.Handle(HealthCheckerAddress, adaptCheckToHandler(l.healthCheck.Check)) +} + func (l *leaderElection) Run() error { if l.identity == "" { id, err := defaultLeaderElectionIdentity() @@ -179,6 +202,7 @@ func (l *leaderElection) Run() error { klog.V(3).Infof("new leader detected, current leader: %s", identity) }, }, + WatchDog: l.healthCheck, } ctx := l.ctx @@ -220,3 +244,15 @@ func inClusterNamespace() string { return "default" } + +// adaptCheckToHandler returns an http.HandlerFunc that serves the provided checks. +func adaptCheckToHandler(c func(r *http.Request) error) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + err := c(r) + if err != nil { + http.Error(w, fmt.Sprintf("internal server error: %v", err), http.StatusInternalServerError) + } else { + fmt.Fprint(w, "ok") + } + }) +} \ No newline at end of file diff --git a/metrics/metrics.go b/metrics/metrics.go index 6ca2b9ae6..733e69236 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -27,7 +27,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/component-base/metrics" - "k8s.io/klog/v2" ) const ( @@ -90,10 +89,15 @@ type CSIMetricsManager interface { // driverName - Name of the CSI driver against which this operation was executed. SetDriverName(driverName string) - // StartMetricsEndpoint starts the metrics endpoint at the specified address/path - // for this metrics manager. - // If the metricsAddress is an empty string, this will be a no op. - StartMetricsEndpoint(metricsAddress, metricsPath string) + // RegisterToServer registers an HTTP handler for this metrics manager to the + // given server at the specified address/path. + RegisterToServer(s Server, metricsPath string) +} + +// Server represents any type that could serve HTTP requests for the metrics +// endpoint. +type Server interface { + Handle(pattern string, handler http.Handler) } // MetricsManagerOption is used to pass optional configuration to a @@ -325,27 +329,13 @@ func (cmm *csiMetricsManager) SetDriverName(driverName string) { } } -// StartMetricsEndpoint starts the metrics endpoint at the specified address/path -// for this metrics manager on a new go routine. -// If the metricsAddress is an empty string, this will be a no op. -func (cmm *csiMetricsManager) StartMetricsEndpoint(metricsAddress, metricsPath string) { - if metricsAddress == "" { - klog.Warningf("metrics endpoint will not be started because `metrics-address` was not specified.") - return - } - - http.Handle(metricsPath, metrics.HandlerFor( +// RegisterToServer registers an HTTP handler for this metrics manager to the +// given server at the specified address/path. +func (cmm *csiMetricsManager) RegisterToServer(s Server, metricsPath string) { + s.Handle(metricsPath, metrics.HandlerFor( cmm.GetRegistry(), metrics.HandlerOpts{ ErrorHandling: metrics.ContinueOnError})) - - // Spawn a new go routine to listen on specified endpoint - go func() { - err := http.ListenAndServe(metricsAddress, nil) - if err != nil { - klog.Fatalf("Failed to start prometheus metrics endpoint on specified address (%q) and path (%q): %s", metricsAddress, metricsPath, err) - } - }() } // VerifyMetricsMatch is a helper function that verifies that the expected and diff --git a/metrics/metrics_test.go b/metrics/metrics_test.go index 18233ec81..d6e6d4b0f 100644 --- a/metrics/metrics_test.go +++ b/metrics/metrics_test.go @@ -19,6 +19,7 @@ package metrics import ( "io/ioutil" "net/http" + "net/http/httptest" "strings" "testing" "time" @@ -481,29 +482,26 @@ func TestRecordMetrics_Negative(t *testing.T) { } } -func TestStartMetricsEndPoint_Noop(t *testing.T) { +func TestRegisterToServer_Noop(t *testing.T) { // Arrange cmm := NewCSIMetricsManagerForSidecar( "fake.csi.driver.io" /* driverName */) operationDuration, _ := time.ParseDuration("20s") + mux := http.NewServeMux() // Act - cmm.StartMetricsEndpoint(":8080", "/metrics") + cmm.RegisterToServer(mux, "/metrics") cmm.RecordMetrics( "/csi.v1.Controller/ControllerGetCapabilities", /* operationName */ nil, /* operationErr */ operationDuration /* operationDuration */) // Assert - request, err := http.NewRequest("GET", "http://localhost:8080/metrics", strings.NewReader("")) - if err != nil { - t.Fatalf("Creating request for metrics endpoint failed: %v", err) - } - client := &http.Client{} - resp, err := client.Do(request) - if err != nil { - t.Fatalf("Failed to GET metrics. Error: %v", err) - } + request := httptest.NewRequest("GET", "/metrics", strings.NewReader("")) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, request) + resp := rec.Result() + if resp.StatusCode != 200 { t.Fatalf("/metrics response status not 200. Response was: %+v", resp) }