Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dogstatsd] handle properly utf8 packets #1279

Merged
merged 1 commit into from
Jan 28, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,8 @@ class Aggregator(object):

def __init__(self, hostname, interval=1.0, expiry_seconds=300,
formatter=None, recent_point_threshold=None,
histogram_aggregates=None, histogram_percentiles=None):
histogram_aggregates=None, histogram_percentiles=None,
utf8_decoding=False):
self.events = []
self.total_count = 0
self.count = 0
Expand All @@ -413,6 +414,8 @@ def __init__(self, hostname, interval=1.0, expiry_seconds=300,
}
}

self.utf8_decoding = utf8_decoding

def packets_per_second(self, interval):
if interval == 0:
return 0
Expand Down Expand Up @@ -495,7 +498,7 @@ def parse_event_packet(self, packet):
# Event syntax:
# _e{5,4}:title|body|meta
name = name_and_metadata[0]
metadata = unicode(name_and_metadata[1])
metadata = name_and_metadata[1]
title_length, text_length = name.split(',')
title_length = int(title_length[3:])
text_length = int(text_length[:-1])
Expand Down Expand Up @@ -525,8 +528,15 @@ def parse_event_packet(self, packet):
raise Exception(u'Unparseable event packet: %s' % packet)

def submit_packets(self, packets):
for packet in packets.splitlines():
# We should probably consider that packets are always encoded
# in utf8, but decoding all packets has an perf overhead of 7%
# So we let the user decide if we wants utf8 by default
# Keep a very conservative approach anyhow
# Clients MUST always send UTF-8 encoded content
if self.utf8_decoding:
packets = unicode(packets, 'utf-8', errors='replace')

for packet in packets.splitlines():
if not packet.strip():
continue

Expand All @@ -542,6 +552,7 @@ def submit_packets(self, packets):
self.submit_metric(name, value, mtype, tags=tags, hostname=hostname,
device_name=device_name, sample_rate=sample_rate)


def _extract_magic_tags(self, tags):
"""Magic tags (host, device) override metric hostname and device_name attributes"""
hostname = None
Expand Down Expand Up @@ -620,15 +631,17 @@ class MetricsBucketAggregator(Aggregator):

def __init__(self, hostname, interval=1.0, expiry_seconds=300,
formatter=None, recent_point_threshold=None,
histogram_aggregates=None, histogram_percentiles=None):
histogram_aggregates=None, histogram_percentiles=None,
utf8_decoding=False):
super(MetricsBucketAggregator, self).__init__(
hostname,
interval,
expiry_seconds,
formatter,
recent_point_threshold,
histogram_aggregates,
histogram_percentiles
histogram_percentiles,
utf8_decoding
)
self.metric_by_bucket = {}
self.last_sample_time_by_context = {}
Expand Down Expand Up @@ -758,15 +771,17 @@ class MetricsAggregator(Aggregator):

def __init__(self, hostname, interval=1.0, expiry_seconds=300,
formatter=None, recent_point_threshold=None,
histogram_aggregates=None, histogram_percentiles=None):
histogram_aggregates=None, histogram_percentiles=None,
utf8_decoding=False):
super(MetricsAggregator, self).__init__(
hostname,
interval,
expiry_seconds,
formatter,
recent_point_threshold,
histogram_aggregates,
histogram_percentiles
histogram_percentiles,
utf8_decoding
)
self.metrics = {}
self.metric_type_to_class = {
Expand Down
5 changes: 5 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ def get_config(parse_args=True, cfg_path=None, options=None):
'additional_checksd': '/etc/dd-agent/checks.d/',
'bind_host': get_default_bind_host(),
'statsd_metric_namespace': None,
'utf8_decoding': False
}

# Config handling
Expand Down Expand Up @@ -489,6 +490,10 @@ def get_config(parse_args=True, cfg_path=None, options=None):
if config.has_option("Main", "collect_ec2_tags"):
agentConfig["collect_ec2_tags"] = _is_affirmative(config.get("Main", "collect_ec2_tags"))

agentConfig["utf8_decoding"] = False
if config.has_option("Main", "utf8_decoding"):
agentConfig["utf8_decoding"] = _is_affirmative(config.get("Main", "utf8_decoding"))

except ConfigParser.NoSectionError, e:
sys.stderr.write('Config file not found or incorrectly formatted.\n')
sys.exit(2)
Expand Down
7 changes: 7 additions & 0 deletions datadog.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ use_mount: no
# like `metric.name` will instead become `namespace.metric.name`
# statsd_metric_namespace:

