Skip to content

Commit

Permalink
etcdserver: add livez and ready http endpoints for etcd.
Browse files Browse the repository at this point in the history
Add two separate probes, one for liveness and one for readiness. The liveness probe would check that the local individual node is up and running, or else restart the node, while the readiness probe would check that the cluster is ready to serve traffic. This would make etcd health-check fully Kubernetes API complient.

Signed-off-by: Siyuan Zhang <sizhang@google.com>
  • Loading branch information
siyuanfoundation committed Sep 28, 2023
1 parent e85949d commit f588767
Show file tree
Hide file tree
Showing 3 changed files with 405 additions and 0 deletions.
1 change: 1 addition & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,7 @@ func (e *Etcd) serveClients() (err error) {
etcdhttp.HandleVersion(mux, e.Server)
etcdhttp.HandleMetrics(mux)
etcdhttp.HandleHealth(e.cfg.logger, mux, e.Server)
etcdhttp.InstallHealthEndpoints(e.cfg.logger, mux, e.Server)

var gopts []grpc.ServerOption
if e.cfg.GRPCKeepAliveMinTime > time.Duration(0) {
Expand Down
211 changes: 211 additions & 0 deletions server/etcdserver/api/etcdhttp/readiness.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcdhttp

import (
"bytes"
"context"
"fmt"
"net/http"
"strings"
"time"

"go.uber.org/zap"

"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/auth"
)

func InstallHealthEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
readyz := ReadyzCheckRegistry(server)
readyz.InstallHttpEndpoints(mux, lg)
livez := LivezCheckRegistry(server)
livez.InstallHttpEndpoints(mux, lg)
}

type HealthCheck func(ctx context.Context) error

type CheckRegistry struct {
path string

checks map[string]HealthCheck
}

func LivezCheckRegistry(server ServerHealth) *CheckRegistry {
reg := CheckRegistry{path: "/livez", checks: make(map[string]HealthCheck)}
reg.Register("ping", ping)
reg.Register("serializable_read", checkSerializableRead(server))
return &reg
}

func ReadyzCheckRegistry(server ServerHealth) *CheckRegistry {
reg := CheckRegistry{path: "/readyz", checks: make(map[string]HealthCheck)}
reg.Register("ping", ping)
reg.Register("data_corruption", checkAlarm(server, etcdserverpb.AlarmType_CORRUPT))
reg.Register("serializable_read", checkSerializableRead(server))
return &reg
}

func (reg *CheckRegistry) Register(name string, check HealthCheck) {
reg.checks[name] = check
}

func (reg *CheckRegistry) InstallHttpEndpoints(mux *http.ServeMux, lg *zap.Logger) {
lg.Info(fmt.Sprintf("Installing %s http endpoint.", reg.path))
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
cancel()
mux.Handle(reg.path, reg.handleRootHealth(ctx, lg))
for checkName, check := range reg.checks {
mux.Handle(fmt.Sprintf("%s/%v", reg.path, checkName), adaptCheckToHandler(ctx, check))
}
}

// handleRootHealth returns an http.HandlerFunc that serves the provided checks.
func (reg *CheckRegistry) handleRootHealth(ctx context.Context, lg *zap.Logger) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// extracts the health check names to be excludeList from the query param
excludeList, _ := r.URL.Query()["exclude"]
// extracts the health check names to be allowList from the query param
allowList, _ := r.URL.Query()["allowlist"]

if len(excludeList) > 0 && len(allowList) > 0 {
http.Error(w, fmt.Sprintf("do not expect both allowlist: %v and exclude list: %v to be both specified in %s.", allowList, excludeList, reg.path), http.StatusBadRequest)
return
}
excluded := listToStringSet(excludeList)
included := listToStringSet(allowList)
failedChecks := []string{}
var individualCheckOutput bytes.Buffer
for checkName, check := range reg.checks {
if len(allowList) > 0 {
if _, found := included[checkName]; !found {
fmt.Fprintf(&individualCheckOutput, "[+]%s not included: ok\n", checkName)
continue
}
delete(included, checkName)
} else {
// no-op the check if we've specified we want to exclude the check
if _, found := excluded[checkName]; found {
delete(excluded, checkName)
fmt.Fprintf(&individualCheckOutput, "[+]%s excluded: ok\n", checkName)
continue
}
}
if err := check(ctx); err != nil {
fmt.Fprintf(&individualCheckOutput, "[-]%s failed: %v\n", checkName, err)
failedChecks = append(failedChecks, checkName)
} else {
fmt.Fprintf(&individualCheckOutput, "[+]%s ok\n", checkName)
}
}
if len(excluded) > 0 {
fmt.Fprintf(&individualCheckOutput, "warn: some health checks cannot be excluded: no matches for %s\n", formatQuoted(excluded.List()...))
}
if len(included) > 0 {
fmt.Fprintf(&individualCheckOutput, "warn: some health checks cannot be included: no matches for %s\n", formatQuoted(included.List()...))
}
// always be verbose on failure
if len(failedChecks) > 0 {
http.Error(w, fmt.Sprintf("%s%s check failed", individualCheckOutput.String(), reg.path), http.StatusInternalServerError)
lg.Error(fmt.Sprintf("%s%s check failed", individualCheckOutput.String(), reg.path))
return
}
lg.Info(fmt.Sprintf("%s check passed:\n%s", reg.path, individualCheckOutput.String()))

w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
if _, found := r.URL.Query()["verbose"]; !found {
fmt.Fprint(w, "ok")
return
}
individualCheckOutput.WriteTo(w)
fmt.Fprintf(w, "%s check passed\n", reg.path)
}
}

// adaptCheckToHandler returns an http.HandlerFunc that serves the provided checks.
func adaptCheckToHandler(ctx context.Context, c HealthCheck) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
err := c(ctx)
if err != nil {
http.Error(w, fmt.Sprintf("internal server error: %v", err), http.StatusInternalServerError)
} else {
fmt.Fprint(w, "ok")
}
})
}

// formatQuoted returns a formatted string of the health check names,
// preserving the order passed in.
func formatQuoted(names ...string) string {
quoted := make([]string, 0, len(names))
for _, name := range names {
quoted = append(quoted, fmt.Sprintf("%q", name))
}
return strings.Join(quoted, ",")
}

type stringSet map[string]struct{}

func (s stringSet) List() []string {
keys := make([]string, len(s))

i := 0
for k := range s {
keys[i] = k
i++
}
return keys
}

func listToStringSet(list []string) stringSet {
set := make(map[string]struct{})
for _, s := range list {
if len(s) == 0 {
continue
}
set[s] = struct{}{}
}
return set
}

func ping(context.Context) error { return nil }

// checkAlarm checks if a specific alarm type is active in the server.
func checkAlarm(srv ServerHealth, at etcdserverpb.AlarmType) func(context.Context) error {
return func(ctx context.Context) error {
as := srv.Alarms()
if len(as) > 0 {
for _, v := range as {
if v.Alarm == at {
return fmt.Errorf("Alarm active:%s", at.String())
}
}
}
return nil
}
}

func checkSerializableRead(srv ServerHealth) func(ctx context.Context) error {
return func(ctx context.Context) error {
checkCtx, cancel := context.WithTimeout(ctx, time.Second)
_, err := srv.Range(checkCtx, &etcdserverpb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: true})
cancel()
if err != nil && err != auth.ErrUserEmpty && err != auth.ErrPermissionDenied {
return fmt.Errorf("RANGE ERROR:%s", err)
}
return nil
}
}
Loading

0 comments on commit f588767

Please sign in to comment.