Skip to content

Commit

Permalink
Make log pattern check configurable in health checker
Browse files Browse the repository at this point in the history
  • Loading branch information
abansal4032 committed Feb 18, 2021
1 parent fc4f167 commit 100f2bf
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 46 deletions.
3 changes: 3 additions & 0 deletions cmd/healthchecker/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type HealthCheckerOptions struct {
CriSocketPath string
CoolDownTime time.Duration
HealthCheckTimeout time.Duration
LogPatterns types.LogPatternFlag
}

// AddFlags adds health checker command line options to pflag.
Expand All @@ -57,6 +58,8 @@ func (hco *HealthCheckerOptions) AddFlags(fs *pflag.FlagSet) {
"The duration to wait for the service to be up before attempting repair.")
fs.DurationVar(&hco.HealthCheckTimeout, "health-check-timeout", types.DefaultHealthCheckTimeout,
"The time to wait before marking the component as unhealthy.")
fs.Var(&hco.LogPatterns, "log-pattern",
"The log pattern to look for in service journald logs. The format for flag value <failureThresholdCount>:<logPattern>")
}

// IsValid validates health checker command line options.
Expand Down
84 changes: 42 additions & 42 deletions pkg/healthchecker/health_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

type healthChecker struct {
component string
systemdService string
enableRepair bool
healthCheckFunc func() (bool, error)
// The repair is "best-effort" and ignores the error from the underlying actions.
Expand All @@ -42,6 +43,7 @@ type healthChecker struct {
crictlPath string
healthCheckTimeout time.Duration
coolDownTime time.Duration
logPatternsToCheck map[string]int
}

// NewHealthChecker returns a new health checker configured with the given options.
Expand All @@ -52,6 +54,8 @@ func NewHealthChecker(hco *options.HealthCheckerOptions) (types.HealthChecker, e
crictlPath: hco.CriCtlPath,
healthCheckTimeout: hco.HealthCheckTimeout,
coolDownTime: hco.CoolDownTime,
systemdService: hco.SystemdService,
logPatternsToCheck: hco.LogPatterns.GetLogPatternCountMap(),
}
hc.healthCheckFunc = getHealthCheckFunc(hco)
hc.repairFunc = getRepairFunc(hco)
Expand Down Expand Up @@ -106,7 +110,14 @@ func getRepairFunc(hco *options.HealthCheckerOptions) func() {
func getHealthCheckFunc(hco *options.HealthCheckerOptions) func() (bool, error) {
switch hco.Component {
case types.KubeletComponent:
return getKubeletHealthCheckFunc(hco.HealthCheckTimeout)
return func() (bool, error) {
httpClient := http.Client{Timeout: hco.HealthCheckTimeout}
response, err := httpClient.Get(types.KubeletHealthCheckEndpoint)
if err != nil || response.StatusCode != http.StatusOK {
return false, nil
}
return true, nil
}
case types.DockerComponent:
return func() (bool, error) {
if _, err := execCommand(hco.HealthCheckTimeout, "docker", "ps"); err != nil {
Expand All @@ -132,7 +143,11 @@ func (hc *healthChecker) CheckHealth() (bool, error) {
if err != nil {
return healthy, err
}
if healthy {
logPatternHealthy, err := logPatternHealthCheck(hc.systemdService, hc.logPatternsToCheck)
if err != nil {
return logPatternHealthy, err
}
if healthy && logPatternHealthy {
return true, nil
}
// The service is unhealthy.
Expand Down Expand Up @@ -165,36 +180,38 @@ func execCommand(timeout time.Duration, command string, args ...string) (string,
return strings.TrimSuffix(string(out), "\n"), nil
}

// kubeletHttpHealthCheck checks the health api response on kubelet.
// Returns true for healthy, false otherwise.
func kubeletHttpHealthCheck(healthCheckTimeout time.Duration) bool {
httpClient := http.Client{Timeout: healthCheckTimeout}
response, err := httpClient.Get(types.KubeletHealthCheckEndpoint)
if err != nil || response.StatusCode != http.StatusOK {
glog.Info("kubelet failed http health check")
return false
// logPatternHealthCheck checks for the provided logPattern occurrences in the service logs.
// Returns true if the pattern is empty or does not exist logThresholdCount times since start of service, false otherwise.
func logPatternHealthCheck(service string, logPatternsToCheck map[string]int) (bool, error) {
if len(logPatternsToCheck) == 0 {
return true, nil
}
return true
}

// kubeletConnectionHealthCheck checks for the kubelet-apiserver connection issue
// by checking repeated occurrences of log "use of closed network connection" in kubelet logs.
// Returns true if the pattern does not exist 10 times since start of service or the last 10 min, false otherwise.
func kubeletConnectionHealthCheck() (bool, error) {
kubeletUptimeFunc := getUptimeFunc(types.KubeletComponent)
uptime, err := kubeletUptimeFunc()
uptimeFunc := getUptimeFunc(service)
uptime, err := uptimeFunc()
if err != nil {
return true, err
}
logStartTime := time.Now().Add(-uptime).Format(types.LogParsingTimeLayout)
if err != nil {
return true, err
}
for pattern, count := range logPatternsToCheck {
healthy, err := checkForPattern(service, logStartTime, pattern, count)
if err != nil || !healthy {
return healthy, err
}
}
return true, nil
}

// checkForPattern returns (true, nil) if logPattern occurs atleast logCountThreshold number of times since last
// service restart. (false, nil) otherwise.
func checkForPattern(service, logStartTime, logPattern string, logCountThreshold int) (bool, error) {
out, err := execCommand(types.CmdTimeout, "/bin/sh", "-c",
// Query kubelet logs since the logStartTime
`journalctl --unit kubelet --since "`+logStartTime+
// Grep the pattern for lost connection
`" | grep -i "`+types.KubeletClosedConnectionLogPattern+
// Query service logs since the logStartTime
`journalctl --unit "`+service+`" --since "`+logStartTime+
// Grep the pattern
`" | grep -i "`+logPattern+
// Get the count of occurrences
`" | wc -l`)
if err != nil {
Expand All @@ -204,26 +221,9 @@ func kubeletConnectionHealthCheck() (bool, error) {
if err != nil {
return true, err
}
if occurrences >= types.KubeletClosedConnectionLogPatternThresholdCount {
glog.Infof("kubelet failed apiserver connection check, log pattern occurrences: %v", occurrences)
if occurrences >= logCountThreshold {
glog.Infof("%s failed log pattern check, %s occurrences: %v", service, logPattern, occurrences)
return false, nil
}
return true, nil
}

// getKubeletHealthCheckFunc returns a function that checks for kubelet health and
// return false if identified as unhealthy, true otherwise.
func getKubeletHealthCheckFunc(healthCheckTimeout time.Duration) func() (bool, error) {
return func() (bool, error) {
httpHealthy := kubeletHttpHealthCheck(healthCheckTimeout)
connectionHealthy, err := kubeletConnectionHealthCheck()
// The plugin will return Unknown status code in case there is any error in
// checking kubelet health.
if err != nil {
glog.Infof("Error in determining apiserver connection health: %v", err)
return false, err
}
healthy := httpHealthy && connectionHealthy
return healthy, nil
}
}
65 changes: 61 additions & 4 deletions pkg/healthchecker/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ limitations under the License.

package types

import "time"
import (
"fmt"
"strconv"
"strings"
"time"
)

const (
DefaultCoolDownTime = 2 * time.Minute
Expand All @@ -33,11 +38,63 @@ const (
DockerComponent = "docker"
ContainerdService = "containerd"

KubeletHealthCheckEndpoint = "http://127.0.0.1:10248/healthz"
KubeletClosedConnectionLogPattern = "use of closed network connection"
KubeletClosedConnectionLogPatternThresholdCount = 10
KubeletHealthCheckEndpoint = "http://127.0.0.1:10248/healthz"

LogPatternFlagSeparator = ":"
)

type HealthChecker interface {
CheckHealth() (bool, error)
}

// LogPatternFlag defines the flag for log pattern health check.
// It contains a map of <log pattern> to <failure threshold for the pattern>
type LogPatternFlag struct {
logPatternCountMap map[string]int
}

// String implements the String function for flag.Value interface
func (lpf *LogPatternFlag) String() string {
result := ""
for k, v := range lpf.logPatternCountMap {
if result != "" {
result += " "
}
result += fmt.Sprintf("%v:%v", k, v)
}
return result
}

// Set implements the Set function for flag.Value interface
func (lpf *LogPatternFlag) Set(value string) error {
if lpf.logPatternCountMap == nil {
lpf.logPatternCountMap = make(map[string]int)
}
items := strings.Split(value, ",")
for _, item := range items {
val := strings.SplitN(item, LogPatternFlagSeparator, 2)
if len(val) != 2 {
return fmt.Errorf("invalid format of the flag value: %v", val)
}
countThreshold, err := strconv.Atoi(val[0])
if err != nil || countThreshold == 0 {
return fmt.Errorf("invalid format for the flag value: %v: %v", val, err)
}
pattern := val[1]
if pattern == "" {
return fmt.Errorf("invalid format for the flag value: %v: %v", val, err)
}
lpf.logPatternCountMap[pattern] = countThreshold
}
return nil
}

// Type implements the Type function for flag.Value interface
func (lpf *LogPatternFlag) Type() string {
return "logPatternFlag"
}

// GetLogPatternCountMap returns the stored log count map
func (lpf *LogPatternFlag) GetLogPatternCountMap() map[string]int {
return lpf.logPatternCountMap
}
100 changes: 100 additions & 0 deletions pkg/healthchecker/types/types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
Copyright 2021 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package types

import (
"reflect"
"testing"

"github.com/stretchr/testify/assert"
)

func TestLogPatternFlag(t *testing.T) {
testCases := []struct {
name string
value string
expectedStringVal string
expectedLogPatternCountMap map[string]int
expectSetError bool
}{
{
name: "valid single flag value",
value: "10:pattern1",
expectedStringVal: "pattern1:10",
expectedLogPatternCountMap: map[string]int{"pattern1": 10},
expectSetError: false,
},
{
name: "valid multiple flag values",
value: "10:pattern1,20:pattern2",
expectedStringVal: "pattern1:10 pattern2:20",
expectedLogPatternCountMap: map[string]int{"pattern1": 10, "pattern2": 20},
expectSetError: false,
},
{
name: "empty log pattern",
value: "10:",
expectSetError: true,
},
{
name: "0 failure threshold count",
value: "0:pattern1",
expectSetError: true,
},
{
name: "empty failure threshold count",
value: ":pattern1",
expectSetError: true,
},
{
name: "empty failure threshold count and pattern",
value: ":",
expectSetError: true,
},
{
name: "non integer value in failure threshold",
value: "notAnInteger:pattern1",
expectSetError: true,
},
{
name: "valid log pattern with ':'",
value: "10:pattern1a:pattern1b,20:pattern2",
expectedStringVal: "pattern1a:pattern1b:10 pattern2:20",
expectedLogPatternCountMap: map[string]int{"pattern1a:pattern1b": 10, "pattern2": 20},
expectSetError: false,
},
}

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
flag := LogPatternFlag{}
err := flag.Set(test.value)
if test.expectSetError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
actualStringVal := flag.String()
actualLogPatternCountMap := flag.GetLogPatternCountMap()
assert.Equal(t, test.expectedStringVal, actualStringVal)
if !reflect.DeepEqual(test.expectedLogPatternCountMap, actualLogPatternCountMap) {
t.Fatalf("logPatternCountMap mismatch, expected: %v, actual: %v", test.expectedLogPatternCountMap, actualLogPatternCountMap)
}
assert.Equal(t, test.expectedLogPatternCountMap, actualLogPatternCountMap)
}
})
}
}

0 comments on commit 100f2bf

Please sign in to comment.