From 58dde9fe7437016b54534bf2869b12765c846645 Mon Sep 17 00:00:00 2001 From: Benjamin Fernandes Date: Thu, 6 Jun 2013 11:25:07 -0400 Subject: [PATCH 1/4] First version of event support in dogstatsd --- aggregator.py | 159 ++++++++++++++++++++++++++++++----------- checks/check_status.py | 5 +- dogstatsd.py | 48 +++++++++++-- pup/status.html | 1 + 4 files changed, 166 insertions(+), 47 deletions(-) diff --git a/aggregator.py b/aggregator.py index 6358759865..be6aad552c 100644 --- a/aggregator.py +++ b/aggregator.py @@ -247,8 +247,10 @@ class MetricsAggregator(object): def __init__(self, hostname, interval=1.0, expiry_seconds=300, formatter=None, recent_point_threshold=None): self.metrics = {} + self.events = [] self.total_count = 0 self.count = 0 + self.event_count = 0 self.metric_type_to_class = { 'g': Gauge, 'c': Counter, @@ -269,55 +271,104 @@ def __init__(self, hostname, interval=1.0, expiry_seconds=300, formatter=None, r def packets_per_second(self, interval): return round(float(self.count)/interval, 2) - def submit_packets(self, packets): + def parse_metric_packet(self, packet): + name_and_metadata = packet.split(':', 1) - for packet in packets.split("\n"): - self.count += 1 - # We can have colons in tags, so split once. - name_and_metadata = packet.split(':', 1) + if len(name_and_metadata) != 2: + raise Exception('Unparseable metric packet: %s' % packet) - if not packet.strip(): - continue + name = name_and_metadata[0] + metadata = name_and_metadata[1].split('|') - if len(name_and_metadata) != 2: - raise Exception('Unparseable packet: %s' % packet) + if len(metadata) < 2: + raise Exception('Unparseable metric packet: %s' % packet) + # Try to cast as an int first to avoid precision issues, then as a + # float. + try: + value = int(metadata[0]) + except ValueError: + try: + value = float(metadata[0]) + except ValueError: + # If the data type is Set, we will allow strings + if metadata[1] in self.ALLOW_STRINGS: + value = metadata[0] + else: + # Otherwise, raise an error saying it must be a number + raise Exception('Metric value must be a number: %s, %s' % (name, metadata[0])) + + # Parse the optional values - sample rate & tags. + sample_rate = 1 + tags = None + for m in metadata[2:]: + # Parse the sample rate + if m[0] == '@': + sample_rate = float(m[1:]) + assert 0 <= sample_rate <= 1 + elif m[0] == '#': + tags = tuple(sorted(m[1:].split(','))) + + # Submit the metric + mtype = metadata[1] + + return name, value, mtype, tags, sample_rate + + def _unescape_event_text(self, string): + return string.replace('\\n', '\n') + + def parse_event_packet(self, packet): + try: + name_and_metadata = packet.split(':', 1) + if len(name_and_metadata) != 2: + raise Exception(u'Unparseable event packet: %s' % packet) + # Event syntax: + # _e{5,4}:title|body|meta name = name_and_metadata[0] - metadata = name_and_metadata[1].split('|') + metadata = unicode(name_and_metadata[1]) + title_length, text_length = name.split(',') + title_length = int(title_length[3:]) + text_length = int(text_length[:-1]) + + event = { + 'title': metadata[:title_length], + 'text': self._unescape_event_text(metadata[title_length+1:title_length+text_length+1]) + } + meta = metadata[title_length+text_length+1:] + for m in meta.split('|')[1:]: + if m[0] == u't': + event['alert_type'] = m[2:] + elif m[0] == u'k': + event['aggregation_key'] = m[2:] + elif m[0] == u's': + event['source_type_name'] = m[2:] + elif m[0] == u'd': + event['date_happened'] = int(m[2:]) + elif m[0] == u'p': + event['priority'] = m[2:] + elif m[0] == u'h': + event['hostname'] = m[2:] + elif m[0] == u'#': + event['tags'] = sorted(m[1:].split(u',')) + return event + except IndexError, ValueError: + raise Exception(u'Unparseable event packet: %s' % packet) - if len(metadata) < 2: - raise Exception('Unparseable packet: %s' % packet) + def submit_packets(self, packets): - # Try to cast as an int first to avoid precision issues, then as a - # float. - try: - value = int(metadata[0]) - except ValueError: - try: - value = float(metadata[0]) - except ValueError: - - # If the data type is Set, we will allow strings - if metadata[1] in self.ALLOW_STRINGS: - value = metadata[0] - else: - # Otherwise, raise an error saying it must be a number - raise Exception('Metric value must be a number: %s, %s' % (name, metadata[0])) - - # Parse the optional values - sample rate & tags. - sample_rate = 1 - tags = None - for m in metadata[2:]: - # Parse the sample rate - if m[0] == '@': - sample_rate = float(m[1:]) - assert 0 <= sample_rate <= 1 - elif m[0] == '#': - tags = tuple(sorted(m[1:].split(','))) - - # Submit the metric - mtype = metadata[1] - self.submit_metric(name, value, mtype, tags=tags, sample_rate=sample_rate) + for packet in packets.split("\n"): + + if not packet.strip(): + continue + + if packet.startswith('_e'): + self.event_count += 1 + event = self.parse_event_packet(packet) + self.events.append(event) + else: + self.count += 1 + name, value, mtype, tags, sample_rate = self.parse_metric_packet(packet) + self.submit_metric(name, value, mtype, tags=tags, sample_rate=sample_rate) def submit_metric(self, name, value, mtype, tags=None, hostname=None, device_name=None, timestamp=None, sample_rate=1): @@ -354,6 +405,19 @@ def histogram(self, name, value, tags=None, hostname=None, device_name=None): def set(self, name, value, tags=None, hostname=None, device_name=None): self.submit_metric(name, value, 's', tags, hostname, device_name) + def event(self, title, text, date_happened=time(), alert_type=None, aggregation_key=None, source_type_name=None, priority=None, tags=None, hostname=None): + event = {'title': title, + 'text': text, + 'date_happened': date_happened, + 'alert_type': alert_type, + 'aggregation_key': aggregation_key, + 'source_type_name': source_type_name, + 'priority': priority, + 'tags': sorted(tags) if tags is not None else None, + 'hostname': hostname + } + self.events.append(event) + def flush(self): timestamp = time() expiry_timestamp = timestamp - self.expiry_seconds @@ -379,6 +443,17 @@ def flush(self): self.count = 0 return metrics + def flush_events(self): + events = self.events + self.events = [] + + self.total_count += self.event_count + self.event_count = 0 + + log.debug("Received %d events since last flush" % len(events)) + + return events + def send_packet_count(self, metric_name): self.submit_metric(metric_name, self.count, 'g') diff --git a/checks/check_status.py b/checks/check_status.py index 2358d212d9..22c9d62570 100644 --- a/checks/check_status.py +++ b/checks/check_status.py @@ -461,12 +461,13 @@ class DogstatsdStatus(AgentStatus): NAME = 'Dogstatsd' def __init__(self, flush_count=0, packet_count=0, packets_per_second=0, - metric_count=0): + metric_count=0, event_count=0): AgentStatus.__init__(self) self.flush_count = flush_count self.packet_count = packet_count self.packets_per_second = packets_per_second self.metric_count = metric_count + self.event_count = event_count def has_error(self): return self.flush_count == 0 and self.packet_count == 0 and self.metric_count == 0 @@ -477,6 +478,7 @@ def body_lines(self): "Packet Count: %s" % self.packet_count, "Packets per second: %s" % self.packets_per_second, "Metric count: %s" % self.metric_count, + "Event count: %s" % self.event_count, ] return lines @@ -487,6 +489,7 @@ def to_dict(self): 'packet_count': self.packet_count, 'packets_per_second': self.packets_per_second, 'metric_count': self.metric_count, + 'event_count': self.event_count, }) return status_info diff --git a/dogstatsd.py b/dogstatsd.py index 25c465429a..b47a7c5103 100755 --- a/dogstatsd.py +++ b/dogstatsd.py @@ -36,9 +36,12 @@ UDP_SOCKET_TIMEOUT = 5 LOGGING_INTERVAL = 10 -def serialize(metrics): +def serialize_metrics(metrics): return json.dumps({"series" : metrics}) +def serialize_event(event): + return json.dumps(event) + class Reporter(threading.Thread): """ The reporter periodically sends the aggregated metrics to the @@ -109,13 +112,23 @@ def flush(self): log.info("Flush #%s: flushing %s metrics" % (self.flush_count, count)) self.submit(metrics) + events = self.metrics_aggregator.flush_events() + event_count = len(events) + if not event_count: + log.info("Flush #%s: No events to flush." % self.flush_count) + else: + log.info("Flush #%s: flushing %s events" % (self.flush_count, len(events))) + self.submit_events(events) + # Persist a status message. packet_count = self.metrics_aggregator.total_count DogstatsdStatus( flush_count=self.flush_count, packet_count=packet_count, packets_per_second=packets_per_second, - metric_count=count).persist() + metric_count=count, + event_count=event_count + ).persist() except: log.exception("Error flushing metrics") @@ -123,7 +136,7 @@ def flush(self): def submit(self, metrics): # HACK - Copy and pasted from dogapi, because it's a bit of a pain to distribute python # dependencies with the agent. - body = serialize(metrics) + body = serialize_metrics(metrics) headers = {'Content-Type':'application/json'} method = 'POST' @@ -150,6 +163,33 @@ def submit(self, metrics): status, method, self.api_host, url, duration)) return duration + def submit_events(self, events): + headers = {'Content-Type':'application/json'} + method = 'POST' + + params = {} + if self.api_key: + params['api_key'] = self.api_key + url = '/api/v1/events?%s' % urlencode(params) + + status = None + conn = self.http_conn_cls(self.api_host) + try: + for event in events: + start_time = time() + body = serialize_event(event) + log.warn(body) + conn.request(method, url, body, headers) + + response = conn.getresponse() + status = response.status + response.close() + duration = round((time() - start_time) * 1000.0, 4) + log.debug("%s %s %s%s (%sms)" % ( + status, method, self.api_host, url, duration)) + finally: + conn.close() + class Server(object): """ A statsd udp server. @@ -160,7 +200,7 @@ def __init__(self, metrics_aggregator, host, port): self.port = int(port) self.address = (self.host, self.port) self.metrics_aggregator = metrics_aggregator - self.buffer_size = 1024 + self.buffer_size = 1024 * 8 # IPv4 only self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) diff --git a/pup/status.html b/pup/status.html index f8f09acaf4..e9b9c3f700 100644 --- a/pup/status.html +++ b/pup/status.html @@ -130,6 +130,7 @@

DogstatsD

Packet count
{{ dogstatsd['packet_count'] }}
Packets per second
{{ dogstatsd['packets_per_second'] }}
Metric count
{{ dogstatsd['metric_count'] }}
+
Event count
{{ dogstatsd['event_count'] }}
From 82b0c377bbd71d9b2760869b34f6bc0cc8f22022 Mon Sep 17 00:00:00 2001 From: Benjamin Fernandes Date: Thu, 6 Jun 2013 12:05:59 -0400 Subject: [PATCH 2/4] Add tests for events in DogStatsD --- tests/test_dogstatsd.py | 86 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 84 insertions(+), 2 deletions(-) diff --git a/tests/test_dogstatsd.py b/tests/test_dogstatsd.py index 479ff0e9ba..2fea181f0b 100644 --- a/tests/test_dogstatsd.py +++ b/tests/test_dogstatsd.py @@ -1,4 +1,4 @@ - +# -*- coding: utf-8 -*- import random import time @@ -16,6 +16,12 @@ def sort_by(m): return (m['metric'], ','.join(m['tags'] or [])) return sorted(metrics, key=sort_by) + @staticmethod + def sort_events(metrics): + def sort_by(m): + return (m['title'], m['text'], ','.join(m.get('tags', None) or [])) + return sorted(metrics, key=sort_by) + def test_counter_normalization(self): stats = MetricsAggregator('myhost', interval=10) @@ -90,7 +96,7 @@ def test_tags_gh442(self): json = util.generate_minjson_adapter() dogstatsd.json = json - serialized = dogstatsd.serialize([api_formatter("foo", 12, 1, ('tag',), 'host')]) + serialized = dogstatsd.serialize_metrics([api_formatter("foo", 12, 1, ('tag',), 'host')]) assert '"tags": ["tag"]' in serialized def test_counter(self): @@ -317,6 +323,11 @@ def test_bad_packets_throw_errors(self): 'unknown.type:2|z', 'string.value:abc|c', 'string.sample.rate:0|c|@abc', + # Bad event-like packets + '_ev{1,2}:bad_header' + '_e{1,}:invalid|headers', + '_e:missing|size|headers', + '_e:{1,1}:t|t|t:bad_meta|h', ] stats = MetricsAggregator('myhost') @@ -391,5 +402,76 @@ def test_scientific_notation(self): ts, val = metrics[0].get('points')[0] nt.assert_almost_equal(val, 9.512901e-05) + def test_event_tags(self): + stats = MetricsAggregator('myhost') + stats.submit_packets('_e{6,4}:title1|text') + stats.submit_packets('_e{6,4}:title2|text|#t1') + stats.submit_packets('_e{6,4}:title3|text|#t1,t2:v2,t3,t4') + stats.submit_packets('_e{6,4}:title4|text|k:key|p:normal|#t1,t2') + + events = self.sort_events(stats.flush_events()) + + assert len(events) == 4 + first, second, third, fourth = events + + try: + first['tags'] + except: + assert True + else: + assert False, "event['tags'] shouldn't be defined when no tags aren't explicited in the packet" + nt.assert_equal(first['title'], 'title1') + nt.assert_equal(first['text'], 'text') + + nt.assert_equal(second['title'], 'title2') + nt.assert_equal(second['text'], 'text') + nt.assert_equal(second['tags'], sorted(['t1'])) + + nt.assert_equal(third['title'], 'title3') + nt.assert_equal(third['text'], 'text') + nt.assert_equal(third['tags'], sorted(['t1', 't2:v2', 't3', 't4'])) + + nt.assert_equal(fourth['title'], 'title4') + nt.assert_equal(fourth['text'], 'text') + nt.assert_equal(fourth['aggregation_key'], 'key') + nt.assert_equal(fourth['priority'], 'normal') + nt.assert_equal(fourth['tags'], sorted(['t1', 't2'])) + + def test_event_title(self): + stats = MetricsAggregator('myhost') + stats.submit_packets('_e{0,4}:|text') + stats.submit_packets(u'_e{9,4}:2intitulé|text') + stats.submit_packets('_e{14,4}:3title content|text') + stats.submit_packets('_e{14,4}:4title|content|text') + stats.submit_packets('_e{13,4}:5title\\ntitle|text') # \n stays escaped + + events = self.sort_events(stats.flush_events()) + + assert len(events) == 5 + first, second, third, fourth, fifth = events + + nt.assert_equal(first['title'], '') + nt.assert_equal(second['title'], u'2intitulé') + nt.assert_equal(third['title'], '3title content') + nt.assert_equal(fourth['title'], '4title|content') + nt.assert_equal(fifth['title'], '5title\\ntitle') + + def test_event_text(self): + stats = MetricsAggregator('myhost') + stats.submit_packets('_e{2,0}:t1|') + stats.submit_packets('_e{2,12}:t2|text|content') + stats.submit_packets('_e{2,23}:t3|First line\\nSecond line') # \n is a newline + stats.submit_packets(u'_e{2,19}:t4|♬ †øU †øU ¥ºu T0µ ♪') # utf-8 compliant + + events = self.sort_events(stats.flush_events()) + + assert len(events) == 4 + first, second, third, fourth = events + + nt.assert_equal(first['text'], '') + nt.assert_equal(second['text'], 'text|content') + nt.assert_equal(third['text'], 'First line\nSecond line') + nt.assert_equal(fourth['text'], u'♬ †øU †øU ¥ºu T0µ ♪') + if __name__ == "__main__": unittest.main() From b48cd40fcc72ecc4e9047ad8019cf4aa21a7a587 Mon Sep 17 00:00:00 2001 From: Benjamin Fernandes Date: Tue, 18 Jun 2013 15:36:55 -0400 Subject: [PATCH 3/4] Better use of event API by dogstatsd --- aggregator.py | 31 +++++++++++++++++++++---------- dogstatsd.py | 2 +- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/aggregator.py b/aggregator.py index be6aad552c..c825f3df86 100644 --- a/aggregator.py +++ b/aggregator.py @@ -364,7 +364,7 @@ def submit_packets(self, packets): if packet.startswith('_e'): self.event_count += 1 event = self.parse_event_packet(packet) - self.events.append(event) + self.event(**event) else: self.count += 1 name, value, mtype, tags, sample_rate = self.parse_metric_packet(packet) @@ -405,17 +405,28 @@ def histogram(self, name, value, tags=None, hostname=None, device_name=None): def set(self, name, value, tags=None, hostname=None, device_name=None): self.submit_metric(name, value, 's', tags, hostname, device_name) - def event(self, title, text, date_happened=time(), alert_type=None, aggregation_key=None, source_type_name=None, priority=None, tags=None, hostname=None): - event = {'title': title, + def event(self, title, text, date_happened=int(time()), alert_type=None, aggregation_key=None, source_type_name=None, priority=None, tags=None, hostname=None): + event = { + 'title': title, 'text': text, - 'date_happened': date_happened, - 'alert_type': alert_type, - 'aggregation_key': aggregation_key, - 'source_type_name': source_type_name, - 'priority': priority, - 'tags': sorted(tags) if tags is not None else None, - 'hostname': hostname } + if date_happened is not None: + event['date_happened'] = date_happened + if alert_type is not None: + event['alert_type'] = alert_type + if aggregation_key is not None: + event['aggregation_key'] = aggregation_key + if source_type_name is not None: + event['source_type_name'] = source_type_name + if priority is not None: + event['priority'] = priority + if tags is not None: + event['tags'] = sorted(tags) + if hostname is not None: + event['host'] = hostname + else: + event['host'] = self.hostname + self.events.append(event) def flush(self): diff --git a/dogstatsd.py b/dogstatsd.py index b47a7c5103..241f33ceb0 100755 --- a/dogstatsd.py +++ b/dogstatsd.py @@ -178,7 +178,7 @@ def submit_events(self, events): for event in events: start_time = time() body = serialize_event(event) - log.warn(body) + log.debug('Sending event: %s' % body) conn.request(method, url, body, headers) response = conn.getresponse() From 0e7977067a6c64d42bfbddeeeab5ed2fea668dc7 Mon Sep 17 00:00:00 2001 From: Benjamin Fernandes Date: Tue, 18 Jun 2013 18:03:00 -0400 Subject: [PATCH 4/4] Fix event date_happened default in dogstatsd --- aggregator.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/aggregator.py b/aggregator.py index c825f3df86..4563bcd817 100644 --- a/aggregator.py +++ b/aggregator.py @@ -405,13 +405,15 @@ def histogram(self, name, value, tags=None, hostname=None, device_name=None): def set(self, name, value, tags=None, hostname=None, device_name=None): self.submit_metric(name, value, 's', tags, hostname, device_name) - def event(self, title, text, date_happened=int(time()), alert_type=None, aggregation_key=None, source_type_name=None, priority=None, tags=None, hostname=None): + def event(self, title, text, date_happened=None, alert_type=None, aggregation_key=None, source_type_name=None, priority=None, tags=None, hostname=None): event = { 'title': title, 'text': text, } if date_happened is not None: event['date_happened'] = date_happened + else: + event['date_happened'] = int(time()) if alert_type is not None: event['alert_type'] = alert_type if aggregation_key is not None: