Skip to content

Commit

Permalink
add etcd increased-traffic check
Browse files Browse the repository at this point in the history
  • Loading branch information
juanvallejo committed Jul 5, 2017
1 parent 75a46c1 commit 1372c59
Show file tree
Hide file tree
Showing 4 changed files with 435 additions and 0 deletions.
151 changes: 151 additions & 0 deletions roles/openshift_health_checker/library/search_journalctl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#!/usr/bin/python
"""Interface to journalctl."""

from time import time
import json
import re
import subprocess

from ansible.module_utils.basic import AnsibleModule


class InvalidMatcherRegexp(Exception):
"""Exception class for invalid matcher regexp."""
pass


class InvalidLogEntry(Exception):
"""Exception class for invalid / non-json log entries."""
pass


class LogInputSubprocessError(Exception):
"""Exception class for errors that occur while executing a subprocess."""
pass


def main():
"""Scan a given list of "log_matchers" for journalctl messages containing given patterns.
"log_matchers" is a list of dicts consisting of three keys that help fine-tune log searching:
'start_regexp', 'regexp', and 'unit'.
Sample "log_matchers" list:
[
{
'start_regexp': r'Beginning of systemd unit',
'regexp': r'the specific log message to find',
'unit': 'etcd',
}
]
"""
module = AnsibleModule(
argument_spec=dict(
log_count_limit=dict(type="int", default=500),
log_matchers=dict(type="list", required=True),
),
)

timestamp_limit_seconds = time() - 60 * 60 # 1 hour

log_count_limit = module.params["log_count_limit"]
log_matchers = module.params["log_matchers"]

matched_regexp, errors = get_log_matches(log_matchers, log_count_limit, timestamp_limit_seconds)

module.exit_json(
changed=False,
failed=len(errors),
errors=errors,
matched=matched_regexp,
)


def get_log_matches(matchers, log_count_limit, timestamp_limit_seconds):
"""Iterate through a list of matchers, verify required fields exist on each one, and
retrieve log output for each matcher, appending its match pattern to a collection if
a matcher's regex pattern is found.
Return a collection of matched regular expressions, or raise a MatchErrorException if
any errors occur."""
matched_regexp = []
errors = []

for matcher in matchers:
try:
log_input = get_log_input(matcher)
except LogInputSubprocessError as err:
errors.append(str(err))
continue

try:
matched = find_matches(log_input, matcher, log_count_limit, timestamp_limit_seconds)
if matched:
matched_regexp.append(matcher.get("regexp", ""))
except InvalidMatcherRegexp as err:
errors.append(str(err))
except InvalidLogEntry as err:
errors.append(str(err))

return matched_regexp, errors


def get_log_input(matcher):
"""Run journalctl with the systemd unit specified in the matcher.
Return the command output as an iterator."""
try:
cmd_output = subprocess.Popen(list([
'/bin/journalctl',
'-ru', matcher.get("unit", ""),
'--output', 'json',
]), stdout=subprocess.PIPE)

return iter(cmd_output.stdout.readline, '')

except subprocess.CalledProcessError as exc:
msg = "Could not obtain journalctl logs for the specified systemd unit: {}: {}"
raise LogInputSubprocessError(msg.format(matcher.get("unit", "<missing>"), str(exc)))
except OSError as exc:
raise LogInputSubprocessError(str(exc))


def find_matches(log_input, matcher, log_count_limit, timestamp_limit_seconds):
"""Receive iterable input, a matcher dictionary and scan journalctl at the
given systemd module for log messages matching a given regular expression.
Return the matched log message (or None) and an error string if any."""
try:
regexp = re.compile(matcher.get("regexp", ""))
start_regexp = re.compile(matcher.get("start_regexp", ""))
except re.error as err:
msg = "A log matcher object was provided with an invalid regular expression: {}"
raise InvalidMatcherRegexp(msg.format(str(err)))

matched = None

for log_count, line in enumerate(log_input):
if log_count >= log_count_limit:
break

try:
obj = json.loads(line)

# don't need to look past the most recent service restart
if start_regexp.match(obj["MESSAGE"]):
break

log_timestamp_seconds = float(obj["__REALTIME_TIMESTAMP"]) / 1000000
if log_timestamp_seconds < timestamp_limit_seconds:
break

if regexp.match(obj["MESSAGE"]):
matched = line
break

except ValueError:
msg = "Log entry for systemd unit {} contained invalid json syntax: {}"
raise InvalidLogEntry(msg.format(matcher.get("unit"), line))

return matched


if __name__ == '__main__':
main()
47 changes: 47 additions & 0 deletions roles/openshift_health_checker/openshift_checks/etcd_traffic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Check that scans journalctl for messages caused as a symptom of increased etcd traffic."""

from openshift_checks import OpenShiftCheck, get_var


class EtcdTraffic(OpenShiftCheck):
"""Check if host is being affected by an increase in etcd traffic."""

name = "etcd_traffic"
tags = ["health", "etcd"]

@classmethod
def is_active(cls, task_vars):
"""Skip hosts that do not have etcd in their group names."""
group_names = get_var(task_vars, "group_names", default=[])
valid_group_names = "etcd" in group_names

version = get_var(task_vars, "openshift", "common", "short_version")
valid_version = version in ("3.4", "3.5", "1.4", "1.5")

return super(EtcdTraffic, cls).is_active(task_vars) and valid_group_names and valid_version

def run(self, tmp, task_vars):
is_containerized = get_var(task_vars, "openshift", "common", "is_containerized")
unit = "etcd_container" if is_containerized else "etcd"

log_matchers = [{
"start_regexp": r"Starting Etcd Server",
"regexp": r"etcd: sync duration of [^,]+, expected less than 1s",
"unit": unit
}]

match = self.execute_module("search_journalctl", {
"log_matchers": log_matchers,
}, task_vars)

if match.get("matched"):
msg = ("Higher than normal etcd traffic detected.\n"
"OpenShift 3.4 introduced an increase in etcd traffic.\n"
"Upgrading to OpenShift 3.6 is recommended in order to fix this issue.\n"
"Please refer to https://access.redhat.com/solutions/2916381 for more information.")
return {"failed": True, "msg": msg}

if match["failed"]:
return {"failed": True, "msg": "\n".join(match.get("errors"))}

return {}
80 changes: 80 additions & 0 deletions roles/openshift_health_checker/test/etcd_traffic_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import pytest

from openshift_checks.etcd_traffic import EtcdTraffic


@pytest.mark.parametrize('group_names,version,is_active', [
(['masters'], "3.5", False),
(['masters'], "3.6", False),
(['nodes'], "3.4", False),
(['etcd'], "3.4", True),
(['etcd'], "3.5", True),
(['etcd'], "3.1", False),
(['masters', 'nodes'], "3.5", False),
(['masters', 'etcd'], "3.5", True),
([], "3.4", False),
])
def test_is_active(group_names, version, is_active):
task_vars = dict(
group_names=group_names,
openshift=dict(
common=dict(short_version=version),
),
)
assert EtcdTraffic.is_active(task_vars=task_vars) == is_active


@pytest.mark.parametrize('group_names,matched,failed,extra_words', [
(["masters"], True, True, ["Higher than normal", "traffic"]),
(["masters", "etcd"], False, False, []),
(["etcd"], False, False, []),
])
def test_log_matches_high_traffic_msg(group_names, matched, failed, extra_words):
def execute_module(module_name, args, task_vars):
return {
"matched": matched,
"failed": failed,
}

task_vars = dict(
group_names=group_names,
openshift=dict(
common=dict(service_type="origin", is_containerized=False),
)
)

check = EtcdTraffic(execute_module=execute_module)
result = check.run(tmp=None, task_vars=task_vars)

for word in extra_words:
assert word in result.get("msg", "")

assert result.get("failed", False) == failed


@pytest.mark.parametrize('is_containerized,expected_unit_value', [
(False, "etcd"),
(True, "etcd_container"),
])
def test_systemd_unit_matches_deployment_type(is_containerized, expected_unit_value):
task_vars = dict(
openshift=dict(
common=dict(is_containerized=is_containerized),
)
)

def execute_module(module_name, args, task_vars):
assert module_name == "search_journalctl"
matchers = args["log_matchers"]

for matcher in matchers:
assert matcher["unit"] == expected_unit_value

return {"failed": False}

check = EtcdTraffic(execute_module=execute_module)
check.run(tmp=None, task_vars=task_vars)


def fake_execute_module(*args):
raise AssertionError('this function should not be called')
Loading

0 comments on commit 1372c59

Please sign in to comment.