Skip to content

Commit

Permalink
[mesos] Support multiple versions and Updates for the review
Browse files Browse the repository at this point in the history
* Change mesos stats endpoint for versions above 0.22.0
* Simplify syntax
* Update tests for @degemer refactor
  • Loading branch information
DorianZaccaria committed May 13, 2015
1 parent 6f9f3c6 commit 6c285f9
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 125 deletions.
117 changes: 54 additions & 63 deletions checks.d/mesos_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from hashlib import md5
import time

# project
from checks import AgentCheck

# 3rd party
import requests

# project
from checks import AgentCheck, CheckException


class MesosMaster(AgentCheck):
GAUGE = AgentCheck.gauge
Expand All @@ -34,13 +34,14 @@ class MesosMaster(AgentCheck):

# These metrics are aggregated only on the elected master
CLUSTER_TASKS_METRICS = {
'staged_tasks' : ('mesos.cluster.staged_tasks', GAUGE),
'started_tasks' : ('mesos.cluster.started_tasks', GAUGE),
'finished_tasks' : ('mesos.cluster.finished_tasks', GAUGE),
'killed_tasks' : ('mesos.cluster.killed_tasks', GAUGE),
'failed_tasks' : ('mesos.cluster.failed_tasks', GAUGE),
'lost_tasks' : ('mesos.cluster.lost_tasks', GAUGE),
'active_tasks_gauge' : ('mesos.cluster.active_tasks_gauge', GAUGE),
'master/tasks_error' : ('mesos.cluster.tasks_error', GAUGE),
'master/tasks_failed' : ('mesos.cluster.tasks_failed', GAUGE),
'master/tasks_finished' : ('mesos.cluster.tasks_finished', GAUGE),
'master/tasks_killed' : ('mesos.cluster.tasks_killed', GAUGE),
'master/tasks_lost' : ('mesos.cluster.tasks_lost', GAUGE),
'master/tasks_running' : ('mesos.cluster.tasks_running', GAUGE),
'master/tasks_staging' : ('mesos.cluster.tasks_staging', GAUGE),
'master/tasks_starting' : ('mesos.cluster.tasks_starting', GAUGE),
}

# These metrics are aggregated only on the elected master
Expand Down Expand Up @@ -109,9 +110,6 @@ class MesosMaster(AgentCheck):

# These metrics are aggregated only on the elected master
STATS_METRICS = {
'active_schedulers' : ('mesos.cluster.active_schedulers', GAUGE),
'total_schedulers' : ('mesos.cluster.total_schedulers', GAUGE),
'outstanding_offers' : ('mesos.cluster.outstanding_offers', GAUGE),
'master/dropped_messages' : ('mesos.cluster.dropped_messages', GAUGE),
'master/outstanding_offers' : ('mesos.cluster.outstanding_offers', GAUGE),
'master/event_queue_dispatches' : ('mesos.cluster.event_queue_dispatches', GAUGE),
Expand All @@ -125,24 +123,6 @@ class MesosMaster(AgentCheck):
'master/valid_status_updates' : ('mesos.cluster.valid_status_updates', GAUGE),
}

def _timeout_event(self, url, timeout, aggregation_key):
self.event({
'timestamp': int(time.time()),
'event_type': 'http_check',
'msg_title': 'URL timeout',
'msg_text': '%s timed out after %s seconds.' % (url, timeout),
'aggregation_key': aggregation_key
})

def _status_code_event(self, url, r, aggregation_key):
self.event({
'timestamp': int(time.time()),
'event_type': 'http_check',
'msg_title': 'Invalid reponse code for %s' % url,
'msg_text': '%s returned a status of %s' % (url, r.status_code),
'aggregation_key': aggregation_key
})

def _get_json(self, url, timeout):
# Use a hash of the URL as an aggregation key
aggregation_key = md5(url).hexdigest()
Expand All @@ -152,48 +132,53 @@ def _get_json(self, url, timeout):
try:
r = requests.get(url, timeout=timeout)
if r.status_code != 200:
self._status_code_event(url, r, aggregation_key)
status = AgentCheck.CRITICAL
msg = "Got %s when hitting %s" % (r.status_code, url)
else:
status = AgentCheck.OK
msg = "Mesos master instance detected at %s " % url
except requests.exceptions.Timeout as e:
# If there's a timeout
self._timeout_event(url, timeout, aggregation_key)
msg = "%s seconds timeout when hitting %s" % (timeout, url)
status = AgentCheck.CRITICAL
except Exception as e:
msg = e.message
msg = str(e)
status = AgentCheck.CRITICAL
finally:
if self.SERVICE_CHECK_NEEDED:
self.service_check(self.SERVICE_CHECK_NAME, status, tags=tags,
message=msg)
self.SERVICE_CHECK_NEEDED = False
if status is AgentCheck.CRITICAL:
self.warning(msg)
return None
self.service_check(self.SERVICE_CHECK_NAME, status, tags=tags,
message=msg)
raise CheckException("Cannot connect to mesos, please check your configuration.")

return r.json()

def _get_master_state(self, url, timeout):
return self._get_json(url + '/state.json', timeout)

def _get_master_stats(self, url, timeout):
return self._get_json(url + '/stats.json', timeout)
if self.version >= [0, 22, 0]:
endpoint = '/metrics/snapshot'
else:
endpoint = '/stats.json'
return self._get_json(url + endpoint, timeout)

def _get_master_roles(self, url, timeout):
return self._get_json(url + '/roles.json', timeout)

def _check_leadership(self, url, timeout):
json = self._get_master_state(url, timeout)
state_metrics = self._get_master_state(url, timeout)

if json is not None and json['leader'] == json['pid']:
self.leader = True
if state_metrics is not None:
self.version = map(int, state_metrics['version'].split('.'))
if state_metrics['leader'] == state_metrics['pid']:
self.leader = True
else:
self.leader = False
return json
return state_metrics

def check(self, instance):
if 'url' not in instance:
Expand All @@ -204,38 +189,44 @@ def check(self, instance):
default_timeout = self.init_config.get('default_timeout', 5)
timeout = float(instance.get('timeout', default_timeout))

json = self._check_leadership(url, timeout)
if json:
tags = ['mesos_cluster:' + json['cluster'], 'mesos_pid:' + json['pid'], 'mesos_id:' + json['id'], 'mesos_node:master'] + instance_tags
state_metrics = self._check_leadership(url, timeout)
if state_metrics:
tags = [
'mesos_cluster:{0}'.format(state_metrics['cluster']),
'mesos_pid:{0}'.format(state_metrics['pid']),
'mesos_node:master'
]
tags += instance_tags

if self.leader:
self.GAUGE('mesos.cluster.total_frameworks', len(json['frameworks']), tags=tags)
self.GAUGE('mesos.cluster.total_frameworks', len(state_metrics['frameworks']), tags=tags)

for framework in json['frameworks']:
framework_tags = ['framework:' + framework['id'], 'framework_name:' + framework['name']] + tags
for framework in state_metrics['frameworks']:
framework_tags = ['framework_name:' + framework['name']] + tags
self.GAUGE('mesos.framework.total_tasks', len(framework['tasks']), tags=framework_tags)
resources = framework['used_resources']
[v[1](self, v[0], resources[k], tags=framework_tags) for k, v in self.FRAMEWORK_METRICS.iteritems()]
for key_name, (metric_name, metric_func) in self.FRAMEWORK_METRICS.iteritems():
metric_func(self, metric_name, resources[key_name], tags=framework_tags)

json = self._get_master_roles(url, timeout)
if json is not None:
for role in json['roles']:
role_metrics = self._get_master_roles(url, timeout)
if role_metrics is not None:
for role in role_metrics['roles']:
role_tags = ['mesos_role:' + role['name']] + tags
self.GAUGE('mesos.role.frameworks', len(role['frameworks']), tags=role_tags)
self.GAUGE('mesos.role.frameworks.count', len(role['frameworks']), tags=role_tags)
self.GAUGE('mesos.role.weight', role['weight'], tags=role_tags)
[v[1](self, v[0], role['resources'][k], tags=role_tags) for k, v in self.ROLE_RESOURCES_METRICS.iteritems()]
for key_name, (metric_name, metric_func) in self.ROLE_RESOURCES_METRICS.iteritems():
metric_func(self, metric_name, role['resources'][key_name], tags=role_tags)

json = self._get_master_stats(url, timeout)
if json is not None:
stats_metrics = self._get_master_stats(url, timeout)
if stats_metrics is not None:
metrics = [self.SYSTEM_METRICS]
if self.leader:
metrics = {}
for d in (self.CLUSTER_TASKS_METRICS, self.CLUSTER_SLAVES_METRICS,
self.CLUSTER_RESOURCES_METRICS, self.CLUSTER_REGISTRAR_METRICS,
self.CLUSTER_FRAMEWORK_METRICS, self.SYSTEM_METRICS, self.STATS_METRICS):
metrics.update(d)
else:
metrics = self.SYSTEM_METRICS
[v[1](self, v[0], json[k], tags=tags) for k, v in metrics.iteritems()]
metrics += [self.CLUSTER_TASKS_METRICS, self.CLUSTER_SLAVES_METRICS,
self.CLUSTER_RESOURCES_METRICS, self.CLUSTER_REGISTRAR_METRICS,
self.CLUSTER_FRAMEWORK_METRICS, self.STATS_METRICS]
for m in metrics:
for key_name, (metric_name, metric_func) in m.iteritems():
metric_func(self, metric_name, stats_metrics[key_name], tags=tags)


