Skip to content

Commit

Permalink
provide better feedback on config errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Carlo Cabanilla committed Mar 7, 2014
1 parent e217163 commit a226bf5
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 9 deletions.
52 changes: 43 additions & 9 deletions checks.d/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# be too much work to rewrite, so raise an exception here.
raise Exception('kafka_consumer check requires at least Python 2.6')

from compat import defaultdict
from compat.defaultdict import defaultdict
from checks import AgentCheck
from kafka.client import KafkaClient
from kafka.common import OffsetRequest
Expand All @@ -16,23 +16,26 @@

class KafkaCheck(AgentCheck):
def check(self, instance):
consumer_groups = instance['consumer_groups']
consumer_groups = self.read_config(instance, 'consumer_groups',
cast=self._validate_consumer_groups)
zk_connect_str = self.read_config(instance, 'zk_connect_str')
kafka_host_ports = self.read_config(instance, 'kafka_connect_str',
cast=self._parse_connect_str)

# Construct the Zookeeper path pattern
zk_prefix = instance.get('zk_prefix', '')
zk_path_tmpl = zk_prefix + '/consumers/%s/offsets/%s/%s'

# Connect to Zookeeper
zk_connect_str = instance['zk_connect_str']
zk_conn = KazooClient(zk_connect_str)
zk_conn.start()

try:
# Query Zookeeper for consumer offsets
consumer_offsets = {}
topics = defaultdict(set)
for consumer_group, topic_partitions in consumer_groups.items():
for topic, partitions in topic_partitions.items():
for consumer_group, topic_partitions in consumer_groups.iteritems():
for topic, partitions in topic_partitions.iteritems():
# Remember the topic partitions that we've see so that we can
# look up their broker offsets later
topics[topic].update(set(partitions))
Expand All @@ -54,10 +57,8 @@ def check(self, instance):
self.log.exception('Error cleaning up Zookeeper connection')

# Connect to Kafka
host_ports = [hp.strip().split(':') for hp
in instance['kafka_connect_str'].split(',')]
kafka_host, kafka_port = random.choice(host_ports)
kafka_conn = KafkaClient(kafka_host, int(kafka_port))
kafka_host, kafka_port = random.choice(kafka_host_ports)
kafka_conn = KafkaClient(kafka_host, kafka_port)

try:
# Query Kafka for the broker offsets
Expand Down Expand Up @@ -92,3 +93,36 @@ def check(self, instance):
self.gauge('kafka.consumer_offset', consumer_offset, tags=tags)
self.gauge('kafka.consumer_lag', broker_offset - consumer_offset,
tags=tags)

# Private config validation/marshalling functions

def _validate_consumer_groups(self, val):
try:
consumer_group, topic_partitions = val.items()[0]
assert isinstance(consumer_group, (str, unicode))
topic, partitions = topic_partitions.items()[0]
assert isinstance(topic, (str, unicode))
assert isinstance(partitions, (list, tuple))
return val
except Exception, e:
self.log.exception(e)
raise Exception('''The `consumer_groups` value must be a mapping of mappings, like this:
consumer_groups:
myconsumer0: # consumer group name
mytopic0: [0, 1] # topic: list of partitions
myconsumer1:
mytopic0: [0, 1, 2]
mytopic1: [10, 12]
''')

def _parse_connect_str(self, val):
try:
host_port_strs = val.split(',')
host_ports = []
for hp in host_port_strs:
host, port = hp.strip().split(':')
host_ports.append((host, int(port)))
return host_ports
except Exception, e:
self.log.exception(e)
raise Exception('Could not parse %s. Must be in the form of `host0:port0,host1:port1,host2:port2`' % val)
12 changes: 12 additions & 0 deletions checks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,18 @@ def normalize(self, metric, prefix=None):
else:
return name

@staticmethod
def read_config(instance, key, message=None, cast=None):
try:
val = instance[key]
except KeyError:

This comment has been minimized.

Copy link
@remh

remh Mar 7, 2014

This won't catch the case where the key is present but without any value:
http://yaml-online-parser.appspot.com/?yaml=init_config%3A%0A%0Ainstances%3A%0A+++-+key%3A%0A&type=python

This comment has been minimized.

Copy link
@clofresh

clofresh Mar 7, 2014

Contributor

Might be too presumptuous to put that test into a shared function. If you really cared, you'd pass in a cast function to test for that.

This comment has been minimized.

Copy link
@remh

remh Mar 7, 2014

Well you didn't for zk_connect_str :)

It's a really a small nitpick though.

This comment has been minimized.

Copy link
@clofresh

clofresh Mar 7, 2014

Contributor

That is true 👿

message = message or 'Must provide `%s` value in instance config' % key
raise Exception(message)

if cast is None:
return val
else:
return cast(val)

def agent_formatter(metric, value, timestamp, tags, hostname, device_name=None,
metric_type=None, interval=None):
Expand Down

0 comments on commit a226bf5

Please sign in to comment.