From aeaeab822f50cb5bd21bd98b70e844bc021cf0bb Mon Sep 17 00:00:00 2001 From: Marina Samuel Date: Fri, 27 Apr 2018 10:31:01 -0400 Subject: [PATCH] Closes #379: Add a task to monitor data source health. --- redash/monitor.py | 2 + redash/query_runner/__init__.py | 6 +- redash/settings/__init__.py | 13 ++- redash/settings/helpers.py | 6 ++ redash/tasks/__init__.py | 1 + redash/tasks/health.py | 59 ++++++++++++++ redash/worker.py | 4 + tests/tasks/test_health.py | 136 ++++++++++++++++++++++++++++++++ 8 files changed, 224 insertions(+), 3 deletions(-) create mode 100644 redash/tasks/health.py create mode 100644 tests/tasks/test_health.py diff --git a/redash/monitor.py b/redash/monitor.py index f1f241eb26..ced9b42920 100644 --- a/redash/monitor.py +++ b/redash/monitor.py @@ -1,3 +1,4 @@ +import json from redash import redis_connection, models, __version__, settings @@ -14,6 +15,7 @@ def get_object_counts(): status['unused_query_results_count'] = models.QueryResult.unused().count() status['dashboards_count'] = models.Dashboard.query.count() status['widgets_count'] = models.Widget.query.count() + status['data_sources'] = json.loads(redis_connection.get('data_sources:health') or '{}') return status diff --git a/redash/query_runner/__init__.py b/redash/query_runner/__init__.py index 2851587f78..e88ec28ddf 100644 --- a/redash/query_runner/__init__.py +++ b/redash/query_runner/__init__.py @@ -102,10 +102,12 @@ def get_data_source_version(self): return version - def test_connection(self): + def test_connection(self, custom_query_text=None): if self.noop_query is None: raise NotImplementedError() - data, error = self.run_query(self.noop_query, None) + + query_text = custom_query_text or self.noop_query + data, error = self.run_query(query_text, None) if error is not None: raise Exception(error) diff --git a/redash/settings/__init__.py b/redash/settings/__init__.py index d2ad704802..b7b88c64d2 100644 --- a/redash/settings/__init__.py +++ b/redash/settings/__init__.py @@ -1,7 +1,7 @@ import os from funcy import distinct, remove -from .helpers import parse_db_url, fix_assets_path, array_from_string, parse_boolean, int_or_none, set_from_string +from .helpers import parse_db_url, fix_assets_path, array_from_string, parse_boolean, int_or_none, set_from_string, dict_from_string def all_settings(): @@ -242,3 +242,14 @@ def all_settings(): # Allow Parameters in Embeds # WARNING: With this option enabled, Redash reads query parameters from the request URL (risk of SQL injection!) ALLOW_PARAMETERS_IN_EMBEDS = parse_boolean(os.environ.get("REDASH_ALLOW_PARAMETERS_IN_EMBEDS", "false")) + +# Allow for a map of custom queries to test data source performance and availability. +# A sample map may look like: +# { +# "1": "select 1;", +# "5": "select 1;" +# } +CUSTOM_HEALTH_QUERIES = dict_from_string(os.environ.get("REDASH_CUSTOM_HEALTH_QUERIES", "")) + +# Frequency of health query runs in minutes (12 hours by default) +HEALTH_QUERIES_REFRESH_SCHEDULE = int(os.environ.get("REDASH_HEALTH_QUERIES_REFRESH_SCHEDULE", 720)) diff --git a/redash/settings/helpers.py b/redash/settings/helpers.py index e55d61001d..cfe69bd38d 100644 --- a/redash/settings/helpers.py +++ b/redash/settings/helpers.py @@ -33,6 +33,12 @@ def array_from_string(s): return [item.strip() for item in array] +def dict_from_string(s): + try: + return json.loads(s) + except ValueError: + return {} + def set_from_string(s): return set(array_from_string(s)) diff --git a/redash/tasks/__init__.py b/redash/tasks/__init__.py index f242e4c516..05d51aae50 100644 --- a/redash/tasks/__init__.py +++ b/redash/tasks/__init__.py @@ -1,3 +1,4 @@ from .general import record_event, version_check, send_mail +from .health import health_status from .queries import QueryTask, refresh_queries, refresh_schemas, cleanup_tasks, cleanup_query_results, execute_query from .alerts import check_alerts_for_query \ No newline at end of file diff --git a/redash/tasks/health.py b/redash/tasks/health.py new file mode 100644 index 0000000000..2502b32ddf --- /dev/null +++ b/redash/tasks/health.py @@ -0,0 +1,59 @@ +import json +import time +from random import randint + +from celery.utils.log import get_task_logger +from redash import models, redis_connection, settings, statsd_client +from redash.worker import celery +from redash.utils import parse_human_time + +logger = get_task_logger(__name__) + + +def update_health_status(data_source_id, data_source_name, query_text, data): + key = "data_sources:health" + + cache = json.loads(redis_connection.get(key) or '{}') + if data_source_id not in cache: + cache[data_source_id] = { + "metadata": { "name": data_source_name }, + "queries": {} + } + cache[data_source_id]["queries"][query_text] = data + + cache[data_source_id]["status"] = "SUCCESS" + for query_status in cache[data_source_id]["queries"].values(): + if query_status["status"] == "FAIL": + cache[data_source_id]["status"] = "FAIL" + break + + redis_connection.set(key, json.dumps(cache)) + +@celery.task(name="redash.tasks.health_status", time_limit=90, soft_time_limit=60) +def health_status(): + for ds in models.DataSource.query: + logger.info(u"task=health_status state=start ds_id=%s", ds.id) + + runtime = None + query_text = ds.query_runner.noop_query + custom_queries = settings.CUSTOM_HEALTH_QUERIES + ds_id = str(ds.id) + + if custom_queries and ds_id in custom_queries: + query_text = custom_queries[ds_id] + + try: + start_time = time.time() + ds.query_runner.test_connection(query_text) + runtime = time.time() - start_time + except Exception as e: + logger.warning(u"Failed health check for the data source: %s", ds.name, exc_info=1) + statsd_client.incr('health_status.error') + logger.info(u"task=health_status state=error ds_id=%s runtime=%.2f", ds.id, time.time() - start_time) + + update_health_status(ds_id, ds.name, query_text, { + "status": "SUCCESS" if runtime is not None else "FAIL", + "last_run": start_time, + "last_run_human": str(parse_human_time(str(start_time))), + "runtime": runtime + }) diff --git a/redash/worker.py b/redash/worker.py index 629180b1f1..668da00735 100644 --- a/redash/worker.py +++ b/redash/worker.py @@ -16,6 +16,10 @@ include='redash.tasks') celery_schedule = { + 'health_status': { + 'task': 'redash.tasks.health_status', + 'schedule': timedelta(minutes=settings.HEALTH_QUERIES_REFRESH_SCHEDULE) + }, 'refresh_queries': { 'task': 'redash.tasks.refresh_queries', 'schedule': timedelta(seconds=30) diff --git a/tests/tasks/test_health.py b/tests/tasks/test_health.py new file mode 100644 index 0000000000..37e4f9ea7d --- /dev/null +++ b/tests/tasks/test_health.py @@ -0,0 +1,136 @@ +import json +import mock +from tests import BaseTestCase + +from redash import redis_connection +from redash.tasks.health import update_health_status, health_status + + +class TestHealthStatus(BaseTestCase): + def setUp(self): + super(TestHealthStatus, self).setUp() + self.patched_custom_queries = self._setup_mock('redash.tasks.health.settings') + self.patched_updated_health_status = self._setup_mock('redash.tasks.health.update_health_status') + self.patched_run_query = self._setup_mock('redash.query_runner.pg.PostgreSQL.run_query') + + self.patched_run_query.return_value = ("some_data", None) + self.patched_custom_queries.CUSTOM_HEALTH_QUERIES = "" + + def _setup_mock(self, function_to_patch): + patcher = mock.patch(function_to_patch) + patched_function = patcher.start() + self.addCleanup(patcher.stop) + return patched_function + + def test_update_health_status_sets_correct_keys(self): + current_health = redis_connection.get('data_sources:health') + self.assertEqual(None, current_health) + + DATA_SOURCE = self.factory.create_data_source() + QUERY_SUCCESS = "SELECT 1" + QUERY_FAIL = "SELECT meep" + SOME_DATA_FAIL = {"a": "b", "foo": "bar", "status": "FAIL"} + SOME_DATA_SUCCESS = {"a": "b", "foo": "bar", "status": "SUCCESS"} + update_health_status(str(DATA_SOURCE.id), DATA_SOURCE.name, QUERY_FAIL, SOME_DATA_FAIL) + update_health_status(str(DATA_SOURCE.id), DATA_SOURCE.name, QUERY_SUCCESS, SOME_DATA_SUCCESS) + + ''' + The expected format of the cached health data is: + { + "": { + "metadata": "", + "queries": { + "": {...}, + "": {...}, + "": {...}, + ... + } + }, + ... + } + ''' + current_health = json.loads(redis_connection.get('data_sources:health')) + + # There is 1 data source. + self.assertEqual(1, len(current_health.keys())) + self.assertEqual(DATA_SOURCE.id, int(current_health.keys()[0])) + + # The data source has "metadata", "queries" and "status" keys. + ds_id = str(DATA_SOURCE.id) + self.assertEqual(3, len(current_health[ds_id].keys())) + self.assertTrue("metadata" in current_health[ds_id].keys()) + self.assertTrue("queries" in current_health[ds_id].keys()) + self.assertTrue("status" in current_health[ds_id].keys()) + + # There are two queries with correct data + self.assertEqual(2, len(current_health[ds_id]["queries"])) + self.assertTrue(QUERY_SUCCESS in current_health[ds_id]["queries"].keys()) + self.assertTrue(QUERY_FAIL in current_health[ds_id]["queries"].keys()) + self.assertEqual(SOME_DATA_FAIL, current_health[ds_id]["queries"][QUERY_FAIL]) + self.assertEqual(SOME_DATA_SUCCESS, current_health[ds_id]["queries"][QUERY_SUCCESS]) + self.assertEqual(SOME_DATA_FAIL["status"], current_health[ds_id]["status"]) + + def test_health_status_success(self): + data_sources = [] + for i in range(5): + data_sources.append(self.factory.create_data_source()) + + health_status() + + # Status is updated for each of the 5 data sources + self.assertEqual(self.patched_updated_health_status.call_count, 5) + + # The data source name and id is correctly passed in the last call of update_health_status() + args, kwargs = self.patched_updated_health_status.call_args + self.assertEqual(str(data_sources[-1].id), args[0]) + self.assertEqual(data_sources[-1].name, args[1]) + + # All expected status keys are available. + EXPECTED_KEYS = ["status", "last_run", "last_run_human", "runtime"] + NEW_STATUS = args[3] + new_status_keys = set(NEW_STATUS.keys()) + self.assertEqual(set(EXPECTED_KEYS), new_status_keys) + + self.assertEqual("SUCCESS", NEW_STATUS["status"]) + for key in EXPECTED_KEYS[1:]: + self.assertIsNotNone(NEW_STATUS[key]) + + def test_health_status_run_query_throws_exception(self): + data_source = self.factory.create_data_source() + + def exception_raiser(*args, **kwargs): + raise Exception + + self.patched_run_query.side_effect = exception_raiser + health_status() + + # Status is updated for the one data source + self.assertEqual(self.patched_updated_health_status.call_count, 1) + + # The data source name is correctly passed in the last call of update_health_status() + args, kwargs = self.patched_updated_health_status.call_args + self.assertEqual(str(data_source.id), args[0]) + self.assertEqual(data_source.name, args[1]) + self.assertEqual(data_source.query_runner.noop_query, args[2]) + + # All expected status keys are available. + EXPECTED_KEYS = ['status', 'last_run', 'last_run_human', 'runtime'] + NEW_STATUS = args[3] + new_status_keys = set(NEW_STATUS.keys()) + self.assertEqual(set(EXPECTED_KEYS), new_status_keys) + + self.assertEqual('FAIL', NEW_STATUS['status']) + self.assertIsNotNone(NEW_STATUS['last_run']) + self.assertIsNotNone(NEW_STATUS['last_run_human']) + self.assertIsNone(NEW_STATUS['runtime']) + + def test_health_status_custom_query(self): + CUSTOM_QUERY = "select * from table" + data_source = self.factory.create_data_source() + self.patched_custom_queries.CUSTOM_HEALTH_QUERIES = {"1": CUSTOM_QUERY} + + health_status() + + args, kwargs = self.patched_updated_health_status.call_args + self.assertNotEqual(data_source.query_runner.noop_query, args[2]) + self.assertEqual(CUSTOM_QUERY, args[2])