Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event support in dogstatsd #532

Merged
merged 4 commits into from
Jul 31, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 130 additions & 42 deletions aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,10 @@ class MetricsAggregator(object):

def __init__(self, hostname, interval=1.0, expiry_seconds=300, formatter=None, recent_point_threshold=None):
self.metrics = {}
self.events = []
self.total_count = 0
self.count = 0
self.event_count = 0
self.metric_type_to_class = {
'g': Gauge,
'c': Counter,
Expand All @@ -269,55 +271,104 @@ def __init__(self, hostname, interval=1.0, expiry_seconds=300, formatter=None, r
def packets_per_second(self, interval):
return round(float(self.count)/interval, 2)

def submit_packets(self, packets):
def parse_metric_packet(self, packet):
name_and_metadata = packet.split(':', 1)

for packet in packets.split("\n"):
self.count += 1
# We can have colons in tags, so split once.
name_and_metadata = packet.split(':', 1)
if len(name_and_metadata) != 2:
raise Exception('Unparseable metric packet: %s' % packet)

if not packet.strip():
continue
name = name_and_metadata[0]
metadata = name_and_metadata[1].split('|')

if len(name_and_metadata) != 2:
raise Exception('Unparseable packet: %s' % packet)
if len(metadata) < 2:
raise Exception('Unparseable metric packet: %s' % packet)
# Try to cast as an int first to avoid precision issues, then as a
# float.
try:
value = int(metadata[0])
except ValueError:
try:
value = float(metadata[0])
except ValueError:

# If the data type is Set, we will allow strings
if metadata[1] in self.ALLOW_STRINGS:
value = metadata[0]
else:
# Otherwise, raise an error saying it must be a number
raise Exception('Metric value must be a number: %s, %s' % (name, metadata[0]))

# Parse the optional values - sample rate & tags.
sample_rate = 1
tags = None
for m in metadata[2:]:
# Parse the sample rate
if m[0] == '@':
sample_rate = float(m[1:])
assert 0 <= sample_rate <= 1
elif m[0] == '#':
tags = tuple(sorted(m[1:].split(',')))

# Submit the metric
mtype = metadata[1]

return name, value, mtype, tags, sample_rate

def _unescape_event_text(self, string):
return string.replace('\\n', '\n')

def parse_event_packet(self, packet):
try:
name_and_metadata = packet.split(':', 1)
if len(name_and_metadata) != 2:
raise Exception(u'Unparseable event packet: %s' % packet)
# Event syntax:
# _e{5,4}:title|body|meta
name = name_and_metadata[0]
metadata = name_and_metadata[1].split('|')
metadata = unicode(name_and_metadata[1])
title_length, text_length = name.split(',')
title_length = int(title_length[3:])
text_length = int(text_length[:-1])

event = {
'title': metadata[:title_length],
'text': self._unescape_event_text(metadata[title_length+1:title_length+text_length+1])
}
meta = metadata[title_length+text_length+1:]
for m in meta.split('|')[1:]:
if m[0] == u't':
event['alert_type'] = m[2:]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to avoid hard coding these keys and just pass along keys to the server which would do the validation? Having these on the client side makes it difficult to change them.

elif m[0] == u'k':
event['aggregation_key'] = m[2:]
elif m[0] == u's':
event['source_type_name'] = m[2:]
elif m[0] == u'd':
event['date_happened'] = int(m[2:])
elif m[0] == u'p':
event['priority'] = m[2:]
elif m[0] == u'h':
event['hostname'] = m[2:]
elif m[0] == u'#':
event['tags'] = sorted(m[1:].split(u','))
return event
except IndexError, ValueError:
raise Exception(u'Unparseable event packet: %s' % packet)

if len(metadata) < 2:
raise Exception('Unparseable packet: %s' % packet)
def submit_packets(self, packets):

# Try to cast as an int first to avoid precision issues, then as a
# float.
try:
value = int(metadata[0])
except ValueError:
try:
value = float(metadata[0])
except ValueError:

# If the data type is Set, we will allow strings
if metadata[1] in self.ALLOW_STRINGS:
value = metadata[0]
else:
# Otherwise, raise an error saying it must be a number
raise Exception('Metric value must be a number: %s, %s' % (name, metadata[0]))

# Parse the optional values - sample rate & tags.
sample_rate = 1
tags = None
for m in metadata[2:]:
# Parse the sample rate
if m[0] == '@':
sample_rate = float(m[1:])
assert 0 <= sample_rate <= 1
elif m[0] == '#':
tags = tuple(sorted(m[1:].split(',')))

# Submit the metric
mtype = metadata[1]
self.submit_metric(name, value, mtype, tags=tags, sample_rate=sample_rate)
for packet in packets.split("\n"):

if not packet.strip():
continue

if packet.startswith('_e'):
self.event_count += 1
event = self.parse_event_packet(packet)
self.event(**event)
else:
self.count += 1
name, value, mtype, tags, sample_rate = self.parse_metric_packet(packet)
self.submit_metric(name, value, mtype, tags=tags, sample_rate=sample_rate)

def submit_metric(self, name, value, mtype, tags=None, hostname=None,
device_name=None, timestamp=None, sample_rate=1):
Expand Down Expand Up @@ -354,6 +405,32 @@ def histogram(self, name, value, tags=None, hostname=None, device_name=None):
def set(self, name, value, tags=None, hostname=None, device_name=None):
self.submit_metric(name, value, 's', tags, hostname, device_name)

def event(self, title, text, date_happened=None, alert_type=None, aggregation_key=None, source_type_name=None, priority=None, tags=None, hostname=None):
event = {
'title': title,
'text': text,
}
if date_happened is not None:
event['date_happened'] = date_happened
else:
event['date_happened'] = int(time())
if alert_type is not None:
event['alert_type'] = alert_type
if aggregation_key is not None:
event['aggregation_key'] = aggregation_key
if source_type_name is not None:
event['source_type_name'] = source_type_name
if priority is not None:
event['priority'] = priority
if tags is not None:
event['tags'] = sorted(tags)
if hostname is not None:
event['host'] = hostname
else:
event['host'] = self.hostname

self.events.append(event)

def flush(self):
timestamp = time()
expiry_timestamp = timestamp - self.expiry_seconds
Expand All @@ -379,6 +456,17 @@ def flush(self):
self.count = 0
return metrics

def flush_events(self):
events = self.events
self.events = []

self.total_count += self.event_count
self.event_count = 0

log.debug("Received %d events since last flush" % len(events))

return events

def send_packet_count(self, metric_name):
self.submit_metric(metric_name, self.count, 'g')

Expand Down
5 changes: 4 additions & 1 deletion checks/check_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,12 +461,13 @@ class DogstatsdStatus(AgentStatus):
NAME = 'Dogstatsd'

def __init__(self, flush_count=0, packet_count=0, packets_per_second=0,
metric_count=0):
metric_count=0, event_count=0):
AgentStatus.__init__(self)
self.flush_count = flush_count
self.packet_count = packet_count
self.packets_per_second = packets_per_second
self.metric_count = metric_count
self.event_count = event_count

def has_error(self):
return self.flush_count == 0 and self.packet_count == 0 and self.metric_count == 0
Expand All @@ -477,6 +478,7 @@ def body_lines(self):
"Packet Count: %s" % self.packet_count,
"Packets per second: %s" % self.packets_per_second,
"Metric count: %s" % self.metric_count,
"Event count: %s" % self.event_count,
]
return lines

