From 677417fe12b1914e4322ac2c1fd1645cb0f1de31 Mon Sep 17 00:00:00 2001 From: Conor Branagan Date: Tue, 6 Jan 2015 16:25:36 +0000 Subject: [PATCH] Fix host tagging for external Elasticsearch clusters. When using an external Elasticsearch cluster, we should tag the hostname by the node's given hostname in the stats output. Otherwise we end up using only the last node's values because each subsequent gauge metric will stomp on the one before it. The test is updated to make sure the hostname given in the stats (which should be the FQDN) matches what we output at the end. Finally, this change also splits the cluster health metrics into a seperate dict. This has two benefits: (a) it will clean up a lot of debugging logging and extra work for `_process_health_data` and (b) makes it easy to check against just the stats metrics for tests. --- checks.d/elastic.py | 46 ++++++++++++++++++++++++++++--------------- tests/test_elastic.py | 19 ++++++++++++++++-- 2 files changed, 47 insertions(+), 18 deletions(-) diff --git a/checks.d/elastic.py b/checks.d/elastic.py index 3824a54a18..c523b6134b 100644 --- a/checks.d/elastic.py +++ b/checks.d/elastic.py @@ -34,7 +34,7 @@ class ESCheck(AgentCheck): DEFAULT_TIMEOUT = 5 - METRICS = { # Metrics that are common to all Elasticsearch versions + STATS_METRICS = { # Metrics that are common to all Elasticsearch versions "elasticsearch.docs.count": ("gauge", "indices.docs.count"), "elasticsearch.docs.deleted": ("gauge", "indices.docs.deleted"), "elasticsearch.store.size": ("gauge", "indices.store.size_in_bytes"), @@ -115,6 +115,9 @@ class ESCheck(AgentCheck): "jvm.mem.non_heap_used": ("gauge", "jvm.mem.non_heap_used_in_bytes"), "jvm.threads.count": ("gauge", "jvm.threads.count"), "jvm.threads.peak_count": ("gauge", "jvm.threads.peak_count"), + } + + CLUSTER_HEALTH_METRICS = { "elasticsearch.number_of_nodes": ("gauge", "number_of_nodes"), "elasticsearch.number_of_data_nodes": ("gauge", "number_of_data_nodes"), "elasticsearch.active_primary_shards": ("gauge", "active_primary_shards"), @@ -176,7 +179,7 @@ def check(self, instance): # Check ES version for this instance and define parameters # (URLs and metrics) accordingly version = self._get_es_version() - self._define_params(version) + self._define_params(version, self.curr_config.is_external) # Load stats data. stats_url = urlparse.urljoin(self.curr_config.url, self.STATS_URL) @@ -212,16 +215,21 @@ def _get_es_version(self): self.log.debug("Elasticsearch version is %s" % version) return version - def _define_params(self, version): + def _define_params(self, version, is_external): """ Define the set of URLs and METRICS to use depending on the running ES version. """ if version >= [0,90,10]: # ES versions 0.90.10 and above self.HEALTH_URL = "/_cluster/health?pretty=true" - self.STATS_URL = "/_nodes/_local/stats?all=true" self.NODES_URL = "/_nodes?network=true" + # For "external" clusters, we want to collect from all nodes. + if is_external: + self.STATS_URL = "/_nodes/stats?all=true" + else: + self.STATS_URL = "/_nodes/_local/stats?all=true" + additional_metrics = { "jvm.gc.collectors.young.count": ("gauge", "jvm.gc.collectors.young.collection_count"), "jvm.gc.collectors.young.collection_time": ("gauge", "jvm.gc.collectors.young.collection_time_in_millis", lambda v: float(v)/1000), @@ -242,7 +250,7 @@ def _define_params(self, version): "jvm.gc.collection_time": ("gauge", "jvm.gc.collection_time_in_millis", lambda v: float(v)/1000), } - self.METRICS.update(additional_metrics) + self.STATS_METRICS.update(additional_metrics) if version >= [0,90,5]: # ES versions 0.90.5 and above @@ -264,7 +272,7 @@ def _define_params(self, version): "elasticsearch.cache.filter.size": ("gauge", "indices.cache.filter_size_in_bytes"), } - self.METRICS.update(additional_metrics) + self.STATS_METRICS.update(additional_metrics) def _get_data(self, url): """ Hit a given URL and return the parsed json @@ -295,17 +303,22 @@ def _get_data(self, url): return resp.json() def _process_stats_data(self, data): + is_external = self.curr_config.is_external for node_name in data['nodes']: node_data = data['nodes'][node_name] # On newer version of ES it's "host" not "hostname" node_hostname = node_data.get('hostname', node_data.get('host', None)) - should_process = self.curr_config.is_external\ - or self.should_process_node(node_name, node_hostname) + should_process = is_external \ + or self.should_process_node(node_name, node_hostname) + + # Override the metric hostname if we're hitting an external cluster. + metric_hostname = node_hostname if is_external else None + if should_process: - for metric in self.METRICS: - desc = self.METRICS[metric] + for metric in self.STATS_METRICS: + desc = self.STATS_METRICS[metric] self._process_metric(node_data, metric, *desc, - tags=self.curr_config.tags) + tags=self.curr_config.tags, hostname=metric_hostname) def should_process_node(self, node_name, node_hostname): """ The node stats API will return stats for every node so we @@ -373,7 +386,8 @@ def _host_matches_node(self, primary_addrs): # Check the interface addresses against the primary address return primary_addrs in ips - def _process_metric(self, data, metric, xtype, path, xform=None, tags=None): + def _process_metric(self, data, metric, xtype, path, xform=None, + tags=None, hostname=None): """data: dictionary containing all the stats metric: datadog metric path: corresponding path in data, flattened, e.g. thread_pool.bulk.queue @@ -391,9 +405,9 @@ def _process_metric(self, data, metric, xtype, path, xform=None, tags=None): if value is not None: if xform: value = xform(value) if xtype == "gauge": - self.gauge(metric, value, tags=tags) + self.gauge(metric, value, tags=tags, hostname=hostname) else: - self.rate(metric, value, tags=tags) + self.rate(metric, value, tags=tags, hostname=hostname) else: self._metric_not_found(metric, path) @@ -409,9 +423,9 @@ def _process_health_data(self, data): event = self._create_event(data['status']) self.event(event) - for metric in self.METRICS: + for metric in self.CLUSTER_HEALTH_METRICS: # metric description - desc = self.METRICS[metric] + desc = self.CLUSTER_HEALTH_METRICS[metric] self._process_metric(data, metric, *desc, tags=self.curr_config.tags) # Process the service check diff --git a/tests/test_elastic.py b/tests/test_elastic.py index 3626032fca..8baa011144 100644 --- a/tests/test_elastic.py +++ b/tests/test_elastic.py @@ -1,4 +1,5 @@ # stdlib +import socket import unittest # 3p @@ -36,11 +37,15 @@ def test_bad_config(self): def test_check(self): agentConfig = { 'version': '0.1', - 'api_key': 'toto' + 'api_key': 'toto', + 'hostname': 'agent-es-test' } conf = { - 'instances': [{'url': 'http://localhost:%s' % PORT}] + 'instances': [ + {'url': 'http://localhost:%s' % PORT}, + {'url': 'http://localhost:%s' % PORT, 'is_external': True} + ] } # Initialize the check from checks.d @@ -48,6 +53,7 @@ def test_check(self): self.check.check(conf['instances'][0]) r = self.check.get_metrics() + self.check.get_events() self.assertTrue(type(r) == type([])) self.assertTrue(len(r) > 0) @@ -86,4 +92,13 @@ def test_check(self): self.check.cluster_status[conf['instances'][0].get('url')] = "red" self.check.check(conf['instances'][0]) events = self.check.get_events() + self.check.get_metrics() self.assertEquals(len(events),1,events) + + # Check an "external" cluster + self.check.check(conf['instances'][1]) + r = self.check.get_metrics() + expected_hostname = socket.gethostname() + for m in r: + if m[0] not in self.check.CLUSTER_HEALTH_METRICS: + self.assertEquals(m[3]['hostname'], expected_hostname)