diff --git a/aggregator.py b/aggregator.py index 9b3ec450c9..90cb0e14ee 100644 --- a/aggregator.py +++ b/aggregator.py @@ -391,7 +391,8 @@ class Aggregator(object): def __init__(self, hostname, interval=1.0, expiry_seconds=300, formatter=None, recent_point_threshold=None, - histogram_aggregates=None, histogram_percentiles=None): + histogram_aggregates=None, histogram_percentiles=None, + utf8_decoding=False): self.events = [] self.total_count = 0 self.count = 0 @@ -413,6 +414,8 @@ def __init__(self, hostname, interval=1.0, expiry_seconds=300, } } + self.utf8_decoding = utf8_decoding + def packets_per_second(self, interval): if interval == 0: return 0 @@ -495,7 +498,7 @@ def parse_event_packet(self, packet): # Event syntax: # _e{5,4}:title|body|meta name = name_and_metadata[0] - metadata = unicode(name_and_metadata[1]) + metadata = name_and_metadata[1] title_length, text_length = name.split(',') title_length = int(title_length[3:]) text_length = int(text_length[:-1]) @@ -525,8 +528,15 @@ def parse_event_packet(self, packet): raise Exception(u'Unparseable event packet: %s' % packet) def submit_packets(self, packets): - for packet in packets.splitlines(): + # We should probably consider that packets are always encoded + # in utf8, but decoding all packets has an perf overhead of 7% + # So we let the user decide if we wants utf8 by default + # Keep a very conservative approach anyhow + # Clients MUST always send UTF-8 encoded content + if self.utf8_decoding: + packets = unicode(packets, 'utf-8', errors='replace') + for packet in packets.splitlines(): if not packet.strip(): continue @@ -542,6 +552,7 @@ def submit_packets(self, packets): self.submit_metric(name, value, mtype, tags=tags, hostname=hostname, device_name=device_name, sample_rate=sample_rate) + def _extract_magic_tags(self, tags): """Magic tags (host, device) override metric hostname and device_name attributes""" hostname = None @@ -620,7 +631,8 @@ class MetricsBucketAggregator(Aggregator): def __init__(self, hostname, interval=1.0, expiry_seconds=300, formatter=None, recent_point_threshold=None, - histogram_aggregates=None, histogram_percentiles=None): + histogram_aggregates=None, histogram_percentiles=None, + utf8_decoding=False): super(MetricsBucketAggregator, self).__init__( hostname, interval, @@ -628,7 +640,8 @@ def __init__(self, hostname, interval=1.0, expiry_seconds=300, formatter, recent_point_threshold, histogram_aggregates, - histogram_percentiles + histogram_percentiles, + utf8_decoding ) self.metric_by_bucket = {} self.last_sample_time_by_context = {} @@ -758,7 +771,8 @@ class MetricsAggregator(Aggregator): def __init__(self, hostname, interval=1.0, expiry_seconds=300, formatter=None, recent_point_threshold=None, - histogram_aggregates=None, histogram_percentiles=None): + histogram_aggregates=None, histogram_percentiles=None, + utf8_decoding=False): super(MetricsAggregator, self).__init__( hostname, interval, @@ -766,7 +780,8 @@ def __init__(self, hostname, interval=1.0, expiry_seconds=300, formatter, recent_point_threshold, histogram_aggregates, - histogram_percentiles + histogram_percentiles, + utf8_decoding ) self.metrics = {} self.metric_type_to_class = { diff --git a/config.py b/config.py index b3741a36fe..9c977c0f9b 100644 --- a/config.py +++ b/config.py @@ -289,6 +289,7 @@ def get_config(parse_args=True, cfg_path=None, options=None): 'additional_checksd': '/etc/dd-agent/checks.d/', 'bind_host': get_default_bind_host(), 'statsd_metric_namespace': None, + 'utf8_decoding': False } # Config handling @@ -489,6 +490,10 @@ def get_config(parse_args=True, cfg_path=None, options=None): if config.has_option("Main", "collect_ec2_tags"): agentConfig["collect_ec2_tags"] = _is_affirmative(config.get("Main", "collect_ec2_tags")) + agentConfig["utf8_decoding"] = False + if config.has_option("Main", "utf8_decoding"): + agentConfig["utf8_decoding"] = _is_affirmative(config.get("Main", "utf8_decoding")) + except ConfigParser.NoSectionError, e: sys.stderr.write('Config file not found or incorrectly formatted.\n') sys.exit(2) diff --git a/datadog.conf.example b/datadog.conf.example index 094057d694..bc81f531b4 100644 --- a/datadog.conf.example +++ b/datadog.conf.example @@ -116,6 +116,13 @@ use_mount: no # like `metric.name` will instead become `namespace.metric.name` # statsd_metric_namespace: +# By default, dogstatsd supports only plain ASCII packets. However, most +# (dog)statsd client support UTF8 by encoding packets before sending them +# this option enables UTF8 decoding in case you need it. +# However, it comes with a performance overhead of ~10% in the dogstatsd +# server. This will be taken care of properly in the new gen agent core. +# utf8_decoding: false + # ========================================================================== # # Service-specific configuration # # ========================================================================== # diff --git a/dogstatsd.py b/dogstatsd.py index 131ab9431b..0caee9cb4c 100755 --- a/dogstatsd.py +++ b/dogstatsd.py @@ -384,6 +384,7 @@ def init(config_path=None, use_watchdog=False, use_forwarder=False, args=None): formatter=get_formatter(c), histogram_aggregates=c.get('histogram_aggregates'), histogram_percentiles=c.get('histogram_percentiles'), + utf8_decoding=c['utf8_decoding'] ) # Start the reporting thread. diff --git a/tests/performance/benchmark_aggregator.py b/tests/performance/benchmark_aggregator.py index dd25ba9f01..d20d8b11c6 100644 --- a/tests/performance/benchmark_aggregator.py +++ b/tests/performance/benchmark_aggregator.py @@ -1,13 +1,10 @@ +# -*- coding: utf-8 -*- """ Performance tests for the agent/dogstatsd metrics aggregator. """ - - from aggregator import MetricsAggregator, MetricsBucketAggregator - - class TestAggregatorPerf(object): FLUSH_COUNT = 10 @@ -54,8 +51,63 @@ def test_checksd_aggregation_perf(self): ma.set('set.%s' % j, float(i)) ma.flush() + def create_event_packet(self, title, text): + p = "_e{{{title_len},{text_len}}}:{title}|{text}".format( + title_len=len(title), + text_len=len(text), + title=title, + text=text + ) + return p + + + def test_dogstatsd_utf8_events(self): + ma = MetricsBucketAggregator('my.host') + + for _ in xrange(self.FLUSH_COUNT): + for i in xrange(self.LOOPS_PER_FLUSH): + for j in xrange(self.METRIC_COUNT): + + ma.submit_packets(self.create_event_packet( + 'Τη γλώσσα μου έδωσαν ελληνική', + """τὸ σπίτι φτωχικὸ στὶς ἀμμουδιὲς τοῦ Ὁμήρου. Μονάχη ἔγνοια ἡ γλῶσσα μου στὶς ἀμμουδιὲς τοῦ Ὁμήρου. ἀπὸ τὸ Ἄξιον ἐστί τοῦ Ὀδυσσέα Ἐλύτη""" + )) + ma.submit_packets(self.create_event_packet( + 'ვეპხის ტყაოსანი შოთა რუსთაველი', + """ღმერთსი შემვედრე, ნუთუ კვლა დამხსნას სოფლისა შრომასა, ცეცხლს, წყალსა და მიწასა, ჰაერთა თანა მრომასა; მომცნეს ფრთენი და აღვფრინდე, მივჰხვდე მას ჩემსა ნდომასა, დღისით და ღამით ვჰხედვიდე მზისა ელვათა კრთომაასა. + """ + )) + ma.submit_packets(self.create_event_packet( + 'Traité sur la tolérance', + """Ose supposer qu'un Ministre éclairé & magnanime, un Prélat humain & sage, un Prince qui sait que son intérêt consiste dans le grand nombre de ses Sujets, & sa gloire dans leur bonheur, daigne jetter les yeux sur cet Ecrit informe & défectueux; il y supplée par ses propres lumieres; il se dit à lui-même: Que risquerai-je à voir la terre cultivée & ornée par plus de mains laborieuses, les tributs augmentés, l'Etat plus florissant?""" + )) + + ma.flush() + + def test_dogstatsd_ascii_events(self): + ma = MetricsBucketAggregator('my.host') + + for _ in xrange(self.FLUSH_COUNT): + for i in xrange(self.LOOPS_PER_FLUSH): + for j in xrange(self.METRIC_COUNT): + + ma.submit_packets(self.create_event_packet( + 'asldkfj fdsaljfas dflksjafs fasdfkjaldsfkjasldf', + """alkdjfa slfalskdfjas lkfdjaoisudhfalsdkjbfaksdhfbasjdk fa;sf ljda fsafksadfh alsdjfhaskjdfgahls d;fjasdlkfh9823udjs dlfhaspdf98as ufdaksjhfaisdhufalskdjfhas df""" + )) + ma.submit_packets(self.create_event_packet( + 'kdjfsofuousodifu982309rijdfsljsd dfsdf sdf', + """dflskjdfs8d9fsdfjs sldfjka ;dlfjapfoia jsdflakjsdfp 0adsfuolwejf wflsdjf lsdkjf0saoiufja dlfjasd of;lasdjf ;askdjf asodfhas lkmfbashudf asd,fasdfna s,dfjas lcjx vjaskdlfjals dfkjasdflk jasldfkj asldkfjas ldfkasjdf a""" + )) + ma.submit_packets(self.create_event_packet( + 'asdf askdjf asldfkjsad lfkajsdlfksajd fasdfsdfdf', + """skdfjsld flskdjf alksdjfpasdofuapo sdfjalksdjf ;as.kjdf ;ljLKJL :KJL:KJ l;kdsjf ;lkj :Lkj FLDKFJ LSKFDJ ;LDFJ SLKDJF KSDLjf: Lfjldkj fLKDSJf lSKDjf ls;kdjf s;lkfjs L:KAJ :LFKJDL:DKjf L:SKjf;lKDJfl;SKJDf :LKSDj;lsdfj fsdljfsd ofisunafoialjsflmsdifjas;dlkfaj sdfkasjd flaksjdfnpmsao;difjkas dfnlaksdfa;sodljfas lfdjasdflmajsdlfknaf98wouanepr9qo3ud fadspuf oaisdufpoasid fj askdjn LKJH LKJHFL KJDHSF DSFLHSL JKDFHLSK DJFHLS KJDFHS""" + )) + + ma.flush() if __name__ == '__main__': t = TestAggregatorPerf() - t.test_dogstatsd_aggregation_perf() + #t.test_dogstatsd_aggregation_perf() #t.test_checksd_aggregation_perf() + t.test_dogstatsd_utf8_events() diff --git a/tests/test_dogstatsd.py b/tests/test_dogstatsd.py index c2cabb4b15..0960d68adf 100644 --- a/tests/test_dogstatsd.py +++ b/tests/test_dogstatsd.py @@ -624,9 +624,9 @@ def test_event_tags(self): nt.assert_equal(fourth['tags'], sorted(['t1', 't2'])) def test_event_title(self): - stats = MetricsAggregator('myhost') + stats = MetricsAggregator('myhost', utf8_decoding=True) stats.submit_packets('_e{0,4}:|text') - stats.submit_packets(u'_e{9,4}:2intitulé|text') + stats.submit_packets(u'_e{9,4}:2intitulé|text'.encode('utf-8')) # comes from socket stats.submit_packets('_e{14,4}:3title content|text') stats.submit_packets('_e{14,4}:4title|content|text') stats.submit_packets('_e{13,4}:5title\\ntitle|text') # \n stays escaped @@ -634,30 +634,42 @@ def test_event_title(self): events = self.sort_events(stats.flush_events()) assert len(events) == 5 - first, second, third, fourth, fifth = events - nt.assert_equal(first['msg_title'], '') - nt.assert_equal(second['msg_title'], u'2intitulé') - nt.assert_equal(third['msg_title'], '3title content') - nt.assert_equal(fourth['msg_title'], '4title|content') - nt.assert_equal(fifth['msg_title'], '5title\\ntitle') + nt.assert_equal(events[0]['msg_title'], '') + nt.assert_equal(events[1]['msg_title'], u'2intitulé') + nt.assert_equal(events[2]['msg_title'], '3title content') + nt.assert_equal(events[3]['msg_title'], '4title|content') + nt.assert_equal(events[4]['msg_title'], '5title\\ntitle') def test_event_text(self): stats = MetricsAggregator('myhost') stats.submit_packets('_e{2,0}:t1|') stats.submit_packets('_e{2,12}:t2|text|content') stats.submit_packets('_e{2,23}:t3|First line\\nSecond line') # \n is a newline - stats.submit_packets(u'_e{2,19}:t4|♬ †øU †øU ¥ºu T0µ ♪') # utf-8 compliant events = self.sort_events(stats.flush_events()) - assert len(events) == 4 - first, second, third, fourth = events + assert len(events) == 3 + + nt.assert_equal(events[0]['msg_text'], '') + nt.assert_equal(events[1]['msg_text'], 'text|content') + nt.assert_equal(events[2]['msg_text'], 'First line\nSecond line') + + def test_event_text_utf8(self): + stats = MetricsAggregator('myhost', utf8_decoding=True) + # Should raise because content is not encoded + + self.assertRaises(Exception, stats.submit_packets, u'_e{2,19}:t4|♬ †øU †øU ¥ºu T0µ ♪') + stats.submit_packets(u'_e{2,19}:t4|♬ †øU †øU ¥ºu T0µ ♪'.encode('utf-8')) # utf-8 compliant + # Normal packet + stats.submit_packets('_e{2,23}:t3|First line\\nSecond line') # \n is a newline + + events = self.sort_events(stats.flush_events()) + + assert len(events) == 2 - nt.assert_equal(first['msg_text'], '') - nt.assert_equal(second['msg_text'], 'text|content') - nt.assert_equal(third['msg_text'], 'First line\nSecond line') - nt.assert_equal(fourth['msg_text'], u'♬ †øU †øU ¥ºu T0µ ♪') + nt.assert_equal(events[0]['msg_text'], 'First line\nSecond line') + nt.assert_equal(events[1]['msg_text'], u'♬ †øU †øU ¥ºu T0µ ♪') def test_recent_point_threshold(self): threshold = 100