Expand All @@ -487,6 +489,7 @@ def to_dict(self):
'packet_count': self.packet_count,
'packets_per_second': self.packets_per_second,
'metric_count': self.metric_count,
'event_count': self.event_count,
})
return status_info

Expand Down
48 changes: 44 additions & 4 deletions dogstatsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@
UDP_SOCKET_TIMEOUT = 5
LOGGING_INTERVAL = 10

def serialize(metrics):
def serialize_metrics(metrics):
return json.dumps({"series" : metrics})

def serialize_event(event):
return json.dumps(event)

class Reporter(threading.Thread):
"""
The reporter periodically sends the aggregated metrics to the
Expand Down Expand Up @@ -109,21 +112,31 @@ def flush(self):
log.info("Flush #%s: flushing %s metrics" % (self.flush_count, count))
self.submit(metrics)

events = self.metrics_aggregator.flush_events()
event_count = len(events)
if not event_count:
log.info("Flush #%s: No events to flush." % self.flush_count)
else:
log.info("Flush #%s: flushing %s events" % (self.flush_count, len(events)))
self.submit_events(events)

# Persist a status message.
packet_count = self.metrics_aggregator.total_count
DogstatsdStatus(
flush_count=self.flush_count,
packet_count=packet_count,
packets_per_second=packets_per_second,
metric_count=count).persist()
metric_count=count,
event_count=event_count
).persist()

except:
log.exception("Error flushing metrics")

def submit(self, metrics):
# HACK - Copy and pasted from dogapi, because it's a bit of a pain to distribute python
# dependencies with the agent.
body = serialize(metrics)
body = serialize_metrics(metrics)
headers = {'Content-Type':'application/json'}
method = 'POST'

Expand All @@ -150,6 +163,33 @@ def submit(self, metrics):
status, method, self.api_host, url, duration))
return duration

def submit_events(self, events):
headers = {'Content-Type':'application/json'}
method = 'POST'

params = {}
if self.api_key:
params['api_key'] = self.api_key
url = '/api/v1/events?%s' % urlencode(params)

status = None
conn = self.http_conn_cls(self.api_host)
try:
for event in events:
start_time = time()
body = serialize_event(event)
log.debug('Sending event: %s' % body)
conn.request(method, url, body, headers)

response = conn.getresponse()
status = response.status
response.close()
duration = round((time() - start_time) * 1000.0, 4)
log.debug("%s %s %s%s (%sms)" % (
status, method, self.api_host, url, duration))
finally:
conn.close()

class Server(object):
"""
A statsd udp server.
Expand All @@ -160,7 +200,7 @@ def __init__(self, metrics_aggregator, host, port):
self.port = int(port)
self.address = (self.host, self.port)
self.metrics_aggregator = metrics_aggregator
self.buffer_size = 1024
self.buffer_size = 1024 * 8

# IPv4 only
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
Expand Down
1 change: 1 addition & 0 deletions pup/status.html
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ <h2>DogstatsD</h2>
<dt>Packet count</dt> <dd>{{ dogstatsd['packet_count'] }}</dd>
<dt>Packets per second</dt> <dd>{{ dogstatsd['packets_per_second'] }}</dd>
<dt>Metric count</dt> <dd>{{ dogstatsd['metric_count'] }}</dd>
<dt>Event count</dt> <dd>{{ dogstatsd['event_count'] }}</dd>
</dl>

</div>
Expand Down
Loading