Skip to content

Commit

Permalink
decrease pod_log_timeout by curl request time
Browse files Browse the repository at this point in the history
  • Loading branch information
juanvallejo committed Jun 23, 2017
1 parent c0918bc commit 8d40f83
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Module for ensuring logs from pods can be queried in a reasonable amount of time.
Check for ensuring logs from pods can be queried in a reasonable amount of time.
"""

import json
Expand All @@ -11,31 +11,23 @@
from openshift_checks.logging.logging import LoggingCheck


ES_CMD_TIMEOUT_SECONDS = 600
ES_DEFAULT_HOST = "logging-es"
ES_DEFAULT_INDEX = "project.logging*"
ES_CMD_TIMEOUT_SECONDS = 30


class PodLogQueryTime(LoggingCheck):
"""Check that logs from ElasticSearch pods can be queried within a reasonable amount of time."""

name = "pod_log_query_time"
class LoggingIndexTime(LoggingCheck):
"""Check that pod logs are aggregated and indexed in ElasticSearch within a reasonable amount of time."""
name = "logging_index_time"
tags = ["health", "logging"]

logging_namespace = "logging"
pod_es_host = ES_DEFAULT_INDEX
pod_es_index = ES_DEFAULT_HOST

def run(self, tmp, task_vars):
"""Add log entry by making unique request to Kibana. Check for unique entry in the ElasticSearch pod logs."""
self.pod_es_host = get_var(task_vars, "openshift_check_pod_elasticsearch_host", default=ES_DEFAULT_HOST)
self.pod_es_index = get_var(task_vars, "openshift_check_pod_elasticsearch_index", default=ES_DEFAULT_INDEX)

pod_log_timeout = int(get_var(task_vars, "openshift_check_pod_logs_timeout", default=ES_CMD_TIMEOUT_SECONDS))

# get all Kibana pods
self.logging_namespace = get_var(task_vars, "openshift_logging_namespace", default=self.logging_namespace)
pods, error = super(PodLogQueryTime, self).get_pods_for_component(
pods, error = super(LoggingIndexTime, self).get_pods_for_component(
self.execute_module,
self.logging_namespace,
"kibana",
Expand All @@ -52,8 +44,7 @@ def run(self, tmp, task_vars):
return {"failed": True, "changed": False, "msg": msg}

# get all Elasticsearch pods
self.logging_namespace = get_var(task_vars, "openshift_logging_namespace", default=self.logging_namespace)
pods, error = super(PodLogQueryTime, self).get_pods_for_component(
pods, error = super(LoggingIndexTime, self).get_pods_for_component(
self.execute_module,
self.logging_namespace,
"es",
Expand All @@ -69,61 +60,43 @@ def run(self, tmp, task_vars):
'At least one Elasticsearch pod is required in order to perform this check.')
return {"failed": True, "changed": False, "msg": msg}

error = self.check_log_delay(running_kibana_pods[0], running_es_pods[0], pod_log_timeout, task_vars)
if error:
return {"failed": True, "msg": error}

self.check_log_delay(running_kibana_pods[0], running_es_pods[0], pod_log_timeout, task_vars)
return {}

def check_log_delay(self, kibana_pod, es_pod, pod_log_timeout, task_vars):
"""Check delay of logs and return any errors accordingly"""
error = None

try:
uid = self.curl_kibana_with_uuid(kibana_pod, task_vars)
self.wait_until_cmd_or_err(es_pod, uid, pod_log_timeout, task_vars)
except OpenShiftCheckException as error:
error = str(error)
except KeyError as error:
error = "Invalid json response from pod: {}".format(str(error))
except ValueError as error:
error = "Invalid response from pod {}".format(str(error))

return error

@staticmethod
def running_pods(pods):
"""Returns: list of pods in a running state"""
return [
pod for pod in pods
if pod['status']['phase'] == 'Running'
]
uid = self.curl_kibana_with_uuid(kibana_pod, task_vars)
self.wait_until_cmd_or_err(es_pod, uid, pod_log_timeout, task_vars)

def wait_until_cmd_or_err(self, es_pod, uid, timeout_secs, task_vars):
"""Wait a maximum of timeout_secs for the uuid logged in Kibana to be found in
the Elasticsearch logs. Since we are querying for a message with the the unique
the Elasticsearch logs. Since we are querying for a message with the unique
uuid from earlier, there should only be a single match.
Raise an OpenShiftCheckException if the timeout is reached before a result is found."""

interval = 1 # seconds to wait between retries
orig_timeout = timeout_secs
timeout_orig = timeout_secs

while timeout_secs > 0:
time.sleep(interval)
timeout_secs -= interval

time_start = int(time.time())
total = self.query_es_from_es(es_pod, uid, task_vars)
time_end = int(time.time())
if total == 1:
return

timeout_secs -= (time_end - time_start)

msg = "expecting match in Elasticsearch for message with uuid {}, but no matches were found after {}s."
raise OpenShiftCheckException(msg.format(uid, orig_timeout))
raise OpenShiftCheckException(msg.format(uid, timeout_orig))

def curl_kibana_with_uuid(self, kibanna_pod, task_vars):
"""curl Kibana with a unique uuid."""
uid = str(self.generate_uuid())
uid = self.generate_uuid()
pod_name = kibanna_pod["metadata"]["name"]
exec_cmd = "exec {pod_name} -c kibana -- curl --connect-timeout 60 -s http://localhost:5601/{uid}"
exec_cmd = "exec {pod_name} -c kibana -- curl --max-time 30 -s http://localhost:5601/{uid}"

error_str = self.oc_cmd(
exec_cmd.format(
Expand All @@ -132,7 +105,15 @@ def curl_kibana_with_uuid(self, kibanna_pod, task_vars):
),
[], task_vars,
)
error_code = json.loads(error_str).get("statusCode")

try:
error_code = json.loads(error_str)["statusCode"]
except KeyError:
msg = 'invalid json response returned by Kibana pod "{}". Missing "statusCode" key: {}'
raise OpenShiftCheckException(msg.format(pod_name, error_str))
except ValueError:
msg = 'non-json response returned by Kibana pod "{}": {}'
raise OpenShiftCheckException(msg.format(pod_name, error_str))

if error_code != 404:
msg = "expecting server error code 404, but got {} instead."
Expand All @@ -143,29 +124,49 @@ def curl_kibana_with_uuid(self, kibanna_pod, task_vars):
def query_es_from_es(self, es_pod, uid, task_vars):
"""curl the Elasticsearch pod and look for a unique uuid in its logs."""
pod_name = es_pod["metadata"]["name"]
exec_cmd = ("exec {pod_name} -- curl --connect-timeout 60 -s -k -f "
"--cert /etc/elasticsearch/secret/admin-cert "
"--key /etc/elasticsearch/secret/admin-key "
"https://{es_host}:9200/{es_index}/_count?q=message:{uid}")
exec_cmd = (
"exec {pod_name} -- curl --max-time 30 -s -k -f "
"--cert /etc/elasticsearch/secret/admin-cert "
"--key /etc/elasticsearch/secret/admin-key "
"https://logging-es:9200/project.{namespace}*/_count?q=message:{uid}"
)

result = self.oc_cmd(
exec_cmd.format(
pod_name=pod_name,
es_host=self.pod_es_host,
es_index=self.pod_es_index,
namespace=self.logging_namespace,
uid=uid
),
[], task_vars,
)
return json.loads(result)["count"]

try:
count = json.loads(result)["count"]
except KeyError:
msg = 'invalid json response returned by Elasticsearch pod "{}". Missing "count" key: {}'
raise OpenShiftCheckException(msg.format(pod_name, result))
except ValueError:
msg = 'non-json response returned by Elasticsearch pod "{}": {}'
raise OpenShiftCheckException(msg.format(pod_name, result))

return count

@staticmethod
def running_pods(pods):
"""Returns: list of pods in a running state"""
return [
pod for pod in pods
if pod['status']['phase'] == 'Running'
]

@staticmethod
def generate_uuid():
return uuid4()
"""Wrap uuid generator. Allows for testing with expected values."""
return str(uuid4())

def oc_cmd(self, cmd_str, extra_args, task_vars):
"""Wrap parent exec_oc method. Allows for testing without actually invoking other modules."""
return super(PodLogQueryTime, self).exec_oc(
return super(LoggingIndexTime, self).exec_oc(
self.execute_module,
self.logging_namespace,
cmd_str,
Expand Down

0 comments on commit 8d40f83

Please sign in to comment.