diff --git a/checks.d/kafka_consumer.py b/checks.d/kafka_consumer.py index 792e8ecd27..f10c9172ff 100644 --- a/checks.d/kafka_consumer.py +++ b/checks.d/kafka_consumer.py @@ -6,7 +6,7 @@ # be too much work to rewrite, so raise an exception here. raise Exception('kafka_consumer check requires at least Python 2.6') -from compat import defaultdict +from compat.defaultdict import defaultdict from checks import AgentCheck from kafka.client import KafkaClient from kafka.common import OffsetRequest @@ -16,14 +16,17 @@ class KafkaCheck(AgentCheck): def check(self, instance): - consumer_groups = instance['consumer_groups'] + consumer_groups = self.read_config(instance, 'consumer_groups', + cast=self._validate_consumer_groups) + zk_connect_str = self.read_config(instance, 'zk_connect_str') + kafka_host_ports = self.read_config(instance, 'kafka_connect_str', + cast=self._parse_connect_str) # Construct the Zookeeper path pattern zk_prefix = instance.get('zk_prefix', '') zk_path_tmpl = zk_prefix + '/consumers/%s/offsets/%s/%s' # Connect to Zookeeper - zk_connect_str = instance['zk_connect_str'] zk_conn = KazooClient(zk_connect_str) zk_conn.start() @@ -31,8 +34,8 @@ def check(self, instance): # Query Zookeeper for consumer offsets consumer_offsets = {} topics = defaultdict(set) - for consumer_group, topic_partitions in consumer_groups.items(): - for topic, partitions in topic_partitions.items(): + for consumer_group, topic_partitions in consumer_groups.iteritems(): + for topic, partitions in topic_partitions.iteritems(): # Remember the topic partitions that we've see so that we can # look up their broker offsets later topics[topic].update(set(partitions)) @@ -54,10 +57,8 @@ def check(self, instance): self.log.exception('Error cleaning up Zookeeper connection') # Connect to Kafka - host_ports = [hp.strip().split(':') for hp - in instance['kafka_connect_str'].split(',')] - kafka_host, kafka_port = random.choice(host_ports) - kafka_conn = KafkaClient(kafka_host, int(kafka_port)) + kafka_host, kafka_port = random.choice(kafka_host_ports) + kafka_conn = KafkaClient(kafka_host, kafka_port) try: # Query Kafka for the broker offsets @@ -92,3 +93,36 @@ def check(self, instance): self.gauge('kafka.consumer_offset', consumer_offset, tags=tags) self.gauge('kafka.consumer_lag', broker_offset - consumer_offset, tags=tags) + + # Private config validation/marshalling functions + + def _validate_consumer_groups(self, val): + try: + consumer_group, topic_partitions = val.items()[0] + assert isinstance(consumer_group, (str, unicode)) + topic, partitions = topic_partitions.items()[0] + assert isinstance(topic, (str, unicode)) + assert isinstance(partitions, (list, tuple)) + return val + except Exception, e: + self.log.exception(e) + raise Exception('''The `consumer_groups` value must be a mapping of mappings, like this: +consumer_groups: + myconsumer0: # consumer group name + mytopic0: [0, 1] # topic: list of partitions + myconsumer1: + mytopic0: [0, 1, 2] + mytopic1: [10, 12] +''') + + def _parse_connect_str(self, val): + try: + host_port_strs = val.split(',') + host_ports = [] + for hp in host_port_strs: + host, port = hp.strip().split(':') + host_ports.append((host, int(port))) + return host_ports + except Exception, e: + self.log.exception(e) + raise Exception('Could not parse %s. Must be in the form of `host0:port0,host1:port1,host2:port2`' % val) diff --git a/checks/__init__.py b/checks/__init__.py index 2693b81199..3c971b74f2 100644 --- a/checks/__init__.py +++ b/checks/__init__.py @@ -534,6 +534,18 @@ def normalize(self, metric, prefix=None): else: return name + @staticmethod + def read_config(instance, key, message=None, cast=None): + try: + val = instance[key] + except KeyError: + message = message or 'Must provide `%s` value in instance config' % key + raise Exception(message) + + if cast is None: + return val + else: + return cast(val) def agent_formatter(metric, value, timestamp, tags, hostname, device_name=None, metric_type=None, interval=None):