Skip to content

Commit

Permalink
Merge pull request #1086 from DataDog/conor/zk-service-check
Browse files Browse the repository at this point in the history
Add zookeeper service checks.
  • Loading branch information
Remi Hakim committed Aug 25, 2014
2 parents d169ebf + 59da2e9 commit 389fc10
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 22 deletions.
74 changes: 56 additions & 18 deletions checks.d/zk.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
# project
from checks import AgentCheck

class ZKConnectionFailure(Exception):
""" Raised when we are unable to connect or get the output of a command. """
pass


class Zookeeper(AgentCheck):
version_pattern = re.compile(r'Zookeeper version: ([^.]+)\.([^.]+)\.([^-]+)', flags=re.I)

Expand All @@ -41,8 +46,51 @@ def check(self, instance):
host = instance.get('host', 'localhost')
port = int(instance.get('port', 2181))
timeout = float(instance.get('timeout', 3.0))
expected_mode = (instance.get('expected_mode') or '').strip()
tags = instance.get('tags', [])
cx_args = (host, port, timeout)

# Send a service check based on the `ruok` response.
try:
ruok_out = self._send_command('ruok', *cx_args)
except ZKConnectionFailure:
# The server should not respond at all if it's not OK.
status = AgentCheck.CRITICAL
message = 'No response from `ruok` command'
self.increment('zookeeper.timeouts')
else:
ruok_out.seek(0)
ruok = ruok_out.readline()
if ruok == 'imok':
status = AgentCheck.OK
else:
status = AgentCheck.WARNING
message = u'Response from the server: %s' % ruok
self.service_check('zookeeper.ruok', status, message=message)

# Read metrics from the `stat` output.
try:
stat_out = self._send_command('stat', *cx_args)
except ZKConnectionFailure:
self.increment('zookeeper.timeouts')
else:
# Parse the response
metrics, new_tags, mode = self.parse_stat(stat_out)

# Write the data
for metric, value in metrics:
self.gauge(metric, value, tags=tags + new_tags)

if expected_mode:
if mode == expected_mode:
status = AgentCheck.OK
message = u"Server is in %s mode" % mode
else:
status = AgentCheck.CRITICAL
message = u"Server is in %s mode but check expects %s mode" % (expected_mode, mode)
self.service_check('zookeeper.mode', status, message=message)

def _send_command(self, command, host, port, timeout):
sock = socket.socket()
sock.settimeout(timeout)
buf = StringIO()
Expand All @@ -66,24 +114,13 @@ def check(self, instance):
chunk = sock.recv(chunk_size)
buf.write(chunk)
num_reads += 1
except socket.timeout:
buf = None
except (socket.timeout, socket.error):
raise ZKConnectionFailure()
finally:
sock.close()
return buf

if buf is not None:
# Parse the response
metrics, new_tags = self.parse_stat(buf)

# Write the data
for metric, value in metrics:
self.gauge(metric, value, tags=tags + new_tags)
else:
# Reading from the client port timed out, track it as a metric
self.increment('zookeeper.timeouts', tags=tags)

@classmethod
def parse_stat(cls, buf):
def parse_stat(self, buf):
''' `buf` is a readable file-like object
returns a tuple: ([(metric_name, value)], tags)
'''
Expand All @@ -94,7 +131,7 @@ def parse_stat(cls, buf):
# body correctly. Particularly, the Connections val was added in
# >= 3.4.4.
start_line = buf.readline()
match = cls.version_pattern.match(start_line)
match = self.version_pattern.match(start_line)
if match is None:
raise Exception("Could not parse version from stat command output: %s" % start_line)
else:
Expand Down Expand Up @@ -156,10 +193,11 @@ def parse_stat(cls, buf):

# Mode: leader
_, value = buf.readline().split(':')
tags = [u'mode:' + value.strip().lower()]
mode = value.strip().lower()
tags = [u'mode:' + mode]

# Node count: 487
_, value = buf.readline().split(':')
metrics.append(('zookeeper.nodes', long(value.strip())))

return metrics, tags
return metrics, tags, mode
5 changes: 5 additions & 0 deletions conf.d/zk.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,8 @@ instances:
# tags:
# - optional_tag1
# - optional_tag2
#
# # If `expected_mode` is defined we'll send a service check where the
# # status is determined by whether the current mode matches the expected.
# # Options: leader, follower, standalone
# expected_mode: leader
39 changes: 35 additions & 4 deletions tests/test_zookeeper.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,34 @@
import os
import unittest
from StringIO import StringIO
from tests.common import get_check
from checks import AgentCheck

CONFIG = """
init_config:
instances:
- host: 127.0.0.1
port: 2181
expected_mode: follower
tags: []
"""

CONFIG2 = """
init_config:
instances:
- host: 127.0.0.1
port: 2182
tags: []
"""

class TestZookeeper(unittest.TestCase):
def is_travis(self):
return "TRAVIS" in os.environ

def test_zk_stat_parsing_lt_v344(self):
Zookeeper, instances = get_check('zk', CONFIG)
zk, instances = get_check('zk', CONFIG)
stat_response = """Zookeeper version: 3.2.2--1, built on 03/16/2010 07:31 GMT
Clients:
/10.42.114.160:32634[1](queued=0,recved=12,sent=0)
Expand Down Expand Up @@ -45,13 +60,24 @@ def test_zk_stat_parsing_lt_v344(self):
]

buf = StringIO(stat_response)
metrics, tags = Zookeeper.parse_stat(buf)
metrics, tags, mode = zk.parse_stat(buf)

self.assertEquals(tags, ['mode:leader'])
self.assertEquals(metrics, expected)

zk.check(instances[0])
service_checks = zk.get_service_checks()
expected = 1 if self.is_travis() else 2
self.assertEquals(len(service_checks), expected)
self.assertEquals(service_checks[0]['check'], 'zookeeper.ruok')
# Don't check status of ruok because it can vary if ZK is running.

if not self.is_travis():
self.assertEquals(service_checks[1]['check'], 'zookeeper.mode')
self.assertEquals(service_checks[1]['status'], AgentCheck.CRITICAL)

def test_zk_stat_parsing_gte_v344(self):
Zookeeper, instances = get_check('zk', CONFIG)
zk, instances = get_check('zk', CONFIG2)
stat_response = """Zookeeper version: 3.4.5--1, built on 03/16/2010 07:31 GMT
Clients:
/10.42.114.160:32634[1](queued=0,recved=12,sent=0)
Expand Down Expand Up @@ -84,8 +110,13 @@ def test_zk_stat_parsing_gte_v344(self):
]

buf = StringIO(stat_response)
metrics, tags = Zookeeper.parse_stat(buf)
metrics, tags, mode = zk.parse_stat(buf)

self.assertEquals(tags, ['mode:leader'])
self.assertEquals(metrics, expected)

zk.check(instances[0])
service_checks = zk.get_service_checks()
self.assertEquals(len(service_checks), 1)
self.assertEquals(service_checks[0]['check'], 'zookeeper.ruok')
self.assertEquals(service_checks[0]['status'], AgentCheck.CRITICAL)

0 comments on commit 389fc10

Please sign in to comment.