# By default, dogstatsd supports only plain ASCII packets. However, most
# (dog)statsd client support UTF8 by encoding packets before sending them
# this option enables UTF8 decoding in case you need it.
# However, it comes with a performance overhead of ~10% in the dogstatsd
# server. This will be taken care of properly in the new gen agent core.
# utf8_decoding: false

# ========================================================================== #
# Service-specific configuration #
# ========================================================================== #
Expand Down
1 change: 1 addition & 0 deletions dogstatsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ def init(config_path=None, use_watchdog=False, use_forwarder=False, args=None):
formatter=get_formatter(c),
histogram_aggregates=c.get('histogram_aggregates'),
histogram_percentiles=c.get('histogram_percentiles'),
utf8_decoding=c['utf8_decoding']
)

# Start the reporting thread.
Expand Down
62 changes: 57 additions & 5 deletions tests/performance/benchmark_aggregator.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
# -*- coding: utf-8 -*-
"""
Performance tests for the agent/dogstatsd metrics aggregator.
"""


from aggregator import MetricsAggregator, MetricsBucketAggregator




class TestAggregatorPerf(object):

FLUSH_COUNT = 10
Expand Down Expand Up @@ -54,8 +51,63 @@ def test_checksd_aggregation_perf(self):
ma.set('set.%s' % j, float(i))
ma.flush()

def create_event_packet(self, title, text):
p = "_e{{{title_len},{text_len}}}:{title}|{text}".format(
title_len=len(title),
text_len=len(text),
title=title,
text=text
)
return p


def test_dogstatsd_utf8_events(self):
ma = MetricsBucketAggregator('my.host')

for _ in xrange(self.FLUSH_COUNT):
for i in xrange(self.LOOPS_PER_FLUSH):
for j in xrange(self.METRIC_COUNT):

ma.submit_packets(self.create_event_packet(
'Τη γλώσσα μου έδωσαν ελληνική',
"""τὸ σπίτι φτωχικὸ στὶς ἀμμουδιὲς τοῦ Ὁμήρου. Μονάχη ἔγνοια ἡ γλῶσσα μου στὶς ἀμμουδιὲς τοῦ Ὁμήρου. ἀπὸ τὸ Ἄξιον ἐστί τοῦ Ὀδυσσέα Ἐλύτη"""
))
ma.submit_packets(self.create_event_packet(
'ვეპხის ტყაოსანი შოთა რუსთაველი',
"""ღმერთსი შემვედრე, ნუთუ კვლა დამხსნას სოფლისა შრომასა, ცეცხლს, წყალსა და მიწასა, ჰაერთა თანა მრომასა; მომცნეს ფრთენი და აღვფრინდე, მივჰხვდე მას ჩემსა ნდომასა, დღისით და ღამით ვჰხედვიდე მზისა ელვათა კრთომაასა.
"""
))
ma.submit_packets(self.create_event_packet(
'Traité sur la tolérance',
"""Ose supposer qu'un Ministre éclairé & magnanime, un Prélat humain & sage, un Prince qui sait que son intérêt consiste dans le grand nombre de ses Sujets, & sa gloire dans leur bonheur, daigne jetter les yeux sur cet Ecrit informe & défectueux; il y supplée par ses propres lumieres; il se dit à lui-même: Que risquerai-je à voir la terre cultivée & ornée par plus de mains laborieuses, les tributs augmentés, l'Etat plus florissant?"""
))

ma.flush()

def test_dogstatsd_ascii_events(self):
ma = MetricsBucketAggregator('my.host')

for _ in xrange(self.FLUSH_COUNT):
for i in xrange(self.LOOPS_PER_FLUSH):
for j in xrange(self.METRIC_COUNT):

ma.submit_packets(self.create_event_packet(
'asldkfj fdsaljfas dflksjafs fasdfkjaldsfkjasldf',
"""alkdjfa slfalskdfjas lkfdjaoisudhfalsdkjbfaksdhfbasjdk fa;sf ljda fsafksadfh alsdjfhaskjdfgahls d;fjasdlkfh9823udjs dlfhaspdf98as ufdaksjhfaisdhufalskdjfhas df"""
))
ma.submit_packets(self.create_event_packet(
'kdjfsofuousodifu982309rijdfsljsd dfsdf sdf',
"""dflskjdfs8d9fsdfjs sldfjka ;dlfjapfoia jsdflakjsdfp 0adsfuolwejf wflsdjf lsdkjf0saoiufja dlfjasd of;lasdjf ;askdjf asodfhas lkmfbashudf asd,fasdfna s,dfjas lcjx vjaskdlfjals dfkjasdflk jasldfkj asldkfjas ldfkasjdf a"""
))
ma.submit_packets(self.create_event_packet(
'asdf askdjf asldfkjsad lfkajsdlfksajd fasdfsdfdf',
"""skdfjsld flskdjf alksdjfpasdofuapo sdfjalksdjf ;as.kjdf ;ljLKJL :KJL:KJ l;kdsjf ;lkj :Lkj FLDKFJ LSKFDJ ;LDFJ SLKDJF KSDLjf: Lfjldkj fLKDSJf lSKDjf ls;kdjf s;lkfjs L:KAJ :LFKJDL:DKjf L:SKjf;lKDJfl;SKJDf :LKSDj;lsdfj fsdljfsd ofisunafoialjsflmsdifjas;dlkfaj sdfkasjd flaksjdfnpmsao;difjkas dfnlaksdfa;sodljfas lfdjasdflmajsdlfknaf98wouanepr9qo3ud fadspuf oaisdufpoasid fj askdjn LKJH LKJHFL KJDHSF DSFLHSL JKDFHLSK DJFHLS KJDFHS"""
))

