From 6c285f900fa9bde8fd0835103aaaed4cc8265fe4 Mon Sep 17 00:00:00 2001 From: Dorian Zaccaria Date: Wed, 6 May 2015 17:13:57 -0400 Subject: [PATCH] [mesos] Support multiple versions and Updates for the review * Change mesos stats endpoint for versions above 0.22.0 * Simplify syntax * Update tests for @degemer refactor --- checks.d/mesos_master.py | 117 ++++++++---------- checks.d/mesos_slave.py | 106 +++++++--------- requirements.txt | 2 +- .../integration}/test_mesos_master.py | 4 +- .../integration}/test_mesos_slave.py | 2 +- 5 files changed, 106 insertions(+), 125 deletions(-) rename tests/{ => checks/integration}/test_mesos_master.py (98%) rename tests/{ => checks/integration}/test_mesos_slave.py (99%) diff --git a/checks.d/mesos_master.py b/checks.d/mesos_master.py index e6a954834a..07edbdbe68 100644 --- a/checks.d/mesos_master.py +++ b/checks.d/mesos_master.py @@ -6,12 +6,12 @@ from hashlib import md5 import time -# project -from checks import AgentCheck - # 3rd party import requests +# project +from checks import AgentCheck, CheckException + class MesosMaster(AgentCheck): GAUGE = AgentCheck.gauge @@ -34,13 +34,14 @@ class MesosMaster(AgentCheck): # These metrics are aggregated only on the elected master CLUSTER_TASKS_METRICS = { - 'staged_tasks' : ('mesos.cluster.staged_tasks', GAUGE), - 'started_tasks' : ('mesos.cluster.started_tasks', GAUGE), - 'finished_tasks' : ('mesos.cluster.finished_tasks', GAUGE), - 'killed_tasks' : ('mesos.cluster.killed_tasks', GAUGE), - 'failed_tasks' : ('mesos.cluster.failed_tasks', GAUGE), - 'lost_tasks' : ('mesos.cluster.lost_tasks', GAUGE), - 'active_tasks_gauge' : ('mesos.cluster.active_tasks_gauge', GAUGE), + 'master/tasks_error' : ('mesos.cluster.tasks_error', GAUGE), + 'master/tasks_failed' : ('mesos.cluster.tasks_failed', GAUGE), + 'master/tasks_finished' : ('mesos.cluster.tasks_finished', GAUGE), + 'master/tasks_killed' : ('mesos.cluster.tasks_killed', GAUGE), + 'master/tasks_lost' : ('mesos.cluster.tasks_lost', GAUGE), + 'master/tasks_running' : ('mesos.cluster.tasks_running', GAUGE), + 'master/tasks_staging' : ('mesos.cluster.tasks_staging', GAUGE), + 'master/tasks_starting' : ('mesos.cluster.tasks_starting', GAUGE), } # These metrics are aggregated only on the elected master @@ -109,9 +110,6 @@ class MesosMaster(AgentCheck): # These metrics are aggregated only on the elected master STATS_METRICS = { - 'active_schedulers' : ('mesos.cluster.active_schedulers', GAUGE), - 'total_schedulers' : ('mesos.cluster.total_schedulers', GAUGE), - 'outstanding_offers' : ('mesos.cluster.outstanding_offers', GAUGE), 'master/dropped_messages' : ('mesos.cluster.dropped_messages', GAUGE), 'master/outstanding_offers' : ('mesos.cluster.outstanding_offers', GAUGE), 'master/event_queue_dispatches' : ('mesos.cluster.event_queue_dispatches', GAUGE), @@ -125,24 +123,6 @@ class MesosMaster(AgentCheck): 'master/valid_status_updates' : ('mesos.cluster.valid_status_updates', GAUGE), } - def _timeout_event(self, url, timeout, aggregation_key): - self.event({ - 'timestamp': int(time.time()), - 'event_type': 'http_check', - 'msg_title': 'URL timeout', - 'msg_text': '%s timed out after %s seconds.' % (url, timeout), - 'aggregation_key': aggregation_key - }) - - def _status_code_event(self, url, r, aggregation_key): - self.event({ - 'timestamp': int(time.time()), - 'event_type': 'http_check', - 'msg_title': 'Invalid reponse code for %s' % url, - 'msg_text': '%s returned a status of %s' % (url, r.status_code), - 'aggregation_key': aggregation_key - }) - def _get_json(self, url, timeout): # Use a hash of the URL as an aggregation key aggregation_key = md5(url).hexdigest() @@ -152,7 +132,6 @@ def _get_json(self, url, timeout): try: r = requests.get(url, timeout=timeout) if r.status_code != 200: - self._status_code_event(url, r, aggregation_key) status = AgentCheck.CRITICAL msg = "Got %s when hitting %s" % (r.status_code, url) else: @@ -160,11 +139,10 @@ def _get_json(self, url, timeout): msg = "Mesos master instance detected at %s " % url except requests.exceptions.Timeout as e: # If there's a timeout - self._timeout_event(url, timeout, aggregation_key) msg = "%s seconds timeout when hitting %s" % (timeout, url) status = AgentCheck.CRITICAL except Exception as e: - msg = e.message + msg = str(e) status = AgentCheck.CRITICAL finally: if self.SERVICE_CHECK_NEEDED: @@ -172,8 +150,9 @@ def _get_json(self, url, timeout): message=msg) self.SERVICE_CHECK_NEEDED = False if status is AgentCheck.CRITICAL: - self.warning(msg) - return None + self.service_check(self.SERVICE_CHECK_NAME, status, tags=tags, + message=msg) + raise CheckException("Cannot connect to mesos, please check your configuration.") return r.json() @@ -181,19 +160,25 @@ def _get_master_state(self, url, timeout): return self._get_json(url + '/state.json', timeout) def _get_master_stats(self, url, timeout): - return self._get_json(url + '/stats.json', timeout) + if self.version >= [0, 22, 0]: + endpoint = '/metrics/snapshot' + else: + endpoint = '/stats.json' + return self._get_json(url + endpoint, timeout) def _get_master_roles(self, url, timeout): return self._get_json(url + '/roles.json', timeout) def _check_leadership(self, url, timeout): - json = self._get_master_state(url, timeout) + state_metrics = self._get_master_state(url, timeout) - if json is not None and json['leader'] == json['pid']: - self.leader = True + if state_metrics is not None: + self.version = map(int, state_metrics['version'].split('.')) + if state_metrics['leader'] == state_metrics['pid']: + self.leader = True else: self.leader = False - return json + return state_metrics def check(self, instance): if 'url' not in instance: @@ -204,38 +189,44 @@ def check(self, instance): default_timeout = self.init_config.get('default_timeout', 5) timeout = float(instance.get('timeout', default_timeout)) - json = self._check_leadership(url, timeout) - if json: - tags = ['mesos_cluster:' + json['cluster'], 'mesos_pid:' + json['pid'], 'mesos_id:' + json['id'], 'mesos_node:master'] + instance_tags + state_metrics = self._check_leadership(url, timeout) + if state_metrics: + tags = [ + 'mesos_cluster:{0}'.format(state_metrics['cluster']), + 'mesos_pid:{0}'.format(state_metrics['pid']), + 'mesos_node:master' + ] + tags += instance_tags if self.leader: - self.GAUGE('mesos.cluster.total_frameworks', len(json['frameworks']), tags=tags) + self.GAUGE('mesos.cluster.total_frameworks', len(state_metrics['frameworks']), tags=tags) - for framework in json['frameworks']: - framework_tags = ['framework:' + framework['id'], 'framework_name:' + framework['name']] + tags + for framework in state_metrics['frameworks']: + framework_tags = ['framework_name:' + framework['name']] + tags self.GAUGE('mesos.framework.total_tasks', len(framework['tasks']), tags=framework_tags) resources = framework['used_resources'] - [v[1](self, v[0], resources[k], tags=framework_tags) for k, v in self.FRAMEWORK_METRICS.iteritems()] + for key_name, (metric_name, metric_func) in self.FRAMEWORK_METRICS.iteritems(): + metric_func(self, metric_name, resources[key_name], tags=framework_tags) - json = self._get_master_roles(url, timeout) - if json is not None: - for role in json['roles']: + role_metrics = self._get_master_roles(url, timeout) + if role_metrics is not None: + for role in role_metrics['roles']: role_tags = ['mesos_role:' + role['name']] + tags - self.GAUGE('mesos.role.frameworks', len(role['frameworks']), tags=role_tags) + self.GAUGE('mesos.role.frameworks.count', len(role['frameworks']), tags=role_tags) self.GAUGE('mesos.role.weight', role['weight'], tags=role_tags) - [v[1](self, v[0], role['resources'][k], tags=role_tags) for k, v in self.ROLE_RESOURCES_METRICS.iteritems()] + for key_name, (metric_name, metric_func) in self.ROLE_RESOURCES_METRICS.iteritems(): + metric_func(self, metric_name, role['resources'][key_name], tags=role_tags) - json = self._get_master_stats(url, timeout) - if json is not None: + stats_metrics = self._get_master_stats(url, timeout) + if stats_metrics is not None: + metrics = [self.SYSTEM_METRICS] if self.leader: - metrics = {} - for d in (self.CLUSTER_TASKS_METRICS, self.CLUSTER_SLAVES_METRICS, - self.CLUSTER_RESOURCES_METRICS, self.CLUSTER_REGISTRAR_METRICS, - self.CLUSTER_FRAMEWORK_METRICS, self.SYSTEM_METRICS, self.STATS_METRICS): - metrics.update(d) - else: - metrics = self.SYSTEM_METRICS - [v[1](self, v[0], json[k], tags=tags) for k, v in metrics.iteritems()] + metrics += [self.CLUSTER_TASKS_METRICS, self.CLUSTER_SLAVES_METRICS, + self.CLUSTER_RESOURCES_METRICS, self.CLUSTER_REGISTRAR_METRICS, + self.CLUSTER_FRAMEWORK_METRICS, self.STATS_METRICS] + for m in metrics: + for key_name, (metric_name, metric_func) in m.iteritems(): + metric_func(self, metric_name, stats_metrics[key_name], tags=tags) self.SERVICE_CHECK_NEEDED = True diff --git a/checks.d/mesos_slave.py b/checks.d/mesos_slave.py index b4e83e24e5..fd5bb4ec3e 100644 --- a/checks.d/mesos_slave.py +++ b/checks.d/mesos_slave.py @@ -6,12 +6,12 @@ from hashlib import md5 import time -# project -from checks import AgentCheck - # 3rd party import requests +# project +from checks import AgentCheck + class MesosSlave(AgentCheck): GAUGE = AgentCheck.gauge @@ -37,14 +37,13 @@ class MesosSlave(AgentCheck): } SLAVE_TASKS_METRICS = { - 'failed_tasks' : ('mesos.slave.failed_tasks', GAUGE), - 'finished_tasks' : ('mesos.slave.finished_tasks', GAUGE), - 'killed_tasks' : ('mesos.slave.killed_tasks', GAUGE), - 'lost_tasks' : ('mesos.slave.lost_tasks', GAUGE), - 'staged_tasks' : ('mesos.slave.staged_tasks', GAUGE), - 'started_tasks' : ('mesos.slave.started_tasks', GAUGE), - 'launched_tasks_gauge' : ('mesos.slave.launched_tasks_gauge', GAUGE), - 'queued_tasks_gauge' : ('mesos.slave.queued_tasks_gauge', GAUGE), + 'slave/tasks_failed' : ('mesos.slave.tasks_failed', GAUGE), + 'slave/tasks_finished' : ('mesos.slave.tasks_finished', GAUGE), + 'slave/tasks_killed' : ('mesos.slave.tasks_killed', GAUGE), + 'slave/tasks_lost' : ('mesos.slave.tasks_lost', GAUGE), + 'slave/tasks_running' : ('mesos.slave.tasks_running', GAUGE), + 'slave/tasks_staging' : ('mesos.slave.tasks_staging', GAUGE), + 'slave/tasks_starting' : ('mesos.slave.tasks_starting', GAUGE), } SYSTEM_METRICS = { @@ -78,7 +77,6 @@ class MesosSlave(AgentCheck): } STATS_METRICS = { - 'total_frameworks' : ('mesos.slave.total_frameworks', GAUGE), 'slave/frameworks_active' : ('mesos.slave.frameworks_active', GAUGE), 'slave/invalid_framework_messages' : ('mesos.slave.invalid_framework_messages', GAUGE), 'slave/invalid_status_updates' : ('mesos.slave.invalid_status_updates', GAUGE), @@ -87,25 +85,9 @@ class MesosSlave(AgentCheck): 'slave/valid_status_updates' : ('mesos.slave.valid_status_updates', GAUGE), } - cluster_name = None - - def _timeout_event(self, url, timeout, aggregation_key): - self.event({ - 'timestamp': int(time.time()), - 'event_type': 'http_check', - 'msg_title': 'URL timeout', - 'msg_text': '%s timed out after %s seconds.' % (url, timeout), - 'aggregation_key': aggregation_key - }) - - def _status_code_event(self, url, r, aggregation_key): - self.event({ - 'timestamp': int(time.time()), - 'event_type': 'http_check', - 'msg_title': 'Invalid reponse code for %s' % url, - 'msg_text': '%s returned a status of %s' % (url, r.status_code), - 'aggregation_key': aggregation_key - }) + def __init__(self, name, init_config, agentConfig, instances=None): + AgentCheck.__init__(self, name, init_config, agentConfig, instances) + self.cluster_name = None def _get_json(self, url, timeout): # Use a hash of the URL as an aggregation key @@ -116,7 +98,6 @@ def _get_json(self, url, timeout): try: r = requests.get(url, timeout=timeout) if r.status_code != 200: - self._status_code_event(url, r, aggregation_key) status = AgentCheck.CRITICAL msg = "Got %s when hitting %s" % (r.status_code, url) else: @@ -124,19 +105,17 @@ def _get_json(self, url, timeout): msg = "Mesos master instance detected at %s " % url except requests.exceptions.Timeout as e: # If there's a timeout - self._timeout_event(url, timeout, aggregation_key) msg = "%s seconds timeout when hitting %s" % (timeout, url) status = AgentCheck.CRITICAL except Exception as e: - msg = e.message + msg = str(e) status = AgentCheck.CRITICAL finally: if self.SERVICE_CHECK_NEEDED: self.service_check(self.SERVICE_CHECK_NAME, status, tags=tags, message=msg) self.SERVICE_CHECK_NEEDED = False if status is AgentCheck.CRITICAL: - self.warning(msg) - return None + raise CheckException("Cannot connect to mesos, please check your configuration.") return r.json() @@ -144,18 +123,23 @@ def _get_state(self, url, timeout): return self._get_json(url + '/state.json', timeout) def _get_stats(self, url, timeout): - return self._get_json(url + '/stats.json', timeout) + if self.version >= [0, 22, 0]: + endpoint = '/metrics/snapshot' + else: + endpoint = '/stats.json' + return self._get_json(url + endpoint, timeout) def _get_constant_attributes(self, url, timeout): - json = None + state_metrics = None if self.cluster_name is None: - json = self._get_state(url, timeout) - if json is not None: - master_state = self._get_state('http://' + json['master_hostname'] + ':5050', timeout) + state_metrics = self._get_state(url, timeout) + if state_metrics is not None: + self.version = map(int, state_metrics['version'].split('.')) + master_state = self._get_state('http://' + state_metrics['master_hostname'] + ':5050', timeout) if master_state is not None: self.cluster_name = master_state['cluster'] - return json + return state_metrics def check(self, instance): if 'url' not in instance: @@ -167,30 +151,36 @@ def check(self, instance): default_timeout = self.init_config.get('default_timeout', 5) timeout = float(instance.get('timeout', default_timeout)) - json = self._get_constant_attributes(url, timeout) + state_metrics = self._get_constant_attributes(url, timeout) tags = None - if json is None: - json = self._get_state(url, timeout) - if json: - tags = ['mesos_cluster:' + self.cluster_name, 'mesos_id:' + json['id'], 'mesos_pid:' + json['pid'], 'mesos_node:slave'] + instance_tags + if state_metrics is None: + state_metrics = self._get_state(url, timeout) + if state_metrics: + tags = [ + 'mesos_cluster:{0}'.format(self.cluster_name), + 'mesos_pid:{0}'.format(state_metrics['pid']), + 'mesos_node:slave' + ] + tags += instance_tags for task in tasks: - for framework in json['frameworks']: + for framework in state_metrics['frameworks']: for executor in framework['executors']: for t in executor['tasks']: - if task.lower() in t['name'].lower() and t['slave_id'] == json['id']: - task_tags = ['framework_id:' + t['framework_id'], 'executor_id:' + t['executor_id'], 'task_name:' + t['name']] + tags + if task.lower() in t['name'].lower() and t['slave_id'] == state_metrics['id']: + task_tags = ['task_name:' + t['name']] + tags self.service_check(t['name'] + '.ok', self.TASK_STATUS[t['state']], tags=task_tags) - [v[1](self, v[0], t['resources'][k], tags=task_tags) for k, v in self.TASK_METRICS.iteritems()] + for key_name, (metric_name, metric_func) in self.TASK_METRICS.iteritems(): + metric_func(self, metric_name, t['resources'][key_name], tags=task_tags) - json = self._get_stats(url, timeout) - if json: + stats_metrics = self._get_stats(url, timeout) + if stats_metrics: tags = tags if tags else instance_tags - metrics = {} - for d in (self.SLAVE_TASKS_METRICS, self.SYSTEM_METRICS, self.SLAVE_RESOURCE_METRICS, - self.SLAVE_EXECUTORS_METRICS, self.STATS_METRICS): - metrics.update(d) - [v[1](self, v[0], json[k], tags=tags) for k, v in metrics.iteritems()] + metrics = [self.SLAVE_TASKS_METRICS, self.SYSTEM_METRICS, self.SLAVE_RESOURCE_METRICS, + self.SLAVE_EXECUTORS_METRICS, self.STATS_METRICS] + for m in metrics: + for key_name, (metric_name, metric_func) in m.iteritems(): + metric_func(self, metric_name, stats_metrics[key_name], tags=tags) self.SERVICE_CHECK_NEEDED = True diff --git a/requirements.txt b/requirements.txt index 88afeff46c..af5775f485 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ ########################################################### # These modules are the deps needed by the -# agent core, meaning every module that is +# agent core, meaning every module that is # not a check # They're installed in the CI and when doing # a source install diff --git a/tests/test_mesos_master.py b/tests/checks/integration/test_mesos_master.py similarity index 98% rename from tests/test_mesos_master.py rename to tests/checks/integration/test_mesos_master.py index 177556b95c..89a38259ec 100644 --- a/tests/test_mesos_master.py +++ b/tests/checks/integration/test_mesos_master.py @@ -1,4 +1,4 @@ -from tests.common import AgentCheckTest, get_check_class +from tests.checks.common import AgentCheckTest, get_check_class from nose.plugins.attrib import attr from mock import patch @@ -305,5 +305,5 @@ def test_checks(self): [self.assertMetric(v[0]) for k, v in check.ROLE_RESOURCES_METRICS.iteritems()] self.assertMetric('mesos.cluster.total_frameworks') self.assertMetric('mesos.framework.total_tasks') - self.assertMetric('mesos.role.frameworks') + self.assertMetric('mesos.role.frameworks.count') self.assertMetric('mesos.role.weight') diff --git a/tests/test_mesos_slave.py b/tests/checks/integration/test_mesos_slave.py similarity index 99% rename from tests/test_mesos_slave.py rename to tests/checks/integration/test_mesos_slave.py index c74c6174d3..47b2e46d78 100644 --- a/tests/test_mesos_slave.py +++ b/tests/checks/integration/test_mesos_slave.py @@ -1,4 +1,4 @@ -from tests.common import AgentCheckTest, get_check_class +from tests.checks.common import AgentCheckTest, get_check_class from nose.plugins.attrib import attr from mock import patch