Skip to content

Commit

Permalink
Merge pull request #1279 from DataDog/leo/dogstatsdutf8
Browse files Browse the repository at this point in the history
[dogstatsd] handle properly utf8 packets
  • Loading branch information
LeoCavaille committed Jan 28, 2015
2 parents 9e309ed + ebf3625 commit fbbf4de
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 27 deletions.
29 changes: 22 additions & 7 deletions aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,8 @@ class Aggregator(object):

def __init__(self, hostname, interval=1.0, expiry_seconds=300,
formatter=None, recent_point_threshold=None,
histogram_aggregates=None, histogram_percentiles=None):
histogram_aggregates=None, histogram_percentiles=None,
utf8_decoding=False):
self.events = []
self.total_count = 0
self.count = 0
Expand All @@ -413,6 +414,8 @@ def __init__(self, hostname, interval=1.0, expiry_seconds=300,
}
}

self.utf8_decoding = utf8_decoding

def packets_per_second(self, interval):
if interval == 0:
return 0
Expand Down Expand Up @@ -495,7 +498,7 @@ def parse_event_packet(self, packet):
# Event syntax:
# _e{5,4}:title|body|meta
name = name_and_metadata[0]
metadata = unicode(name_and_metadata[1])
metadata = name_and_metadata[1]
title_length, text_length = name.split(',')
title_length = int(title_length[3:])
text_length = int(text_length[:-1])
Expand Down Expand Up @@ -525,8 +528,15 @@ def parse_event_packet(self, packet):
raise Exception(u'Unparseable event packet: %s' % packet)

def submit_packets(self, packets):
for packet in packets.splitlines():
# We should probably consider that packets are always encoded
# in utf8, but decoding all packets has an perf overhead of 7%
# So we let the user decide if we wants utf8 by default
# Keep a very conservative approach anyhow
# Clients MUST always send UTF-8 encoded content
if self.utf8_decoding:
packets = unicode(packets, 'utf-8', errors='replace')

for packet in packets.splitlines():
if not packet.strip():
continue

Expand All @@ -542,6 +552,7 @@ def submit_packets(self, packets):
self.submit_metric(name, value, mtype, tags=tags, hostname=hostname,
device_name=device_name, sample_rate=sample_rate)


def _extract_magic_tags(self, tags):
"""Magic tags (host, device) override metric hostname and device_name attributes"""
hostname = None
Expand Down Expand Up @@ -620,15 +631,17 @@ class MetricsBucketAggregator(Aggregator):

def __init__(self, hostname, interval=1.0, expiry_seconds=300,
formatter=None, recent_point_threshold=None,
histogram_aggregates=None, histogram_percentiles=None):
histogram_aggregates=None, histogram_percentiles=None,
utf8_decoding=False):
super(MetricsBucketAggregator, self).__init__(
hostname,
interval,
expiry_seconds,
formatter,
recent_point_threshold,
histogram_aggregates,
histogram_percentiles
histogram_percentiles,
utf8_decoding
)
self.metric_by_bucket = {}
self.last_sample_time_by_context = {}
Expand Down Expand Up @@ -758,15 +771,17 @@ class MetricsAggregator(Aggregator):

def __init__(self, hostname, interval=1.0, expiry_seconds=300,
formatter=None, recent_point_threshold=None,
histogram_aggregates=None, histogram_percentiles=None):
histogram_aggregates=None, histogram_percentiles=None,
utf8_decoding=False):
super(MetricsAggregator, self).__init__(
hostname,
interval,
expiry_seconds,
formatter,
recent_point_threshold,
histogram_aggregates,
histogram_percentiles
histogram_percentiles,
utf8_decoding
)
self.metrics = {}
self.metric_type_to_class = {
Expand Down
5 changes: 5 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ def get_config(parse_args=True, cfg_path=None, options=None):
'additional_checksd': '/etc/dd-agent/checks.d/',
'bind_host': get_default_bind_host(),
'statsd_metric_namespace': None,
'utf8_decoding': False
}

# Config handling
Expand Down Expand Up @@ -489,6 +490,10 @@ def get_config(parse_args=True, cfg_path=None, options=None):
if config.has_option("Main", "collect_ec2_tags"):
agentConfig["collect_ec2_tags"] = _is_affirmative(config.get("Main", "collect_ec2_tags"))

agentConfig["utf8_decoding"] = False
if config.has_option("Main", "utf8_decoding"):
agentConfig["utf8_decoding"] = _is_affirmative(config.get("Main", "utf8_decoding"))

except ConfigParser.NoSectionError, e:
sys.stderr.write('Config file not found or incorrectly formatted.\n')
sys.exit(2)
Expand Down
7 changes: 7 additions & 0 deletions datadog.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ use_mount: no
# like `metric.name` will instead become `namespace.metric.name`
# statsd_metric_namespace:

# By default, dogstatsd supports only plain ASCII packets. However, most
# (dog)statsd client support UTF8 by encoding packets before sending them
# this option enables UTF8 decoding in case you need it.
# However, it comes with a performance overhead of ~10% in the dogstatsd
# server. This will be taken care of properly in the new gen agent core.
# utf8_decoding: false

# ========================================================================== #
# Service-specific configuration #
# ========================================================================== #
Expand Down
1 change: 1 addition & 0 deletions dogstatsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ def init(config_path=None, use_watchdog=False, use_forwarder=False, args=None):
formatter=get_formatter(c),
histogram_aggregates=c.get('histogram_aggregates'),
histogram_percentiles=c.get('histogram_percentiles'),
utf8_decoding=c['utf8_decoding']
)

# Start the reporting thread.
Expand Down
62 changes: 57 additions & 5 deletions tests/performance/benchmark_aggregator.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
# -*- coding: utf-8 -*-
"""
Performance tests for the agent/dogstatsd metrics aggregator.
"""


from aggregator import MetricsAggregator, MetricsBucketAggregator




class TestAggregatorPerf(object):

FLUSH_COUNT = 10
Expand Down Expand Up @@ -54,8 +51,63 @@ def test_checksd_aggregation_perf(self):
ma.set('set.%s' % j, float(i))
ma.flush()

def create_event_packet(self, title, text):
p = "_e{{{title_len},{text_len}}}:{title}|{text}".format(
title_len=len(title),
text_len=len(text),
title=title,
text=text
)
return p


def test_dogstatsd_utf8_events(self):
ma = MetricsBucketAggregator('my.host')

for _ in xrange(self.FLUSH_COUNT):
for i in xrange(self.LOOPS_PER_FLUSH):
for j in xrange(self.METRIC_COUNT):

ma.submit_packets(self.create_event_packet(
'Τη γλώσσα μου έδωσαν ελληνική',
"""τὸ σπίτι φτωχικὸ στὶς ἀμμουδιὲς τοῦ Ὁμήρου. Μονάχη ἔγνοια ἡ γλῶσσα μου στὶς ἀμμουδιὲς τοῦ Ὁμήρου. ἀπὸ τὸ Ἄξιον ἐστί τοῦ Ὀδυσσέα Ἐλύτη"""
))
ma.submit_packets(self.create_event_packet(
'ვეპხის ტყაოსანი შოთა რუსთაველი',
"""ღმერთსი შემვედრე, ნუთუ კვლა დამხსნას სოფლისა შრომასა, ცეცხლს, წყალსა და მიწასა, ჰაერთა თანა მრომასა; მომცნეს ფრთენი და აღვფრინდე, მივჰხვდე მას ჩემსა ნდომასა, დღისით და ღამით ვჰხედვიდე მზისა ელვათა კრთომაასა.
"""
))
ma.submit_packets(self.create_event_packet(
'Traité sur la tolérance',
"""Ose supposer qu'un Ministre éclairé & magnanime, un Prélat humain & sage, un Prince qui sait que son intérêt consiste dans le grand nombre de ses Sujets, & sa gloire dans leur bonheur, daigne jetter les yeux sur cet Ecrit informe & défectueux; il y supplée par ses propres lumieres; il se dit à lui-même: Que risquerai-je à voir la terre cultivée & ornée par plus de mains laborieuses, les tributs augmentés, l'Etat plus florissant?"""
))

ma.flush()

def test_dogstatsd_ascii_events(self):
ma = MetricsBucketAggregator('my.host')

for _ in xrange(self.FLUSH_COUNT):
for i in xrange(self.LOOPS_PER_FLUSH):
for j in xrange(self.METRIC_COUNT):

ma.submit_packets(self.create_event_packet(
'asldkfj fdsaljfas dflksjafs fasdfkjaldsfkjasldf',
"""alkdjfa slfalskdfjas lkfdjaoisudhfalsdkjbfaksdhfbasjdk fa;sf ljda fsafksadfh alsdjfhaskjdfgahls d;fjasdlkfh9823udjs dlfhaspdf98as ufdaksjhfaisdhufalskdjfhas df"""
))
ma.submit_packets(self.create_event_packet(
'kdjfsofuousodifu982309rijdfsljsd dfsdf sdf',
"""dflskjdfs8d9fsdfjs sldfjka ;dlfjapfoia jsdflakjsdfp 0adsfuolwejf wflsdjf lsdkjf0saoiufja dlfjasd of;lasdjf ;askdjf asodfhas lkmfbashudf asd,fasdfna s,dfjas lcjx vjaskdlfjals dfkjasdflk jasldfkj asldkfjas ldfkasjdf a"""
))
ma.submit_packets(self.create_event_packet(
'asdf askdjf asldfkjsad lfkajsdlfksajd fasdfsdfdf',
"""skdfjsld flskdjf alksdjfpasdofuapo sdfjalksdjf ;as.kjdf ;ljLKJL :KJL:KJ l;kdsjf ;lkj :Lkj FLDKFJ LSKFDJ ;LDFJ SLKDJF KSDLjf: Lfjldkj fLKDSJf lSKDjf ls;kdjf s;lkfjs L:KAJ :LFKJDL:DKjf L:SKjf;lKDJfl;SKJDf :LKSDj;lsdfj fsdljfsd ofisunafoialjsflmsdifjas;dlkfaj sdfkasjd flaksjdfnpmsao;difjkas dfnlaksdfa;sodljfas lfdjasdflmajsdlfknaf98wouanepr9qo3ud fadspuf oaisdufpoasid fj askdjn LKJH LKJHFL KJDHSF DSFLHSL JKDFHLSK DJFHLS KJDFHS"""
))