ma.flush()

if __name__ == '__main__':
t = TestAggregatorPerf()
t.test_dogstatsd_aggregation_perf()
#t.test_dogstatsd_aggregation_perf()
#t.test_checksd_aggregation_perf()
t.test_dogstatsd_utf8_events()
42 changes: 27 additions & 15 deletions tests/test_dogstatsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,40 +624,52 @@ def test_event_tags(self):
nt.assert_equal(fourth['tags'], sorted(['t1', 't2']))

def test_event_title(self):
stats = MetricsAggregator('myhost')
stats = MetricsAggregator('myhost', utf8_decoding=True)
stats.submit_packets('_e{0,4}:|text')
stats.submit_packets(u'_e{9,4}:2intitulé|text')
stats.submit_packets(u'_e{9,4}:2intitulé|text'.encode('utf-8')) # comes from socket
stats.submit_packets('_e{14,4}:3title content|text')
stats.submit_packets('_e{14,4}:4title|content|text')
stats.submit_packets('_e{13,4}:5title\\ntitle|text') # \n stays escaped

events = self.sort_events(stats.flush_events())

assert len(events) == 5
first, second, third, fourth, fifth = events

nt.assert_equal(first['msg_title'], '')
nt.assert_equal(second['msg_title'], u'2intitulé')
nt.assert_equal(third['msg_title'], '3title content')
nt.assert_equal(fourth['msg_title'], '4title|content')
nt.assert_equal(fifth['msg_title'], '5title\\ntitle')
nt.assert_equal(events[0]['msg_title'], '')
nt.assert_equal(events[1]['msg_title'], u'2intitulé')
nt.assert_equal(events[2]['msg_title'], '3title content')
nt.assert_equal(events[3]['msg_title'], '4title|content')
nt.assert_equal(events[4]['msg_title'], '5title\\ntitle')

def test_event_text(self):
stats = MetricsAggregator('myhost')
stats.submit_packets('_e{2,0}:t1|')
stats.submit_packets('_e{2,12}:t2|text|content')
stats.submit_packets('_e{2,23}:t3|First line\\nSecond line') # \n is a newline
stats.submit_packets(u'_e{2,19}:t4|♬ †øU †øU ¥ºu T0µ ♪') # utf-8 compliant

events = self.sort_events(stats.flush_events())

assert len(events) == 4
first, second, third, fourth = events
assert len(events) == 3

nt.assert_equal(events[0]['msg_text'], '')
nt.assert_equal(events[1]['msg_text'], 'text|content')
nt.assert_equal(events[2]['msg_text'], 'First line\nSecond line')

def test_event_text_utf8(self):
stats = MetricsAggregator('myhost', utf8_decoding=True)
# Should raise because content is not encoded

self.assertRaises(Exception, stats.submit_packets, u'_e{2,19}:t4|♬ †øU †øU ¥ºu T0µ ♪')
stats.submit_packets(u'_e{2,19}:t4|♬ †øU †øU ¥ºu T0µ ♪'.encode('utf-8')) # utf-8 compliant
# Normal packet
stats.submit_packets('_e{2,23}:t3|First line\\nSecond line') # \n is a newline

events = self.sort_events(stats.flush_events())

assert len(events) == 2

nt.assert_equal(first['msg_text'], '')
nt.assert_equal(second['msg_text'], 'text|content')
nt.assert_equal(third['msg_text'], 'First line\nSecond line')
nt.assert_equal(fourth['msg_text'], u'♬ †øU †øU ¥ºu T0µ ♪')
nt.assert_equal(events[0]['msg_text'], 'First line\nSecond line')
nt.assert_equal(events[1]['msg_text'], u'♬ †øU †øU ¥ºu T0µ ♪')

def test_recent_point_threshold(self):
threshold = 100
Expand Down