self.SERVICE_CHECK_NEEDED = True
106 changes: 48 additions & 58 deletions checks.d/mesos_slave.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from hashlib import md5
import time

# project
from checks import AgentCheck

# 3rd party
import requests

# project
from checks import AgentCheck


class MesosSlave(AgentCheck):
GAUGE = AgentCheck.gauge
Expand All @@ -37,14 +37,13 @@ class MesosSlave(AgentCheck):
}

SLAVE_TASKS_METRICS = {
'failed_tasks' : ('mesos.slave.failed_tasks', GAUGE),
'finished_tasks' : ('mesos.slave.finished_tasks', GAUGE),
'killed_tasks' : ('mesos.slave.killed_tasks', GAUGE),
'lost_tasks' : ('mesos.slave.lost_tasks', GAUGE),
'staged_tasks' : ('mesos.slave.staged_tasks', GAUGE),
'started_tasks' : ('mesos.slave.started_tasks', GAUGE),
'launched_tasks_gauge' : ('mesos.slave.launched_tasks_gauge', GAUGE),
'queued_tasks_gauge' : ('mesos.slave.queued_tasks_gauge', GAUGE),
'slave/tasks_failed' : ('mesos.slave.tasks_failed', GAUGE),
'slave/tasks_finished' : ('mesos.slave.tasks_finished', GAUGE),
'slave/tasks_killed' : ('mesos.slave.tasks_killed', GAUGE),
'slave/tasks_lost' : ('mesos.slave.tasks_lost', GAUGE),
'slave/tasks_running' : ('mesos.slave.tasks_running', GAUGE),
'slave/tasks_staging' : ('mesos.slave.tasks_staging', GAUGE),
'slave/tasks_starting' : ('mesos.slave.tasks_starting', GAUGE),
}

SYSTEM_METRICS = {
Expand Down Expand Up @@ -78,7 +77,6 @@ class MesosSlave(AgentCheck):
}

STATS_METRICS = {
'total_frameworks' : ('mesos.slave.total_frameworks', GAUGE),
'slave/frameworks_active' : ('mesos.slave.frameworks_active', GAUGE),
'slave/invalid_framework_messages' : ('mesos.slave.invalid_framework_messages', GAUGE),
'slave/invalid_status_updates' : ('mesos.slave.invalid_status_updates', GAUGE),
Expand All @@ -87,25 +85,9 @@ class MesosSlave(AgentCheck):
'slave/valid_status_updates' : ('mesos.slave.valid_status_updates', GAUGE),
}

cluster_name = None

def _timeout_event(self, url, timeout, aggregation_key):
self.event({
'timestamp': int(time.time()),
'event_type': 'http_check',
'msg_title': 'URL timeout',
'msg_text': '%s timed out after %s seconds.' % (url, timeout),
'aggregation_key': aggregation_key
})

def _status_code_event(self, url, r, aggregation_key):
self.event({
'timestamp': int(time.time()),
'event_type': 'http_check',
'msg_title': 'Invalid reponse code for %s' % url,
'msg_text': '%s returned a status of %s' % (url, r.status_code),
'aggregation_key': aggregation_key
})
def __init__(self, name, init_config, agentConfig, instances=None):
AgentCheck.__init__(self, name, init_config, agentConfig, instances)
self.cluster_name = None

def _get_json(self, url, timeout):
# Use a hash of the URL as an aggregation key
Expand All @@ -116,46 +98,48 @@ def _get_json(self, url, timeout):
try:
r = requests.get(url, timeout=timeout)
if r.status_code != 200:
self._status_code_event(url, r, aggregation_key)
status = AgentCheck.CRITICAL
msg = "Got %s when hitting %s" % (r.status_code, url)
else:
status = AgentCheck.OK
msg = "Mesos master instance detected at %s " % url
except requests.exceptions.Timeout as e:
# If there's a timeout
self._timeout_event(url, timeout, aggregation_key)
msg = "%s seconds timeout when hitting %s" % (timeout, url)
status = AgentCheck.CRITICAL
except Exception as e:
msg = e.message
msg = str(e)
status = AgentCheck.CRITICAL
finally:
if self.SERVICE_CHECK_NEEDED:
self.service_check(self.SERVICE_CHECK_NAME, status, tags=tags, message=msg)
self.SERVICE_CHECK_NEEDED = False
if status is AgentCheck.CRITICAL:
self.warning(msg)
return None
raise CheckException("Cannot connect to mesos, please check your configuration.")

