Skip to content

Commit

Permalink
[postgres] Add and update replication metrics to the postgres check
Browse files Browse the repository at this point in the history
* Add postgresql.replication_delay_bytes metric
* Add postgresql.total_tables metric
* Update postgresql.replication_delay because it wasn't accurate
* Update the test suite
  • Loading branch information
DorianZaccaria committed Mar 18, 2015
1 parent 3729b72 commit bee8eec
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 41 deletions.
58 changes: 47 additions & 11 deletions checks.d/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,34 @@ class PostgreSql(AgentCheck):
relname = ANY(%s)"""
}

REPLICATION_METRICS = {
'descriptors': [],
COUNT_METRICS = {
'descriptors': [
('schemaname', 'schema')
],
'metrics': {
'GREATEST(0, EXTRACT(EPOCH FROM now() - pg_last_xact_replay_timestamp())) AS replication_delay': ('postgresql.replication_delay', GAUGE),
'pg_stat_user_tables': ('postgresql.total_tables', GAUGE),
},
'relation': False,
'query': """
SELECT schemaname, count(*)
FROM %s
GROUP BY schemaname
"""
}

REPLICATION_METRICS_9_1 = {
'CASE WHEN pg_last_xlog_receive_location() = pg_last_xlog_replay_location() THEN 0 ELSE GREATEST (0, EXTRACT (EPOCH FROM now() - pg_last_xact_replay_timestamp())) END': ('postgresql.replication_delay', GAUGE),
}

REPLICATION_METRICS_9_2 = {
'abs(pg_xlog_location_diff(pg_last_xlog_receive_location(), pg_last_xlog_replay_location())) AS replication_delay_bytes': ('postgres.replication_delay_bytes', GAUGE)
}

REPLICATION_METRICS = {
'descriptors': [],
'metrics': {},
'relation': False,
'query': """
SELECT %s
WHERE (SELECT pg_is_in_recovery())"""
}
Expand All @@ -205,6 +226,7 @@ def __init__(self, name, init_config, agentConfig, instances=None):
self.bgw_metrics = {}
self.db_instance_metrics = []
self.db_bgw_metrics = []
self.replication_metrics = {}

def _get_version(self, key, db):
if key not in self.versions:
Expand Down Expand Up @@ -239,7 +261,7 @@ def _get_instance_metrics(self, key, db):
"""
# Extended 9.2+ metrics if needed
metrics = self.instance_metrics.get(key)

if metrics is None:

# Hack to make sure that if we have multiple instances that connect to
Expand All @@ -254,7 +276,7 @@ def _get_instance_metrics(self, key, db):

self.db_instance_metrics.append(sub_key)


if self._is_9_2_or_above(key, db):
self.instance_metrics[key] = dict(self.COMMON_METRICS, **self.NEWER_92_METRICS)
else:
Expand Down Expand Up @@ -292,6 +314,19 @@ def _get_bgw_metrics(self, key, db):
metrics = self.bgw_metrics.get(key)
return metrics

def _get_replication_metrics(self, key, db):
""" Use either REPLICATION_METRICS_9_1 or REPLICATION_METRICS_9_1 + REPLICATION_METRICS_9_2
depending on the postgres version.
Uses a dictionnary to save the result for each instance
"""
metrics = self.replication_metrics.get(key)
if self._is_9_1_or_above(key, db) and metrics is None:
self.replication_metrics[key] = dict(self.REPLICATION_METRICS_9_1)
if self._is_9_2_or_above(key, db):
self.replication_metrics[key].update(self.REPLICATION_METRICS_9_2)
metrics = self.replication_metrics.get(key)
return metrics