ma.flush()

if __name__ == '__main__':
t = TestAggregatorPerf()
t.test_dogstatsd_aggregation_perf()
#t.test_dogstatsd_aggregation_perf()
#t.test_checksd_aggregation_perf()
t.test_dogstatsd_utf8_events()
42 changes: 27 additions & 15 deletions tests/test_dogstatsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,40 +624,52 @@ def test_event_tags(self):
nt.assert_equal(fourth['tags'], sorted(['t1', 't2']))

def test_event_title(self):
stats = MetricsAggregator('myhost')
stats = MetricsAggregator('myhost', utf8_decoding=True)
stats.submit_packets('_e{0,4}:|text')
stats.submit_packets(u'_e{9,4}:2intitulé|text')
stats.submit_packets(u'_e{9,4}:2intitulé|text'.encode('utf-8')) # comes from socket
stats.submit_packets('_e{14,4}:3title content|text')
stats.submit_packets('_e{14,4}:4title|content|text')
stats.submit_packets('_e{13,4}:5title\\ntitle|text') # \n stays escaped

events = self.sort_events(stats.flush_events())

assert len(events) == 5
first, second, third, fourth, fifth = events

nt.assert_equal(first['msg_title'], '')
nt.assert_equal(second['msg_title'], u'2intitulé')
nt.assert_equal(third['msg_title'], '3title content')
nt.assert_equal(fourth['msg_title'], '4title|content')
nt.assert_equal(fifth['msg_title'], '5title\\ntitle')
nt.assert_equal(events[0]['msg_title'], '')
nt.assert_equal(events[1]['msg_title'], u'2intitulé')
nt.assert_equal(events[2]['msg_title'], '3title content')
nt.assert_equal(events[3]['msg_title'], '4title|content')
nt.assert_equal(events[4]['msg_title'], '5title\\ntitle')

def test_event_text(self):
stats = MetricsAggregator('myhost')
stats.submit_packets('_e{2,0}:t1|')
stats.submit_packets('_e{2,12}:t2|text|content')
stats.submit_packets('_e{2,23}:t3|First line\\nSecond line') # \n is a newline
stats.submit_packets(u'_e{2,19}:t4|♬ †øU †øU ¥ºu T0µ ♪') # utf-8 compliant

events = self.sort_events(stats.flush_events())

assert len(events) == 4
first, second, third, fourth = events
assert len(events) == 3

nt.assert_equal(events[0]['msg_text'], '')
nt.assert_equal(events[1]['msg_text'], 'text|content')
nt.assert_equal(events[2]['msg_text'], 'First line\nSecond line')

def test_event_text_utf8(self):
stats = MetricsAggregator('myhost', utf8_decoding=True)
# Should raise because content is not encoded

self.assertRaises(Exception, stats.submit_packets, u'_e{2,19}:t4|♬ †øU †øU ¥ºu T0µ ♪')
stats.submit_packets(u'_e{2,19}:t4|♬ †øU †øU ¥ºu T0µ ♪'.encode('utf-8')) # utf-8 compliant
# Normal packet
stats.submit_packets('_e{2,23}:t3|First line\\nSecond line') # \n is a newline

events = self.sort_events(stats.flush_events())

assert len(events) == 2

nt.assert_equal(first['msg_text'], '')
nt.assert_equal(second['msg_text'], 'text|content')
nt.assert_equal(third['msg_text'], 'First line\nSecond line')
nt.assert_equal(fourth['msg_text'], u'♬ †øU †øU ¥ºu T0µ ♪')
nt.assert_equal(events[0]['msg_text'], 'First line\nSecond line')
nt.assert_equal(events[1]['msg_text'], u'♬ †øU †øU ¥ºu T0µ ♪')

def test_recent_point_threshold(self):
threshold = 100
Expand Down

0 comments on commit fbbf4de

Please sign in to comment.