return r.json()

def _get_state(self, url, timeout):
return self._get_json(url + '/state.json', timeout)

def _get_stats(self, url, timeout):
return self._get_json(url + '/stats.json', timeout)
if self.version >= [0, 22, 0]:
endpoint = '/metrics/snapshot'
else:
endpoint = '/stats.json'
return self._get_json(url + endpoint, timeout)

def _get_constant_attributes(self, url, timeout):
json = None
state_metrics = None
if self.cluster_name is None:
json = self._get_state(url, timeout)
if json is not None:
master_state = self._get_state('http://' + json['master_hostname'] + ':5050', timeout)
state_metrics = self._get_state(url, timeout)
if state_metrics is not None:
self.version = map(int, state_metrics['version'].split('.'))
master_state = self._get_state('http://' + state_metrics['master_hostname'] + ':5050', timeout)
if master_state is not None:
self.cluster_name = master_state['cluster']

return json
return state_metrics

def check(self, instance):
if 'url' not in instance:
Expand All @@ -167,30 +151,36 @@ def check(self, instance):
default_timeout = self.init_config.get('default_timeout', 5)
timeout = float(instance.get('timeout', default_timeout))

json = self._get_constant_attributes(url, timeout)
state_metrics = self._get_constant_attributes(url, timeout)
tags = None

if json is None:
json = self._get_state(url, timeout)
if json:
tags = ['mesos_cluster:' + self.cluster_name, 'mesos_id:' + json['id'], 'mesos_pid:' + json['pid'], 'mesos_node:slave'] + instance_tags
if state_metrics is None:
state_metrics = self._get_state(url, timeout)
if state_metrics:
tags = [
'mesos_cluster:{0}'.format(self.cluster_name),
'mesos_pid:{0}'.format(state_metrics['pid']),
'mesos_node:slave'
]
tags += instance_tags

for task in tasks:
for framework in json['frameworks']:
for framework in state_metrics['frameworks']:
for executor in framework['executors']:
for t in executor['tasks']:
if task.lower() in t['name'].lower() and t['slave_id'] == json['id']:
task_tags = ['framework_id:' + t['framework_id'], 'executor_id:' + t['executor_id'], 'task_name:' + t['name']] + tags
if task.lower() in t['name'].lower() and t['slave_id'] == state_metrics['id']:
task_tags = ['task_name:' + t['name']] + tags
self.service_check(t['name'] + '.ok', self.TASK_STATUS[t['state']], tags=task_tags)
[v[1](self, v[0], t['resources'][k], tags=task_tags) for k, v in self.TASK_METRICS.iteritems()]
for key_name, (metric_name, metric_func) in self.TASK_METRICS.iteritems():
metric_func(self, metric_name, t['resources'][key_name], tags=task_tags)

json = self._get_stats(url, timeout)
if json:
stats_metrics = self._get_stats(url, timeout)
if stats_metrics:
tags = tags if tags else instance_tags
metrics = {}
for d in (self.SLAVE_TASKS_METRICS, self.SYSTEM_METRICS, self.SLAVE_RESOURCE_METRICS,
self.SLAVE_EXECUTORS_METRICS, self.STATS_METRICS):
metrics.update(d)
[v[1](self, v[0], json[k], tags=tags) for k, v in metrics.iteritems()]
metrics = [self.SLAVE_TASKS_METRICS, self.SYSTEM_METRICS, self.SLAVE_RESOURCE_METRICS,
self.SLAVE_EXECUTORS_METRICS, self.STATS_METRICS]
for m in metrics:
for key_name, (metric_name, metric_func) in m.iteritems():
metric_func(self, metric_name, stats_metrics[key_name], tags=tags)

self.SERVICE_CHECK_NEEDED = True
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
###########################################################
# These modules are the deps needed by the
# agent core, meaning every module that is
# agent core, meaning every module that is
# not a check
# They're installed in the CI and when doing
# a source install
Expand Down
Loading

0 comments on commit 6c285f9

Please sign in to comment.