def _collect_stats(self, key, db, instance_tags, relations, custom_metrics):
"""Query pg_stat_* for various metrics
If relations is not an empty list, gather per-relation metrics
Expand All @@ -305,7 +340,8 @@ def _collect_stats(self, key, db, instance_tags, relations, custom_metrics):
self.DB_METRICS,
self.CONNECTION_METRICS,
self.BGW_METRICS,
self.LOCK_METRICS
self.LOCK_METRICS,
self.COUNT_METRICS
]

# Do we need relation-specific metrics?
Expand All @@ -316,9 +352,9 @@ def _collect_stats(self, key, db, instance_tags, relations, custom_metrics):
self.SIZE_METRICS
]

# Only available for >= 9.1 due to
# pg_last_xact_replay_timestamp
if self._is_9_1_or_above(key,db):
replication_metrics = self._get_replication_metrics(key, db)
if replication_metrics is not None:
self.REPLICATION_METRICS['metrics'] = replication_metrics
metric_scope.append(self.REPLICATION_METRICS)

full_metric_scope = list(metric_scope) + custom_metrics
Expand Down Expand Up @@ -454,15 +490,15 @@ def _process_customer_metrics(self,custom_metrics):
if param not in m:
raise CheckException("Missing {0} parameter in custom metric"\
.format(param))

self.log.debug("Metric: {0}".format(m))

for k, v in m['metrics'].items():
if v[1].upper() not in ['RATE','GAUGE','MONOTONIC']:
raise CheckException("Collector method {0} is not known."\
"Known methods are RATE,GAUGE,MONOTONIC".format(
v[1].upper()))

m['metrics'][k][1] = getattr(PostgreSql, v[1].upper())
self.log.debug("Method: %s" % (str(v[1])))

Expand Down
87 changes: 57 additions & 30 deletions tests/test_postgresql.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import unittest
from tests.common import load_check, AgentCheckTest

Expand All @@ -6,6 +7,16 @@
import time
from pprint import pprint


# postgres version: (expected metrics, expected tagged metrics per database)
METRICS = {
'9.4.0': (40, 26),
'9.3.5': (40, 26),
'9.2.9': (40, 26),
'9.1.14': (35, 23),
'9.0.18': (34, 23),
}

@attr(requires='postgres')
class TestPostgres(AgentCheckTest):

Expand Down Expand Up @@ -33,7 +44,7 @@ def test_checks(self):
"metrics": {
"numbackends": ["custom.numbackends", "Gauge"],
},
"query": "SELECT datname, %s FROM pg_stat_database WHERE datname = 'datadog_test' LIMIT(1)",
"query": "SELECT datname, %s FROM pg_stat_database WHERE datname = 'datadog_test' LIMIT(1)",
"relation": False,
}]
},
Expand Down Expand Up @@ -61,42 +72,53 @@ def test_checks(self):
db = self.check.dbs[key]

metrics = self.check.get_metrics()
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.connections']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.dead_rows']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.live_rows']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.table_size']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.index_size']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.total_size']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.max_connections']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.percent_usage_connections']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.db.count']) == 1, pprint(metrics))
# Don't test for locks
# self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.locks']) >= 1, pprint(metrics))
# Brittle tests
# self.assertTrue(4 <= len(metrics) <= 6, metrics)
# self.assertTrue(4 <= len([m for m in metrics if 'db:datadog_test' in str(m[3]['tags']) ]) <= 5, pprint(metrics))
# self.assertTrue(len([m for m in metrics if 'table:persons' in str(m[3]['tags'])]) == 2, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.connections']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.dead_rows']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.live_rows']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.table_size']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.index_size']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.total_size']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.max_connections']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.percent_usage_connections']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.total_tables']) == 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.db.count']) == 1, pprint(metrics))

# Rate metrics, need 2 collection rounds
time.sleep(1)
self.check.run()
metrics = self.check.get_metrics()

exp_metrics = 39
exp_db_tagged_metrics = 26
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.bgwriter.checkpoints_timed']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.bgwriter.checkpoints_requested']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.bgwriter.buffers_checkpoint']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.bgwriter.buffers_clean']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.bgwriter.maxwritten_clean']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.bgwriter.buffers_backend']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.bgwriter.buffers_alloc']) >= 1, pprint(metrics))

self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.commits']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.rollbacks']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.disk_read']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.buffer_hit']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.rows_returned']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.rows_fetched']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.rows_inserted']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.rows_updated']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.rows_deleted']) >= 1, pprint(metrics))

self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.seq_scans']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.seq_rows_read']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.rows_hot_updated']) >= 1, pprint(metrics))

if self.check._is_9_1_or_above(key, db):
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.bgwriter.buffers_backend_fsync']) >= 1, pprint(metrics))

if self.check._is_9_2_or_above(key, db):
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.bgwriter.sync_time']) >= 1, pprint(metrics))
else:
if not self.check._is_9_1_or_above(key, db):
# No replication metric
exp_metrics -= 1

# Not all bgw metrics
exp_metrics -= 2
# Not all common metrics see NEWER_92_METRICS
exp_metrics -= 3
exp_db_tagged_metrics -= 3
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.bgwriter.write_time']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.bgwriter.sync_time']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.deadlocks']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.temp_bytes']) >= 1, pprint(metrics))
self.assertTrue(len([m for m in metrics if m[0] == u'postgresql.temp_files']) >= 1, pprint(metrics))

# Service checks
service_checks = self.check.get_service_checks()
Expand All @@ -113,9 +135,14 @@ def test_checks(self):
self.check.run()
metrics = self.check.get_metrics()

self.assertEquals(len([m for m in metrics if 'table:persons' in str(m[3].get('tags', [])) ]), 11, metrics)

pg_version_array = self.check._get_version(key, db)
pg_version = '.'.join(str(x) for x in pg_version_array)
exp_metrics = METRICS[pg_version][0]
exp_db_tagged_metrics = METRICS[pg_version][1]
self.assertEquals(len(metrics), exp_metrics, metrics)
self.assertEquals(len([m for m in metrics if 'db:datadog_test' in str(m[3].get('tags', []))]), exp_db_tagged_metrics, metrics)
self.assertEquals(len([m for m in metrics if 'table:persons' in str(m[3].get('tags', [])) ]), 11, metrics)

self.metrics = metrics
self.assertMetric("custom.numbackends")
Expand Down

0 comments on commit bee8eec

Please sign in to comment.